kafka和flink的入门到精通 4 生产数据流程

news/2024/5/19 7:21:53/文章来源:https://blog.csdn.net/qq_45956730/article/details/126951623

参考023 - 大数据 - Kafka - 生产者 - 生产数据的准备_哔哩哔哩_bilibili

链接:https://pan.baidu.com/s/1QMOJVkRy4nKkjzoDryvQXw
提取码:fcoe

本文接着上一篇kafka和flink的入门到精通 3 组件扩展,kafka-生产者_水w的博客-CSDN博客

目录

4.1 生产数据流程

◼  生产数据的准备

 ➢ KafkaProducer,ProducerRecod

 ◼  采集器

 ◼ 拦截器的实现

 ◼ 元数据请求和更新

◼  分区选择

◼  将数据缓存到采集器中

◼  sender从采集器中获取数据发送到服务器


4.1 生产数据流程

◼  生产数据的准备

 ➢ KafkaProducer,ProducerRecod

这个“KafkaProducer”下面是一些初始化的配置操作, 

  • clientId:;
  • partitioner:是可配置的,默认为DefaultPartitioner;
  • interceptors:拦截器;
  • maxRequestSize:最大请求大小;
  • compressionType:压缩方式;
  • accumulator:采集器;
  • metadata:元数据;
  • sender:发送对象,会随着线程的启动而执行;
  • ioThread;

因此,accumulator就是快递员-----缓冲区,sender就是运输工具。

ProducerRecord的参数:

 ◼  采集器

小人要发送快递,做的第一件事就是 send操作来发送数据,把record做了一系列的拦截interceptors(不是只有一个),做了一些定制化的操作,形成了新的Record。

当把消息拦截之后,那么把消息发给谁,我们不知道。那么这些东西在哪,就在元数据当中。需要从元数据当中得到一些信息,比如集群的信息。然后把Key做序列化,消息继续往下走。

但是Topic主题有多个分区,那么接下来就要问消息要放在哪个分区Partition?因为如果都往一个分区里放会导致过载。-------分区选择

现在Topic和分区Partition我都知道了,那么TopicPartition对象就会被构建出来,构建出来之后,开始做一些检验操作,会判断当前数据的大小有没有超过阈值。

 接下来,重点是一个RecordAccumulator缓冲区,快递就被快递员拿到了。

 这就是一个比较完整的流程了。那么接下来就详细说说。

 ◼ 拦截器的实现

生成一个自定义拦截器“MyProducerInterceptor”,继承“ProducerInterceptor”,在新的拦截器中定义onSend()方法,为了好区分,我们让key和Value不一样。(原本的拦截器中的onSend()方法,key和Value是一样的。)

 将生成的自定义拦截器“MyProducerInterceptor”进行配置,

 运行代码,刷新之后,会发现多了一条数据,

 这就叫拦截器。

 ◼ 元数据请求和更新

 此时,拦截器已经处理完了,

 Matadata不是频繁的从服务器得到数据,它是可以缓存下来的,缓存以后做一些事情。

可以看到,这儿的代码“watiOnMetadata” 等待元数据,那么为什么要等待元数据?因为元数据中包含很多信息和内容,我们需要知道里面有什么。

  • cache:缓存;
  • newTopics
  • cluster:集群;

那么集群中有什么?

可以看到, cluster集群中有:

  • nodes:服务器节点的集合;
  • controller;
  • partitionByTopicPartition:当亲的分区里的分区信息;
  • partitionByTopic:某一个主题topic当中的所有分区;
  • availablePartitionByTopic:可用的分区;
  • partitionByNode:某一个node节点当中的所有分区;

等等,里面东西太多了,这里不一一列举了。

 

剩下的,我们就需要怎么去等待元数据的?

那么接下来,我们就需要知道是怎么去等待元数据的?我们怎么就把元数据得到了呢?

默认情况下是肯定不知道元数据的,因为最开始的时候,第一次发送数据,我们都不知道服务器有哪些分区,那么我们要怎么划分分区?连元数据都没有,我们怎么做分区选择?

所以首先要把元数据得到。

可以看到,requestUpdateForTopic(topic) 就意味着我准备发送请求来更新元数据了。

可以看到代码中,sender唤醒,进行awaitUpdate()来等待更新, 什么时候更新完毕了,程序才会继续走下去。所以现在这个awaitUpdate()就会阻塞等待,sender是一个线程,sender.wakeup()就会执行,那么也就是说当前的生产线程(主线程)就会暂停,seder发送线程就会开始。

比如,我现在想发到广州一个快递,问快递员说能不能发到广州,快递员说你等一等我问一下。他得知道它们公司能不能发到广州。问了公司说可以发,快递员才会给你说可以,我就得到了这个信息。

  • sender会向集群的服务器Kafka Servers发送一个请求MatadataRequest(METADATA),传了一个ApiKeys叫METADATA;
  • doSend()向服务器发送request请求;
  • 请求发送到了Kafka Servers之后,放到了requestChnnel中,然后再从requestChnnel中把request请求取出来,KafkaApis会得到请求对象。
  • KafkaApis中, 到了handle这一步,就会取到请求对象了,拿到了ApiKey
  • 得到以后,到了handleTopicMetaRequest来处理这个请求,拿到了请求信息,拿到之后就可以把所有的Topic和分区都可以得到了。得到以后,发给客户端。
  • 此时,sender得到响应对象response,对响应处理,其中如果是metadataResonse,就会有一个处理成功的响应handleSuccessfulResonse。就完成了设定,进行更新update,当前就有数据了。

 

◼  分区选择

此时的“watiOnMetadata” 等待元数据已经完成,

我们继续往下,这儿的序列化操作直接按步骤进行就可以,

我们继续往下,接下来,就讲讲Kafka的数据到底应该放在哪个分区里?

 都往一个分区里放,会导致负载太大,出现问题,所以我们要均匀的。

可以从代码看出,

分区选择的判断:

  • 如果指定分区号,则直接使用;
  • 如果没有分区号,那么采用分区器决定分区;
    • 如果没有指定key,会从所有分区中随机选一个,进行存放;
    • 如果没有key,会将key进行散列后,再和分区数量取余;

 那么我们接着往下走,就到了数据大小检测。

◼  将数据缓存到采集器中

 那么我们接着往下走,就到了数据大小检测, 因为数据不能太大, 太大的话承载不了。

可以从这部分的代码中看到,数据检测的是两个大小:

  • maxRequestSize:最大请求字节数,大小为1MB;
  • totalRequestSize:总共的内存大小,大小为32MB; 

可以从代码中找到, maxRequestSize是来自一个“MAX_REQUEST_SIZE_CONFIG”的,大小为1MB;

totalRequestSize是来自一个“BIFFER_MEMORY_CONFIG”的,大小为32MB;

 如果超过了阈值就会有问题。

 

那么我们接着往下,数据大小检测完成之后,就到了这个位置 ,

可以看到,这里的“accumulator.append()”的accumulator,就是我们的快递员了。那么现在相当于我们已经把快递给到了快递员。

把快递给到了快递员,快递员一定会要么?不一定,因为有些东西还没填全,快递员是不会收我们的快递的。

从accumulator.append()的代码中,

我们可以看到有一个Deque双端队列(两边都可以放,都可以取),

如果这个时候,从Deque双端队列中取出一条数据发送出去,此时突然这条数据发送失败了怎么办?就得重新发送,否则数据就会没了。

Deque双端队列的特点就可以保证如果发送失败了,那么我还可以再把数据放回到头部位置。

Deque可不是只有一个,是根据分区信息来得到Deque的。Deque里面包含着ProducerBatch,这个就类似于箱子的感觉,把快递放到箱子里,只不过这个箱子外面包裹着一个更大的箱子。

所以现在就等同于我们拿到了一个装快递的箱子,现在就要把快递(消息)放到箱子里面去,

从代码中,可以找到这个“tryAppend()”,这个就是尝试把把快递(消息)放到箱子里面去,try是指不一定能放到箱子里。

 因为从大箱子里面取出最后一个箱子,取出来之后,想把key和vlaue放进去,发现箱子没那么大,放不下去。所以就会有if语句来判断两种情况,如果有空间则就可以放进去,没有空间就放不进去。

空间足够的话,就可以放进去了。放进去以后,发送成功, 就会等待响应。

那么此时的图如下所示:

把消息加到accumulator进去之后, 继续往下,可以看到,

 有一个对箱子的判断,如果箱子满了或者有新的被创建出来,那么意味着这个时候,要唤醒sender。也就是说我们的sender要准备发送数据了。

也即是说我们已经把快递放到了箱子里,这个时候要问sender有没有时间,有时间的话来一趟把箱子送走,所以此时sender被唤醒。

◼  sender从采集器中获取数据发送到服务器

 从sender的代码中可以看到,

现在这个地方,就准备把数据从缓冲区中取出来发送。从元数据中取到集群cluster的相关信息,把cluster中可用的节点准备好。

那么什么是可用的节点?

现在我们是要把消息归了类发给上海和深圳,可是上海和深圳有好几个区,我们得把发往上海和深圳不同的区的快递放一块儿。

我们在缓冲区是根据分区选择来将消息数据分类,到了这个车(sender)上就不一样了,会按照节点来分类,所以分类方式就会发送变化。

拿到leader之后,就知道给哪个分区发了。

这样的好处就是把发往多个分区的消息合成一个,请求就会好很多。

现在我们已经知道了准备给谁发,知道以后,往下,判断当前的元数据是不是需要更新。

继续往下,现在相当于把前面的小箱子拿到了sender里面,拿到以后,形成一个Map。

 构建了另外的请求:

  • ProduceRequest(PRODUCE)
  • ProduceRequestData(ACKS,DATA)
  • HandleProduceResponse

到了这一步,sender把请求就准备好了,就可以把请求发给服务器,服务器处理了操作之后,会发送响应response给sender。

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_11620.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

今天是虚拟机

乌班图打开终端的方法ctrl+alt+t 接下来就是关于使用xftp来实现连接虚拟机 这儿就拿重点来讲一下 获取虚拟机的ip地址 通过打开虚拟机的终端(别的虚拟机如何打开不清楚,但是这个打开的方法我放在上面) 输入ifconfig,如果非虚拟机的话就输入ipconfig 如果没有显示虚拟机的ip…

net二手手帐

开发工具(eclipse/idea/vscode等):vs2017 数据库(sqlite/mysql/sqlserver等):sqlserver 功能模块(请用文字描述,至少200字):本网站的基本内容是设计并实现一个二手手帐文具交易网,手帐文具爱好者们可以足不出户地交易自…

基于StarterWare的TMS320C6748裸机程序开发入门详解教程

LED裸机程序开发 本小结将讲解如何利用TI提供的StarterWare软件包开发一个基于DSP C6748的LED流水灯程序,以及如何查找芯片的技术参考手册和数据手册。文章内容主要涵盖LED裸机程序开发、工程建立、添加头文件和库文件、源代码编写和解析和按键中断裸机程序演示和解析等。 关…

Win10系统C++调用OpenCV实现网络摄像头录像和抓拍图片

1 前言 前边文章介绍了在WIN10系统上,分别用C和Python调用OpenCV接口,播放本地和网络摄像头视频。本篇我们来看一下,用C如何调用OpenCV接口,打开网络摄像头的视频,对其进行录像,并抓拍图片。 视频来源 视频…

公众号网课答案查题系统-全网都在使用

公众号网课答案查题系统-全网都在使用 本平台优点: 多题库查题、独立后台、响应速度快、全网平台可查、功能最全! 1.想要给自己的公众号获得查题接口,只需要两步! 2.题库: 题库:题库后台(点击…

深度强化学习-DQN算法

论文地址:https://arxiv.org/abs/1312.5602 先讲下在线,离线,同策略和异策略 同策略(on-policy)和异策略(off-policy)的根本区别在于生成样本的策略和参数更新时的策略是否相同。 对于同策略&am…

产品性能测试入门秘籍

前言 在《一体化测试指标可视工程实践》中,我们分享了以趣链BaaS系统为例的测试实践路径,在后台收到读者们关于性能测试的留言。为此,本期将围绕如何进行产品性能测试这一话题,展开详细描述。 众所周知,一个优秀的系统…

Cobalt Strike(八)权限提升

1.BypassUAC UAC 是微软在 Windows Vista 以后版本引入的一种安全机制,通过 UAC,应用程序和任务可始终在非管理员帐户的安全上下文中运行,除非管理员特别授予管理员级别的系统访问权限。UAC 可以阻止未经授权的应用程序自动进行安装&#xf…

【最新计算机毕业设计】Java springboot大学生体质测评系统

基于SpringBoot的大学生体质测评系统 提供最新的计算机毕业设计源代码及帮助指导,公众号:一点毕设! 大学生体质测试管理系统提供给用户一个简单方便体质测试管理信息,通过留言区互动更方便。本系统采用了B/S体系的结构&#xff0…

吔队列了你——写点单调队列优化DP

5_Lei:有没有变态一点的图啊单调队列优化DP(补) 前言: DP显然是OI中的一个重要且高频的考点,然而友善的出题人大多不会只考一个推转移方程,往往需要结合一些优化。 单调队列: 看这个的应该都会,不写了,扔个板子上去。 P1886 滑动窗口 /【模板】单调队列 优化DP: 显然…

行业话题 | 天天坐地铁,你知道BIM在地铁中的应用吗

近年来,随着经济水平的不断提高和城市化进程的加快,我国地铁建设规模也在不断加大,而地铁车站是地铁施工的难点和控制性工程,具有施工空间狭小,技术复杂等特点。 由于施工现场布置制约因素多,二维施工现场平…

究竟都是谁在使用?OpenMLDB 落地案例大起底

本文整理自第四范式资深架构师、OpenMLDB PMC 卢冕在第四范式技术日「高效落地的AI开源生态」分论坛的主题分享——《开源机器学习数据库 OpenMLDB:提供线上线下一致的生产特征平台》。内容包括: 感恩 OpenMLDB 贡献者OpenMLDB 发展历程OpenMLDB 架构设…

WinForms时代结束,报表控件FastReport.NET开启FastReport.Core.Skia 时代!

要创建高质量的报告并将其正确导出为不同的格式(PDF、Word、Excel 等),必须使用图形引擎。从 .NET Framework 的最早版本开始,Microsoft 就将 GDI 及其包装器用作 System.Drawing 库的一部分。FastReport.NET长期以来一直使用相同…

第一篇文章 mybatis 综述

mybatis框架可以让程序员只需专注于写sql语句 框架就是半成品,将公共的部分固定下来,非公共的部分你自己开发就行 三层架构: 界面层Conttroller层:用来接收客户端的输入,调用业务逻辑层Service层,返回结果…

关于Facebook营销的十个常见问题,一次性讲清楚!

--- NO.1--- 为什么做Facebook营销? 作为全球最大的社交媒体,Facebook月活用户已达到了惊人的29亿,并且这个数据还在持续增长中,这意味着全球几乎一半人都会出现在Facebook上。很多企业对Facebook的关注点,也从是否做…

VMware Explore 大会发布重磅云上技术之外,VMware 有哪些前沿探索?

编辑 | 宋慧 出品 | CSDN 云计算 最近,VMware 举办了年度技术大会 VMware Explore,重磅发布了其在多云趋势下的多个技术产品组合,包含了云基础架构、云原生、网络与安全、远程混合办公等等。不过,在这些优势领域的产品之外&#…

系统架构与设计(1)- 权限系统的设计以及主流的五种权限模型

作者:码猿技术专栏来源:https://juejin.cn/post/7121977695197970463 ------------------------------------------------------------------- 这篇文章就来介绍一下权限系统的设计以及主流的五种权限模型。权限管控可以通俗的理解为权力限制,即不同的人由于拥有不同权力,他…

阿里云国际站代理商:FFmpeg 处理音视频文件的常用方法

阿里云代理商(聚搜云)专业服务于阿里云ECS服务器采购、阿里云Ddos采购、阿里云waf采购、对象存储OSS、阿里云企业邮箱采购、阿里云国际站代理商、阿里云国际站充值、云安全中心(态势感知)、阿里云高可用云数据库RDS、web应用云waf…

YOLO系列目标检测算法-Scaled-YOLOv4

YOLO系列目标检测算法目录 YOLO系列目标检测算法总结对比YOLOv1YOLOv2YOLOv3YOLOv4 Scaled-YOLOv4- 文章链接 YOLOv5- 文章链接 YOLOv6- 文章链接 YOLOv7- 文章链接 本文总结: 提出一种网络缩放方法,使得基于CSP的YOLOv4可以上下伸缩,以适…

2019Linux系统教程189讲-08_基于LAMP架构部署商城系统

任务需求 1、任务具体要求 使用yum(dnf)工具一键部署LAMP环境 发布电商项目上线 ① 能够实现web界面注册会员账号 ② 能够实现web界面进行后台商品及会员的管理 2、项目选型 ㈠ PHPSHE商城系统概述 PHPSHE商城系统是将商品管理、品牌管理、规格管理、折扣管理、拼团管理、…