RabbitMQ实现死信队列

news/2024/3/29 23:17:42/文章来源:https://blog.csdn.net/lianaozhe/article/details/129236677

目录

  • 死信队列是什么
  • 怎样实现一个死信队列
    • 说明
    • 实现过程
      • 导入依赖
      • 添加配置
      • 编写mq配置类
      • 添加业务队列的消费者
      • 添加死信队列的消费者
      • 添加消息发送者
      • 添加消息测试类
      • 测试
  • 死信队列的应用场景
  • 总结

死信队列是什么

“死信”是RabbitMQ中的一种消息机制,当你在消费消息时,如果队列里的消息出现以下情况:

  • 消息被否定确认,使用 channel.basicNack 或 channel.basicReject ,并且此时requeue 属性被设置为false。
  • 消息在队列的存活时间超过设置的TTL时间。
  • 消息队列的消息数量已经超过最大队列长度。

那么该消息将成为“死信”。

“死信”消息会被RabbitMQ进行特殊处理,如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃。

怎样实现一个死信队列

说明

配置死信队列大概可以分为三个步骤:

1.配置业务队列,绑定到业务交换机上

2.为业务队列配置死信交换机和路由key

3.为死信交换机配置死信队列

注意,并不是直接声明一个公共的死信队列,然后所以死信消息就自己跑到死信队列里去了。而是为每个需要使用死信的业务队列配置一个死信交换机,这里同一个项目的死信交换机可以共用一个,然后为每个业务队列分配一个单独的路由key。

实现过程

导入依赖

        <!--RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

添加配置

spring:      #rabbitmqrabbitmq:host: 83.136.16.134password: guestusername: guestlistener:type: simplesimple:default-requeue-rejected: falseacknowledge-mode: manual

编写mq配置类

代码里面有详细说明,这里不在赘述。

package com.miaosha.study.mq;import com.sun.org.apache.regexp.internal.RE;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/*** @Author: laz* @CreateTime: 2023-02-27  09:16* @Version: 1.0*/
@Configuration
public class RabbitmqConfig {/*** 业务交换机*/public static final String BUSINESS_EXCHANGE_NAME = "business.exchange";/*** 业务队列a*/public static final String BUSINESS_QUEUEA_NAME = "business.queue.a";/*** 业务交换机b*/public static final String BUSINESS_QUEUEB_NAME = "business.queue.b";/*** 死信交换机*/public static final String DEAD_LETTER_EXCHANGE_NAME = "dead.letter.exchange";/*** 死信队列a*/public static final String DEAD_LETTER_QUEUEA_NAME = "dead.letter.queue.a";/*** 死信队列b*/public static final String DEAD_LETTER_QUEUEB_NAME = "dead.letter.queue.b";/*** 死信队列路由键a*/public static final String DEAD_LETTER_QUEUEA_ROUNTING_KEY_NAME = "dead.letter.queue.a.rounting.key";/*** 死信队列路由键b*/public static final String DEAD_LETTER_QUEUEB_ROUNTING_KEY_NAME = "dead.letter.queue.b.rounting.key";/*** 申明业务交换机* @return*/@Beanpublic FanoutExchange businessExchange(){return new FanoutExchange(BUSINESS_EXCHANGE_NAME);}/*** 申明死信交换机* @return*/@Beanpublic DirectExchange deadletterExchange(){return new DirectExchange(DEAD_LETTER_EXCHANGE_NAME);}/*** 申明业务队列a* @return*/@Beanpublic Queue queuea(){Map<String,Object> map = new HashMap<>();//绑定死信交换机map.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE_NAME);//绑定的死信路由键map.put("x-dead-letter-routing-key",DEAD_LETTER_QUEUEA_ROUNTING_KEY_NAME);return QueueBuilder.durable(BUSINESS_QUEUEA_NAME).withArguments(map).build();}/*** 申明业务队列b* @return*/@Beanpublic Queue queueb(){Map<String,Object> map = new HashMap<>();//绑定死信交换机map.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE_NAME);//绑定的死信路由键map.put("x-dead-letter-routing-key",DEAD_LETTER_QUEUEB_ROUNTING_KEY_NAME);return QueueBuilder.durable(BUSINESS_QUEUEB_NAME).withArguments(map).build();}/*** 申明死信队列a* @return*/@Beanpublic Queue deadletterQueuea(){return new Queue(DEAD_LETTER_QUEUEA_NAME);}/*** 申明死信队列b* @return*/@Beanpublic Queue deadletterQueueb(){return new Queue(DEAD_LETTER_QUEUEB_NAME);}/*** 队列a绑定到业务交换机* @return*/@Beanpublic Binding businessBindinga(){return BindingBuilder.bind(queuea()).to(businessExchange());}/*** 队列b绑定到业务交换机* @return*/@Beanpublic Binding businessBindingb(){return BindingBuilder.bind(queueb()).to(businessExchange());}/*** 死信队列a绑定到死信交换机* @return*/@Beanpublic Binding deadletterBindinga(){return BindingBuilder.bind(deadletterQueuea()).to(deadletterExchange()).with(DEAD_LETTER_QUEUEA_ROUNTING_KEY_NAME);}/*** 死信队列b绑定到死信交换机* @return*/@Beanpublic Binding deadletterBindingB(){return BindingBuilder.bind(deadletterQueueb()).to(deadletterExchange()).with(DEAD_LETTER_QUEUEB_ROUNTING_KEY_NAME);}
}

添加业务队列的消费者

package com.miaosha.study.mq;import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;import static com.miaosha.study.mq.RabbitmqConfig.BUSINESS_QUEUEA_NAME;
import static com.miaosha.study.mq.RabbitmqConfig.BUSINESS_QUEUEB_NAME;/*** @Author: laz* @CreateTime: 2023-02-27  09:53* @Version: 1.0*/
@Slf4j
@Component
public class RabbitmqReceiver {/*** 监听业务队列a* @param message*/@RabbitListener(queues = BUSINESS_QUEUEA_NAME)public void queuea(Message message, Channel channel) throws IOException {String msg = new String(message.getBody());log.info("业务队列A接受到消息【{}】",msg);boolean ack = true;Exception exception = null;try {//这里模拟业务逻辑出现异常的情况if (msg.contains("fail")){throw new RuntimeException("dead letter exception");}} catch (Exception e){ack = false;exception = e;}//当ack为false时(业务逻辑出现异常),说明当前消息消费异常,这里直接放入死信队列if (!ack){log.error("业务队列A消费发生异常,error msg:{}", exception.getMessage());/*** void basicNack(long deliveryTag, boolean multiple, boolean requeue)* 参数一:当前消息的唯一id* 参数二:是否针对多条消息* 参数三:是否从新入队列*/channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);} else {channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}}/*** 监听业务队列b* @param msg*/@RabbitListener(queues = BUSINESS_QUEUEB_NAME)public void queueb(Message msg,Channel channel) throws Exception{String str = new String(msg.getBody());log.info("业务队列B接受到消息【{}】",str);channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);}
}

添加死信队列的消费者

package com.miaosha.study.mq;import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;import static com.miaosha.study.mq.RabbitmqConfig.*;/*** @Author: laz* @CreateTime: 2023-02-27  09:58* @Version: 1.0*/
@Slf4j
@Component
public class DeadLetterReceiver {/*** 监听业务队列a* @param msg*/@RabbitListener(queues = DEAD_LETTER_QUEUEA_NAME)public void queuea(Message msg, Channel channel) throws IOException {String str = new String(msg.getBody());log.info("死信队列A接受到消息【{}】",str);channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);log.info("死信消息properties:{}", msg.getMessageProperties());}/*** 监听业务队列b* @param msg*/@RabbitListener(queues = DEAD_LETTER_QUEUEB_NAME)public void queueb(Message msg, Channel channel) throws IOException {String str = new String(msg.getBody());log.info("死信队列B接受到消息【{}】",str);channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);log.info("死信消息properties:{}", msg.getMessageProperties());}
}

添加消息发送者

package com.miaosha.study.mq;import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import static com.miaosha.study.mq.RabbitmqConfig.BUSINESS_EXCHANGE_NAME;/*** @Author: laz* @CreateTime: 2023-02-27  09:49* @Version: 1.0*/
@Component
public class RabbitmqSender {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMsg(String msg){rabbitTemplate.convertAndSend(BUSINESS_EXCHANGE_NAME,"",msg);}
}

添加消息测试类

package com.miaosha.study.controller;import com.miaosha.study.mq.RabbitmqSender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;/*** @Author: laz* @CreateTime: 2023-02-27  09:59* @Version: 1.0*/
@RestController
@RequestMapping("mq")
public class TestController {@Autowiredprivate RabbitmqSender rabbitmqSender;@RequestMapping("testDeadLetterQueue/{msg}")public void testDeadLetterQueue(@PathVariable("msg")String msg){rabbitmqSender.sendMsg(msg);}
}

测试

运行项目,访问:http://localhost:8081/mq/testDeadLetterQueue/msg

在这里插入图片描述

可以看到,此时只有业务消费者消费了消息,死信队列并没有消费到消息。

然后根据消费者里面的逻辑,我们发送一条 ‘fail’的消息,再次测试

访问:http://localhost:8081/mq/testDeadLetterQueue/fail

在这里插入图片描述

可以看到,死信队列a已收到消息。到此实现死信队列的流程就通了。

注意:我们的死信消息MessageProperties中的内容比较多,代表的含义分别是:

字段名含义
x-first-death-exchange第一次被抛入的死信交换机的名称
x-first-death-reason第一次成为死信的原因,rejected:消息在重新进入队列时被队列拒绝,由于default-requeue-rejected 参数被设置为false。expired :消息过期。maxlen : 队列内消息数量超过队列最大容量
x-first-death-queue第一次成为死信前所在队列名称
x-death历次被投入死信交换机的信息列表,同一个消息每次进入一个死信交换机,这个数组的信息就会被更新

死信队列的应用场景

一般用在较为重要的业务队列中,确保未被正确消费的消息不被丢弃,一般发生消费异常可能原因主要有由于消息信息本身存在错误导致处理异常,处理过程中参数校验异常,或者因网络波动导致的查询异常等等,当发生异常时,当然不能每次通过日志来获取原消息,然后让运维帮忙重新投递消息。通过配置死信队列,可以让未正确处理的消息暂存到另一个队列中,待后续排查清楚问题后,编写相应的处理代码来处理死信消息,这样比手工恢复数据要好太多了。

总结

死信队列其实并没有什么神秘的地方,不过是绑定在死信交换机上的普通队列,而死信交换机也只是一个普通的交换机,不过是用来专门处理死信的交换机。

死信消息的生命周期:

  1. 业务消息被投入业务队列
  2. 消费者消费业务队列的消息,由于处理过程中发生异常,于是进行了nck或者reject操作
  3. 被nck或reject的消息由RabbitMQ投递到死信交换机中
  4. 死信交换机将消息投入相应的死信队列
  5. 死信队列的消费者消费死信消息

本篇文章到此结束!希望对您有所帮助。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_74803.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

wafw00f 防火墙探测

kali机器自带防火墙探测工具wafw00&#xff0c;它可以通过发送正常以及不正常甚至包含恶意代码的HTTP请求&#xff0c;来探测网站是否存在防火墙&#xff0c;并识别防火墙的厂商及类型。安装&#xff1a;git clone https://github.com/EnableSecurity/wafw00f.git python setup…

Windows如何查看某个端口被占用的情况?

在工作中&#xff0c;有时会发现端口被占用的情况&#xff0c;导致软件报错或者服务无法启动等问题。在不知道具体哪个进程占用该端口号的情况下&#xff0c;我们可以用下面方法来查找。 举例&#xff1a;我现在发现8090端口被占用了&#xff0c;我现在需要找到并杀掉该进程。…

TCP状态转换

欢迎关注博主 Mindtechnist 或加入【Linux C/C/Python社区】一起探讨和分享Linux C/C/Python/Shell编程、机器人技术、机器学习、机器视觉、嵌入式AI相关领域的知识和技术。 TCP状态转换专栏&#xff1a;《Linux从小白到大神》《网络编程》 TCP状态转换示意图如下 针对上面的示…

高并发之读多写少的场景设计(用户中心)

用户中心是一个典型的读多写少系统&#xff0c;可以说我们大部分的系统都属于这种类型&#xff0c;而这类系统通过缓存就能获得很好的性能提升。并且在流量增大后&#xff0c;用户中心通常是系统改造中第一个要优化的模块&#xff0c;因为它常常和多个系统重度耦合&#xff0c;…

消息队列介绍和RabbitMQ的安装

1.消息队列 1.1 MQ的相关概念 1.1.1 什么是MQ MQ(message queue)&#xff0c;从字面意思上看&#xff0c;本质是个队列&#xff0c;FIFO 先入先出&#xff0c;只不过队列中存放的内容是message 而已&#xff0c;还是一种跨进程的通信机制&#xff0c;用于上下游传递消息。在…

高阶人生从在职读研开始——中国社科院与美国杜兰大学金融管理硕士

说到学历&#xff0c;好多人都不太在意&#xff0c;感觉学历没什么用。等升职学历被卡时&#xff0c;等你想考公学历达不到时&#xff0c;当你想跳槽更大的平台时&#xff0c;学历会显得尤其重要。当机会来临时&#xff0c;我们应该做好全足的准备&#xff0c;而不是眼瞅着机会…

SpringBoot相关操作

01-今日内容 Spring概述、快速入门SpringBoot配置SpringBoot整合 02-SpringBoot概述 SpringBoot提供了一种快速使用Spring的方式&#xff0c;基于约定优于配置的思想&#xff0c;可以让开发人员不必在配置与逻辑业务之间进行思维的切换&#xff0c;全身心的投入到逻辑业务的…

金融信创步入快车道,应“需”而生的监控易运维方案为国产化助力

在我国“28N”信创三步走战略中&#xff0c;金融信创赫然名列其中&#xff0c;成为最早践行信创理论与实操的行业之一。截止到目前&#xff0c;金融信创渗透率业已仅次于党政部门&#xff0c;位列“8”大重点行业之首。超快的发展速度&#xff0c;让金融信创较早的步入“买方市…

浅谈模型评估选择及重要性

作者&#xff1a;王同学 来源&#xff1a;投稿 编辑&#xff1a;学姐 模型评估作为机器学习领域一项不可分割的部分&#xff0c;却常常被大家忽略&#xff0c;其实在机器学习领域中重要的不仅仅是模型结构和参数量&#xff0c;对模型的评估也是至关重要的&#xff0c;只有选择那…

前端开发:JS的节流与防抖

前言 在前端实际开发中&#xff0c;有关JS原生的节流和防抖处理也是很重要的点&#xff0c;关于底层和原理的掌握使用&#xff0c;尤其是在性能优化方面甚为重要。作为前端开发的进阶内容&#xff0c;在实际开发过程中节流和防抖通常都是项目优化的必要手段&#xff0c;而且也是…

机器学习笔记之近似推断(二)推断的核心思想

机器学习笔记之近似推断——推断的核心思想引言回顾&#xff1a;推断的目的与困难推断的目的推断的困难推断的核心思想——优化引言 上一节介绍了从深度学习的角度介绍了推断&#xff0c;并介绍了推断的目的和困难。本节将继续介绍推断的核心思想。 回顾&#xff1a;推断的目…

写给交互设计新手的信息架构全方位指南

目录什么是信息架构&#xff1f;通用方法日常工作可以关注的大神常用工具相关书籍什么是信息架构&#xff1f;信息架构是一个比众多其他领域更难定义的领域。内容策划由内容策划师来完成&#xff0c;交互设计由设计师来完成&#xff0c;而信息架构的完成与它们不同&#xff0c;…

达梦数据库(DM8)集成使用 Geotools(27.2)

达梦数据库&#xff08;DM8&#xff09;集成使用 Geotools&#xff08;27.2&#xff09;系统环境版本达梦 8 集成 Geotools 环境安装达梦8&#xff0c;请参照项目 pom.xml 添加 geotools 配置项目 pom.xml 添加达梦数据库驱动包Geotools 使用示例Geotools 连接数据库Geotools 空…

CLion Remote Debug CrossCompile

CLion远程Docker调试ROS(交叉编译)的设置步骤 准备一个好用的docker&#xff0c;运行起来&#xff08;Docker Image一定可以跑cuda和图形界面的&#xff0c;否则启动不了CLion&#xff0c;可以不用浪费时间看本教程了&#xff09; 在docker镜像中配置好ssh和rsync&#xff0c;…

测量 R 代码运行时间的 5 种方法

简介 平常在撰写论文时&#xff0c;会需要比较算法之间的计算时间。本篇文章给出几种测量 R 代码运行时间的方法。本文是小编学习过程中的笔记&#xff0c;主要参考博客1&#xff0c;2。 1. 使用 Sys.time() 小编通常使用 Sys.time() 函数来计算时间。首先记录当前运行时刻&…

数据结构与算法之Huffman tree(赫夫曼树 / 霍夫曼树 / 哈夫曼树 / 最优二叉树)

目录赫夫曼树概述定义构造赫夫曼树步骤代码实现赫夫曼树概述 HuffmanTree因为翻译不同所以有其他的名字&#xff1a;赫夫曼树、霍夫曼树、哈夫曼树 赫夫曼树又称最优二叉树&#xff0c;是一种带权路径长度最短的二叉树。所谓树的带权路径长度&#xff0c;就是树中所有的叶结点…

如何在logback.xml中自定义动态属性

原文地址&#xff1a;http://blog.jboost.cn/trick-logback-prop.html 当使用logback来记录Web应用的日志时&#xff0c;我们通过在logback.xml中配置appender来指定日志输出格式及输出文件路径&#xff0c;这在一台主机或一个文件系统上部署单个实例没有问题&#xff0c;但是…

超店有数分享:2023还有哪些tiktok数据值得关注?

目前&#xff0c;tiktok是全球增长最迅猛的社交媒体软件之一。很多商家瞄准了tiktok的变现转化潜力&#xff0c;纷纷入局tiktok电商赛道。在入局这个赛道之前&#xff0c;我们需要了解一些tiktok的相关数据&#xff0c;这样才能更好的了解大局&#xff0c;及时调整自己的业务情…

Python 简单可变、复杂可变、简单不可变、复杂不可变类型的copy、deepcopy的行为

copy模块&#xff1a;copy&#xff1a;浅拷贝deepcopy&#xff1a;深拷贝简单可变类型、复杂可变的copy()、deepcopy()&#xff1a;简单不可变、复杂不可变类型的copy()、deepcopy()&#xff1a;结论&#xff1a;对于简单类型的可变类型copy是深拷贝&#xff0c;改变了该拷贝变…

TIA博途Wincc中自定义配方画面的具体方法示例

TIA博途Wincc中自定义配方画面的具体方法示例 前面和大家分享了通过TIA博途自带的配方视图组态配方功能的具体方法,具体内容可参考以下链接中的内容: TIA PORTAL wincc中配方recipe组态及配方视图的使用方法 但是,使用配方视图的时候感觉不是很方便,同时一部分使用人员也感…