【RabbitMQ七】——RabbitMQ发布确认模式(Publisher Confirms)

news/2024/4/20 20:56:21/文章来源:https://blog.csdn.net/wangwei021933/article/details/129253863

RabbitMQ发布确认模式

  • 前言
  • 如何实现发布确认
  • 发布确认模式有三种策略
    • 单独发布消息
      • 执行结果
    • 批量发布消息
      • 执行结果
    • 异步处理发布确认
      • 执行结果
      • 思考点
        • 如何追踪未完成的确认?
        • 重新发布丢失的消息
  • 总结
    • 收获

前言

发布确认是解决消息不丢失的重要环节,在设置队列持久化、消息持久化的基础上,设置发布确认,一旦生产者投递消息之后,如果Broker接收到消息,会给生产者一个应答。
生产者进行接收应答,用来确认这条消息是否正常发送到Broker。生产者也可以根据收没有收到这条消息的应答进行相应的处理。

如何实现发布确认

发布者确认在通道级别使用confirmSelect方法启用

Channel channel = connection.createChannel();
channel.confirmSelect();

发布确认模式有三种策略

  1. 单独发布消息,同步等待确认:简单,但吞吐量非常有限。
  2. 批量发布消息,等待批量的同步确认:简单、合理的吞吐量,但是很难判断出什么时候出了问题。
  3. 异步处理:最佳的性能和资源利用,良好的控制情况下的错误,但涉及到正确的实现,相对复杂

单独发布消息

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;/*** @author : [WangWei]* @version : [v1.0]* @className : Producer* @description : [生产者]* @createTime : [2023/2/1 9:38]* @updateUser : [WangWei]* @updateTime : [2023/2/1 9:38]* @updateRemark : [描述说明本次修改内容]*/
public class Producer {private static final String QUEUE_NAME = "Confirm";public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//建立连接RabbitMQUtils.getConnection();//声明通道Channel channel = RabbitMQUtils.getChannel();//开启确认模式channel.confirmSelect();channel.queueDeclare(QUEUE_NAME,true,false,false,null);long start = System.currentTimeMillis();//循环发送2条消息for (int i = 0; i <200 ; i++) {String msg="消息确认模式消息:"+i;/*推送消息*交换机命名,不填写使用默认的交换机* routingKey -路由键-* props:消息的其他属性-路由头等正文* msg消息正文*/channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));// uses a 5 second timeout//如果超时过期,则抛出TimeoutException。如果任何消息被nack(丢失), waitForConfirmsOrDie将抛出IOException。channel.waitForConfirmsOrDie(5_000);}long end = System.currentTimeMillis();System.out.println("发送200条消息使用时间:"+(end-start));}
}

执行结果

在这里插入图片描述

waitForConfirmsOrDie(long)方法等待其确认。该方法在确认消息后立即返回。如果消息在超时内没有得到确认,或者消息被nack-ed(意味着代理由于某种原因无法处理它),该方法将抛出异常。异常的处理通常包括记录错误消息和/或重新尝试发送该消息。

这种技术非常简单,但也有一个主要缺点:它大大减慢了发布速度,因为对消息的确认会阻塞所有后续消息的发布。这种方法交付的吞吐量不会超过每秒发布的几百条消息。

批量发布消息

发布一批消息并等待整个批消息被确认。以100个为例

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;/*** @author : [WangWei]* @version : [v1.0]* @className : Producer* @description : [生产者]* @createTime : [2023/2/1 9:38]* @updateUser : [WangWei]* @updateTime : [2023/2/1 9:38]* @updateRemark : [描述说明本次修改内容]*/
public class Producer2 {private static final String QUEUE_NAME = "Confirm";public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//用于记录每发布100条消息进行一次确认int batchSize = 100;int outstandingMessageCount = 0;//建立连接RabbitMQUtils.getConnection();//声明通道Channel channel = RabbitMQUtils.getChannel();//开启确认模式channel.confirmSelect();//声明持久化队列channel.queueDeclare(QUEUE_NAME,true,false,false,null);long start = System.currentTimeMillis();//循环发送200条消息for (int i = 0; i <200 ; i++) {String msg="消息确认模式消息:"+i;/*推送消息*交换机命名,不填写使用默认的交换机* routingKey -路由键-* props:消息的其他属性-路由头等正文* msg消息正文*/channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));outstandingMessageCount++;//每100次确认一次if (outstandingMessageCount == batchSize) {// uses a 5 second timeout//如果超时过期,则抛出TimeoutException。如果任何消息被nack(丢失), waitForConfirmsOrDie将抛出IOException。channel.waitForConfirmsOrDie(5_000);outstandingMessageCount = 0;}}if (outstandingMessageCount > 0) {channel.waitForConfirmsOrDie(5_000);}long end = System.currentTimeMillis();System.out.println("发送200条消息使用时间:"+(end-start));}
}

执行结果

在这里插入图片描述
与等待单个消息的确认相比,等待一批消息被确认大大提高了吞吐量(对于远程RabbitMQ节点,最多可提高20-30倍)。一个缺点是,在失败的情况下,我们无法确切地知道哪里出了问题,因此我们可能不得不在内存中保留整个批处理,以记录有意义的内容或重新发布消息。而且这种解决方案仍然是同步的,因此它会阻止消息的发布。

异步处理发布确认

异步处理发行商确认通常需要以下步骤:

  1. 提供一种将发布序列号与消息关联起来的方法。
  2. 在通道上注册一个确认侦听器,以便在发布者acks/nacks到达时得到通知,以执行适当的操作,如记录日志或重新发布nack-ed消息。在此步骤中,序列号到消息的关联机制也可能需要进行一些清理。
  3. 在发布消息之前跟踪发布序列号。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import com.rabbitmq.client.MessageProperties;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeoutException;/*** @author : [WangWei]* @version : [v1.0]* @className : Producer* @description : [生产者]* @createTime : [2023/2/1 9:38]* @updateUser : [WangWei]* @updateTime : [2023/2/1 9:38]* @updateRemark : [描述说明本次修改内容]*/
public class Producer3 {private static final String QUEUE_NAME = "Confirm";public static void main(String[] args) throws IOException, TimeoutException {//建立连接RabbitMQUtils.getConnection();//声明通道Channel channel = RabbitMQUtils.getChannel();//开启确认模式channel.confirmSelect();//用于保存序列号与消息之前映射的容器ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();//清理映射关系的回调//multiple 一个布尔值,如果为false则说明只有一条消息被确认或者丢失,如果为true这表示有小于或等于sequenceNumber的消息被确认或者丢失。ConfirmCallback cleanOutstandingConfirms = (sequenceNumber, multiple) -> {if (multiple) {//如果包含值为true则返回该映射的部分视图,其键值小于或等于,sequenceNumberConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(sequenceNumber, true);//移除所有key和valueconfirmed.clear();} else {//移除序列号对象的消息outstandingConfirms.remove(sequenceNumber);}};//两个回调一个用于确认消息,另一个用于nack-ed消息(可以被代理视为丢失的消息)。//sequenceNumber用于标识确认的消息或者丢失的消息//
//        channel.addConfirmListener((sequenceNumber,multiple)->{
//            String body = outstandingConfirms.get(sequenceNumber);
//            System.out.println("ack message:"+body);
//        },(sequenceNumber,multiple)->{
//            //当消息被丢失时。。。。
//            String body = outstandingConfirms.get(sequenceNumber);
//            System.out.println("no ack message:"+body);
//        });//当消息缺失的回调channel.addConfirmListener(cleanOutstandingConfirms, (sequenceNumber, multiple) -> {String body = outstandingConfirms.get(sequenceNumber);System.out.println("没有确认的消息:"+body);//重新使用回调来清理映射cleanOutstandingConfirms.handle(sequenceNumber, multiple);});//声明持久化队列channel.queueDeclare(QUEUE_NAME,true,false,false,null);long start = System.nanoTime();//循环发送2条消息for (int i = 0; i <200 ; i++) {String msg="消息确认模式消息:"+i;//使用map将发布序列号与消息的字符串体关联起来//channel.getNextPublishSeqNo()获取下一个消息的序列号outstandingConfirms.put(channel.getNextPublishSeqNo(), msg);/*推送消息*交换机命名,不填写使用默认的交换机* routingKey -路由键-* props:消息的其他属性-路由头等正文* msg消息正文*/channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));System.out.println(msg+"发布成功");}long end = System.nanoTime();System.out.println("发送200条消息使用时间:"+ Duration.ofNanos(end - start).toMillis());}
}

执行结果

在这里插入图片描述

思考点

如何追踪未完成的确认?

示例使用ConcurrentNavigableMap来跟踪未完成的确认。这种数据结构的方便有以下几个原因。
它允许轻松地将序列号与消息(无论消息数据是什么)关联起来,并轻松地将条目清理到给定的序列id(以处理多个确认/nack)。
最后,它支持并发访问,因为确认回调是在客户端库拥有的线程中调用的,应该与发布线程保持不同。
除了使用复杂的映射实现之外,还有其他方法可以跟踪未完成的确认,比如使用简单的并发散列映射和变量来跟踪发布序列的下界,但它们通常更复杂,

重新发布丢失的消息

从相应的回调中重新发布nack-ed消息可能很诱人,但应该避免这种情况,因为确认回调是在I/O线程中分派的,其中通道不应该执行操作。更好的解决方案是将消息放入由发布线程轮询的内存队列中。像ConcurrentLinkedQueue这样的类可以很好地在确认回调和发布线程之间传输消息。

总结

在某些应用程序中,确保发布的消息到达代理非常重要。发布者确认RabbitMQ的特性可以帮助满足这一要求。发布者确认本质上是异步的,但也可以同步地处理它们。没有确定的方法来实现发布者确认,这通常归结于应用程序和整个系统中的约束。典型的技术有:
单独发布消息,同步等待确认:简单,但吞吐量非常有限。
批量发布消息,等待批量的同步确认:简单、合理的吞吐量,但是很难判断出什么时候出了问题。
异步处理:最佳的性能和资源利用,良好的控制情况下的错误,但是实现较为复杂

收获

1.从官网学习rabbitmq的使用,严格落实先宏观,后微观,结合思维导图和三遍读书法.
2.学习的过程中,锤炼英文阅读,动手查字典和利用工具.
3. 理论和实践结合,理论学习完成之后,及时进行实践包括先代码实现,阶段总结等等.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_75306.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MySQL实战解析底层---基础架构:一条SQL查询语句是如何执行的?

目录 前言 连接器 查询缓存 分析器 优化器 执行器 前言 平时使用数据库&#xff0c;看到的通常都是一个整体比如&#xff0c;有个最简单的表&#xff0c;表里只有一个 ID 字段&#xff0c;在执行下面这个查询语句时&#xff1a; 看到的只是输入一条语句&#xff0c;返回…

Billu靶场黑盒盲打——思路和详解

一、信息收集 1、探测内网主机IP可以使用各种扫描工具比如nmap&#xff0c;我这里用的是自己编写的。 nmap -n 192.168.12.0/24 #扫描IP&#xff0c;发现目标主机 2、先不着急&#xff0c;先收集一波它的端口&#xff08;无果&#xff09; nmap -n 192.168.12.136 -p 1-10000…

华为OD机试题,用 Java 解【靠谱的车】问题

最近更新的博客 华为OD机试题,用 Java 解【停车场车辆统计】问题华为OD机试题,用 Java 解【字符串变换最小字符串】问题华为OD机试题,用 Java 解【计算最大乘积】问题华为OD机试题,用 Java 解【DNA 序列】问题华为OD机试 - 组成最大数(Java) | 机试题算法思路 【2023】使…

字符串反转-课后程序(JAVA基础案例教程-黑马程序员编著-第九章-课后作业)

【案例9-2】 字符串反转 【案例介绍】 1.案例描述 在使用软件或浏览网页时&#xff0c;总会查询一些数据&#xff0c;查询数据的过程其实就是客户端与服务器交互的过程。用户&#xff08;客户端&#xff09;将查询信息发送给服务器&#xff0c;服务器接收到查询消息后进行处…

数据仓库-数仓分层

层级 全拼 职责划分 ODS(源数据层) Operational DataStore ODS层存储最原始的数据&#xff0c; 对数据不做任何加工处理&#xff1b; 源数据主要来自业务数据库和日志&#xff0c;这些数据是用户操作业务系统产生&#xff0c;所以叫操作型数据(Operational Data) 。 DWD(…

MySQL数据库操作

查看数据库语法show databases——列出所有的数据库 show databases [ like wild ];——列出和字符串wild名字相同的数据库 这里可以配合SQl的 "%" 和 "_" 通配符使用来查找多个数据库在SQL语句中"%"代表任意字符出现任意次数,"_"代表…

为什么要学习C++软件调试技术?掌握这类技术都有哪些好处?

目录 1、为什么要学习C软件调试技术&#xff1f; 1.1、IDE调试手段虽必不可少&#xff0c;但还不够 1.2、通过查看日志和代码去排查异常崩溃问题&#xff0c;费时费力&#xff0c;很难定位问题 1.3、有的问题很难复现&#xff0c;可能只在客户的环境才能复现 1.4、为了应对…

短视频美颜sdk人脸编辑技术详解、美颜sdk代码分析

短视频美颜sdk中人脸编辑技术可以将人像风格进行转变&#xff0c;小编认为这也是未来的美颜sdk的一个重要发展方向&#xff0c;下文小编将为大家讲解一下短视频美颜sdk中人脸编辑的关键点。 一、人脸编辑的细分关键点 1、年龄 通过更改人脸的年龄属性&#xff0c;可用于模仿人…

攻不下dfs不参加比赛(七)

标题 为什么练dfs题目总结重点为什么练dfs 相信学过数据结构的朋友都知道dfs(深度优先搜索)是里面相当重要的一种搜索算法,可能直接说大家感受不到有条件的大家可以去看看一些算法比赛。这些比赛中每一届或多或少都会牵扯到dfs,可能提到dfs大家都知道但是我们为了避免眼高手…

AST之path常用属性和方法总结笔记

文章目录1. path常用属性总结1.1 path.node1.2 path.scope1.3 path.parentPath1.4 path.parent1.5 path.container1.6 path.type1.7 path.key2. path常用方法总结2.1 path.toString2.2 path.replaceWith2.3 path.replaceWithMultiple2.4 path.remove2.5 path.insertBefore2.6 p…

Android 蓝牙开发——HCI log 分析(二十)

HCI log 是用来分析蓝牙设备之间的交互行为是否符合预期,是否符合蓝牙规范。对于蓝牙开发者来说,通过 HCI log 可以帮助我们更好地分析问题,理解蓝牙协议。 一、抓取HCI log 1、手机抓取HCI log 在开发者选项中打开启用蓝牙HCI信息收集日志开关,Android系统就开始自动地收…

在中外合作办学硕士领域似乎自己一直在纠结,也许是为了能遇见人大女王金融硕士

2023考研成绩如期而至&#xff0c;还记得考试时的一幕幕吗&#xff1f;在身体被高热侵蚀的情况下&#xff0c;我们似乎很难忘记这次考试所带给我们的经历。如今成绩下来了&#xff0c;可能与我们预期的几乎相同&#xff0c;但是在不断地寻找新的学习途径的过程中我们发现&#…

驾驭云安全:2023年云安全展望

由于其的良好的可扩展性和优质的事件处理效率&#xff0c;云技术已成为现代企业的必备的管理技术之一&#xff0c;目前他已经成为所有行业及企业的热门选择。然而&#xff0c;攻击面积的增加以及不针对云技术衍生出来的多类攻击方式&#xff0c;使许多企业更容易受到威胁和数据…

分层测试(2)单元测试【必备】

1. 什么是单元测试&#xff1f; 对代码中的逻辑隔离的最小代码片段进行测试&#xff0c;验证其逻辑是否符合预期&#xff0c;单元可以是函数&#xff0c;方法&#xff0c;类&#xff0c;功能模块。 2. 单元测试的优点 掌握代码&#xff1a;单元测试允许开发人员了解单元提供…

软件测试之场景法

场景法 1. 概述 1.1 为什么使用场景法设计测试用例 大多数业务软件由后台管理&#xff08;比如&#xff1a;用户管理、角色管理、权限管理等等各种管理&#xff09;和工作流等几个部分组成。终端用户&#xff0c;期望软件能够实现业务需求&#xff0c;而不是简单的功能的组合…

2023湖北土建施工员证报考条件考试时间及报考流程 启程别

2023湖北土建施工员证报考条件考试时间及报考流程 启程别 土建施工员证是建设厅七大员中的施工员证的一种。分为土建、装饰装修、市政、设备安装。土建施工员证怎么报考等一系列相关问题启程别告诉你 施工员证报考条件 其实施工员证的报考条件没有那么复杂&#xff0c;基本上年…

智慧扫码点餐系统源码

智慧餐厅扫码点餐小程序系统源码 1. 开发语言&#xff1a;JAVA 2. 数据库&#xff1a;MySQL 3. 原生小程序 4. Saas 模式 5. 带调试部署视频 6、总后台管理端商家端门店端小程序用户端 智慧扫码点餐系统支持多店铺运营&#xff0c;单店铺运营以及连锁店铺运营。系统功能支…

numpy常用操作

文章目录1 numpy库2 数组对象 ndarray2.1 数组对象的创建2.1.1 利用array函数创建ndarray对象2.1.2 np.ones()和np.zeros()函数2.1.3 np.random.rand()函数2.1.4 np.arange()函数2.1.5 np.linspace()函数2.1.6 np.empty()函数2.2 ndarray对象常用属性2.3 ndarray常用操作2.3.1 …

IAP初探

IAP(In-Application Programming)在应用编程&#xff0c;浅显易懂&#xff0c;按照字面意思即是在程序不关闭情况下&#xff0c;对应用进行再次写入程序&#xff0c;对程序的写入需要传输数据&#xff0c;而传输数据的前提是通信&#xff0c; IAP对代码进行更新可以简要分为以…

为什么需要学习shell、shell的作用

课程基于B站于超课程笔记 03 Shebang的正确玩法_哔哩哔哩_bilibili P1 shell的作用 P2 shell执行命令的流程 P3 Shebang的正确玩法 什么是shell及组成 shell概念 shelll组成 Shebang概念 /bin/sh /bin/bash一样&#xff0c;都是指向一个bash解释器 [rootlocalhost ~]#…