About-Flink
- 一、Flink简介
-
- 1.1、flink特点
- 1.2、分层Api
- 1.3、Flink vs Spark Streaming
- 二、Flink批处理应用
-
- 2.1、依赖的引入
- 2.2、准备批处理文件
- 2.3、wordCount编码
- 2.4、自定义类
- 三、Flink流处理应用
-
- 3.1、wordCount编码
- 3.2、设置并行度-默认为4
- 3.2、数据来源socket
- 3.3、配置文件参数提取
- 四、Standlone环境运行job
-
- 4.1、Standlone环境的搭建
- 4.2、配置文件说明
- 4.3、提交jar包入口
- 4.4、命令行提交Job
- 五、Flink On Yarn
-
- 5.1、session-Cluster模式
- 5.2、Per-Job-cluster模式
- 六、Flink 四大组件
-
- 6.1、Flink运行时的组件
- 6.2、任务提交流程
- 6.3、任务调度原理
- 6.4、并行度
- 6.5、程序和数据流
- 6.6、数据的传输形式
- 6.6、任务链
- 七、F流处理PAI
-
- 7.1、Environment
-
- 7.1.1、getExcuteionEnvironment
- 7.1.2、createLocalEnvironment
- 7.1.3、createRemoteEnvironment
- 7.2、Source
-
- 7.2.1、List
- 7.2.2、source from file
- 7.2.3、source from kafka
- 7.2.4、自定义source
- 7.3、Treansform
-
- 7.3.1、map
- 7.3.2、FlatMap
- 7.3.3、Fliter
- 7.3.4、Keyby
- 7.3.5、滚动聚合
- 7.3.6、Reduce
- 7.3.7、Split和select
一、Flink简介
1.1、flink特点
-
低延迟,高吞吐、结果精准,良好的容错
? 支持事件时间(event-time)和处理时间(processing-time)语义
? 精确一次(exactly-once)的状态一致性保证
? 低延迟,每秒处理数百万个事件,毫秒级延迟
? 与众多常用存储系统的连接
? 高可用,动态扩展,实现7*24小时全天候运行
1.2、分层Api
越顶层越抽象,表达含义越简明,使用越方便越底层越具体,表达能力越丰富,使用越灵活
1.3、Flink vs Spark Streaming
?数据模型
-spark采用RDD模型,spark streaming的DStream实际上也就是一组组小批数据RDD的集合
-flink基秀数据模型是数据流,以及事件(Event)序列?运行时架构
-spark是批计算,将DAG划分为不同的stage,一个完成后才可以计算下一个
-flink是标准的流执行模式,一个事件在一个节点处理完后可以直接发往下一个节
点进行处理
二、Flink批处理应用
2.1、依赖的引入
2.2、准备批处理文件
2.3、wordCount编码
2.4、自定义类
- 结果输出
三、Flink流处理应用
3.1、wordCount编码
3.2、设置并行度-默认为4
3.2、数据来源socket
- nc lk -7777
3.3、配置文件参数提取
四、Standlone环境运行job
4.1、Standlone环境的搭建
- 下载包 解压
- flink-1.10.1-bin.scala_2.12.tgz
4.2、配置文件说明
1、通常jobmanager 的配置比 taskmanager,因为干活的是taskmanager
2、并行度不一定比slots小,一定比集群总的slots小
- 启动一个jobmanage和一个taskmanager
- 配置参考
4.3、提交jar包入口
- 配置文件4个槽,只占用了2个槽。以并行度最高来群定soilt的使用个数。个数不够,超时后会报超时。
- 运行结果
4.4、命令行提交Job
- 查看运行的job列表
- 命令行取消job
- 查看运行的和取消的所有列表
五、Flink On Yarn
- flink提供了两种yarn上运行模式,分别为session-Cluster和per-Job-cluster的模式
- 以Yarn模式部署Flink任务时,要求Flink是有Haddop支持的版本,1.7以上版本,需要将整合hadoop支持的依赖放入Flink 的 lib下。
5.1、session-Cluster模式
- Flink bin 目录下启动 -n 可不指定
- 执行提交job命令 有Session 找 Session集群 没session找 Standlone
5.2、Per-Job-cluster模式
- 基本操作
六、Flink 四大组件
6.1、Flink运行时的组件
-
作业管理器 JobManager作用
1、控制一个应用程序执行的主进程,也就是说,每个应用程序都会被一个不同的JobManager所控制执行。 2、JobManager会先接收到要执行的应用程序,这个应用程序会包括:作业图(JobGraph)、逻辑数据流图(logical dataflow Graph)和打包了所有的类、库和其它资源的JAR包。 3、JobManager会把JobGraph转换成一个物理层面的数据流图,这个图被叫做"执行图"(ExecutionGraph),包含了所有可以并发执行的任务。 4、JobManager会向资源管理器(ResourceManager)请求执行任务必要的资源,也就是任务管理器(兀skManager)上的插槽(slot)。一旦它获取到了足够的资源,就会将执行图分发到真正运行它们的hskManager上。而在运行过程中,JobManager会负责所有需要中央协调的操作,比如说检查点(checkpoints)的协调。
-
任务管理器 TaskManager
1、Flink中的工作进程。通常在Flink中会有多个TaskManager运行,每一个TaskManager都包含了一定数量的插槽(slots),插槽的数量限制了TaskManager能够执行的任务数量。 2、启动之后,TaskManager会向资源管理器注册它的插槽;收到资源管理器的指令后,TaskManager就会将一个或者多个插槽提供给JobManager调用。JobManager就可以向插槽分配任务(tasks)来执行了。 3、在执行过程中,TgskManager可以跟其它运行同一应用程序的TaskManager交换数据。
-
资源管理器 ResourceManager
1、主要负责管理任务管理器(TaskManager)的插槽(slot),TaskManger插槽是Flink中定义的处理资源单元。 2、Flink为不同的环境和资源管理工具提供了不同资源管理器,比如SRN、Mesos、K8s, 以及Standalone部署。 3、当JobManager申请插槽资源时,ResourceManager会将有空闲插槽的TaskManager分酉已给JobManager。如果ResourceManager没有有足够的插槽来满足JobManager的请求,它还可以向资源提供平台发起会话,以提供启动心TaskManager进程的容器。
-
分发器
1、可以跨作业运行,它为应用提交提供了REST接口。 2、当一个应用被提交执行时,分发器就会启动并将应用移交给一个
JobManager
3、Dispatcher也会启动一个Web UI,用来方便地展示和监控作业执
行的信息。
4、Dispatcher在架构中可能并不是必需的,这取决于应用提交运行
的方式。
6.2、任务提交流程
6.3、任务调度原理
6.4、并行度
6.5、程序和数据流
6.6、数据的传输形式
6.6、任务链
七、F流处理PAI
7.1、Environment
7.1.1、getExcuteionEnvironment
7.1.2、createLocalEnvironment
7.1.3、createRemoteEnvironment
7.2、Source
7.2.1、List
import com.tan.flink.bean.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Arrays;public class SourceFromCollection {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<SensorReading> inputDataStream = env.fromCollection(Arrays.asList(new SensorReading("sensor_1", 1547718199L, 35.8),new SensorReading("sensor_6", 1547718201L, 15.4),new SensorReading("sensor_7", 1547718202L, 6.7),new SensorReading("sensor_10", 1547718205L, 38.1)));inputDataStream.print();env.execute();}
}
7.2.2、source from file
env.readTextFile(path);
7.2.3、source from kafka
-
pom 依赖
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.11_2.12</artifactId><version>1.10.1</version></dependency>
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import java.util.Properties;public class SourceFromKafka {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// kafka 配置Properties properties = new Properties();properties.setProperty("bootstrap.servers", "192.168.200.102:9092,192.168.200.102:9092,192.168.200.104:9092");properties.setProperty("group.id", "flink-kafka");properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");properties.setProperty("auto.offset.reset", "latest");DataStreamSource<String> inputDataStream = env.addSource(new FlinkKafkaConsumer011<String>("sensor",new SimpleStringSchema(),properties));inputDataStream.print();env.execute(); }
}
7.2.4、自定义source
-
需要实现SourceFunction 或者继承SourceFunction的富函数RichSourceFunction
import com.tan.flink.bean.SensorReading; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import java.util.Random; import java.util.UUID;public class SourceFromCustom {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<SensorReading> inputDataStream = env.addSource(new CustomSource());inputDataStream.print();env.execute();}public static class CustomSource implements SourceFunction<SensorReading> {boolean running = true;@Overridepublic void run(SourceContext<SensorReading> sourceContext) throws Exception {Random random = new Random();while (running) {// 每隔 100 秒数据for (int i = 0; i < 5; i++) {String id = UUID.randomUUID().toString().substring(0, 8);long timestamp = System.currentTimeMillis();double temperature = 60 + random.nextGaussian() * 20;sourceContext.collect(new SensorReading(id, timestamp, temperature));Thread.sleep(100L);}Thread.sleep(1000L);}}@Overridepublic void cancel() {running = false;}} }