前言
es是大数据存储的必备中间件之一,通过flink可以读取来自日志文件,kafka等外部数据源的数据,然后写入到es中,本篇将通过实例演示下完整的操作过程;
一、前置准备
1、提前搭建并开启es服务(本文使用docker搭建的es7.6的服务);
2、提前搭建并开启kibana服务(便于操作es的索引数据);
3、提前创建一个测试用的索引
PUT test_index
注意点:
使用docker搭建的es,可能会出现创建完毕索引后,插入数据报错的问题,即提示无操作权限的问题,如果出现这个问题,请执行下面的这段,否则在运行flink代码的时候也会报错;
PUT _settings
{"index": {"blocks": {"read_only_allow_delete": "false"}}
}
二、编写程序
1、导入基础的pom依赖
<dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.12.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.12.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.12.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.3</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.49</version></dependency><dependency><groupId>com.alibaba.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>1.2.0</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.75</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.12</artifactId><version>1.12.0</version></dependency><!--新引入的包--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch7_2.11</artifactId><version>1.12.0</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-core</artifactId><version>2.11.2</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_2.12</artifactId><version>1.12.0</version></dependency></dependencies>
2、准备一个外部文件用于程序读取
csv 文件内容如下
3、核心程序代码
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;import java.util.ArrayList;
import java.util.HashMap;public class SinkEs {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//从环境的集合中获取数据String path = "E:\\code-self\\flink_study\\src\\main\\resources\\userinfo.txt";DataStreamSource<String> inputStream = env.readTextFile(path);SingleOutputStreamOperator<UserInfo> dataStream = inputStream.map(new MapFunction<String, UserInfo>() {@Overridepublic UserInfo map(String value) throws Exception {String[] fields = value.split(",");return new UserInfo(fields[0], fields[1]);}});ArrayList<HttpHost> httpHosts = new ArrayList<>();httpHosts.add(new HttpHost("IP",9200));ElasticsearchSink.Builder<UserInfo> result =new ElasticsearchSink.Builder<UserInfo>(httpHosts, new ElasticsearchSinkFunction<UserInfo>() {@Overridepublic void process(UserInfo element, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {//具体数据写入的操作HashMap<String, String> dataSource = new HashMap<>();dataSource.put("id",element.getId());dataSource.put("name",element.getName());//创建请求作为向es写入的请求命令IndexRequest indexRequest = Requests.indexRequest().index("test_index").source(dataSource);//发送请求requestIndexer.add(indexRequest);}});result.setBulkFlushMaxActions(1);dataStream.addSink(result.build());env.execute();System.out.println("数据写入es成功");}}
上面代码中涉及到的一个UserInfo对象
public class UserInfo {private String id;private String name;public UserInfo() {}public UserInfo(String id, String name) {this.id = id;this.name = name;}public String getId() {return id;}public void setId(String id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}
}
运行上面的程序,观察控制台输出
4、使用kibana查询结果
执行下面的查询语句
GET test_index/_search
{"query": {"match_all": {}}
}
看到下面的结果,说明数据成功写入到es
程序运行过程中的问题总结
本次编写代码向es导入数据时,遇到了2点问题,在此做一下记录,避免后面的踩坑
1、报错截图如下
大概的意思是通过flink程序写入到es的时候,时间类型对不上,解决办法是,在程序中添加如下的代码:
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
2、报错截图如下
大概的意思是:ElasticSearch进入“只读”模式,只允许删除,网上给出了一些解决方案说是内存不足导致的,但是我设置了好像不行,最后的解决办法就是文章开头说的那样,做一下设置即可,即设置为false;
PUT _settings
{"index": {"blocks": {"read_only_allow_delete": "false"}}
}