# 【博学谷学习记录】超强总结,用心分享 | RabbitMQ消息的可靠性

news/2024/5/20 14:21:45/文章来源:https://www.cnblogs.com/Azureblue/p/16612531.html

消息队列在使用过程中,如何确保RabbitMQ消息的可靠性,如何确保发送的消息至少被消费一次?

1.生产者消息确认

RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。这种机制必须给每个消息指定一个唯一ID。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。

返回结果有两种方式:

  • publisher-confirm,发送者确认
    • 消息成功投递到交换机,返回ack
    • 消息未投递到交换机,返回nack
  • publisher-return,发送者回执
    • 消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败原因。

注意:

确定机制发送消息时,需要给每条消息设置一个全局唯一的一个ID,以区分不同的消息,防止ACK发生冲突。

1.1 配置文件

首先,修改生产者服务中的application.yml文件,添加下面的内容:

spring:rabbitmq:publisher-confirm-type: correlatedpublisher-returns: truetemplate:mandatory: true

完整的application.yml文件:
spring:rabbitmq:host: 192.168.186.100 # rabbitMQ的ip地址port: 5672 # 端口username: rabbitpassword: 123456virtual-host: / # 虚拟主机publisher-confirm-type: correlatedpublisher-returns: truetemplate:mandatory: true

说明:
  • publish-confirm-type:开启publisher-confirm,这里支持两种类型:
    • simple:同步等待confirm结果,直到超时
    • correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
  • publish-returns:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback
  • template.mandatory:定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息

1.2 定义Return回调

每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目加载时配置:

修改生产者服务,添加一个:

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {// 获取RabbitTemplateRabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);// 设置ReturnCallbackrabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {// 投递失败,记录日志log.info("消息发送到队列失败,应答码{},原因{},交换机{},路由键{},消息{}",replyCode, replyText, exchange, routingKey, message.toString());// 如果有业务需要,可以重发消息});}
}


1.3 定义ConfirmCallback

ConfirmCallback可以在发送消息时指定,因为每个业务处理confirm成功或失败的逻辑不一定相同。

在生产者服务的SpringAmqpTest类中,定义一个单元测试方法:

public void testSendMessageSimpleQueue() throws InterruptedException {// 1.消息体String message = "hello, spring amqp!";// 2.全局唯一的消息ID,需要封装到CorrelationData中CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());// 3.添加callbackcorrelationData.getFuture().addCallback(result -> {if(result.isAck()){// 3.1.ack,消息成功log.debug("消息发送到交换机成功, ID:{}", correlationData.getId());}else{// 3.2.nack,消息失败log.error("消息发送到交换机失败, ID:{}, 原因{}",correlationData.getId(), result.getReason());}},ex -> log.error("消息发送异常, ID:{}, 原因{}",correlationData.getId(),ex.getMessage()));// 4.发送消息rabbitTemplate.convertAndSend("amq.topic", "topic.queue1", message, correlationData);// 休眠一会儿,等待ack回执Thread.sleep(2000);
}

1.4 测试

手动绑定交换机和队列:


测试一:运行单元测试

测试通过!


测试二:修改routingKey,再次进行测试

结果:

消息投递到交换机成功,但消息未能发送到队列。


2.消息持久化

生产者确认可以确保消息投递到RabbitMQ的队列中,但是消息发送到RabbitMQ以后,如果突然宕机,也可能导致消息丢失。

要想确保消息在RabbitMQ中安全保存,必须开启消息持久化机制。

  • 交换机持久化
  • 队列持久化
  • 消息持久化

2.1 交换机持久化

RabbitMQ中交换机默认是非持久化的,mq重启后就丢失。

SpringAMQP中可以通过代码指定交换机持久化:

@Bean
public DirectExchange simpleExchange(){// 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除return new DirectExchange("amq.direct", true, false);
}

事实上,默认情况下,由SpringAMQP声明的交换机都是持久化的


可以在RabbitMQ控制台看到持久化的交换机都会带上D的标示:


2.2 队列持久化

RabbitMQ中队列默认是非持久化的,mq重启后就丢失。

SpringAMQP中可以通过代码指定交换机持久化:

@Bean
public Queue simpleQueue(){// 使用QueueBuilder构建队列,durable就是持久化的return QueueBuilder.durable("simple.queue").build();
}

事实上,默认情况下,由SpringAMQP声明的队列都是持久化的

可以在RabbitMQ控制台看到持久化的队列都会带上D的标示:


2.3 消息持久化

利用SpringAMQP发送消息时,可以设置消息的属性(MessageProperties),指定delivery-mode:

  • 1:非持久化
  • 2:持久化

用java代码指定:

Message message = MessageBuilder.withBody("hello".getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();

默认情况下SpringAMQP发出的任何消息都是持久化的,不用特意指定。


3.消费者消息确认

RabbitMQ是阅后即焚机制,RabbitMQ确认消息被消费者消费后会立刻删除。

而RabbitMQ是通过消费者回执来确认消费者是否成功处理消息的:消费者获取消息后,应该向RabbitMQ发送ACK回执,表明自己已经处理消息。


设想这样的场景:

  • 1)RabbitMQ投递消息给消费者
  • 2)消费者获取消息后,返回ACK给RabbitMQ
  • 3)RabbitMQ删除消息
  • 4)消费者宕机,消息尚未处理

这样,消息就丢失了。因此消费者返回ACK的时机非常重要。


而SpringAMQP则允许配置三种确认模式:

•manual:手动ack,需要在业务代码结束后,调用api发送ack。

•auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack

•none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除


由此可知:

  • none模式下,消息投递是不可靠的,可能丢失
  • auto模式类似事务机制,出现异常时返回nack,消息回滚到mq;没有异常,返回ack
  • manual:自己根据业务情况,判断什么时候该ack

一般,我们都是使用默认的auto即可。


3.1 none模式

修改消费者服务的application.yml文件,添加下面内容:

spring:rabbitmq:listener:simple:acknowledge-mode: none # 关闭ack

修改消费者服务的SpringRabbitListener类中的方法,模拟一个消息处理异常:

@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg) {log.info("消费者接收到simple.queue的消息:【{}】", msg);// 模拟异常System.out.println(1 / 0);log.debug("消息处理完成!");
}

测试可以发现,当消息处理抛异常时,消息依然被RabbitMQ删除了。

3.2 auto模式

再次把确认机制修改为auto:

spring:rabbitmq:listener:simple:acknowledge-mode: auto # 自动ack

在异常位置打断点,再次发送消息,程序卡在断点时,可以发现此时消息状态为unack(未确定状态):


抛出异常后,因为Spring会自动返回nack,所以消息恢复至Ready状态,并且没有被RabbitMQ删除:



4.消费失败重试机制

当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,无限循环,导致mq的消息处理飙升,带来不必要的压力。

4.1 本地重试

我们可以利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。

修改消费者服务的application.yml文件,添加内容:

spring:rabbitmq:listener:simple:retry:enabled: true # 开启消费者失败重试initial-interval: 1000 # 初识的失败等待时长为1秒multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

重启消费者服务,重复之前的测试。可以发现:

  • 在重试3次后,SpringAMQP会抛出异常AmqpRejectAndDontRequeueException,说明本地重试触发了
  • 查看RabbitMQ控制台,发现消息被删除了,说明最后SpringAMQP返回的是ack,mq删除消息了

结论:

  • 开启本地重试时,消息处理过程中抛出异常,不会requeue到队列,而是在消费者本地重试
  • 重试达到最大次数后,Spring会返回ack,消息会被丢弃

4.2 失败策略

在之前的测试中,达到最大重试次数后,消息会被丢弃,这是由Spring内部机制决定的。

在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecovery接口来处理,它包含三种不同的实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

比较优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。


RepublishMessageRecoverer处理方案:

1)在消费者服务中定义处理失败消息的交换机和队列

2)定义一个RepublishMessageRecoverer,关联队列和交换机

完整代码:

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;@Configuration
public class ErrorMessageConfig {//定义处理失败消息的交换机和队列@Beanpublic DirectExchange errorMessageExchange(){return new DirectExchange("error.direct");}@Beanpublic Queue errorQueue(){return new Queue("error.queue", true);}@Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");}//定义RepublishMessageRecoverer,关联队列和交换机@Beanpublic MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");}
}

重启消费者服务,重复之前的测试:

  • 控制台中不再报错,显示重新发布失败的消息到"error.direct"交换机中,routingKey为"error"。
  • 查看队列内信息可以发现,不但mq消息在,而且还有异常报错信息。

5.总结

如何确保RabbitMQ消息的可靠性?

  • 开启生产者确认机制,确保生产者的消息能到达队列
  • 开启持久化功能,确保消息未消费前在队列中不会丢失
  • 开启消费者确认机制为auto,由spring确认消息处理成功后完成ack
    • 开启消费者失败重试机制,并设置MessageRecoverer,多次重试失败后将消息投递到异常交换机,交由人工处理

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_2480.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Learn Dijkstra For The Last Time

博客链接:https://www.codein.icu/learn-dijkstra/ Introduction Dijkstra 算法是用于求解非负权图单源最短路的经典算法。 市面上的大部分教程都仅仅停留在「如何实现 Dijkstra 算法」的层面。从应用角度,这当然无可厚非。但理解算法本身,也不失为一件乐事。 问自己这样几个…

自己de搭建博客记录

鸽子啊鸽子一去不复返自己de搭建博客记录因为奇奇怪怪的原因所以开始学着自己搭建一个博客了 但是估计搭好了也不会常更新,连博客园都咕了一个月了 先水水免得自己忘记了,要学的还有挺多 突然发现博客阅读量猛涨,看了下貌似是N2的插件文章被爬到各种奇怪网站了-1 参考资料 参…

The forked VM terminated without properly saying goodbye. VM crash or System.exit called?

1、maven构建报错: [INFO] BUILD FAILURE[INFO] ------------------------------------------------------------------------[INFO] Total time: 47.849 s[INFO] Finished at: 2022-08-19T08:05:22+08:00[INFO] ----------------------------------------------------------…

阿里云 EMAS Serverless 重磅发布

EMAS Serverless重磅发布, 为应用开发者特别是多端开发者实现一站式应用开发提供了全新的开发体验。为了更好的布道推广 Serverless 开发生态,现面向开发者/学生提供免费套餐(不是只1个月哦,你懂的),针对业务发展的不同阶段,提供多种套餐和按量付费模式,请来阿里云官网…

Windows安全加固

实验环境 操作系统:Windows Server 2012 系统密码: 安全加固项 1、用户系统 1.1 加固项名称: Administrator账户停用 加固说明: 防止 Administrator 账户被黑客爆破出密码,避免Administrator账户被黑客利用获取计算机系统权限。只有一个管理员账户时无法禁用,需要创建另一…

WPF实现一个简单自定义管道

先看效果 xaml代码 <UserControl x:Class="WPF控件测试.Control.Pipeline" xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" …

获取数的全部因子 单次查询/预处理

对于单次查询,可以直接用sqrt(n)遍历。 对于多次查询,每次都遍历会遍历多个无用的数。 可以采用打表法,直接获取数据范围内的全部数据的因子。 代码如下:int N = 100010; vector<int> factor[100010]; for (int i = 1; i <= N; i++) {for(int j=i;j<=N;j+=i)fa…

漫谈测试成长之探索——缺陷分析

​ 回顾校园生活中,我们参加每一场考试后都会对错题进行分析总结并补缺补漏,以便能更好地去应对更重要的考试。回到软件系统开发中,我们记录和跟踪缺陷的目的是什么,仅仅是为了在软件系统开发过程中跟踪Bug直至修复么?应该不止于此。我们也可以对项目缺陷进行分析,分析其…

SpringBoot读取.yml配置文件最常见的两种方式-源码及其在nacos的应用

一、前言 我们在开发中会经常遇到一些可能会变的值,比如数据库的密码,一些关键链接的配置等等。 都需要我们写在配置文件中,这样可以把这些配置文件放到nacos上进行管理,修改nacos的配置,咱们发布的包就可以动态的进行更新了,不需要重新修改打包在重新发包! 咱们今天就来…

数据结构开门篇

数据结构 1、什么是数据结构 数据结构是数据组织、管理和存储格式,其使用目的是为了高效地访问和修改数据2、时间复杂度和空间复杂度 什么是时间复杂度 时间复杂度是对一个算法运行时间长短的度量,用大O表示,记作 T(n)=O(f(n))如果运行时间是常数量级,则用常数1表示 只保留…

小红书数据 小红书爬虫 小红书接口 xhs

小红书数据 小红书接口 小红书api(小红书爬虫 xhs xiaohongshu 红书) 最新小红书APP接口稳定运行,主流接口都已部署,支持并发请求! 只抓取公开数据供作学习用途,不做引流上赞上粉业务,如有侵权,联系删除,谢谢! 只提供数据支持,不交流方案! 联系加q 2126851589! Git…

记一次有意思的 SQL 实现 → 分组后取每组的第一条记录

开心一刻今天,朋友气冲冲的走到我面前朋友:我不是谈了个女朋友,谈了三个月嘛,昨天我偷看她手机,你猜她给我备注什么我:备注什么?朋友:舔狗 2 号!我一听,气就上来了,说道:走,找她去,这婆娘确实该骂,臭不要脸的朋友拉住我,劝到:哎哎,不是去骂她,是找她理论,叫…

JavaIO流

JavaIO流 一、流的概念 内存与存储设备之间传输数据的通道。例如在内存中的程序,想要读取硬盘中的文件,必须借助流;在水库中的水流入家庭中(存储设备到内存) 二、流的分类 按方向: 输入流:将存储设备中的内容读入到内存中 输出流:将内存中的内容写入到存储设备中 流入程…

Oracle 序列学习与使用总结

Oracle序列学习与使用总结 by:授客 QQ:1033553122 简述 序列是oracle提供的用于生成一系列数字的数据库对象,序列会自动生成顺序递增的序列号,可用于提供唯一的自动递增主键。序列和视图一样,并不占用实际的存储空间,只是在数据字典中保存他的定义信息。 创建序列 当创建序…

JavaScript快速入门-04-运算符

4 运算符 4.1 算术运算符 4.1.1 概述JavaScript 提供的算术运算符如下所示:类型 符号 示例加法运算符 + a+b减法运算符 - a-b乘法运算符 * a*b除法运算符 / a/b余数运算符 % a%b自增运算符 ++ ++a/a++自减运算符 -- --a/a--指数运算符 ** a**b4.1.2 加法运算符加法运算符是最常…

电脑棒没有显示器解决方案

因为我的工作电脑是一个电脑棒(见下图),没有自带的显示屏,所以只能通过远程控制。但是据说TeamViewer可能会误判为商业使用(之后就不让你免费了),于是我就打算转ToDesk。刚开始,这个东西开机能够自启动,但是就是无法连接到服务器😓 (上图这玩意儿就是电脑棒) 之后…

select总结

select总结 我的一生 第1章-废物的一生 第50章-糟糕的婴儿 第300章-莫欺少年穷第600章-莫欺中年穷 第1000章-莫欺老年穷第1100章-不详的离去第1101章-棺材板的震动 第1150章-盗墓贼的眼泪 第1200章-死者为大

MICROSOFT SQL SERVER TO POSTGRESQL MIGRATION USING PGLOADER

To continue our migration series, today’s post will focus on pgloader. Pgloader is another Open Source data migration utility for PostgreSQL from MySQL and SQL Server. Today’s demo will migrate a sample database (StackOverflow) from MS SQL Server 2019 to…

阅读笔记: Robust Vehicle Localization in Urban Environments Using Probabilistic Maps

摘要 此篇是对Map-Based Precision Vehicle Localization in Urban Environments[4]工作的改进,在精度、地图更新、对环境改变和动态障碍物的鲁棒性方面都有所提升。具体而言,环境没有被建模为一个固定的反射强度网格,而是被建模成概率网格,每个网格都独立表达为一个对反射…