如何保证消息不丢失?——使用rabbitmq的死信队列!

news/2024/5/3 19:36:14/文章来源:https://blog.csdn.net/qq_62124267/article/details/137612737

如何保证消息不丢失?——使用rabbitmq的死信队列!

1、什么是死信

在 RabbitMQ 中充当主角的就是消息,在不同场景下,消息会有不同地表现。
死信就是消息在特定场景下的一种表现形式,这些场景包括:

    1. 消息被拒绝访问,即 RabbitMQ返回 basicNack 的信号时 或者拒绝basicReject
    1. 消费者发生异常,超过重试次数 。 其实spring框架调用的就是 basicNack
    1. 消息的Expiration 过期时长或队列TTL过期时间。
    1. 消息队列达到最大容量

上述场景经常产生死信,即消息在这些场景中时,被称为死信。

2、什么是死信队列

死信队列就是用于储存死信的消息队列,在死信队列中,有且只有死信构成,不会存在其余类型的消息。
死信队列在 RabbitMQ 中并不会单独存在,往往死信队列都会绑定这一个普通的业务消息队列,当所绑定的消息队列中,有消息变成死信了,那么这个消息就会重新被交换机路由到指定的死信队列中去,我们可以通过对这个死信队列进行监听,从而手动的去对这一消息进行补偿。 人工干预
在这里插入图片描述

3、那么,我们到底如何来使用死信队列呢?

死信队列基本使用,只需要在声明业务队列的时候,绑定指定的死信交换机和RoutingKey即可。

生产者

/** Copyright (c) 2020, 2024, fpl1116.cn All rights reserved.**/
package com.fpl.provider;import com.fpl.model.OrderingOk;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;/*** <p>Project: spring-rabbitmq - DeadProvider</p>* <p>Powered by fpl1116 On 2024-04-09 11:35:12</p>* <p>描述:<p>** @author penglei* @version 1.0* @since 1.8*/
@Service
public class DeadProvider {@Autowiredprivate RabbitTemplate rabbitTemplate;public void send(OrderingOk orderingOk) {rabbitTemplate.convertAndSend("Direct_E01", "RK01", orderingOk,new MessagePostProcessor(){@Overridepublic Message postProcessMessage(Message message) throws AmqpException {int id  = orderingOk.getId();int expiration = 0;if(id == 1){expiration = 50*1000;}else if(id == 2){expiration = 40*1000;}else if(id ==3){expiration = 30*1000;}else if(id ==4){expiration = 20*1000;}else if(id ==5){expiration = 10*1000;}//为每个消息设置过期时长,但是有可能造成最前面的一个消息未过期一直阻塞后面的消息不能被消费message.getMessageProperties().setExpiration(String.valueOf(expiration));return message;}});}
}

消费者

/** Copyright (c) 2020, 2024, fpl1116.cn All rights reserved.**/
package com.fpl.consumers;import com.fpl.model.OrderingOk;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;/*** <p>Project: spring-rabbitmq - DeadConsumer</p>* <p>Powered by fpl1116 On 2024-04-09 11:32:59</p>* <p>描述:<p>** @author penglei* @version 1.0* @since 1.8*/
//@Configuration
@Slf4j
public class DeadConsumer {//死信交换机@Beanpublic DirectExchange deadExchange(){return  ExchangeBuilder.directExchange("Dead_E01").build();}//死信队列@Beanpublic Queue deadQueue1(){return   QueueBuilder.durable("Dead_Q01").build();}//死信交换机与死信队列的绑定@Beanpublic Binding deadBinding1(Queue deadQueue1,DirectExchange deadExchange){return BindingBuilder.bind(deadQueue1).to(deadExchange).with("RK_DEAD");}//业务队列@Beanpublic Queue queue1(){return   QueueBuilder.durable("Direct_Q01").deadLetterExchange("Dead_E01").deadLetterRoutingKey("RK_DEAD")//.ttl(10*1000) //该属性是队列的属性,设置消息的过期时间,消息在队列里面停留时间n毫秒后,就会把这个消息投递到死信交换机,针对的是所有的消息//.maxLength(20) //设置队列存放消息的最大个数,x-max-length属性值,当队列里面消息超过20,会把队列之前的消息依次放进死信队列.build();}//业务交换机@Beanpublic DirectExchange exchange(){return  ExchangeBuilder.directExchange("Direct_E01").build();}//业务交换机与队列的绑定@Beanpublic Binding binding1(Queue queue1,DirectExchange exchange){return BindingBuilder.bind(queue1).to(exchange).with("RK01");}//    @RabbitListener(queues = "Direct_Q01")
//    public void receiveMessage(OrderingOk msg,Message message, Channel channel) throws IOException {
//
//        long deliveryTag = message.getMessageProperties().getDeliveryTag();
//
//        System.out.println("消费者1 收到消息:"+ msg +" tag:"+deliveryTag);
//
//        channel.basicReject(deliveryTag, false);
//        try {
//            // 处理消息...
//            int  i= 5/0;
//            // 如果处理成功,手动发送ack确认 ,Yes
//            channel.basicAck(deliveryTag, false);
//        } catch (Exception e) {
//            // 处理失败,可以选择重试或拒绝消息(basicNack或basicReject)  NO
//            channel.basicNack(deliveryTag, false, false); // 并重新入队
//
//        }
}//}

测试

@Testvoid test4() throws IOException {for (int i = 1; i <=5;i++){OrderingOk orderingOk = OrderingOk.builder().id(i).name("fpl " + i).build();deadProvider.send(orderingOk);System.out.println("发送成功:"+i);}System.in.read();}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_1045751.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

全国水科技大会 免费征集《水环境治理减污降碳协同增效示范案例》

申报时间截止到2024年4月15日&#xff0c;请各单位抓紧申报&#xff0c;申报条件及申报表请联系&#xff1a;13718793867 围绕水环境治理减污降碳协同增效领域&#xff0c;以资源化、生态化和可持续化为导向&#xff0c;面向生态、流城、城市、农村、工业园区、电力、石化、钢…

前端mock数据——使用mockjs进行mock数据

前端mock数据——使用mockjs进行mock数据 一、安装二、mockjs的具体使用 一、安装 首选需要有nodejs环境安装mockjs&#xff1a;npm install mockjs 若出现像上图这样的错&#xff0c;则只需npm install mockjs --legacy-peer-deps即可 src下新建mock文件夹&#xff1a; mo…

基于Java SpringBoot+Vue的体育用品库存管理系统

博主介绍&#xff1a;✌IT徐师兄、7年大厂程序员经历。全网粉丝15W、csdn博客专家、掘金/华为云//InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精彩专栏推荐订阅&#x1f447;&#x1f3…

LeetCode | 数组 | 二分查找 | 35.搜索插入位置【C++】

题目链接 题目描述 给定一个排序数组和一个目标值&#xff0c;在数组中找到目标值&#xff0c;并返回其索引。如果目标值不存在于数组中&#xff0c;返回它将会被按顺序插入的位置。 请必须使用时间复杂度为 O(log n) 的算法。 示例 1: 输入: nums [1,3,5,6], target 5 输出…

C++ 线程库(thread)与锁(mutex)

一.线程库(thread) 1.1 线程类的简单介绍 thread类文档介绍 在C11之前&#xff0c;涉及到多线程问题&#xff0c;都是和平台相关的&#xff0c;比如windows和linux下各有自己的接口&#xff0c;这使得代码的可移植性比较差。C11中最重要的特性就是对线程进行支持了&#xff…

eNSP-抓包解析TCP三次握手和四次挥手的过程

一、环境搭建 1.设备连接 并 启动所有设备 2.服务器配置 3.客服端配置 二、抓包测试 1.打开抓包软件 2.客户端获取数据 三、抓包结果

HEC-HMS水文模型

HEC-HMS是美国陆军工程兵团水文工程中心开发的一款水文模型。HMS能够模拟各种类型的降雨事件对流域水文&#xff0c;河道水动力以及水利设施的影响&#xff0c;在世界范围内得到了广泛的应用。它有着完善的前后处理软件&#xff0c;能有效减轻建模的负担&#xff1b;能够与HEC开…

如何用Vue实现实时网络状态监控:一篇让你轻松掌握前端网络连通性管理的指南

1、演示 2、网络监控目的 网络性能优化&#xff1a; 通过监控用户的网络状态&#xff0c;可以了解网络延迟、带宽利用率、丢包率等信息&#xff0c;从而优化网络性能&#xff0c;提升用户体验。 故障排除&#xff1a; 可以监控网络状态以及网络设备的运行情况&#xff0c;及时…

【现代C++】线程支持库

现代C&#xff08;C11及其之后的版本&#xff09;引入了标准的线程支持库&#xff0c;使得多线程编程变得更加简单和可移植。这个库提供了线程管理、互斥量、条件变量和其他同步原语。 1. std::thread - 基本线程 std::thread允许创建执行特定任务的线程。 #include <ios…

蓝桥杯-油漆面积

代码及其解析:(AC80%&#xff09; 思路:是把平面划成单位边长为1&#xff08;面积也是1&#xff09;的方格。每读入一个矩形&#xff0c;就把它覆盖的方格标注为已覆盖&#xff1b;对所有矩形都这样处理&#xff0c;最后统计被覆盖的方格数量即可。编码极其简单&#xff0c;但…

gradio简单搭建——关键词匹配筛选【进一步优化】

gradio简单搭建——关键词匹配筛选[进一步优化] 任务回顾新的想法&#xff1a;无效元素筛选界面搭建数据处理与生成过程交互界面展示 任务回顾 在 apply \text{apply} apply方法的使用一节中&#xff0c;简单提到了任务目标&#xff1a;通过关键词的形式&#xff0c;在文本数据…

三维点云:对原始点云数据进行体素化

文章目录 一、原始点云二、对原始点云进行体素化三、结果展示 一、原始点云 &#x1f349;原始点云为.pts文件&#xff0c;内容为x, y, z的坐标 原始点云展示 二、对原始点云进行体素化 使用open3d库实现&#xff0c;如果没有需要在命令行执行pip install open3d import o…

【STL】vector

目录 1. vector的使用 1.1 vector的定义 1.2 vector iterator 的使用 1.3 vector 空间增长问题 1.4 vector 增删查改 1.5 vector 迭代器失效问题&#xff08;重点&#xff09; 2.vector模拟实现 1. vector的使用 1.1 vector的定义 1.2 vector iterator 的使用 1.3 vecto…

【PDF-XSS攻击】Java项目-上传文件-解决PDF文件XSS攻击

文章目录 背景解决pdfbox依赖控制器代码PdfUtils工具类 验证最后源码参考 背景 上传xss-pdf造成存储型xss因为在浏览器直接预览的PDF&#xff0c;而不是预览&#xff0c;所以安全部门认为会有XSS漏洞 解决 安全部门修复建议 1、根据白名单的标签和属性对数据进行过滤&#…

linux大文件IO

在Linux中处理大文件&#xff08;通常指大小超过2GB的文件&#xff09;时&#xff0c;需要使用特定的系统调用和标志&#xff0c;以确保程序能够正确地处理大文件的读写。这主要是因为在32位系统上&#xff0c;传统的文件偏移量和文件大小使用off_t类型表示&#xff0c;它通常是…

揭秘ChatGPT预训练数据集

自大语言模型引领新一代的AI浪潮之后&#xff0c;对于Open AI发布的GPT系列LLM使用的数据集一直是行业内的谜&#xff0c;我们都知道&#xff0c;随着模型的参数量提升&#xff0c;预训练数据的使用量也同步增加&#xff0c;下面就让我们从相关论文和分析从探索GPT-X大模型的预…

地理信息系统(ArcGIS)在水文水资源、水环境中的应用

刘老师&#xff08;副教授&#xff09;&#xff1a;来自北京重点高校资深专家&#xff0c;长期从事水资源与水环境、流域污染控制与管理、非点源模拟与控制、环境信息系统开发、环境遥感与GIS应用等领域的研究&#xff0c;发表多篇Sci论文、具有资深的技术底蕴和专业背景。 1、…

wps可以打钩的框框

方法一&#xff1a; 输入2611&#xff0c;按下altx 方法二&#xff1a; R 选中后->开始->字体wingdings字体

自动驾驶硬件系统-激光雷达(Lidar)测量模型

自动驾驶硬件系统-激光雷达(Lidar)测量模型 激光雷达(Lidar, Light Detection And Ranging)是Google系自动驾驶技术路线广泛应用的硬件传感器。 附赠自动驾驶学习资料和量产经验&#xff1a;链接 1、激光雷达(Lidar)的工作原理 通过持续不断的发射激光束&#xff0c;激光束遇…

winform入门篇3 -- 手工创建窗口

手工创建窗口 Form, 窗口 可以手工创建一个窗口类 class MyFrom : Form { } 1.创建一个windows 窗体应用 这样就自动创建了一个窗体应用Form1 现在不使用这个自动创建的&#xff0c;手工写一个 2.手动创建 1.删除Form1.cs 2.添加 新建MyForm 类 让该类继承Form 在构造…