SpringBoot 整合 RabbitMQ 实现消息回调、手动确认 (二) 有图 有源码

news/2024/5/4 13:11:39/文章来源:https://blog.csdn.net/qq_39550368/article/details/126661153

创建时间 2022年8月29日

标签:Java、SpringBoot、RabbitMQ、队列

注释:新建SpringBoot项目实操RabbitMQ实现消息回调、手动确认

来源:CSDN博主:小目标青年

文章目录

  • SpringBoot 整合 RabbitMQ 回调确认模式
    • 生产者推送消息回调
      • 1、消息推送到server,但是server里找不到交换机
      • 2、下次推送到 server,找到交换机,但是没有找到队列
      • 3、消息推送到server,交换机和队列啥都没找到
      • 4、消息推送成功
    • 消费者收到消息确认

本文涉及的所有代码均已上传至 Gitee 链接:https://gitee.com/Array_Xiang/spring-boot-rabbit-mq.git

SpringBoot 整合 RabbitMQ 回调确认模式

上一期我们讲过了 direct、topic、fanout 三种模式,这一期我们来探究一下RabbitMQ 的消息回调和手动确认,在看文章之前,首先要确保您对 SpringBoot 和 RabbitMQ 有一定的理解能力,如果两者只熟悉其中的一种,或者都不熟悉,还请您不要过早阅读此文章。有名人云:浪费别人的时间就是在谋财害命

前期准备工作:IDEA、Jdk8、RabbitMQ

生产者推送消息回调

老样子,直接上代码

server:port: 8093spring:application:name: back-providerrabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guestvirtual-host: XiangHost#确认消息已发送到交换机(Exchange)publisher-confirm-type: correlated#确认消息已发送到队列(Queue)publisher-returns: true

如果你们在配置确认回调,测试发现无法触发回调函数,那么存在原因也许是因为SpringBoot版本导致的配置项不起效,可以把 publisher-confirms: true 替换为 publisher-confirm-type: correlated

配置消息确认的回调函数:RabbitConfig

package com.liuyuncen.config;import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @belongsProject: rabbitmq_springboot* @belongsPackage: com.liuyuncen.config* @author: Xiang想* @createTime: 2022-08-31  13:38* @description: TODO* @version: 1.0*/
@Configuration
public class RabbitConfig {@Beanpublic RabbitTemplate createRabbitTemplate(ConnectionFactory factory){RabbitTemplate rabbitTemplate = new RabbitTemplate();rabbitTemplate.setConnectionFactory(factory);// 只有开启了 Mandatory 才能出发回调函数,无论消息吐送结果怎样都强制调用回调函数rabbitTemplate.setMandatory(true);rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {System.out.println("ConfirmCallback 相关数据 = " + correlationData);System.out.println("ConfirmCallback 确认情况 = " + b);System.out.println("ConfirmCallback 原因 = " + s);}});rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {@Overridepublic void returnedMessage(Message message, int i, String s, String s1, String s2) {System.out.println("ReturnCallback 消息 = " + message);System.out.println("ReturnCallback 回应码 = " + i);System.out.println("ReturnCallback 回应信息 = " + s);System.out.println("ReturnCallback 交换机 = " + s1);System.out.println("ReturnCallback 路由键 = " + s2);}});return rabbitTemplate;}
}

可以看到我们配置了两个回调函数 ConfirmCallback 和 ReturnCallback,他们在什么情况下会触发呢?

我们大致模拟这几种情况

  1. 消息推送到 server ,但是 server 里找不到交换机
  2. 消息推送到 server,找到交换机,没有找到队列
  3. 消息推送到 server,交换机和队列里啥都没找到
  4. 消息推送成功

我们分别根据这几种情况来写测试案例

1、消息推送到server,但是server里找不到交换机

写个测试接口,把消息推送到 not-existent-exchange

package com.liuyuncen.collection;import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;/*** @belongsProject: rabbitmq_springboot* @belongsPackage: com.liuyuncen.collection* @author: Xiang想* @createTime: 2022-08-31  13:48* @description: TODO* @version: 1.0*/
@RestController
public class NotExistentExchange {@AutowiredRabbitTemplate rabbitTemplate;@GetMapping("/notExistentMessage")public String notExistentMessage(){Map<String,Object> map = new HashMap<>();map.put("code",String.valueOf(UUID.randomUUID()));map.put("time", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));map.put("message","hi 你肯定找不到这个交换机");// 指向交换机和路由键rabbitTemplate.convertAndSend("not-existent-exchange","routingKey",map);return "ok";}
}

调用接口,查看控制台,这里说没有找到交换机

在这里插入图片描述

结论:这种情况触发 ConfirmCallback 回调函数

2、下次推送到 server,找到交换机,但是没有找到队列

这种情况就需要新增一个交换机了,但是不给交换机绑定队列,在 RabbitConfig 配置类添加如下

    @BeanDirectExchange callBackDirectExchange(){return new DirectExchange("callBackDirectExchange");}

添加了一个 callBackDirectExchange 交换机,但是没有绑定队列

package com.liuyuncen.collection;import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;/*** @belongsProject: rabbitmq_springboot* @belongsPackage: com.liuyuncen.collection* @author: Xiang想* @createTime: 2022-08-31  13:56* @description: TODO* @version: 1.0*/
@RestController
public class NotQueueController {@AutowiredRabbitTemplate rabbitTemplate;@GetMapping("/notQueue")public String notQueue(){Map<String,Object> map = new HashMap<>();map.put("code",String.valueOf(UUID.randomUUID()));map.put("time", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));map.put("message","hi 你肯定找不到队列");// 指向交换机和路由键rabbitTemplate.convertAndSend("callBackDirectExchange","routingKey",map);return "ok";}
}

调用接口,查看控制台情况

可以看到这种情况,两个函数都被调用了,这种情况下,消息是推送到服务器的,所以 ConfirmCallback 确认了成功,而在 ReturnCallback 回调函数的打印参数里看到,消息到交换机成功了,但是分发给队列时候,找不到队列,报 NO_ROUTE 错误

结论:这种情况出发 ConfirmCallback 和 ReturnCallback 两个回调函数

3、消息推送到server,交换机和队列啥都没找到

这种情况就和案例1很像了,3和1 的回调是一致的,这里不做说明了

结论:这种情况触发 ConfirmCallabck 回调函数

4、消息推送成功

那么测试下,按正常调用之前的消息推送接口就可以了

在 rabbitConfig 配置类中补充配置

    @Beanpublic Queue callBackQueue(){// 队列名叫 DirectQueuereturn new Queue("callBackQueue",true);}@BeanBinding bindingDirect(){// 绑定并且指定 路由键return BindingBuilder.bind(callBackQueue()).to(callBackDirectExchange()).with("routingKey");}
package com.liuyuncen.collection;import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;/*** @belongsProject: rabbitmq_springboot* @belongsPackage: com.liuyuncen.collection* @author: Xiang想* @createTime: 2022-08-31  14:03* @description: TODO* @version: 1.0*/
@RestController
public class NormalController {@AutowiredRabbitTemplate rabbitTemplate;@GetMapping("/normal")public String normal(){Map<String,Object> map = new HashMap<>();map.put("code",String.valueOf(UUID.randomUUID()));map.put("time", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));map.put("message","hi 你又能看见我了");// 指向交换机和路由键rabbitTemplate.convertAndSend("callBackDirectExchange","routingKey",map);return "ok";}
}

在这里插入图片描述

结论:这种情况触发 ConfirmCallback 回调函数

消费者收到消息确认

和生产的消息确认机制不同,因为消息接收本来就在监听信息,符合条件的消息就消费下来,所以,消息接收确认机制主要三种模式

  1. 自动确认,这也是默认的消息确认情况 AcknowledgeMode.NONE

    RabbitMQ 成功将消息发出立即认为本次投递已经被正确处理,不管消费者端是否成功处理本次投递,所以这种情况如果消费者抛出异常,也就是消费者没有成功处理这条消息,那么就相当于消息丢失,一般这种情况我们都是使用 try-catch 捕捉异常,打印日志用于追踪数据,这样找出对应数据后在做处理

  2. 根据情况确认,这个不做介绍

  3. 手动确认,这个比较关键,也就是我们配置接收消息确认机制时,多数选择的模式,消费者在收到消息后,手动调用 basic.ack、basic.nack、basic.reject 后,rabbitMQ收到这些消息,才认为本次投递成功

    1. basic.ack 用于确认
    2. basic.nack 用于否确认,(这是AMQP 0-9-1 的 RabbitMQ扩展)
    3. basic.reject 用于否确认,但是和 basic.nack 不同的是,一次只能拒绝一条消息

这里重点说一下 reject,因为有时候一些场景是需要重新入列的

channel.basicReject(deliveryTag,true);拒绝消费当前消息,如果第二个参数传入 true,就是将数据重新丢回队列里,那么下次还会消费这个消息。如果第二个参数设置 false,那就告诉服务器,他已经知道这个数据是因为什么原因被拒绝的,下次一次就不会再消费这个消息了

这样就出现了一个问题,如果设置为 False,被拒绝的数据就一直在队列里,如果被拒绝的越来越多,就会导致消息积压,一定要及时清理出去

再捎带脚说一下 nack 吧,这个也是相当于设置不消费某消息

channel.basicNack(deliveryTag,false,true);第一个参数依然是当前小消息到数据唯一id,第二个参数是指是否针对多条消息,如果是true,也就是说一次性针对当前通道的消息tagId小于这个消息的都拒绝,第三个参数是指是否重新入列(这里要注意,你这一次被拒绝了,下一次还是会被拒绝,所以还是会导致消息积压)

罗里吧嗦说了一大堆,上手吧

在 rabbitmq-callback 服务下的 consumer 中添加接收类

package com.liuyuncen.receiver;import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;
import java.util.Map;/*** @belongsProject: rabbitmq_springboot* @belongsPackage: com.liuyuncen* @author: Xiang想* @createTime: 2022-08-31  14:35* @description: TODO* @version: 1.0*/
@Component
public class HandAffirmReceiver implements ChannelAwareMessageListener {@Overridepublic void onMessage(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.println("消息唯一ID值 = " + deliveryTag);try{byte[] body = message.getBody();ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(body));Map<String ,String> map = (Map<String,String>)ois.readObject();String code = map.get("code");String time = map.get("time");String msg = map.get("message");ois.close();System.out.println("手动确认消息:code:"+code+" time:"+time+", msg:"+msg);System.out.println("消费的主题消息来自:"+message.getMessageProperties().getConsumerQueue());//  第二个参数,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息channel.basicAck(deliveryTag,true);}catch (Exception e){channel.basicReject(deliveryTag,false);e.printStackTrace();}}
}

启动好消费者后,我们还是用刚刚的 NormalController.normal()接口再推一条消息

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ohahF6ay-1662097329222)(images/image-20220831144920523.png)]

成功消费下来,我们也可以把 HandAffirmReceiver 低 38行的代码改为

channel.basicReject(deliveryTag, true);

强行把所有的消息从新放回队列里

印证了我们在上诉的说法,当值设置为 true 时候,消息重新丢回队列,但是还是会重新进行消费,再被拒绝再被消费,一直被循环

我们也把 HandAffirmReceiver 低 38行的代码改为

channel.basicReject(deliveryTag, false);

拒绝消费,rabbitmq会把这条消息丢掉,可以看到消息打印之后,rabbitmq 的队列里,也没有多余的消息

到这里,我们已经掌握了怎么去使用消息消费的手动确认了

但是这个场景往往是不够的,他们需要这个消费者项目里面,监听了好几个队列都变成手动确认模式,而且处理消息业务逻辑也不一样了。

那我们就添加队列

然后我们的手动消费确认确认监听类就可以把设置的队列都消费下来

随后在不同的业务逻辑里处理不同的分区即可

package com.liuyuncen.receiver;import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;
import java.util.Map;/*** @belongsProject: rabbitmq_springboot* @belongsPackage: com.liuyuncen* @author: Xiang想* @createTime: 2022-08-31  14:35* @description: TODO* @version: 1.0*/
@Component
public class HandAffirmReceiver implements ChannelAwareMessageListener {@Overridepublic void onMessage(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.println("消息唯一ID值 = " + deliveryTag);try{byte[] body = message.getBody();ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(body));Map<String ,String> map = (Map<String,String>)ois.readObject();String code = map.get("code");String time = map.get("time");String msg = map.get("message");ois.close();System.out.println("手动确认消息:code:"+code+" time:"+time+", msg:"+msg);System.out.println("消费的主题消息来自:"+message.getMessageProperties().getConsumerQueue());String consumerQueue = message.getMessageProperties().getConsumerQueue();if ("fanout.A".equals(consumerQueue)) {System.out.println("执行 fanout.A 中的消息的业务处理流程......");}if ("callBackQueue".equals(consumerQueue)){System.out.println("执行 callBackQueue 中的消息的业务处理流程......");}//  第二个参数,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息channel.basicAck(deliveryTag,true);}catch (Exception e){channel.basicReject(deliveryTag,false);e.printStackTrace();}}
}

我们在根据不同的消息队列来判断执行逻辑,

这时再调用 /fanoutMessage 和 /normal 接口

获得更多相关面试题 关注公众号「Xiang想」

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_4858.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

3天精通Postman---动态参数amp;断言amp;CSV数据驱动amp;Mock Server

DAY2课题&#xff1a;Postman接口关联&动态参数&断言&CSV数据驱动目录 一、接口关联&#xff0c;接口依赖&#xff0c;多接口串联&#xff0c;组合API 二、Postman的动态参数&#xff08;随机数&#xff09; 三、Postman的环境变量和全局变量 四、Postman断言 五、…

极端气候肆虐催化,碳中和带出了一个“再生时代”

江南一带的高温结束了&#xff0c;今年这场轰轰烈烈的高温&#xff0c;也画上了最后的句号。各地骤降的温度让人仿佛忘却了“热到爆表”的经历&#xff0c;但过去已经成为历史&#xff0c;历史充满痕迹。 格陵兰岛冰盖加速融化、欧洲莱茵河部分河段干涸、长江流域汛期反枯、重…

Cyclopropene-PEG-MAL Maleimide|环丙烯-聚乙二醇-马来酰亚胺

描述&#xff1a;环丙烯有机化合物。环丙烯是由三个碳原子构成的环烯烃&#xff0c;分子式为C3H4 &#xff0c;由于具有张力&#xff0c;环丙烯具有一些和其他环烯烃不同的性质。 理化性质 环丙烯在常温常压下为无色气体&#xff0c;沸点-36.15 &#xff0c;折射率1.489 。 环…

Git的安装与使用

1、Git的下载 2、git的安装 点击安装软件&#xff0c;一路安装到底&#xff0c;无需做任何选择 ...... 此处省略中间安装步骤 ...... 3、检验是否安装成功 在桌面右键&#xff0c;如果出现此图&#xff0c;表示安装成功 4、配置git 为了方便git客户端操作远程仓储方便&#…

Redis集群搭建(单机集群)

Redis入门篇https://blog.csdn.net/tongxin_tongmeng/article/details/126620333集群配置文件&#xff08;单机集群&#xff09; 1.复制/home/redis/redis-7.0.4/redis.conf到/home/redis/workspace/cluster_one cp /home/redis/redis-7.0.4/redis.conf /home/redis/workspace/…

私有化部署的知识管理平台对企业有什么意义?

随着企业的发展扩大&#xff0c;企业内部沉淀的知识也越来越多。过去很多企业都会将知识存储到云上&#xff0c;云部署模式虽然给企业带来了极大的便利&#xff0c;但在一些性能及数据安全上会存在一定的弊端&#xff0c;隐藏不少的企业会选择将数据存储在本地。下面我们就从企…

数字机器人如何更好的助力智慧政务?这里或许有你想要的答案

“十四五”规划和2035年远景目标纲要中明确提出&#xff0c;迎接数字时代&#xff0c;加快建设数字经济、数字社会、数字政府&#xff0c;以数字化转型整体驱动生产方式、生活方式和治理方式变革。 国务院于6月23日印发的《关于加强数字政府建设的指导意见》&#xff0c; 再一…

22年国家gongwuyuan考试申论题(副省级)

2022年国家公务员考试申论题&#xff08;副省级&#xff09;的问题一&#xff0c;它的题目是&#xff1a;根据“给定资料1”&#xff0c;请你谈谈B公司的案例为企业科技创新提供了哪些启示&#xff1b;要求&#xff1a;分析全面&#xff0c;条理清晰&#xff0c;不超过200字。 …

一个SpringBoot问题就干趴下了?我却凭着这份PDF文档吊打面试官(Spring Boot知识点+详解)

随着 Spring Boot 使用越来越广泛&#xff0c;Spring Boot 已经成为 Java 程序员面试的知识点&#xff0c;很多同学对 Spring Boot 理解不是那么深刻&#xff0c;经常就会被几个连环追问就给干趴下了&#xff01; 给大家整理了 Spring Boot 的35个常见知识点、21道面试必刷题、…

Docker基础-3.本地镜像发布与容器数据卷

我们在上一章中生成了自己的镜像&#xff1a;myubuntu&#xff0c;这章分别将它发布到阿里云和私有仓库 docker imagesREPOSITORY TAG IMAGE ID CREATED SIZE myubuntu 1.0 938b4fc0baf5 20 minutes ago 179MB一、本地镜像发布到阿里云…

视频融合平台EasyCVR视频广场页脚优化为瀑布流式的实现方式

EasyCVR基于云边端一体化架构&#xff0c;兼容性高、拓展性强&#xff0c;可支持多类型设备、多协议方式接入&#xff0c;将复杂多变的底层资源统一管理起来&#xff0c;实现视频资源的统一汇聚与管理、鉴权分发、服务器集群、智能分析、数据共享、集成与调用等视频能力服务。 …

如何使用Postman快速简单的调用快递物流平台快递鸟API接口

前沿 快递鸟是一家聚合类的第三方快递物流平台&#xff0c;目前该平台提供的产品主要以API为主。由于API不能直观的看到产品效果&#xff0c;需要进行API对接联调成功后才能真实的看到产品的实际效果。但是如果一上来就写代码进行对接&#xff0c;耗费的时间长不说&#xff0c…

川渝智慧高速第 4 部分:车路协同系统数据交换

1 范围 本文件规定了智慧高速公路车路协同系统数据交换的架构和内容。 本文件适用于成渝地区双城经济圈智慧高速公路的新建、改&#xff08;扩&#xff09;建工程&#xff0c;以及高速公路既有设施 智慧化提升改造。 2 规范性引用文件 下列文件中的内容通过文中的规范性引用…

自动化情侣微信早安信息定时推送

文章目录一、效果展示二、配置config.txt&#xff08;重点&#xff09;2.1 填写appID和appsecret2.1 创建测试模板填写template_id2.4 填写user2.5 填写weather_key2.6 填写剩下其他框选内容即可三、运行软件3.1 选择config.txt文件并设定时间3.2 运行软件3.3 效果展示一、效果…

湘潭大学新生匿名问答网站——解湘 项目总结

湘潭大学新生匿名问答网站——解湘 项目总结 一.开发进度 解湘 ​ 项目首页 ​ 大一暑假过半,7月29日建立本地工程文件其中项目在github上经历七次push(第八次为修改配置文件,防止数据库泄露),但在本地修改次数远远大于七次。 后端开发均为我一人完成,前端开发由他人负责…

告别BeanUtils,Mapstruct从入门到精通

如果你现在还在使用BeanUtils&#xff0c;看了本文&#xff0c;也会像我一样&#xff0c;从此改用Mapstruct。对象之间的属性拷贝&#xff0c;之前用的是Spring的BeanUtils&#xff0c;有一次&#xff0c;在学习领域驱动设计的时候&#xff0c;看了一位大佬的文章&#xff0c;他…

Redis实现消息队列

一、消息队列 1、什么是消息队列 消息队列&#xff08;Message Queue&#xff09;是一种应用间的通信方式&#xff0c;消息发送后可以立即返回&#xff0c;由消息系统来确保消息的可靠传递。消息发布者只管把消息发布到 MQ 中而不用管谁来取&#xff0c;消息使用者只管从 MQ 中…

阿里巴巴出品:完美杜绝备战一个月面试 10 分钟,让 Java 面试从此不再难

众所周知&#xff0c;阿里后台一直以 Java 为重&#xff0c;面试官也是做 Java 开发的。但是语言只是工具&#xff0c;对代码的理解才是核心。面试时重点考察的是基础知识&#xff0c;以及解题的思考过程。 小编也是托了很多的关系&#xff0c;要到了这份阿里内容的面试解析。…

【运维心得】如何进行应用日志分析?

目录 为什么要进行分析? 什么是时序数据库&#xff1f; 分析结果这么用可以吗&#xff1f; 部分代码(python) 为什么要进行分析? 时间如梭&#xff0c;转眼炎热的8月份就过去了&#xff0c;全国4亿人都体会到了汗蒸和煎炸&#xff0c;这几天的凉爽&#xff0c;才使得我能…