消息队列-kafka-消息发送流程(源码跟踪) 与消息可靠性

news/2024/7/27 8:59:42/文章来源:https://blog.csdn.net/qq_43259860/article/details/136464356

官方网址

源码:https://kafka.apache.org/downloads
快速开始:https://kafka.apache.org/documentation/#gettingStarted
springcloud整合

发送消息流程

在这里插入图片描述
主线程:主线程只负责组织消息,如果是同步发送会阻塞,如果是异步发送需要传入一个回调函数。
Map集合:存储了主线程的消息。
Sender线程:真正的发送其实是sender去发送到broker中。

源码阅读

1 首先打开Producer.send()可以看到里面的内容

// 返回值是一个 Future 参数为ProducerRecord
Future<RecordMetadata> send(ProducerRecord<K, V> record);
// ProducerRecord定义了这些信息
// 主题
private final String topic;
// 分区
private final Integer partition;
// header
private final Headers headers;
private final K key;
private final V value;
// 时间戳
private final Long timestamp;

2 发送之前的前置处理

public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {// intercept the record, which can be potentially modified; this method does not throw exceptions// 这里给开发者提供了前置处理的勾子ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);// 我们最终发送的是经过处理后的消息 并且如果是异步发送会有callback 这个是用户定义的return doSend(interceptedRecord, callback);}

3 进入真正的发送逻辑Future doSend()

  • 由于是网络通信,所以我们要序列化,在这个函数里面就做了序列化的内容。
try {serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());} catch (ClassCastException cce) {throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +" to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +" specified in key.serializer", cce);}byte[] serializedValue;try {serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());} catch (ClassCastException cce) {throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +" specified in value.serializer", cce);}
  • 然后我们获取分区
// 然后这里又是一个策略者模式 也是由用户可以配置的  DefaultPartitioner UniformStickyPartitioner RoundRobinPartitioner 提供了这样三个分区器
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {Integer partition = record.partition();return partition != null ?partition :partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}

4 到了我们的RecordAccumulator,也就是先由主线程发送到了RecordAccumulator

// 也就是对图中的Map集合
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);

我们发现里面是用一个MAP存储的一个分区和ProducerBatch 是讲这个消息写到内存里面MemoryRecordsBuilder 通过这个进行写入

// 可以看到是一个链表实现的双向队列,也就是消息会按append的顺序写到 内存记录中去
private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;

5 接着我们看,我们append了以后,会有一个判断去唤醒sender线程,见下面的注释

// 如果说哦我们当前的 这个batch满了或者 我们创建了一个新的batch 这个时候唤醒 sender线程去发送数据
if (result.batchIsFull || result.newBatchCreated) {log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);// 唤醒sender 去发送数据this.sender.wakeup();}
// 实现了Runnable 所以我们去看一下RUN方法的逻辑
public class Sender implements Runnable 

好上来就是一个循环

while (running) {try {runOnce();} catch (Exception e) {log.error("Uncaught error in kafka producer I/O thread: ", e);}
}

接着进入runOnece方法,直接看核心逻辑

// 从RecordAccumulator 拿数据 然后发送
Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);addToInflightBatches(batches);
// 中间省去了非核心逻辑
sendProduceRequests(batches, now);

如果继续跟踪的话最终是走到了selector.send()里面:

Send send = request.toSend(destination, header);InFlightRequest inFlightRequest = new InFlightRequest(clientRequest,header,isInternalRequest,request,send,now);this.inFlightRequests.add(inFlightRequest);selector.send(send);

6 接着我们就要看返回逻辑了,可以看到在sendRequest里面sendProduceRequest方法是通过传入了一个回调函数处理返回的。

RequestCompletionHandler callback = new RequestCompletionHandler() {public void onComplete(ClientResponse response) {handleProduceResponse(response, recordsByPartition, time.milliseconds());}};
// 如果有返回
if (response.hasResponse()) {ProduceResponse produceResponse = (ProduceResponse) response.responseBody();for (Map.Entry<TopicPartition, ProduceResponse.PartitionResponse> entry : produceResponse.responses().entrySet()) {TopicPartition tp = entry.getKey();ProduceResponse.PartitionResponse partResp = entry.getValue();ProducerBatch batch = batches.get(tp);completeBatch(batch, partResp, correlationId, now, receivedTimeMs + produceResponse.throttleTimeMs());}this.sensors.recordLatency(response.destination(), response.requestLatencyMs());} 

追踪到ProducerBatch

if (this.finalState.compareAndSet(null, tryFinalState)) {completeFutureAndFireCallbacks(baseOffset, logAppendTime, exception);return true;}
private void completeFutureAndFireCallbacks(long baseOffset, long logAppendTime, RuntimeException exception) {// Set the future before invoking the callbacks as we rely on its state for the `onCompletion` callproduceFuture.set(baseOffset, logAppendTime, exception);// execute callbacksfor (Thunk thunk : thunks) {try {if (exception == null) {RecordMetadata metadata = thunk.future.value();if (thunk.callback != null)thunk.callback.onCompletion(metadata, null);} else {if (thunk.callback != null)thunk.callback.onCompletion(null, exception);}} catch (Exception e) {log.error("Error executing user-provided callback on message for topic-partition '{}'", topicPartition, e);}}produceFuture.done();}

Thunk 这个其实就是我们在Append的时候的回调:
在这里插入图片描述
至此整个流程就完成了,从发送消息,到响应后回调我们的函数。

消息可靠性

// 所有消费者的配置都在ProducerConfig 里面
public static final String ACKS_CONFIG = "acks";

acks = 0:异步形式,单向发送,不会等待 broker 的响应
acks = 1:主分区保存成功,然后就响应了客户端,并不保证所有的副本分区保存成功
acks = all 或 -1:等待 broker 的响应,然后 broker 等待副本分区的响应,总之数据落地到所有的分区后,才能给到producer 一个响应

在可靠性的保证下,假设一些故障:

  • Broker 收到消息后,同步 ISR 异常:只要在 -1 的情况下,其实不会造成消息的丢失,因为有重试机制
  • Broker 收到消息,并同步 ISR 成功,但是响应超时:只要在 -1 的情况下,其实不会造成消息的丢失,因为有重试机制

可靠性能保证哪些,不能保障哪些?

  • 保证了消息不会丢失
  • 不保证消息一定不会重复(消息有重复的概率,需要消费者有幂等性控制机制)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_998574.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Sql Server】存储过程的创建和使用事务,常见运用场景,以及目前现状

欢迎来到《小5讲堂》&#xff0c;大家好&#xff0c;我是全栈小5。 这是《Sql Server》系列文章&#xff0c;每篇文章将以博主理解的角度展开讲解&#xff0c; 特别是针对知识点的概念进行叙说&#xff0c;大部分文章将会对这些概念进行实际例子验证&#xff0c;以此达到加深对…

2024宠物行业未来发展趋势:京东宠物健康(宠物营养保健和医疗)市场品类数据分析报告

近段时间&#xff0c;广州某知名宠物医院的医疗事故正在被大众热议&#xff0c;也让越来越多从业者开始关心宠物医疗行业的未来形势。 在2022年下半年&#xff0c;京东平台专门设立了一个一级大类目&#xff1a;宠物健康&#xff08;将其从原本的宠物生活类目中独立出来&#…

[递归、搜索、回溯]----递归

前言 作者&#xff1a;小蜗牛向前冲 专栏&#xff1a;小蜗牛算法之路 专栏介绍&#xff1a;"蜗牛之道&#xff0c;攀登大厂高峰&#xff0c;让我们携手学习算法。在这个专栏中&#xff0c;将涵盖动态规划、贪心算法、回溯等高阶技巧&#xff0c;不定期为你奉上基础数据结构…

OpenCASCADE+Qt创建建模平台

1、建模平台效果 2、三维控件OCCWidget 将V3d_View视图与控件句柄绑定即可实现3d视图嵌入Qt中&#xff0c;为了方便也可以基于QOpenGLWidget控件进行封装&#xff0c;方便嵌入各种窗体使用并自由缩放。 #ifndef OCCTWIDGET_H #define OCCTWIDGET_H#include <QWidget> #i…

产业园区如何实现数字化运营管理?

​在数字化浪潮席卷全球的今天&#xff0c;产业园区正经历着前所未有的变革&#xff0c;数字化运营管理成为各个园区转型升级的发力方向&#xff0c;它不仅能够提升园区的运营管理效率&#xff0c;还能够帮助园区提高服务效能、实现精准招商、增强决策效率&#xff0c;从而全面…

开源模型应用落地-工具使用篇-Ollama(六)

一、前言 在AI大模型百花齐放的时代&#xff0c;很多人都对新兴技术充满了热情&#xff0c;都想尝试一下。但是&#xff0c;实际上要入门AI技术的门槛非常高。除了需要高端设备&#xff0c;还需要面临复杂的部署和安装过程&#xff0c;这让很多人望而却步。不过&#xff0c;随着…

算法学习04:双指针、位运算

算法学习04&#xff1a;双指针、位运算 文章目录 算法学习04&#xff1a;双指针、位运算前言须要记忆的模版&#xff1a;一、双指针1.例题1注意&#xff1a;两个指针在一个序列 2.例题2 二、位运算1.例题1注意&#xff1a;从0开始数“第一位” 2.例题2注意&#xff1a;lowbit操…

蓝桥杯刷题(二)

参考大佬代码&#xff1a;&#xff08;区间合并二分&#xff09; import os import sysn, L map(int, input().split()) # 输入n,len arr [list(map(int, input().split())) for _ in range(n)] # 输入Li,Si def check(Ti, arr, L)->bool:sec [] # 存入已打开的阀门在…

如何恢复未保存的 Excel 文件

本周我们将 Office 恢复系列扩展到 Excel 恢复&#xff0c;并提出了最常见的问题&#xff1a;如何恢复 Excel 文件&#xff1f; 与 Office Word 不同&#xff0c;Excel 完全是关于表格和计算的。在处理Excel文件时&#xff0c;您可能会遇到更多问题。与往常一样&#xff0c;我们…

【JavaEE进阶】CSS选择器的常见用法

CSS选择器的主要功能就是选中页面指定的标签元素&#xff0c;选中了元素&#xff0c;才可以设置元素的属性。 CSS选择器主要有以下几种: 标签选择器类选择器id选择器复合选择器通配符选择器 接下来用代码来学习这几个选择器的使用。 <!DOCTYPE html> <html lang&q…

macos docker baota 宝塔 搭建 ,新增端口映射

拉取镜像仅拉取镜像保存到本地&#xff0c;不部署容器&#xff0c;仅需拉取一次&#xff0c;永久存储到本地镜像列表 docker pull akaishuichi/baota-m1:lnmp 其他可参考&#xff1a;宝塔面板7.9.2docker镜像发布-集成LN/AMP支持m1/m2 mac版本 - Linux面板 - 宝塔面板论坛 运行…

单细胞联合BulkRNA分析思路|加个MR锦上添花,增强验证~

今天给大家分享一篇IF7.3的单细胞MR的文章&#xff0c;2023年12月发表在Frontiers in Immunology&#xff1a;An integrative analysis of single-cell and bulk transcriptome and bidirectional mendelian randomization analysis identified C1Q as a novel stimulated risk…

JAVA虚拟机实战篇之内存调优[4](内存溢出问题案例)

文章目录 版权声明修复问题内存溢出问题分类 分页查询文章接口的内存溢出问题背景解决思路问题根源解决思路 Mybatis导致的内存溢出问题背景问题根源解决思路 导出大文件内存溢出问题背景问题根源解决思路 ThreadLocal占用大量内存问题背景问题根源解决思路 文章内容审核接口的…

python界面开发 - Menu (popupmenu) 右键菜单

文章目录 1. python图形界面开发1.1. Python图形界面开发——Tkinter1.2. Python图形界面开发——PyQt1.3. Python图形界面开发——wxPython1.4. Python图形界面开发—— PyGTK&#xff1a;基于GTK1.5. Python图形界面开发—— Kivy1.6. Python图形界面开发——可视化工具1.7. …

汽车大灯尾灯的车灯罩破损破裂裂纹等问题用什么胶可以修复??

汽车大灯尾灯破裂可以使用硅酮玻璃胶或者环氧树脂胶进行修复。 环氧树脂胶的优点主要包括&#xff1a; 粘接力强&#xff1a;环氧树脂胶也具有很高的粘接力&#xff0c;可以有效地将裂缝两侧的材料粘合在一起&#xff0c;确保牢固和持久的修复效果。内聚强度大&#xff1a;环…

Linux-信号3_sigaction、volatile与SIGCHLD

文章目录 前言一、sigaction__sighandler_t sa_handler;__sigset_t sa_mask; 二、volatile关键字三、SIGCHLD方法一方法二 前言 本章内容主要对之前的内容做一些补充。 一、sigaction #include <signal.h> int sigaction(int signum, const struct sigaction *act,struc…

1、Ajax、get、post、ajax,随机颜色

一、Ajax初始 1、什么是Ajax&#xff1f; 异步的JavaScript和xml 2、xml是什么&#xff1f; 一种标记语言&#xff0c;传输和存储数据----------现在用JSON传输数据 3、Ajax的作用 局部加载 可以使网页异步更新 4、Ajax的原理或者步骤(6步) 创建Ajax对象 if (window.X…

Whisper实现语音识别转文本

#教程 主要参考开源免费离线语音识别神器whisper如何安装&#xff0c; OpenAI开源模型Whisper——音频转文字 Whisper是一个开源的自动语音识别系统&#xff0c;它在网络上收集了680,000小时的多语种和多任务监督数据进行训练&#xff0c;使得它可以将多种语言的音频转文字。…

方阵的特征值与特征向量

目录 特征值 & 特征向量 相关性质 特征值 & 特征向量 相关性质

Vue-Router路由跳转

1.标签式 标签式是就是通过router-link 路由链接组件实现视图转换 2.编程式 this.$router.push(/about)