RocketMQ 事务消息 详解

news/2024/5/19 18:49:10/文章来源:https://blog.csdn.net/HNU_Csee_wjw/article/details/124202444

🍊 Java学习:Java从入门到精通总结

🍊 深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想

🍊 绝对不一样的职场干货:大厂最佳实践经验指南


📆 最近更新:2023年4月9日

🍊 个人简介:通信工程本硕 for NJU💪、Java程序员🌕。做过科研paper,发过专利,优秀的程序员不应该只是CRUD

🍊 点赞 👍 收藏 ⭐留言 📝 都是我最大的动力!


文章目录

  • 事务消息发送流程
  • 发送事务消息源码分析
  • 事务消息回查
    • Broker发起

事务消息发送流程

在这里插入图片描述
半消息实现了分布式环境下的数据一致性的处理,生产者发送事务消息的流程如上图所示,通过对源码的学习,我们可以弄清楚下面几点,也是半消息机制的核心:

  1. 为什么prepare消息不会被Consumer消费?
  2. 事务消息是如何提交和回滚的?
  3. 定时回查本地事务状态的实现细节。

发送事务消息源码分析

发送事务消息方法TransactionMQProducer.sendMessageInTransaction

  • msg:消息
  • tranExecuter:本地事务执行器
  • arg:本地事务执行器参数
public TransactionSendResult sendMessageInTransaction(final Message msg,final LocalTransactionExecuter localTransactionExecuter, final Object arg)throws MQClientException {TransactionListener transactionListener = getCheckListener();if (null == localTransactionExecuter && null == transactionListener) {throw new MQClientException("tranExecutor is null", null);}// 忽视消息延迟的属性if (msg.getDelayTimeLevel() != 0) {MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);}Validators.checkMessage(msg, this.defaultMQProducer);// 发送半消息SendResult sendResult = null;MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());try {sendResult = this.send(msg);} catch (Exception e) {throw new MQClientException("send message Exception", e);}// 处理发送半消息的结果LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;Throwable localException = null;switch (sendResult.getSendStatus()) {// 发送半消息成功,执行本地事务逻辑case SEND_OK: {try {if (sendResult.getTransactionId() != null) {msg.putUserProperty("__transactionId__", sendResult.getTransactionId());}String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);if (null != transactionId && !"".equals(transactionId)) {msg.setTransactionId(transactionId);}// 执行本地事务逻辑if (null != localTransactionExecuter) {localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);} else if (transactionListener != null) {log.debug("Used new transaction API");localTransactionState = transactionListener.executeLocalTransaction(msg, arg);}if (null == localTransactionState) {localTransactionState = LocalTransactionState.UNKNOW;}if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {log.info("executeLocalTransactionBranch return {}", localTransactionState);log.info(msg.toString());}} catch (Throwable e) {log.info("executeLocalTransactionBranch exception", e);log.info(msg.toString());localException = e;}}break;// 发送半消息失败,标记本地事务状态为回滚case FLUSH_DISK_TIMEOUT:case FLUSH_SLAVE_TIMEOUT:case SLAVE_NOT_AVAILABLE:localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;break;default:break;}// 结束事务,设置消息 COMMIT / ROLLBACKtry {this.endTransaction(msg, sendResult, localTransactionState, localException);} catch (Exception e) {log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);}// 返回事务发送结果TransactionSendResult transactionSendResult = new TransactionSendResult();transactionSendResult.setSendStatus(sendResult.getSendStatus());transactionSendResult.setMessageQueue(sendResult.getMessageQueue());// 提取Prepared消息的uniqIDtransactionSendResult.setMsgId(sendResult.getMsgId());transactionSendResult.setQueueOffset(sendResult.getQueueOffset());transactionSendResult.setTransactionId(sendResult.getTransactionId());transactionSendResult.setLocalTransactionState(localTransactionState);return transactionSendResult;}

该方法的入参包含有一个需要用户实现本地事务的LocalTransactionExecuter executerexecuter中会进行事务操作以保证本地事务和消息发送这两个操作的原子性。


由上面的源码可知:

Producer会首先发送一个半消息到Broker中:

  1. 半消息发送成功,执行事务
  2. 半消息发送失败,不执行事务

半消息发送到Broker后不会被Consumer消费掉的原因有以下两点:

  1. Broker在将消息写入CommitLog时会判断消息类型,如果是prepare或者rollback消息,ConsumeQueueoffset不变
  2. Broker在构造ConsumeQueue时会判断是否是处于prepare或者rollback状态的消息,如果是则不会将该消息放入ConsumeQueue里,Consumer在拉取消息时也就不会拉取到这条消息

Producer会根据半消息的发送结果和本地任务执行结果来决定如何处理事务(commitrollback),方法最后调用了endTransaction来处理事务的执行结果,源码如下:

  • sendResult:发送半消息的结果
  • localTransactionState:本地事务状态
  • localException:执行本地事务逻辑产生的异常
  • RemotingException:远程调用异常
  • MQBrokerExceptionBroker异常
  • InterruptedException:当线程中断异常
  • UnknownHostException:未知host异常

public void endTransaction(final Message msg,final SendResult sendResult,final LocalTransactionState localTransactionState,final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {// 解码消息idfinal MessageId id;if (sendResult.getOffsetMsgId() != null) {id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());} else {id = MessageDecoder.decodeMessageId(sendResult.getMsgId());}// 创建请求String transactionId = sendResult.getTransactionId();final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();requestHeader.setTransactionId(transactionId);requestHeader.setCommitLogOffset(id.getOffset());switch (localTransactionState) {case COMMIT_MESSAGE:requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);break;case ROLLBACK_MESSAGE:requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);break;case UNKNOW:requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);break;default:break;}doExecuteEndTransactionHook(msg, sendResult.getMsgId(), brokerAddr, localTransactionState, false);requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());requestHeader.setMsgId(sendResult.getMsgId());String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;// 提交 commit / rollback 消息 this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,this.defaultMQProducer.getSendMsgTimeout());}

该方法是将事务执行的结果发送给Broker,再由Broker决定是否进行消息投递,执行步骤如下:

  1. 收到消息后先检查是否是事务消息,如果不是事务消息则直接返回
  2. 根据请求头里的offset查询半消息,如果查询结果为空则直接返回
  3. 根据半消息构造新消息,新构造的消息会被重新写入到CommitLog里,rollback消息的消息体为空
  4. 如果是rollback消息,则该消息不会被投递

具体原因上文中已经分析过:只有commit消息才会被Broker投递给consumer

RocketMQ会将commit消息和rollback消息都写入到commitLog里,但rollback消息的消息体为空且不会被投递,CommitLog在删除过期消息时才会将其删除。当事务commit成功之后,RocketMQ会重新封装半消息并将其投递给Consumer端消费。


事务消息回查

Broker发起

相较于普通消息,事务消息主要依赖下面三个类:

  1. TransactionStateService:事务状态服务,负责对事务消息进行管理,包括存储和更新事务消息状态、回查状态等
  2. TranStateTable:事务消息状态存储表,基于MappedFileQueue实现
  3. TranRedoLogTranStateTable的日志,每次写入操作都会记录日志,当Broker宕机时,可以利用这个文件做数据恢复

存储半消息到CommitLog时,使用offset索引到对应的TranStateTable的位置


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_283754.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

RSA非对称加密算法原理和代码实现 信息安全 密码学

一 欧拉数论定理 1. 欧拉函数 设n为一正整数,则欧拉函数φ(n)\varphi (n)φ(n)等于0∼n−10\sim n-10∼n−1中与n互素的整数个数 比如φ(5)4\varphi (5) 4φ(5)4,因为0~5中, 1,2,3,4均与5互素,即最大公约数为1 2. 欧拉定…

采集工具助力市场营销,让您的营销更加高效

随着市场竞争的日益激烈,企业的营销策略也在不断升级。而在这个信息爆炸的时代,采集数据成为了市场营销中不可或缺的一环。为了更好地服务客户,我们公司开发了一款高效、快捷的采集工具,为您的营销活动提供有力支持。 Msray-plus&…

计算机网络习题 | 第一章:计算机网络概述

文章目录概述1、以下关于OSI环境中数据传输的过程的描述中,错误的是( )2、以下关于广域网 WAN 特点的描述中 ,错误的是( )3、以下关于计算机网络定义的描述中,错误的是( &#xff09…

【分布式 论文】之 1. MapReduce——Simplified Data Processing on Large Clusters

文章目录1. 需求 / 现存问题2. 总述3. 实现3.1 概述1. 需求 / 现存问题 输入数据通常很大,为了在合理的时间内完成计算,必须将计算分布到数百或数千台机器上。 如何并行化计算、分发数据和处理故障等问题使得原本简单的计算变得晦涩难懂,需…

chatGPA的主要功能-chatGPT深度分析

ChatGPT功能介绍 ChatGPT是基于深度学习技术的自然语言处理算法,其主要用途是生成自然语言文本,能够应用于多个自然语言处理任务。以下是其主要功能介绍: 文本生成:ChatGPT能够生成高质量的自然语言文本,可以应用于大…

Mybatis-plus学习2

一、Mybatis-plus分页操作 1.配置拦截器即可 //分页插件Beanpublic PaginationInterceptor paginationInterceptor(){return new PaginationInterceptor();} 2.直接使用Page对象 //测试分页查询Testpublic void testPage(){//参数一:当前页//参数二:页面…

关键词采集软件在SEO优化中的应用与效果

搜索引擎的优化被广泛认为是提高网站排名和在线可见性的重要方法之一。SEO人员需要进行大量的工作以确保网站的内容和标签可以被搜索引擎正确地解析和索引。在这项任务中,使用搜索引擎关键词采集软件可以帮助SEO人员完成许多繁琐的任务并简化他们的工作流程。在本文…

【C语言】数组指针-c语言的任督二脉

视频链接:bilibili 关于指针需要注意的地方 只有以下两种情况数组名表示的是整个数组 1.sizeof(数组名) 2.&数组名 除此之外数组名表示的都是首元素地址 一、字符指针 是一个指向字符的指针 int main() {char ch w;char* p &ch;//char* ch2 "abcdef"…

【TreeSet】| 深度剥析Java SE 源码合集Ⅳ

目录一. 🦁 前言二. 🦁 剥析流程2.1 类图2.2 属性2.3 构造方法2.4 添加单个元素2.5 移除单个元素2.6 查找单个元素2.7 查找接近的元素2.8 获得首尾的元素2.9 清空2.10 克隆2.11 序列化2.12 反序列化2.13 获得迭代器2.14 转换成 Set/Collection2.15 查找范…

Python 进阶指南(编程轻松进阶):二、环境配置和命令行

原文:http://inventwithpython.com/beyond/chapter2.html 环境配置是配置你的计算机环境,以便你写代码的过程。这包括安装任何必要的工具,配置它们,以及处理安装过程中的任何问题。没有一键配置这种傻瓜式操作过程,因为…

分享一个智能的问答工具,刷题和学习的好帮手

使用了这个问答工具后,感觉前后端都要被替代了,太强了。 由于本人之前很想体验,但是一直难搞,最近发现了一个免梯子的,重要事情说一遍,免梯子!是我最近发现的最好用,最快的&#xff…

OpenCV实战——多尺度FAST特征检测

OpenCV实战——多尺度FAST特征检测0. 前言1. BRISK 特征检测器1.1 BRISK 检测关键点1.2 多尺度关键点快速检测2. ORB 特征检测算法3. 完整代码相关链接0. 前言 FAST 是用于快速检测图像中关键点的方法,而 SURF 和 SIFT 算法的设计重点是尺度不变性。为了同时实现快…

【软件设计师10】软件工程

软件工程 1. 瀑布模型SDLC - 结构化 优点:结构化方法模型,每个阶段分工明确;出现问题可以向上层回溯 缺点:需求阶段难以把控,在项目初期,软件的需求几乎是不明确的,等开发完用户往往再提出问…

微信小程序 | 网易云+ChatGPT实现一个智能音乐推荐小程序

文章目录* 效果预览** 分析用户的输入产生推荐** 分析用户的选择标签进行推荐一、需求背景二、项目原理及架构2.1 实现原理(1) 基于用户的喜欢歌手推荐(2)基于用户的兴趣标签推荐(3)改进上一步推荐的结果2.…

IM即时通讯-N-如何保证消息的可靠性展示

结论先行 客户端如何在推拉结合的模式下保证消息的可靠性展示? 原则: server拉取的消息一定是连续的原则: 端侧记录的消息的连续段有两个作用: 1. 记录消息的连续性, 即起始中间没有断层, 2. 消息连续&am…

【数据结构】树与二叉树的基本概念及性质

目录 一、树的基本概念 1️⃣树的定义 2️⃣基本术语 3️⃣树的性质 二、二叉树的概念 1️⃣二叉树的定义 2️⃣特殊二叉树 3️⃣二叉树的性质 参考资料 一、树的基本概念 1️⃣树的定义 数据结构中的树是什么❓ 树是 个结点的有限集。有且仅有一个特定的称为根(上图A结点…

零基础教学必会篇(详解字符函数和字符串函数)(完结版)

各位csdn的友友们好,上次阿博给大家讲了一些简单的字符串函数的功能和模拟实现,今天就和阿博一起再上一个台阶继续拿捏它们😊😊😊 文章目录1.strstr的功能介绍2.strstr函数的模拟实现3.strtok的功能介绍4.strerror和pe…

零基础学习Java 06

目录 String String构造方法 字符串查找 字符串截取 字符串替换 字符串拆分 字符串修改 String String类在java.lang包下,所以使用的时候不需要导包。 String构造方法 字符串查找 char charAt(int index),输入位置index,找单个字符 …

MAE论文笔记+Pytroch实现

Masked Autoencoders Are Scalable Vision Learners, 2021 近期在梳理Transformer在CV领域的相关论文,落脚点在于如何去使用Pytroch实现如ViT和MAE等。通过阅读源码,发现不少论文的源码都直接调用timm来实现ViT。故在此需要简单介绍一下timm…

Linux 中的 /dev/random 和 /dev/urandom 是什么?

在Linux系统中,/dev/random和/dev/urandom是两个特殊的设备文件,用于生成随机数。在本文中,我们将深入探讨这两个设备文件的区别,以及它们在Linux系统中的作用。 /dev/random /dev/random是一个随机数生成器设备文件,…