Flink 输出至 Elasticsearch

【1】引入pom.xml依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-elasticsearch6_2.12</artifactId>
    <version>1.10.0</version>
</dependency>

【2】ES6 Scala代码,自动导入的scala包需要修改为scala._ 否则会出现错误。

package com.zzx.flink

import java.util

import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
import org.apache.http.HttpHost
import org.elasticsearch.client.Requests


object EsSinkTest {
  def main(args: Array[String]): Unit = {
    // 创建一个流处理执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //从文件中读取数据并转换为 类
    val inputStreamFromFile: DataStream[String] = env.readTextFile("E:\\Project\\flink\\src\\main\\resources\\wordcount.txt")
    //转换
    val dataStream: DataStream[SensorReading] = inputStreamFromFile
      .map( data => {
        var dataArray = data.split(",")
        SensorReading(dataArray(0),dataArray(1).toLong,dataArray(2).toDouble)
      })

    //定义一个 HttpHosts
    val httpHost = new util.ArrayList[HttpHost]()
    //默认 9200 我的修改为了 9201
    httpHost.add(new HttpHost("192.168.1.12",9200,"http"))
    httpHost.add(new HttpHost("127.0.0.1","http"))
    //定义一个 ElasticSearchFuntion 操作 es的function
    val esSinkFunc = new ElasticsearchSinkFunction[SensorReading] {
      //element 每一条数据 通过 index 发送
      override def process(element: SensorReading, runtimeContext: RuntimeContext, index: RequestIndexer): Unit = {
        //包装写入 es 的数据
        val dataSource = new util.HashMap[String,String]()
        dataSource.put("sensor_id",element.id)
        dataSource.put("temp",element.temperature.toString)
        dataSource.put("ts",element.timestamp.toString)

        //index
        val indexRequest = Requests.indexRequest()
            .index("sensor_temp")
            .`type`("readingdata")
            .source(dataSource)
        index.add(indexRequest)
        println("saved successfully " + element.toString)
      }
    }
    //输出值 es
    dataStream.addSink(new ElasticsearchSink.Builder[SensorReading](httpHost,esSinkFunc).build())
    env.execute("es")
  }
}

【3】ES6输出展示

​ [点击并拖拽以移动] ​​

原文地址:https://blog.csdn.net/zhengzhaoyang122/article/details/135299055

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


文章浏览阅读774次,点赞24次,收藏16次。typescript项目中我们使用typings-for-css-modules-loader来替代css-loader实现css modules。1、typings-for-css-modules-loader加载器介绍 Webpack加载器,用作css-loader的替代产品,可动态生成CSS模块的TypeScript类型这句话是什么意思呢?就是编译时处理css文件...
文章浏览阅读784次。react router redux antd eslint prettier less axios_react+antd+redux+less
文章浏览阅读3.9k次,点赞5次,收藏11次。需要删除.security-7索引文件。把在第1步中的被注释的配置打开。之后就是按照提示输入密码。执行bin目录下的文件。_failed to authenticate user 'elastic' against
文章浏览阅读1.2k次,点赞23次,收藏24次。Centos 8 安装es_centos8 yum elasticsearch
文章浏览阅读3.2k次。设置完之后,数据会⾃动同步到其他节点。修改密码时,将第⼀步配置删除,然后重启。单独使⽤⼀个节点⽣成证书;执⾏设置⽤户名和密码的命令。执⾏完上⾯命令以后就可以在。⽂件,在⾥⾯添加如下内容。这个⽂件复制到其他节点下。其中⼀个节点设置密码即可。依次对每个账户设置密码。全部节点都要重启⼀遍。需要在配置⽂件中开启。个⽤户分别设置密码,⽬录下,证书⽂件名为。功能,并指定证书位置。_es设置账号和密码
文章浏览阅读1.9k次,点赞2次,收藏7次。针对多数据源写入的场景,可以借助MQ实现异步的多源写入,这种情况下各个源的写入逻辑互不干扰,不会由于单个数据源写入异常或缓慢影响其他数据源的写入,虽然整体写入的吞吐量增大了,但是由于MQ消费是异步消费,所以不适合实时业务场景。不易出现数据丢失问题,主要基于MQ消息的消费保障机制,比如ES宕机或者写入失败,还能重新消费MQ消息。针对这种情况,有数据强一致性要求的,就必须双写放到事务中来处理,而一旦用上事物,则性能下降更加明显。可能出现延时问题:MQ是异步消费模型,用户写入的数据不一定可以马上看到,造成延时。_mysql同步es
文章浏览阅读3.6w次,点赞48次,收藏44次。【程序员洲洲送书福利-第十九期】《C++ Core Guidelines解析》
文章浏览阅读1.3k次。当我们在开发Vue应用时,经常需要对表单进行校验,以确保用户输入的数据符合预期。Vue提供了一个强大的校验规则机制,通过定义rules规则,可以方便地对表单进行验证,并给出相应的错误提示。_vue ruler校验
文章浏览阅读2k次,点赞16次,收藏12次。Linux内核源码下载地址及方式_linux源码下载
文章浏览阅读1k次。这样在每天自动生成的索引skywalking_log_xxx就会使用上述模版来生成,timestamp会被设置成date类型。然后此时在–>索引管理–>kibana–>索引模式添加skywalking_log*索引时就会有时间字段了。在通过skywalking将日志收集到es后,由于skywalking收集的日志(skywalking_log索引)没有date类型的字段导致在es上再索引模式中没有时间范围的查询。skywalking收集的日志有时间戳字段timestamp,只是默认为long类型。_skywalking timestamp
文章浏览阅读937次,点赞18次,收藏21次。1.初始化git仓库,使用git int命令。2.添加文件到git仓库,两步走:2.1 使用命令,注意,可反复多次使用,添加多个文件;2.2 使用命令,完成。此笔记是我个人学习记录笔记,通过廖雪峰的笔记进行学习,用自己能理解的笔记记录下来,如果侵权,联系删。不存在任何盈利性质,单纯发布后,用于自己学习回顾。
文章浏览阅读786次,点赞8次,收藏7次。上述示例中的 origin 是远程仓库的名称,https://github.com/example/repository.git 是远程仓库的 URL,(fetch) 表示该远程仓库用于获取更新,(push) 表示该远程仓库用于推送更新。你可以选择在本地仓库创建与远程仓库分支对应的本地分支,也可以直接将本地仓库的分支推送到远程仓库的对应分支。将 替换为远程仓库的名称(例如 origin), 替换为要推送的本地分支的名称, 替换为要推送到的远程分支的名称。_git remote 智能切换仓库
文章浏览阅读1.5k次。配置eslint校验代码工具_eslint 实时校验
文章浏览阅读1.2k次,点赞28次,收藏26次。Git入门基础介绍,什么是Git,如何使用Git,以及Git的工作的基本原理
文章浏览阅读2.7k次。基于官方给出的几种不同环境不同的安装方式,本文将会选择在使用.zip文件在Windows上安装Elasticsearch在Linux或macOS上从存档文件安装ElasticsearchInstall Elasticsearch with Docker (此种方式待定)使用Docker安装Elasticsearch。_elasticsearch安装部署windows
文章浏览阅读3.3k次,点赞5次,收藏11次。【Linux驱动】内核模块编译 —— make modules 的使用(单模块编译、多模块编译)_make modules
文章浏览阅读1k次。docker启动es报错_max virtual memory areas vm.max_map_count [65530] is too low, increase to at
文章浏览阅读4.2k次,点赞2次,收藏6次。使用docker单机安装elasticsearch后再安装kibana时找不到es。_unable to retrieve version information from elasticsearch nodes. security_ex
文章浏览阅读1.1k次。日志处理对于任何现代IT系统都是关键部分,本教程专为新手设计,通过详细解释Logstash的三大核心组件,为您展示如何从零开始搭建强大的日志处理系统。您还将学习如何同步MySQL数据到Elasticsearch,并通过一个"Hello World"示例快速入门。无论您是完全的新手还是有一些基础,本教程都将引导您顺利掌握Logstash的基本操作和高级应用。_logstash mysql
文章浏览阅读1.1w次,点赞5次,收藏25次。执行这条指令之后,你的本地项目就与远程Git仓库建立了连接,你就可以开始对你的代码进行版本追踪和协作开发了。使用“git remote add origin”指令,可以轻松地将本地项目连接到远程Git仓库。git remote set-url origin 执行这条指令之后,Git就会将已经添加的名为“origin”的仓库删除。git remote add origin 其中,是你的远程Git仓库的网址。_git remote add origin