SpringBoot集成RocketMQ实现分布式事务

news/2024/4/26 22:31:30/文章来源:https://blog.csdn.net/weixin_39643007/article/details/126620127

基本概念

RocketMQ 事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。RocketMQ 的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致 

执行流程

 

(1) 发送方向 MQ 服务端发送消息。
(2) MQ Server 将消息持久化成功之后,向发送方 ACK 确认消息已经发送成功,此时消息为半消息。
(3) 发送方开始执行本地事务逻辑。
(4) 发送方根据本地事务执行结果向 MQ Server 提交二次确认(Commit 或是 Rollback),MQ Server 收到Commit 状态则将半消息标记为可投递,订阅方最终将收到该消息;MQ Server 收到 Rollback 状态则删除半消息,订阅方将不会接受该消息。
(5) 在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达 MQ Server,经过固定时间后MQ Server 将对该消息发起消息回查。
(6) 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
(7) 发送方根据检查得到的本地事务的最终状态再次提交二次确认,MQ Server 仍按照步骤4对半消息进行操作

项目实例

pom文件内容

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.3</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.example</groupId><artifactId>rocketmq</artifactId><version>0.0.1-SNAPSHOT</version><name>rocketmq</name><description>Demo project for Spring Boot</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.0</version></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.5.8</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

application.yml

server:port: 8088rocketmq:name-server: 127.0.0.1:9876access-channel: LOCALproducer:group: deer_message_pushsend-message-timeout: 3000compress-message-body-threshold: 4096max-message-size: 2048000retry-times-when-send-failed: 2retry-next-server: true

消息生产者工具类

package com.example;
import cn.hutool.core.util.IdUtil;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;import java.io.Serializable;/*** @Author 何志鹏* @Date 2022/8/30 11:44* @Version 1.0*/
@Component
public class RocketmqProducer {private final Logger logger = LoggerFactory.getLogger(RocketmqProducer.class);@Autowiredprivate RocketMQTemplate rocketMQTemplate;/*** MQ半消息** @param topic target topic* @param tag   topic's tag* @param msg   message* @return send status*/public <T extends Serializable> SendStatus txSend(T msg, String topic, String tag) {String destination = String.format("%s:%s", topic, tag);Message<T> message = MessageBuilder.withPayload(msg).setHeader("KEYS", IdUtil.simpleUUID()).setHeader("DESTINATION", destination).build();TransactionSendResult result =rocketMQTemplate.sendMessageInTransaction(destination, message, msg);//发送状态String sendStatus = result.getSendStatus().name();// 本地事务执行状态String localTxState = result.getLocalTransactionState().name();logger.info("send tx message sendStatus:{},localTXState:{}",sendStatus,localTxState);return result.getSendStatus();}/*** 同步消息** @param msg* @param topic* @param tag* @param <T>* @return*/public <T extends Serializable> SendStatus syncSend(T msg, String topic, String tag) {String destination = String.format("%s:%s", topic, tag);Message<T> message = MessageBuilder.withPayload(msg).setHeader("KEYS", IdUtil.simpleUUID()).setHeader("DESTINATION", destination).build();return rocketMQTemplate.syncSend(destination, message).getSendStatus();}/*** 异步消息** @param msg* @param topic* @param tag* @param <T>* @return*/public <T extends Serializable> void asyncSend(T msg, String topic, String tag) {String destination = String.format("%s:%s", topic, tag);Message<T> message = MessageBuilder.withPayload(msg).setHeader("KEYS", IdUtil.simpleUUID()).setHeader("DESTINATION", destination).build();rocketMQTemplate.asyncSend(destination, message, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("============================发送成功=================================");}@Overridepublic void onException(Throwable e) {System.out.println("============================发送失败================================="+e.getMessage());}});}
}

创建user实体类

package com.example;import java.io.Serializable;/*** @Author 何志鹏* @Date 2022/8/30 17:05* @Version 1.0*/
public class User implements Serializable{private static final long serialVersionUID = 4247558661107952933L;private Integer id;private String name;private int age;public Integer getId() {return id;}public void setId(Integer id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}public int getAge() {return age;}public void setAge(int age) {this.age = age;}
}

生产者测试类

package com.example;import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest
class RocketmqApplicationTests {@Autowiredprivate RocketmqProducer rocketmqProducer;@Testvoid contextLoads() {User user = new User();user.setId(1);user.setName("何志鹏555555555555555555555");user.setAge(18);rocketmqProducer.txSend(user, "topic1", "11111");System.out.println("==========================开始发送消息=========================================");}}

消费者

package com.example;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;/*** @Author 何志鹏* @Date 2022/8/30 14:16* @Version 1.0*/
@Component
@RocketMQMessageListener(consumerGroup = "deer_message_push", topic = "topic1")
public class RocketMQConsumer  implements RocketMQListener {@Overridepublic void onMessage(Object message) {System.out.println("收到的消息为 ================================================: " + message);}
}

自定义一个RocketmqTransactionListener注解  这边为了方便定义哪些消息需要实现事务  如下:

package com.example;import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface RocketmqTransactionListener {String topic();String selectorExpression();
}

事务监听实现

package com.example;import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationContext;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;@Component
@RocketMQTransactionListener(corePoolSize = 2, maximumPoolSize = 10)
public class RocketmqTransactionListenerAdapter implements RocketMQLocalTransactionListener, InitializingBean {private final Logger logger = LoggerFactory.getLogger(RocketmqTransactionListenerAdapter.class);private final ApplicationContext applicationContext;private final ConcurrentHashMap<String, RocketMQLocalTransactionListener> listeners;public RocketmqTransactionListenerAdapter(ApplicationContext applicationContext) {this.applicationContext = applicationContext;this.listeners = new ConcurrentHashMap<>();}@Overridepublic void afterPropertiesSet() {applicationContext.getBeansWithAnnotation(RocketmqTransactionListener.class).values().stream().filter(m -> RocketMQLocalTransactionListener.class.isAssignableFrom(m.getClass())).collect(Collectors.toList()).forEach(listener -> {RocketmqTransactionListener annotation =listener.getClass().getAnnotation(RocketmqTransactionListener.class);Stream.of(annotation.selectorExpression().split("\\|\\|")).forEach(tag -> {String destination = String.format("%s:%s", annotation.topic(), tag);listeners.put(destination, (RocketMQLocalTransactionListener) listener);});});}@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {String destination = getDestination(msg);if (!listeners.containsKey(destination)) {logger.info("----------------------------------------------------");logger.warn("{} transaction message is not supported", destination);return RocketMQLocalTransactionState.ROLLBACK;} else {RocketMQLocalTransactionListener listener = listeners.get(destination);return listener.executeLocalTransaction(msg, arg);}}@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message msg) {String destination = getDestination(msg);if (!listeners.containsKey(destination)) {logger.info("----------------------------------------------------");logger.warn("{} transaction message is not supported", destination);return RocketMQLocalTransactionState.ROLLBACK;} else {RocketMQLocalTransactionListener listener = listeners.get(destination);return listener.checkLocalTransaction(msg);}}private String getDestination(Message msg) {String topic = msg.getHeaders().get("rocketmq_TOPIC", String.class);String tags = msg.getHeaders().get("rocketmq_TAGS", String.class);return String.format("%s:%s", topic, tags);}}

说明:
定义本地事务处理类,实现RocketMQLocalTransactionListener接口,以及加上@RocketMQTransactionListener注解,这个类似方法的调用是异步的;
executeLocalTransaction方法:当我们处理完业务后,可以根据业务处理情况,返回事务执行状态,有rollback, commit or unknown三种,分别是回滚事务,提交事务和未知;根据事务消息执行流程,如果返回rollback,则直接丢弃消息;如果是返回commit,则消费消息;如果是unknow,则继续等待,然后调用checkLocalTransaction方法,最多重试15次,超过了默认丢弃此消息;
checkLocalTransaction方法:是当MQ Server未得到MQ发送方应答,或者超时的情况,或者应答是unknown的情况,调用此方法进行检查确认,返回值和上面的方法一样;
 

自定义生产者事务监听类

package com.example;import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Service;/*** @Author 何志鹏* @Date 2022/8/30 16:56* @Version 1.0*/
@Service
@RocketmqTransactionListener(topic = "topic1", selectorExpression = "11111")
public class ProducerListener implements RocketMQLocalTransactionListener {@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {try {//todo 数据库相关逻辑  报错事务回滚 否则事务提交int count = 1;System.err.println("==================================开始执行事务===============================");return count > 0 ? RocketMQLocalTransactionState.COMMIT : RocketMQLocalTransactionState.ROLLBACK;} catch (Exception e) {System.err.println("=====================================事务执行失败============================================");return RocketMQLocalTransactionState.ROLLBACK;}}@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message msg) {try {//todo 数据库相关逻辑  报错事务回滚 否则事务提交boolean flag = true;return flag ? RocketMQLocalTransactionState.COMMIT : RocketMQLocalTransactionState.ROLLBACK;} catch (Exception e) {System.err.println("=====================================事务执行失败222222============================================");return RocketMQLocalTransactionState.ROLLBACK;}}
}

启动测试类RocketmqApplicationTests测试

当执行程序没有异常的情况下  可以看出,执行成功后,消息执行成功返回的结果为SEND_OK,本地事务执行的状态为COMMIT_MESSAGE

回滚测试

仿造异常情况下的程序  如改动以下程序即可

重启项目进行测试  打印日志如下:

 从执行的结果可以看出,消息执行成功返回的结果为SEND_OK,本地事务执行的状态为:ROLLBACK_MESSAGE,直接丢弃消息,所以消费端无法消费此消息  本地事务的状态分别为提交状态、回滚状态、未知状态  当生产者回调操作执行的结果为本地事务状态。其会发送给TC,而TC会在发送给TM,TM会根据TC发送过来的本地事务状态来决定全局事务确认指令 状态如下:

TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息。
TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。
TransactionStatus.Unknown: 未知状态,它代表需要检查消息队列来确定状态

最后附上项目源码

rocketmq: 测试rocketmq事务实现

 

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_3538.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Windows安全中心内存完整性无法打开问题的处理方法

Windows11安全中心内存完整性无法打开 今天电脑使用过程中突然看到系统桌面右下角任务栏中 windows安全中心图标出现了警告信息&#xff0c;如下图红框所示&#xff1a; 点击该图标进入windows安全中心的 安全性概览 界面&#xff0c;如下图&#xff1a; 在该界面可以看到出现安…

安卓毕业设计app项目源码基于Uniapp实现的美食餐厅订餐点餐

&#x1f345;文末获取联系&#x1f345; 一、项目介绍 计算机毕业设计安卓App毕设项目之美食APP-IT实战课堂_哔哩哔哩_bilibili计算机毕业设计安卓App毕设项目之美食APP-IT实战课堂共计2条视频&#xff0c;包括&#xff1a;F29 472-美食APP、项目选型成品与定制_ewm等&#…

OIDC 在 Authing 控制台的配置流程 | 认证(二)

01 集成介绍 在《Spring Security 集成 Authing OIDC 认证&#xff08;一&#xff09;》中我们讲解了很多的基础知识和概念。我们讲解了什么是 Spring Security &#xff0c;以及使用 Spring Security 安全管理框架给我们开发工作带来的便利和整合难度的降低&#xff0c;极大…

Git 实战(三) | Github 必会高频基础命令与 IDE 的 Git 集成

GitHub 上我们可以 fork 别人的项目&#xff0c;为了与别人产生一种协作关系&#xff0c;将他人的项目在自己本地创建也一个&#xff0c;这里以霍格沃兹测试学院&#xff08;Hogwarts&#xff09;的演练环境做演示&#xff1a; 1.1) 点击fork按钮对项目进行fork&#xff1a; 1.…

Docker 镜像构建可以分享的快乐

通过上一篇 Dockerfile 语法与指令的学习&#xff0c;本节就开始使用Dockerfile 来制作自己的 Docker 镜像啦。 Docker 镜像构建 新建 app.py 文件 from flask import Flask app Flask(__name__) app.route(/) def hello(): return Hello World! Hogwarts. 本代码主要功能是当…

物业公司如何解决降本增收?快鲸智慧社区系统来帮你

当下物业行业要解决的问题是降本增收&#xff0c;传统物业很难做到这点&#xff0c;想要生存并可持续发展&#xff0c;唯一的一条出路就是发展智慧物业&#xff0c;用技术对传统物业进行改造&#xff0c;实现降本增收的目标&#xff0c;这也是不少物业企业向智慧物业转型的原因…

【老王读Spring Transaction-1】从EnableTransactionManagement顺藤摸瓜,研究@Transactional的实现原理

从EnableTransactionManagement顺藤摸瓜&#xff0c;研究Transactional的实现原理前言版本约定正文EnableTransactionManagementProxyTransactionManagementConfiguration——Spring 事务配置的核心类TransactionInterceptorTransactionAttributeSource小结前言 Spring 对事务…

【Linux】Rocky 9.0 Podman服务无法正常启动

Rocky Linux 9.0发布后&#xff0c;我在本地虚拟机对该版本进行了安装和测试&#xff0c;发现Podman服务在某些情况下&#xff0c;无法正常启动。 当/etc/selinux/config配置中&#xff0c;SELINUXenforcing为默认配置的时候&#xff0c;启动Podman服务&#xff0c;会出现下面的…

Vue3.0中使用路由进行跳转和传参以及取值

1、在vue2.0中的路由跳转 2、在vue2.0中取出路由的传值 在vue3.0中取消了vue2.0的部分api&#xff0c;新增的两个API,分别是useRouter和useRoute。 3、vue3.0中路由跳转 1&#xff09;第一步先引入import {useRouter} from vue-router; 2&#xff09;第二步 const router useR…

使用SSH反向转发服务器上的请求到个人电脑

开启服务器ssh网关功能 修改/etc/ssh/sshd_config文件&#xff0c;将GatewayPorts 修改为yes&#xff0c;并放开AllowAgentForwarding yes和AllowTcpForwarding yes&#xff0c;GatewayPorts默认为no&#xff0c;AllowAgentForwarding、AllowTcpForwarding默认被注释 修改完成…

数字逻辑设计(4)

文章目录数组逻辑设计&#xff08;4&#xff09;1. 组合逻辑电路中的险象一、门延迟二、逻辑冒险三、险象的分类1&#xff09;静态冒险2&#xff09;动态冒险3&#xff09;功能冒险2. 险象的判断及消除险象的判断1&#xff09;代数法2&#xff09; 卡诺图法险象的消除1&#xf…

paddlepaddle

项目用到了paddlespeech2&#xff0c;学了几天paddlepaddle&#xff0c;简单记录一下: 文章目录1 手写数字识别任务2 极简方案构建手写数字识别模型模型设计训练配置训练过程模型测试3【手写数字识别】之数据处理4【手写数字识别】网络结构4.1 经典的全连接神经网络4.2 卷积神经…

14天刷爆LeetCode算法学习计划——Day02双指针(2)

Day02——双指针一、前言二、知识点三、LeetCode189. 轮转数组1.题目2.解题示意图3.解题思路4.代码实现5.验证代码6.注意点四、结语一、前言 盲目刷题只会让自己心态爆炸&#xff0c;所以本期14天算法学习计划&#xff0c;也是LeetCode上的 [算法] 学习计划&#xff0c;在本专栏…

【LeetCode】统计全 1 子矩形(单调栈)

1504. 统计全 1 子矩形 - 力扣&#xff08;LeetCode&#xff09; 一、题目 给你一个 m x n 的二进制矩阵 mat &#xff0c;请你返回有多少个 子矩形 的元素全部都是 1 。 示例 1&#xff1a; 输入&#xff1a;mat [[1,0,1],[1,1,0],[1,1,0]] 输出&#xff1a;13 解释&#x…

TCL基础学习 字符串

基本指令 Tcl将所有的变量值视作字符串&#xff0c;并将他们作为字符串来保存。下标列出了比较有用的字符串操作命令&#xff1a; append将值追加到字符串尾binary二进制字符串操作format字符串格式化regexp正则表达式regsub用字符串模式进行字符串模拟匹配和替换scan字符串分…

计算机网络面试(一)网络分层结构

文章目录为什么使用分层结构OSI参考模型分层结构——OSI参考模型ISO各个分层解析TCP/IP各个分层解析为什么使用分层结构 对网络分层以后&#xff0c;可以将问题细化&#xff0c;使得问题更加容易分析。把一个大的系统分拆成小的体系后&#xff0c;便于在各个层次上制定标准&am…

《三叶虫与其他故事》我的恐惧如涟漪扩散,荡漾过百万年的时光

《三叶虫与其他故事》我的恐惧如涟漪扩散&#xff0c;荡漾过百万年的时光 布里斯D’J.潘凯克 Breece D‘J Pancake&#xff08;1952-1979&#xff09;&#xff0c;美国作家。二十六岁时自杀身亡&#xff0c;生前仅发表过六篇小说。潘凯克深受美国南方文学传统的影响&#xff0c…

3dmax的Corona的渲染器材质要如何完全转换VRay材质?

经常有伙伴问怎么转化材质&#xff0c;将CR转换成vr或者将VR转换CR~其实这一点需要通过材质转换插件即可转换~ 方法一&#xff1a;cr转vr材质&#xff0c;自带 第一步&#xff1a;确认自己的corona渲染器版本为corona5及以上&#xff1a; ​ 第2步 确认自己的vray渲染器版本…

springboot手机推荐网站毕业设计源码052329

摘 要 随着社会的发展&#xff0c;计算机的优势和普及使得手机推荐网站的开发成为必需。手机推荐网站主要是借助计算机&#xff0c;通过对首页、手机问答、公告消息、手机资讯、手机测评、我的、跳转到后台等信息进行管理。减少管理员的工作&#xff0c;同时也方便广大用户对个…

voip|网络电话,软件实现电信座机

原理 我们办理的宽带一般都含有座机服务&#xff0c;有一个座机号&#xff0c;自己买个座机插到光猫的语音口上就能用。光猫内置语音服务&#xff0c;座机通过电话线接上光猫来打电话&#xff0c;这个语音服务本质上是VOIP&#xff0c;基于IP的语音传输&#xff0c;光猫在VOIP…