【大数据】flink 读取文件数据写入ElasticSearch

news/2024/4/29 9:15:49/文章来源:https://blog.csdn.net/congge_study/article/details/127822270

前言

es是大数据存储的必备中间件之一,通过flink可以读取来自日志文件,kafka等外部数据源的数据,然后写入到es中,本篇将通过实例演示下完整的操作过程;

一、前置准备

1、提前搭建并开启es服务(本文使用docker搭建的es7.6的服务);

2、提前搭建并开启kibana服务(便于操作es的索引数据);

3、提前创建一个测试用的索引

PUT test_index

注意点:

使用docker搭建的es,可能会出现创建完毕索引后,插入数据报错的问题,即提示无操作权限的问题,如果出现这个问题,请执行下面的这段,否则在运行flink代码的时候也会报错;

PUT _settings
{"index": {"blocks": {"read_only_allow_delete": "false"}}
}

二、编写程序

1、导入基础的pom依赖

    <dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.12.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.12.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.12.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.3</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.49</version></dependency><dependency><groupId>com.alibaba.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>1.2.0</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.75</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.12</artifactId><version>1.12.0</version></dependency><!--新引入的包--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch7_2.11</artifactId><version>1.12.0</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-core</artifactId><version>2.11.2</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_2.12</artifactId><version>1.12.0</version></dependency></dependencies>

2、准备一个外部文件用于程序读取

csv 文件内容如下

3、核心程序代码

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;import java.util.ArrayList;
import java.util.HashMap;public class SinkEs {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//从环境的集合中获取数据String path = "E:\\code-self\\flink_study\\src\\main\\resources\\userinfo.txt";DataStreamSource<String> inputStream = env.readTextFile(path);SingleOutputStreamOperator<UserInfo> dataStream = inputStream.map(new MapFunction<String, UserInfo>() {@Overridepublic UserInfo map(String value) throws Exception {String[] fields = value.split(",");return new UserInfo(fields[0], fields[1]);}});ArrayList<HttpHost> httpHosts = new ArrayList<>();httpHosts.add(new HttpHost("IP",9200));ElasticsearchSink.Builder<UserInfo> result =new ElasticsearchSink.Builder<UserInfo>(httpHosts, new ElasticsearchSinkFunction<UserInfo>() {@Overridepublic void process(UserInfo element, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {//具体数据写入的操作HashMap<String, String> dataSource = new HashMap<>();dataSource.put("id",element.getId());dataSource.put("name",element.getName());//创建请求作为向es写入的请求命令IndexRequest indexRequest = Requests.indexRequest().index("test_index").source(dataSource);//发送请求requestIndexer.add(indexRequest);}});result.setBulkFlushMaxActions(1);dataStream.addSink(result.build());env.execute();System.out.println("数据写入es成功");}}

上面代码中涉及到的一个UserInfo对象

public class UserInfo {private String id;private String name;public UserInfo() {}public UserInfo(String id, String name) {this.id = id;this.name = name;}public String getId() {return id;}public void setId(String id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}
}

运行上面的程序,观察控制台输出

4、使用kibana查询结果

执行下面的查询语句

GET test_index/_search
{"query": {"match_all": {}}
}

看到下面的结果,说明数据成功写入到es 

 程序运行过程中的问题总结

本次编写代码向es导入数据时,遇到了2点问题,在此做一下记录,避免后面的踩坑

1、报错截图如下

大概的意思是通过flink程序写入到es的时候,时间类型对不上,解决办法是,在程序中添加如下的代码:

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

2、报错截图如下

大概的意思是:ElasticSearch进入“只读”模式,只允许删除,网上给出了一些解决方案说是内存不足导致的,但是我设置了好像不行,最后的解决办法就是文章开头说的那样,做一下设置即可,即设置为false;

PUT _settings
{"index": {"blocks": {"read_only_allow_delete": "false"}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_224062.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

图像分割 - Hough变换直线检测

目录 1. Hough 直线检测 2. HoughLinesP 函数 1. Hough 直线检测 霍夫变换&#xff08;Hough 变换&#xff09;&#xff1a;利用对偶原理&#xff0c;把原空间的问题转换到对偶空间去求解 这里涉及到空间转换&#xff0c;将原来的笛卡尔空间&#xff08;xy空间&#xff09;…

App安全架构之前端安全防护

近年来&#xff0c;随着互联网、物联网、移动设备、5G通讯等技术的齐头发展&#xff0c;人类的生活和工作越来越离不开软件和互联网&#xff0c;正如人类社会文明发展到一定程度以后&#xff0c;会需要法律等社会规范来保护一样&#xff0c;线上环境也是一样道理。 Gartner 对…

Python学习小组课程-课程大纲与Python开发环境安装

一、前言 注意&#xff1a;此为内部小组学习资料&#xff0c;非售卖品&#xff0c;仅供学习参考。 为提升项目落地的逻辑思维能力&#xff0c;以及通过自我创造工具来提升工作效率&#xff0c;特成立Python学习小组。计划每周花一个小时进行在线会议直播学习&#xff0c;面向…

国内访问Github超级慢?那是你没有用我这个脚本。直接起飞。

导语 之前很多朋友咨询过国内访问Github较慢的问题&#xff0c;然后我一般让他们自己去知乎上找攻略&#xff0c;但今天我才发现网上竟然没有一个一键配置的脚本&#xff0c;一般都需要我们跟着教程一步步地去做才行。这也太麻烦了&#xff0c;于是自己动手写了个脚本&#xf…

ceph浅谈

总谈 ceph简介 用上ceph&#xff0c;多台机器的磁盘空间在一起了&#xff0c;在一台机器上就可以看到使用所有空间。 还可以保存多份安全备份 存储先ceph&#xff0c;自我管理修复&#xff0c;跨机房&#xff0c;节点越多&#xff0c;并行化&#xff0c;论上&#xff0c;节点越…

1-(3-磺酸基)丙基-1-甲基-2-吡咯烷酮三氟甲磺酸盐[C3SO3Hnmp]CF3SO3

1-(3-磺酸基)丙基-1-甲基-2-吡咯烷酮三氟甲磺酸盐[C3SO3Hnmp]CF3SO3 离子液体(IonicLiquids)是完全由离子组成&#xff0c;现在多指在低于100摄氏度时呈液体状态的熔盐。通常由特定的有机阳离子和无机阴离子&#xff08;或有机阴离子&#xff09;构成。 离子液体特点 蒸汽压…

C++基础——模板讲解

目录 一. 泛型编程 二. 函数模板 1.格式&#xff1a; 2.定义&#xff1a; 1.隐式实例化 2.显式实例化 3.解决方法3&#xff1a;使用多个T类型 4.在C中编译器允许非模板函数和模板函数同时存在 一. 泛型编程 先来看一段代码&#xff1a; void Swap(int& p1, int&am…

LeetCode:8. 字符串转换整数 (atoi)

8. 字符串转换整数 &#xff08;atoi&#xff09;1&#xff09;题目2&#xff09;思路3&#xff09;代码4&#xff09;结果1&#xff09;题目 请你来实现一个 myAtoi(string s) 函数&#xff0c;使其能将字符串转换成一个 32 位有符号整数&#xff08;类似 C/C 中的 atoi 函数…

逻辑判断与正则表达式文本处理

上一篇文章分享了正则表达式的操作&#xff0c;这一篇文章就让我们一起看看正则表达式与逻辑判断结合起来会发生什么吧&#xff01;感兴趣的小伙伴欢迎评论区或者是私信留言&#xff01; 一、题目描述&#xff1a; 输入一个字符串,检查其是否为合法的python变量。输入$$$结束:…

数据结构【队列】

文章目录&#xff08;一&#xff09;队列定义&#xff08;二&#xff09;队列实现&#xff08;1&#xff09;创建结构体&#xff08;2&#xff09;具体函数实现及解析1.1 初始化队列1.2入队列1.3出队列1.4取队首元素1.5取队尾元素1.6返回队列个数1.7判断是否为空1.8销毁队列&am…

FITC标记的STAT1-ASON,绿色荧光素标记STAT1反义寡核苷酸,FITC-STAT1-ASON

产品名称&#xff1a;FITC标记的STAT1-ASON&#xff0c;绿色荧光素标记STAT1反义寡核苷酸 ​​​​​​​英文名称&#xff1a;FITC-STAT1-ASON STAT1 是第一个被发现的 STATs 家族成员&#xff0c;其编码基因位于 2 号染色体上&#xff0c;由 750 个氨基酸残基组成&#xff…

随想录一刷Day55——动态规划

文章目录Day55_动态规划47. 判断子序列48. 不同的子序列Day55_动态规划 47. 判断子序列 392. 判断子序列 思路&#xff1a; 双指针很简单&#xff0c;O(n)O(n)O(n) 时间就能解决 这里还是用dp dp[i][j] 表示以 s[i - 1] 结尾的字符串和以 t[]i-1 为结尾的字符串的最大子序列长…

Linux篇【5】:Linux 进程概念(二)

目录 3.5、查看进程 3.6、通过系统调用接口获取正在进行的进程的标识符 3.7、通过系统调用接口创建子进程 - fork 初识 3.5、查看进程 [HJMhjmlcc ~]$ clear [HJMhjmlcc ~]$ pwd /home/HJM [HJMhjmlcc ~]$ ls [HJMhjmlcc ~]$ touch mytest.c [HJMhjmlcc ~]$ ls mytest.c [H…

基于51单片机的简易数字计算器Proteus仿真

资料编号&#xff1a;115 下面是相关功能视频演示&#xff1a; 115-基于51单片机的简易数字计算器Proteus仿真&#xff08;源码仿真全套资料&#xff09;功能说明&#xff1a; 该计算器系统51 系列的单片机进行的数字计算器系统设计&#xff0c;可以完成计算器的键盘输入&…

一看就会的Java方法

文章目录一、方法的定义和使用&#x1f351;1、为什么引入方法&#xff1f;&#x1f351;2、方法的定义&#x1f351;3、方法调用的执行过程&#x1f351;4、实参和形参的关系二、方法重载&#x1f351;1、为什么需要方法重载&#x1f351;2、方法重载的概念和特点&#x1f351…

用DIV+CSS技术设计的体育主题网站(足球介绍)

&#x1f389;精彩专栏推荐 &#x1f4ad;文末获取联系 ✍️ 作者简介: 一个热爱把逻辑思维转变为代码的技术博主 &#x1f482; 作者主页: 【主页——&#x1f680;获取更多优质源码】 &#x1f393; web前端期末大作业&#xff1a; 【&#x1f4da;毕设项目精品实战案例 (10…

【面经】之小鼠喝药问题

题目 现在有 10 只小白鼠和 1000 支药水&#xff0c;1000 支药水中有且仅有一支药水有毒&#xff0c;如果小白鼠喝下毒药&#xff0c;那么毒发的时间是两小时。 现在只给你两小时的时间&#xff0c;请问如何用这 10 只小白鼠测出哪支药水有毒&#xff1f;&#xff08;忽略小白…

linux系统文件权限

目录 shell命令以及运行原理 具体体现(命令行解释器) Linux权限的概念 Linux下有两种用户&#xff1a;超级用户&#xff08;root&#xff09;、普通用户 su指令 Linux权限管理方面 文件访问者的分类&#xff08;人&#xff09; 为什么要有所属组&#xff1f; 文件属性…

STM32 Bootloader开发记录 2

在《stm32 bootloader开发记录.md》文档中&#xff0c;已经实现了Bootloader下的升级功能。可以在Bootloader启动时&#xff0c;进入升级模式&#xff0c;使用串口传输数据&#xff0c;来下载固件到flash中。 但是&#xff0c;在实际应用中&#xff0c;一般是在应用运行过程中…

基于单片机的指纹门禁设计

功能&#xff1a; 研究内容&#xff1a;本课题以单片机为核心采用C语言来开发一指纹电子密码锁。系统拟在Altium Designer9开发平台上设计原理图&#xff0c;并绘制PCB并制成单片机开发板&#xff0c;然后根据原理图将相关元器件焊接到开发板上。软件部分在Keil uVision4开发…