🍊 Java学习:Java从入门到精通总结
🍊 深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想
🍊 绝对不一样的职场干货:大厂最佳实践经验指南
📆 最近更新:2023年4月9日
🍊 个人简介:通信工程本硕 for NJU💪、Java程序员🌕。做过科研paper,发过专利,优秀的程序员不应该只是CRUD
🍊 点赞 👍 收藏 ⭐留言 📝 都是我最大的动力!
文章目录
- 事务消息发送流程
- 发送事务消息源码分析
- 事务消息回查
- Broker发起
事务消息发送流程
半消息实现了分布式环境下的数据一致性的处理,生产者发送事务消息的流程如上图所示,通过对源码的学习,我们可以弄清楚下面几点,也是半消息机制的核心:
- 为什么prepare消息不会被
Consumer
消费? - 事务消息是如何提交和回滚的?
- 定时回查本地事务状态的实现细节。
发送事务消息源码分析
发送事务消息方法TransactionMQProducer.sendMessageInTransaction
:
msg
:消息tranExecuter
:本地事务执行器arg
:本地事务执行器参数
public TransactionSendResult sendMessageInTransaction(final Message msg,final LocalTransactionExecuter localTransactionExecuter, final Object arg)throws MQClientException {TransactionListener transactionListener = getCheckListener();if (null == localTransactionExecuter && null == transactionListener) {throw new MQClientException("tranExecutor is null", null);}// 忽视消息延迟的属性if (msg.getDelayTimeLevel() != 0) {MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);}Validators.checkMessage(msg, this.defaultMQProducer);// 发送半消息SendResult sendResult = null;MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());try {sendResult = this.send(msg);} catch (Exception e) {throw new MQClientException("send message Exception", e);}// 处理发送半消息的结果LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;Throwable localException = null;switch (sendResult.getSendStatus()) {// 发送半消息成功,执行本地事务逻辑case SEND_OK: {try {if (sendResult.getTransactionId() != null) {msg.putUserProperty("__transactionId__", sendResult.getTransactionId());}String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);if (null != transactionId && !"".equals(transactionId)) {msg.setTransactionId(transactionId);}// 执行本地事务逻辑if (null != localTransactionExecuter) {localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);} else if (transactionListener != null) {log.debug("Used new transaction API");localTransactionState = transactionListener.executeLocalTransaction(msg, arg);}if (null == localTransactionState) {localTransactionState = LocalTransactionState.UNKNOW;}if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {log.info("executeLocalTransactionBranch return {}", localTransactionState);log.info(msg.toString());}} catch (Throwable e) {log.info("executeLocalTransactionBranch exception", e);log.info(msg.toString());localException = e;}}break;// 发送半消息失败,标记本地事务状态为回滚case FLUSH_DISK_TIMEOUT:case FLUSH_SLAVE_TIMEOUT:case SLAVE_NOT_AVAILABLE:localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;break;default:break;}// 结束事务,设置消息 COMMIT / ROLLBACKtry {this.endTransaction(msg, sendResult, localTransactionState, localException);} catch (Exception e) {log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);}// 返回事务发送结果TransactionSendResult transactionSendResult = new TransactionSendResult();transactionSendResult.setSendStatus(sendResult.getSendStatus());transactionSendResult.setMessageQueue(sendResult.getMessageQueue());// 提取Prepared消息的uniqIDtransactionSendResult.setMsgId(sendResult.getMsgId());transactionSendResult.setQueueOffset(sendResult.getQueueOffset());transactionSendResult.setTransactionId(sendResult.getTransactionId());transactionSendResult.setLocalTransactionState(localTransactionState);return transactionSendResult;}
该方法的入参包含有一个需要用户实现本地事务的LocalTransactionExecuter executer
,executer
中会进行事务操作以保证本地事务和消息发送这两个操作的原子性。
由上面的源码可知:
Producer
会首先发送一个半消息到Broker
中:
- 半消息发送成功,执行事务
- 半消息发送失败,不执行事务
半消息发送到Broker
后不会被Consumer
消费掉的原因有以下两点:
Broker
在将消息写入CommitLog
时会判断消息类型,如果是prepare
或者rollback
消息,ConsumeQueue
的offset
不变Broker
在构造ConsumeQueue
时会判断是否是处于prepare
或者rollback
状态的消息,如果是则不会将该消息放入ConsumeQueue
里,Consumer
在拉取消息时也就不会拉取到这条消息
Producer
会根据半消息的发送结果和本地任务执行结果来决定如何处理事务(commit
或rollback
),方法最后调用了endTransaction
来处理事务的执行结果,源码如下:
sendResult
:发送半消息的结果localTransactionState
:本地事务状态localException
:执行本地事务逻辑产生的异常RemotingException
:远程调用异常MQBrokerException
:Broker
异常InterruptedException
:当线程中断异常UnknownHostException
:未知host异常
public void endTransaction(final Message msg,final SendResult sendResult,final LocalTransactionState localTransactionState,final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {// 解码消息idfinal MessageId id;if (sendResult.getOffsetMsgId() != null) {id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());} else {id = MessageDecoder.decodeMessageId(sendResult.getMsgId());}// 创建请求String transactionId = sendResult.getTransactionId();final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();requestHeader.setTransactionId(transactionId);requestHeader.setCommitLogOffset(id.getOffset());switch (localTransactionState) {case COMMIT_MESSAGE:requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);break;case ROLLBACK_MESSAGE:requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);break;case UNKNOW:requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);break;default:break;}doExecuteEndTransactionHook(msg, sendResult.getMsgId(), brokerAddr, localTransactionState, false);requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());requestHeader.setMsgId(sendResult.getMsgId());String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;// 提交 commit / rollback 消息 this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,this.defaultMQProducer.getSendMsgTimeout());}
该方法是将事务执行的结果发送给Broker
,再由Broker
决定是否进行消息投递,执行步骤如下:
- 收到消息后先检查是否是事务消息,如果不是事务消息则直接返回
- 根据请求头里的
offset
查询半消息,如果查询结果为空则直接返回 - 根据半消息构造新消息,新构造的消息会被重新写入到
CommitLog
里,rollback
消息的消息体为空 - 如果是
rollback
消息,则该消息不会被投递
具体原因上文中已经分析过:只有
commit
消息才会被Broker
投递给consumer
RocketMQ会将
commit
消息和rollback
消息都写入到commitLog
里,但rollback
消息的消息体为空且不会被投递,CommitLog
在删除过期消息时才会将其删除。当事务commit
成功之后,RocketMQ会重新封装半消息并将其投递给Consumer
端消费。
事务消息回查
Broker发起
相较于普通消息,事务消息主要依赖下面三个类:
TransactionStateService
:事务状态服务,负责对事务消息进行管理,包括存储和更新事务消息状态、回查状态等TranStateTable
:事务消息状态存储表,基于MappedFileQueue
实现TranRedoLog
:TranStateTable
的日志,每次写入操作都会记录日志,当Broker
宕机时,可以利用这个文件做数据恢复
存储半消息到CommitLog
时,使用offset
索引到对应的TranStateTable
的位置