Flink广播流 BroadcastStream

news/2024/7/27 8:47:54/文章来源:https://blog.csdn.net/gwc791224/article/details/136716122

文章目录

  • 前言
  • BroadcastStream代码示例
  • Broadcast 使用注意事项


前言

Flink中的广播流(BroadcastStream)是一种特殊的流处理方式,它允许将一个流(通常是一个较小的流)广播到所有的并行任务中,从而实现在不同任务间共享数据的目的。广播流在处理配置信息、小数据集或者全局变量等场景下特别有用,因为这些数据需要在所有任务中保持一致且实时更新。

广播流的使用通常涉及以下步骤:

  1. 定义MapStateDescriptor:首先需要定义一个MapStateDescriptor来描述要广播的数据的格式。这个描述器指定了数据的键值对类型。

  2. 创建广播流:然后,需要将一个普通的流转换为广播流。这通常通过调用流的broadcast()方法实现,并将MapStateDescriptor作为参数传入。

  3. 连接广播流与非广播流:一旦有了广播流,就可以将其与一个或多个非广播流(无论是Keyed流还是Non-Keyed流)连接起来。这通过调用非广播流的connect()方法完成,并将广播流作为参数传入。连接后的流是一个BroadcastConnectedStream,它提供了process()方法用于处理数据。

  4. 处理数据:在process()方法中,可以编写逻辑来处理非广播流和广播流的数据。根据非广播流的类型(Keyed或Non-Keyed),需要传入相应的KeyedBroadcastProcessFunctionBroadcastProcessFunction类型的处理函数。

广播流的一个典型使用场景是在处理数据时需要实时动态改变配置。例如,当需要从MySQL数据库中实时查询和更新某些关键字过滤规则时,如果直接在计算函数中进行查询,可能会阻塞整个计算过程甚至导致任务停止。通过使用广播流,可以将这些配置信息广播到所有相关任务的实例中,然后在计算过程中直接使用这些配置信息,从而提高计算效率和实时性。

总的来说,Flink的广播流提供了一种有效的方式来实现不同任务间的数据共享和实时更新,适用于各种需要全局数据或配置的场景。


BroadcastStream代码示例

功能:将用户信息进行广播,从Kafka中读取用户访问记录,判断访问用户是否存在


import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
import org.apache.flink.util.Collector;import flink.demo.data.UserVo;
/*** 多流connect,并进行join**/
public class BroadcastTest{public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties proterties = new Properties();proterties.setProperty("bootstrap.servers", "10.168.88.88:9092");proterties.setProperty("group.id", "test");proterties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");proterties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//        proterties.setProperty("auto.offset.reset", "latest");FlinkKafkaConsumer<ObjectNode> consumerVisit= new FlinkKafkaConsumer<>("test",new JSONKeyValueDeserializationSchema(false), proterties);DataStreamSource<ObjectNode> streamSource = env.addSource(consumerVisit);DataStreamSource<Tuple2<String, List<UserVo>>> userStreamSource = env.addSource(new UserListSource());MapStateDescriptor<String, List<UserVo>> descriptor =new MapStateDescriptor<>("userStream",BasicTypeInfo.STRING_TYPE_INFO,TypeInformation.of(new TypeHint<List<UserVo>>() {}));BroadcastStream<Tuple2<String, List<UserVo>>> broadcastStream = userStreamSource.broadcast(descriptor);// 将数据流和控制流进行连接,利用控制流中的数据来控制字符串的输出BroadcastConnectedStream<ObjectNode, Tuple2<String, List<UserVo>>> tmp=streamSource.connect(broadcastStream);tmp.process(new UserPvProcessor()).print();env.execute("kafkaTest");}private static class UserPvProcessorextends BroadcastProcessFunction<ObjectNode, Tuple2<String, List<UserVo>>, String> {private static final long serialVersionUID = 1L;MapStateDescriptor<String, List<UserVo>> descriptor =new MapStateDescriptor<>("userStream",BasicTypeInfo.STRING_TYPE_INFO,TypeInformation.of(new TypeHint<List<UserVo>>() {}));@Override//用户信息处理public void processBroadcastElement(Tuple2<String, List<UserVo>> value, Context ctx, Collector<String> out)throws Exception {// 将接收到的控制数据放到 broadcast state 中  ctx.getBroadcastState(descriptor).put(value.f0, value.f1);// 打印控制信息System.out.println(Thread.currentThread().getName() + " 接收到用户信息 : "+value.f0+"   " + value.f1);}@Override//数据流public void processElement(ObjectNode element, ReadOnlyContext ctx, Collector<String> out) throws Exception {// 从 broadcast state 中拿到用户列表信息List<UserVo> userList = ctx.getBroadcastState(descriptor).get("userList");String time=LocalDateTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss"));if(userList!=null&&userList.size()>0) {Map<String,String> userMap=new HashMap<>();for(UserVo vo:userList) {userMap.put(vo.getUserid(), vo.getUserName());}
//				System.out.println(userMap);JsonNode value = element.get("value");String userid=value.get("user").asText();String userName=userMap.get(userid);if (StringUtils.isNotBlank(userName)) {out.collect(Thread.currentThread().getName()+"存在用户"+userid+"  "+userName +" "+time);}else {out.collect(Thread.currentThread().getName()+"不存在用户"+userid+" "+time );}}else {out.collect(Thread.currentThread().getName()+"不存在用户"+element.get("value")+" "+time );}}}
}

Broadcast 使用注意事项

  • 同一个 operator 的各个 task 之间没有通信,广播流侧(processBroadcastElement)可以能修改 broadcast state,而数据流侧(processElement)只能读 broadcast state.;
  • 需要保证所有 Operator task 对 broadcast state 的修改逻辑是相同的,否则会导致非预期的结果;
  • Operator tasks 之间收到的广播流元素的顺序可能不同:虽然所有元素最终都会下发给下游tasks,但是元素到达的顺序可能不同,所以更新state时不能依赖元素到达的顺序;
  • 每个 task 对各自的 Broadcast state 都会做快照,防止热点问题;
  • 目前不支持 RocksDB 保存 Broadcast state:Broadcast state 目前只保存在内存中,需要为其预留合适的内存

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_1007461.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

VsCode 使用密钥连接 Centos

在 centos 下生成密钥 ssh-keygen 执行上述命令后&#xff0c;一路回车&#xff0c;直到出现如下界面&#xff1a; 查看密钥生成情况 cd /root/.ssh ls 结果如下所示&#xff1a; 服务器上安装公钥 cd /root/.ssh cat id_rsa.pub >> authorized_keys ls >查看确…

CVE-2024-27199 JetBrains TeamCity 身份验证绕过漏洞2

漏洞简介 TeamCity Web 服务器中发现了第二个身份验证绕过漏洞。这种身份验证旁路允许在没有身份验证的情况下访问有限数量的经过身份验证的端点。未经身份验证的攻击者可以利用此漏洞修改服务器上有限数量的系统设置&#xff0c;并泄露服务器上有限数量的敏感信息。 项目官网…

LAMP网站部署(Discuz论坛网站部署)

目录 mysql命令 语法 选项 参数 实例 安装php 安装Mariadb 关掉防火墙和selinux 启动HTTP服务 初始化数据库 查看数据库是否创建成功 修改HTTP的配置文件 浏览器打开 将以下所有目录都加上权限 最后首页效果 mysql命令 是MySQL数据库服务器的客户端工具&#xff0c;它工作在命…

tomcat的webapp文件中发布web应用

一、Web服务器 1.什么是Web 概述&#xff1a; web(World Wide Web)即全球广域网&#xff0c;也称为万维网&#xff0c;它是一种基于超文本和HTTP的、全球性的、动态交百的、跨平台的分布式图形信息系统。是建立在internet上的一种网络服务&#xff0c;为浏览者在Intern…

【深度学习笔记】9_5 多尺度目标检测

注&#xff1a;本文为《动手学深度学习》开源内容&#xff0c;部分标注了个人理解&#xff0c;仅为个人学习记录&#xff0c;无抄袭搬运意图 9.5 多尺度目标检测 在9.4节&#xff08;锚框&#xff09;中&#xff0c;我们在实验中以输入图像的每个像素为中心生成多个锚框。这些…

物联网技术助力智慧城市转型升级:智能、高效、可持续

目录 一、物联网技术概述及其在智慧城市中的应用 二、物联网技术助力智慧城市转型升级的路径 1、提升城市基础设施智能化水平 2、推动公共服务智能化升级 3、促进城市治理现代化 三、物联网技术助力智慧城市转型升级的成效与展望 1、成效显著 2、展望未来 四、物联网技…

机试:蛇形矩阵

问题描述: 代码示例: //蛇形矩阵 #include <bits/stdc.h> using namespace std;int main(){int n;cout << "输入样例" << endl; cin >> n;int k 1; for(int i 0; i < n; i){if( i %2 0){//单数行for(int j 0; j < n; j){ cout &…

运维自动化之ansible工具

目录 前言 一、Ansible 工具概述 1、Ansible 功能 2、Ansible 特性 3、Ansible 优缺点 4、Ansible 架构 4.1 Ansible 组成 4.2 Ansible 命令执行来源 二、Ansible 安装和基础用法 1、Ansible 安装 1.1 yum源安装 1.2 使用python编译安装 1.3 Git方式安装 2、Ansib…

《小程序从入门到入坑》框架语法

前言 哈喽大家好&#xff0c;我是 SuperYing&#xff0c;我们继续小程序入门系列&#xff0c;本文将对小程序框架语法进行比较全面的介绍。在《小程序从入门到入坑》简介及工程创建中&#xff0c;我们提到小程序项目结构&#xff0c;主要包括 app.json&#xff0c;app.js&…

Airtest-Selenium升级兼容Selenium 4.0,给你全新体验!

一、前言 在上期更新推文中提到&#xff0c;我们Airtest-Selenium更新到了1.0.6版本&#xff0c;新增支持Selenium4.0的语法&#xff0c;那么我们来看一下Airtest-Selenium更新后有什么新的内容吧~ 二、selenium 4.0有什么新功能 selenium4.0最主要的还是定位元素方法的更新…

使用 opencv 识别答题卡,生成填涂答案

一般答题卡设计时都在试卷4个角预留4个一样大小的黑块 仅能识别选择题判断题之类的填涂答题的题目&#xff0c;不能识别填空题应用题等其它主观题 使用 opencv 识别试卷图片中所有黑块&#xff0c;再根据黑块大小获取四个角的位置&#xff0c;根据四个黑块位置校正图像 将图…

给电脑加硬件的办法 先找电脑支持的接口,再买相同接口的

需求&#xff1a;我硬盘太小&#xff0c;换或加一个大硬盘 结论&#xff1a;接口是NVMe PCIe 3.0 x4 1.找到硬盘型号 主硬盘 三星 MZALQ512HALU-000L2 (512 GB / 固态硬盘) 2.上官网查 或用bing查 非官方渠道信息&#xff0c;不确定。

阿里云-云服务器ECS新手如何建网站?

租阿里云服务器一年要多少钱&#xff1f; 不同类型的服务器有不同的价格。 以ECS计算型c5为例&#xff1a;2核4G-1年518.40元&#xff0c;4核8G-1年948.00元。 阿里云ECS云服务器租赁价格由三部分组成&#xff1a; 也就是说&#xff0c;云服务器配置成本磁盘价格网络宽带价格…

SpringCloud(22)之Sentinel实战应用

一、Sentinel核心库 sentinel主页&#xff1a;主页 alibaba/Sentinel Wiki GitHub 1.1 Sentinel介绍 随着微服务的流行&#xff0c;服务和服务之间的稳定性变得越来越重要。Sentinel 是面向分布式、多语言异构化服务架构的流量治理组件&#xff0c;主要以流量为切入点&…

嵌入式单片机学习思路感想分享

今天看到了一个提问,原话如下: 曾经干了8年单片机工程师,对工程师从入门,到入行,再到普通,再到高级,整个路径还算清晰,比如什么阶段,会碰到什么瓶颈,怎么突破,我都经历过。 这个同学,有个典型的问题,就是学得太多且杂了,估计稍微复杂点的项目,做不出来。 现在…

软件设计和体系结构

软件设计和体系结构 一、引言 软件 定义&#xff1a;一系列按照特定顺序组织的计算机数据、指令的集合 特点&#xff1a; 软件不是生产制造&#xff0c;是设计开发软件不会磨损和老化软件需要根据实际情况进行定制开发 软件设计的基本原则 抽象方法 过程抽象&#xff1a;是指…

【数据结构和算法初阶(C语言)】队列实操(概念实现+oj题目栈和队列的双向实现,超级经典!!!)

1. 队列的概念及结构 队列&#xff1a;只允许在一端进行插入数据操作&#xff0c;在另一端进行删除数据操作的特殊线性表&#xff0c; 队列具有先进先出 FIFO(First In First Out) 入队列&#xff1a;进行插入操作的一端称为队尾 出队列&#xff1a;进行删除操作的一端称为…

ChatGPT提问技巧——对抗性提示

ChatGPT提问技巧——对抗性提示 对抗性提示是一种允许模型生成能够抵御某些类型的攻击或偏差的文本的技术。这种技术可用于训练更健壮、更能抵御某些类型的攻击或偏差的模型。 要在 ChatGPT 中使用对抗性提示&#xff0c;应为模型提供一个提示&#xff0c;该提示的设计应使模…

OS-Copilot:实现具有自我完善能力的通用计算机智能体

&#x1f349; CSDN 叶庭云&#xff1a;https://yetingyun.blog.csdn.net/ AI 缩小了人类间的知识和技术差距 论文标题&#xff1a;OS-Copilot: Towards Generalist Computer Agents with Self-Improvement 论文链接&#xff1a;https://arxiv.org/abs/2402.07456 项目主页&a…

Java NIO浅析

NIO&#xff08;Non-blocking I/O&#xff0c;在Java领域&#xff0c;也称为New I/O&#xff09;&#xff0c;是一种同步非阻塞的I/O模型&#xff0c;也是I/O多路复用的基础&#xff0c;已经被越来越多地应用到大型应用服务器&#xff0c;成为解决高并发与大量连接、I/O处理问题…