About-Flink

news/2024/5/19 23:47:35/文章来源:https://blog.csdn.net/web13116256725/article/details/126635975

About-Flink

  • 一、Flink简介
    • 1.1、flink特点
    • 1.2、分层Api
    • 1.3、Flink vs Spark Streaming
  • 二、Flink批处理应用
    • 2.1、依赖的引入
    • 2.2、准备批处理文件
    • 2.3、wordCount编码
    • 2.4、自定义类
  • 三、Flink流处理应用
    • 3.1、wordCount编码
    • 3.2、设置并行度-默认为4
    • 3.2、数据来源socket
    • 3.3、配置文件参数提取
  • 四、Standlone环境运行job
    • 4.1、Standlone环境的搭建
    • 4.2、配置文件说明
    • 4.3、提交jar包入口
    • 4.4、命令行提交Job
  • 五、Flink On Yarn
    • 5.1、session-Cluster模式
    • 5.2、Per-Job-cluster模式
  • 六、Flink 四大组件
    • 6.1、Flink运行时的组件
    • 6.2、任务提交流程
    • 6.3、任务调度原理
    • 6.4、并行度
    • 6.5、程序和数据流
    • 6.6、数据的传输形式
    • 6.6、任务链
  • 七、F流处理PAI
    • 7.1、Environment
      • 7.1.1、getExcuteionEnvironment
      • 7.1.2、createLocalEnvironment
      • 7.1.3、createRemoteEnvironment
    • 7.2、Source
      • 7.2.1、List
      • 7.2.2、source from file
      • 7.2.3、source from kafka
      • 7.2.4、自定义source
    • 7.3、Treansform
      • 7.3.1、map
      • 7.3.2、FlatMap
      • 7.3.3、Fliter
      • 7.3.4、Keyby
      • 7.3.5、滚动聚合
      • 7.3.6、Reduce
      • 7.3.7、Split和select

一、Flink简介

1.1、flink特点

  • 低延迟,高吞吐、结果精准,良好的容错

    ? 支持事件时间(event-time)和处理时间(processing-time)语义
    ? 精确一次(exactly-once)的状态一致性保证
    ? 低延迟,每秒处理数百万个事件,毫秒级延迟
    ? 与众多常用存储系统的连接
    ? 高可用,动态扩展,实现7*24小时全天候运行

1.2、分层Api

	越顶层越抽象,表达含义越简明,使用越方便越底层越具体,表达能力越丰富,使用越灵活

在这里插入图片描述

1.3、Flink vs Spark Streaming

?数据模型
-spark采用RDD模型,spark streaming的DStream实际上也就是一组组小批数据RDD的集合
-flink基秀数据模型是数据流,以及事件(Event)序列?运行时架构
-spark是批计算,将DAG划分为不同的stage,一个完成后才可以计算下一个
-flink是标准的流执行模式,一个事件在一个节点处理完后可以直接发往下一个节
点进行处理

二、Flink批处理应用

2.1、依赖的引入

在这里插入图片描述

2.2、准备批处理文件

在这里插入图片描述

2.3、wordCount编码

在这里插入图片描述

2.4、自定义类

在这里插入图片描述

  • 结果输出
    在这里插入图片描述

三、Flink流处理应用

3.1、wordCount编码

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

3.2、设置并行度-默认为4

在这里插入图片描述

3.2、数据来源socket

  • nc lk -7777
    在这里插入图片描述

3.3、配置文件参数提取

在这里插入图片描述
在这里插入图片描述

四、Standlone环境运行job

4.1、Standlone环境的搭建

  • 下载包 解压
  • flink-1.10.1-bin.scala_2.12.tgz

4.2、配置文件说明

在这里插入图片描述

1、通常jobmanager 的配置比 taskmanager,因为干活的是taskmanager
2、并行度不一定比slots小,一定比集群总的slots小
  • 启动一个jobmanage和一个taskmanager
    在这里插入图片描述
    在这里插入图片描述
  • 配置参考
    在这里插入图片描述

4.3、提交jar包入口

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

  • 配置文件4个槽,只占用了2个槽。以并行度最高来群定soilt的使用个数。个数不够,超时后会报超时。
    在这里插入图片描述
  • 运行结果
    在这里插入图片描述

4.4、命令行提交Job

在这里插入图片描述

  • 查看运行的job列表
    在这里插入图片描述
  • 命令行取消job
    在这里插入图片描述
  • 查看运行的和取消的所有列表
    在这里插入图片描述

五、Flink On Yarn

  • flink提供了两种yarn上运行模式,分别为session-Cluster和per-Job-cluster的模式
  • 以Yarn模式部署Flink任务时,要求Flink是有Haddop支持的版本,1.7以上版本,需要将整合hadoop支持的依赖放入Flink 的 lib下。

5.1、session-Cluster模式

在这里插入图片描述
在这里插入图片描述

  • Flink bin 目录下启动 -n 可不指定
    在这里插入图片描述
  • 执行提交job命令 有Session 找 Session集群 没session找 Standlone
    在这里插入图片描述
    在这里插入图片描述

5.2、Per-Job-cluster模式

在这里插入图片描述
在这里插入图片描述

  • 基本操作
    在这里插入图片描述

六、Flink 四大组件

6.1、Flink运行时的组件

在这里插入图片描述

  • 作业管理器 JobManager作用

    1、控制一个应用程序执行的主进程,也就是说,每个应用程序都会被一个不同的JobManager所控制执行。
    2、JobManager会先接收到要执行的应用程序,这个应用程序会包括:作业图(JobGraph)、逻辑数据流图(logical dataflow Graph)和打包了所有的类、库和其它资源的JAR包。
    3、JobManager会把JobGraph转换成一个物理层面的数据流图,这个图被叫做"执行图"(ExecutionGraph),包含了所有可以并发执行的任务。
    4、JobManager会向资源管理器(ResourceManager)请求执行任务必要的资源,也就是任务管理器(兀skManager)上的插槽(slot)。一旦它获取到了足够的资源,就会将执行图分发到真正运行它们的hskManager上。而在运行过程中,JobManager会负责所有需要中央协调的操作,比如说检查点(checkpoints)的协调。
    
  • 任务管理器 TaskManager

    1、Flink中的工作进程。通常在Flink中会有多个TaskManager运行,每一个TaskManager都包含了一定数量的插槽(slots),插槽的数量限制了TaskManager能够执行的任务数量。
    2、启动之后,TaskManager会向资源管理器注册它的插槽;收到资源管理器的指令后,TaskManager就会将一个或者多个插槽提供给JobManager调用。JobManager就可以向插槽分配任务(tasks)来执行了。
    3、在执行过程中,TgskManager可以跟其它运行同一应用程序的TaskManager交换数据。
    
  • 资源管理器 ResourceManager

    1、主要负责管理任务管理器(TaskManager)的插槽(slot),TaskManger插槽是Flink中定义的处理资源单元。
    2、Flink为不同的环境和资源管理工具提供了不同资源管理器,比如SRN、Mesos、K8s, 以及Standalone部署。
    3、当JobManager申请插槽资源时,ResourceManager会将有空闲插槽的TaskManager分酉已给JobManager。如果ResourceManager没有有足够的插槽来满足JobManager的请求,它还可以向资源提供平台发起会话,以提供启动心TaskManager进程的容器。
    
  • 分发器

    1、可以跨作业运行,它为应用提交提供了REST接口。
    2、当一个应用被提交执行时,分发器就会启动并将应用移交给一个
    

    JobManager
    3、Dispatcher也会启动一个Web UI,用来方便地展示和监控作业执
    行的信息。
    4、Dispatcher在架构中可能并不是必需的,这取决于应用提交运行
    的方式。

6.2、任务提交流程

在这里插入图片描述
在这里插入图片描述

6.3、任务调度原理

在这里插入图片描述

6.4、并行度

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

6.5、程序和数据流

在这里插入图片描述
在这里插入图片描述

6.6、数据的传输形式

在这里插入图片描述

6.6、任务链

在这里插入图片描述
在这里插入图片描述

七、F流处理PAI

7.1、Environment

7.1.1、getExcuteionEnvironment

在这里插入图片描述

7.1.2、createLocalEnvironment

在这里插入图片描述

7.1.3、createRemoteEnvironment

在这里插入图片描述

7.2、Source

7.2.1、List

import com.tan.flink.bean.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Arrays;public class SourceFromCollection {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<SensorReading> inputDataStream = env.fromCollection(Arrays.asList(new SensorReading("sensor_1", 1547718199L, 35.8),new SensorReading("sensor_6", 1547718201L, 15.4),new SensorReading("sensor_7", 1547718202L, 6.7),new SensorReading("sensor_10", 1547718205L, 38.1)));inputDataStream.print();env.execute();}
}

7.2.2、source from file

env.readTextFile(path);

7.2.3、source from kafka

  • pom 依赖

    	<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.11_2.12</artifactId><version>1.10.1</version></dependency>
    

    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
    import java.util.Properties;

    public class SourceFromKafka {
    public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // kafka 配置Properties properties = new Properties();properties.setProperty("bootstrap.servers", "192.168.200.102:9092,192.168.200.102:9092,192.168.200.104:9092");properties.setProperty("group.id", "flink-kafka");properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");properties.setProperty("auto.offset.reset", "latest");DataStreamSource<String> inputDataStream = env.addSource(new FlinkKafkaConsumer011<String>("sensor",new SimpleStringSchema(),properties));inputDataStream.print();env.execute();
    }
    

    }

7.2.4、自定义source

  • 需要实现SourceFunction 或者继承SourceFunction的富函数RichSourceFunction

    import com.tan.flink.bean.SensorReading;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.source.SourceFunction;
    import java.util.Random;
    import java.util.UUID;public class SourceFromCustom {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<SensorReading> inputDataStream = env.addSource(new CustomSource());inputDataStream.print();env.execute();}public static class CustomSource implements SourceFunction<SensorReading> {boolean running = true;@Overridepublic void run(SourceContext<SensorReading> sourceContext) throws Exception {Random random = new Random();while (running) {// 每隔 100 秒数据for (int i = 0; i < 5; i++) {String id = UUID.randomUUID().toString().substring(0, 8);long timestamp = System.currentTimeMillis();double temperature = 60 + random.nextGaussian() * 20;sourceContext.collect(new SensorReading(id, timestamp, temperature));Thread.sleep(100L);}Thread.sleep(1000L);}}@Overridepublic void cancel() {running = false;}}
    }
    

7.3、Treansform

7.3.1、map

7.3.2、FlatMap

7.3.3、Fliter

7.3.4、Keyby

7.3.5、滚动聚合

7.3.6、Reduce

7.3.7、Split和select

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_381454.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

通过 replace() 和正则实现 将文本中的所有数字颜色高亮

实现的效果&#xff1a; 用到的知识点&#xff1a; replace() 方法用于在字符串中用一些字符替换另一些字符&#xff0c;或替换一个与正则表达式匹配的子串。 repalce&#xff08; a, b &#xff09; 必须传两个值&#xff0c;其中a 是要替换的文本&#xff0c;或者满足条件…

javaweb JAVA JSP球鞋销售系统购物系统ssm购物系统购物商城系统源码(ssm电子商务系统)

JSP球鞋销售系统购物系统ssm购物系统购物商城系统源码&#xff08;ssm电子商务系统&#xff09;

生产和同城存储双活架构下,发生脑裂问题影响数据库读写,如何快速分析问题和解决问题?

数据中心脑裂问题,简单说就是两个数据中心间的网络和存储链路同时发生中断,导致两个数据中心内的应用、数据库或者操作系统同时抢占和利用共享的资源,造成资源的数据不一致,产生重大影响。如何避免脑裂是每个存储双活方案都需要尤为重视的问题,脑裂会带来长时间的存储读写…

linux上redis单机的安装

1. 官网下载 https://github.com/redis/redis/archive/7.0.4.tar.gz 2. 上传到虚拟机/data/目录下、解压 tar -xzvf redis-7.0.4.tar.gz 3. 进入redis-7.0.4此目录 cd redis-7.0.4;ll 4. 安装到指定目录中 a. mkdir /usr/local/redis b. make PREFIX/usr/local/redis inst…

沃尔玛、eBay、wish、新蛋等美系平台对于测评风控点有哪些?怎么解决

很多人把各大平台风控想得过于简单&#xff0c;以为注册一批买家账号配一个IP就能进行下单上评&#xff0c;这也是导致市面上的测评现象杂乱无章。但是一定要明白一点各大电商平台都是一家数据公司他的算法一定是根据市场的变化而不断调整的。 平台检测的方式有很多种 1、平台…

RabbitMQ入门(二)

1.概述 RabbityMQ整体上是一个生产者和消费者模式。生产者生产消息到消息中间件的服务节点&#xff08;Broker&#xff09;,服务节点中包含交换器&#xff08;Exchange&#xff09;和队列&#xff08;Queue&#xff09;&#xff0c;生产的消息首先经过交换器&#xff0c;再由交…

搭建vue3项目

搭建vue3项目搭建准备创建项目选择所需配置运行项目vue3已经被大众所熟悉&#xff0c;很多公司都在做vue2到vue3的升级。 介绍vue3项目的搭建过程 搭建准备 前端开发环境需要node.js&npm node下载地址:http://nodejs.cn/download/ 根据自己电脑环境下载就行 安装vue-cli3…

2022/08/31 day14:企业级解决方案

文章目录目录缓存预热缓存雪崩缓存击穿缓存穿透性能指标监控总结目录 面试问题 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-EtBtkGNE-1661933471760)(en-resource://database/5507:1)] [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下…

抖音小程序模板全行业整理合集,抖音小程序制作平台分享

小弟我是来自第三方抖音小程序制作平台的打工人&#xff0c;给大家整合了一些我们平台的抖音小程序模板&#xff0c;大家可以根据需要来获取。 步骤就是点击下方的链接&#xff0c;选好自己的抖音小程序模板&#xff0c;在平台注册账号直接套用到自己的抖音小程序上&#xff0…

深入理解蓝牙BLE之“信道管理”

目录 一.BLE的调制解调&#xff1a; 二.BLE的信道&#xff1a; 三.BLE的广播信道&#xff1a; 四.BLE的数据信道&#xff1a; 五.BLE信道管理&#xff1a; 5.1广播信道的随机延时&#xff1a; 5.2数据信道的调频算法&#xff1a; 跳频算法1&#xff1a; 跳频算法2&…

02.Haoop 虚拟机 桥接与NAT之间区别 及桥接设置

首先说 我的硬件准备&#xff0c;1台windows系统&#xff0c;1台mac pro 。 在 物理机上使用了 VMWARE CENTOS 7 的 方式进行配置。 那么我希望能实现把 这2台机器连在一起&#xff0c;做Hadoop 的集群。 网络问题是首先需要解决的事情&#xff0c;主要不通物理主机之间一直…

02:入门及安装(狂神说RabbitMQ)

RabbitMQ入门及安装 https://www.bilibili.com/video/BV1dX4y1V73Gp27 概述 简单概述&#xff1a; RabbitMQ是一个开源的遵循 AMQP协议实现的基于 Erlang语言编写&#xff0c;支持多种客户端&#xff08;语言&#xff09;&#xff0c;用于在分布式系统中存储消息&#xff0…

Spring Security 入门之自定义表单登录开发实现(三)

文章目录1. 前言2. 自定义认证2.1 自定义登录页面2.2 后端认证逻辑3. 自定义登陆成功处理3.1 登陆成功原理3.2 自定义登陆成功响应处理4. 自定义登陆失败处理4.1 登陆失败原理4.2 自定义登陆失败响应处理5. 注销用户处理5.1 注销原理总结1. 前言 在弄懂HelloWorld案例后&#…

Node.js | 使用内置模块 event 实现发布订阅模式

&#x1f5a5;️ NodeJS专栏&#xff1a;Node.js从入门到精通 &#x1f5a5;️ 蓝桥杯真题解析&#xff1a;蓝桥杯Web国赛真题解析 &#x1f9e7; 加入社区领红包&#xff1a;海底烧烤店ai&#xff08;从前端到全栈&#xff09; &#x1f9d1;‍&#x1f4bc;个人简介&#xff…

自动化测试中的验证码问题

做自动化测试的同学在面试的时候经常会遇到这问题&#xff0c;而且我们在实际的工作中也会遇到这个问题&#xff0c;那么这问题到底该怎么处理&#xff1f; 下面给出了面试过程中常见的相关面试题供大家参考&#xff1a; 01 在做自动化登陆的同时&#xff0c;如何绕过验证码&a…

windows下安装docker

下载docker&#xff0c;通过Redirecting…这个下载docker 正在上传…重新上传取消 下载完安装 安装完成后&#xff0c;进入powershell&#xff0c;输入命令docker network ls,查看docker网络&#xff0c;如果没有bridge项目&#xff0c;创建容器会报错(Windows容器就是两…

3D格式转换神器HOOPS Exchange使用教程(一):打印组件结构

HOOPS Exchange是什么&#xff1f; HOOPS Exchange 是一组软件库&#xff0c;可以帮助开发人员在开发应用程序时读取和写入主流的 2D 和 3D 格式。HOOPS Exchange 支持在主流的3D 文件格式中读取 CAD 数据&#xff0c;并支持将 3D 数据转换为 PRC 数据格式&#xff0c;这是一种…

NGINX源码之:event与epoll

在进入正题之前&#xff0c;先来大概了解下epoll&#xff1a; 引入多路复用之前socket建立连接流程&#xff1a; 1、服务端先建立socket&#xff08;serversocket&#xff09;占用一个文件描述符fd,然后bind端口&#xff0c;开启监听listen accept事件&#xff1b; 2、客户端请…

有趣的前端项目——一个暴躁萌的大眼仔

有趣的前端项目——一个暴躁萌的大眼仔 众所周知&#xff0c;我是一个摆子前端&#xff08;真的 &#xff09;&#xff0c;闲来无事&#xff0c;网上冲浪 遇见了如此蠢萌的大眼 于是我&#xff0c;行也思&#xff0c;坐也思&#xff0c;可算把这个大眼给复刻出来了。 原文出…

01-Flink概述

1. 源起和设计理念https://flink.apache.org/在 Flink 官网主页的顶部可以看到,项目的核心目标,是“数据流上的有状态计算”(Stateful Computations over Data Streams)。 具体定位是:Apache Flink 是一个框架和分布式处理引擎,如下图所示,用于对无界和有界数据流进行有…