四、发布确认

news/2024/5/7 2:18:40/文章来源:https://blog.csdn.net/hc1285653662/article/details/129308915

1、发布确认原理

生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了;

如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也可以设置basic.ack 的 multiple 域,表示到这个序列号之前的所有消息都已经得到了处理kao

confirm 模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息,生产者应用程序同样可以在回调方法中处理该 nack 消息

2、发布确认策略

2.1 开启发布确认的方法

发布确认模式时没有开启的,如果需要开启,则需要在channel上调用 confirmSelect() 方法

//开启发布确认
channel.confirmSelect();

2.2 单个发布确认

这是一种简单的确认方式,它是一种同步确认发布的方式,也就是发布一个消息之后只有它被确认发布,后续的消息才能继续发布。

waitForConfirmsOrDie(long)这个方法只有在消息被确认的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。
(也就是说调用 channel.waitForConfirms() 后,信道会一直等待消息进行确认后才会返回true,否则一直阻塞,直到超时发生异常)

这种确认方式有一个最大的缺点就是:发布速度特别的慢,因为如果没有确认发布的消息就会阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量。当然对于某些应用程序来说这可能已经足够了。

//1、单个确认模式public static void pulishMessageSingleConfirm() throws Exception {try (Channel channel = RabbitMqUtils.getChannel();) {channel.queueDeclare(QUEUE_NAME, false, false, false, null);//开启发布确认模式channel.confirmSelect();long begin = System.currentTimeMillis();for (int i = 0; i < MESSAGE_COUNT; i++) {String message = i + "";channel.basicPublish("", QUEUE_NAME, null, message.getBytes());boolean flag = channel.waitForConfirms();if (flag) {System.out.println("消息发送成功");}}long end = System.currentTimeMillis();System.out.println("发布" + MESSAGE_COUNT + "个单独确认消息,耗时" + (end - begin) + "ms");}}

2.3 批量发布确认

上面那种方式非常慢,与单个等待确认消息相比,先发布一批消息然后一起确认可以极大地提高吞吐量,当然这种方式的缺点就是:当发生故障导致发布出现问题时,不知道是哪个消息出现问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。当然这种方案仍然是同步的,也一样阻塞消息的发布

思路:攒100个消息一起进行发布,当发布完第一百个消息时,监听消息是否被确认

 //2、批量发布确认模式public static void pulishMessageBatchConfirm() throws Exception {try (Channel channel = RabbitMqUtils.getChannel();) {channel.queueDeclare(QUEUE_NAME, false, false, false, null);//开启发布确认模式channel.confirmSelect();long begin = System.currentTimeMillis();//生产者每次发布100个消息,确认一次int batchConfirmSize = 0;for (int i = 0; i < MESSAGE_COUNT; i++) {String message = i + "";channel.basicPublish("", QUEUE_NAME, null, message.getBytes());batchConfirmSize++;if (batchConfirmSize % 100 == 0) {boolean flag = channel.waitForConfirms();if (flag) {System.out.println("消息发送成功");}batchConfirmSize = 0;}}long end = System.currentTimeMillis();System.out.println("发布" + MESSAGE_COUNT + "个批量确认消息,耗时" + (end - begin) + "ms");}}

2.4 异步发布确认

维护一个所有已发布消息的map,通过回调函数传递回来当前确认的消息,然后从map中移除掉已经确认的消息,剩下的就是已经发布但是没有确认的消息
在这里插入图片描述

//3、异步发布确认模式public static void pulishMessageAsyncConfirm() throws Exception {try (Channel channel = RabbitMqUtils.getChannel();) {//创建一个队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);//开启发布确认channel.confirmSelect();/*** 线程安全有序的一个哈希表,适用于高并发的情况* 1、轻松的将序号和消息进行关联* 2、轻松的批量删除条目  只需要给到序列号* 3、支持并发访问* */ConcurrentSkipListMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();/*** 确认收到消息的一个回调* 1、参数1:当前收到的消息的序列号* 2、参数2:是否批量确认*/ConfirmCallback ackCallBack = (sequenceNumber, mutiple) -> {if (mutiple) {System.out.println("生产者发布的消息" + outstandingConfirms.get(sequenceNumber) + "被确认,序列号" + sequenceNumber);//返回的是小于等于当前序列号的未确认消息,是一个mapConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(sequenceNumber, true);//消除该部分未确认消息confirmed.clear();} else {System.out.println("生产者发布的消息" + outstandingConfirms.get(sequenceNumber) + "被确认,序列号" + sequenceNumber);//只消除当前序号的消息outstandingConfirms.remove(sequenceNumber);}};//消息未确认的回调ConfirmCallback nackCallBack = (sequenceNumber, mutiple) -> {String message = outstandingConfirms.get(sequenceNumber);System.out.println("生产者发布的消息" + message + "未被确认,序列号" + sequenceNumber);};/*添加一个异步确认的监听器1、确认收到消息的回调2、未收到消息的回调*/channel.addConfirmListener(ackCallBack, nackCallBack);long begin = System.currentTimeMillis();for (int i = 0; i < MESSAGE_COUNT; i++) {String message = i + "";outstandingConfirms.put(channel.getNextPublishSeqNo(), message);channel.basicPublish("", QUEUE_NAME, null, message.getBytes());}long end = System.currentTimeMillis();System.out.println("发布" + MESSAGE_COUNT + "个异步确认消息,耗时" + (end - begin) + "ms");}}

2.5 如何处理异步未确认消息

最好的解决的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列

2.6 上述三种发布确认模式比较

  • 单个发布确认:同步等待确认,简单,但吞吐量非常有限。
  • 批量发布确认:简单,合理的吞吐量,一旦出现问题但很难推断出是那条消息出现了问题
  • 异步发布确认:最佳性能和资源使用,在出现错误的情况下可以很好地控制,但是实现起来稍微难些

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_77105.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SpringCloud之认识微服务

文章目录一、传统项目转型二、走进 SpringCloud三、微服务项目搭建3.1 创建一个 SpringBoot 项目3.2 创建三个 Maven 子工程3.3 为子工程创建 application.yml3.4 引入依赖3.5 数据库 建库建表3.6 编写业务提示&#xff1a;以下是本篇文章正文内容&#xff0c;SpringCloud系列学…

如何校招进BAT做产品经理

嗨&#xff0c;很高兴&#xff0c;以文字的形式和你见面。在校招中&#xff0c;我拿到了百度、京东、爱奇艺、新浪和去哪儿的产品经理校招offer&#xff0c;其中百度是special offer。在找实习的过程中&#xff0c;也拿到了爱奇艺、微信电影票、搜狐畅游、艺龙等公司的产品经理…

欢迎来到 BharatBox,这是一个以来自印度的知名艺术家和品牌为特色的文化元宇宙中心

通过 Brinc 的客户 Heftyverse 娱乐公司&#xff0c;将印度艺术家、电影制片厂、体育品牌和音乐公司聚集在这个全新虚拟中心。 The Sandbox 与 Brinc 的联营公司推出 BharatBox&#xff0c;这是一个全新的文化中心&#xff0c;由来自印度的娱乐、艺术和体育范畴的主要合作伙伴组…

知识图谱的介绍

知识图谱的由来 谷歌在2012年提出了知识图谱的概念&#xff0c;当时目的在于优化搜索引擎的返回结构&#xff0c;为用户提供更精确的结果。 知识图谱的定义 为了理解知识图谱&#xff0c;我们首先要明白信息与知识的概念。首先&#xff0c;信息表示的是外部的客观事实&#…

hadoop调优

hadoop调优 1 HDFS核心参数 1.1 NameNode内存生产配置 1.1.1 NameNode内存计算 每个文件块大概占用150byte&#xff0c;如果一台服务器128G&#xff0c;能存储的文件块如下 128 (G)* 1024(MB) * 1024(KB) * 1024(Byte) / 150 Byte 9.1 亿 1.1.2 Hadoop2.x 在Hadoop2.x中…

MVVM模式下如何正确【视图绑定+数据】

概述 我如何&#xff08;不在后面的代码中使用代码&#xff09;自动绑定到我想要的视图&#xff1f;据我了解&#xff0c;如果正确完成&#xff0c;这就是模式应该如何工作。我可以使用主窗口 xaml 中的代码实现这一切&#xff0c;我甚至正确创建了一个资源字典&#xff08;因…

linux下nm,objdump和ldd三大工具使用

linux下进行C/C开发时经常需要使用nm&#xff0c;objdump&#xff0c;ldd工具来分析定位问题&#xff0c;本篇文章就对其做个总结&#xff1a; 1.测试程序 TestSo.h #pragma once #include <iostream>extern "C" int CTypeAdd(int x, int y); extern "…

Python 之网络式编程

一 客户端/服务器架构 即C/S架构&#xff0c;包括 1、硬件C/S架构&#xff08;打印机&#xff09; 2、软件B/S架构&#xff08;web服务&#xff09; C/S架构与Socket的关系&#xff1a; 我们学习Socket就是为了完成C/S的开发 二 OSI七层 引子&#xff1a;   计算机组成…

云HIS系统源码 医院his源码 云his源码

大型医院his系统源码 SaaS运维平台多医院入驻强大的电子病历完整文档 &#xff0c;有演示 一、系统概述&#xff1a; 基层卫生健康云是一款满足基层医疗机构各类业务需要的健康云产品。该产品能帮助基层医疗机构完成日常各类业务&#xff0c;提供病患挂号支持、病患问诊、电子…

python学习——【第一弹】

前言 Python是一种跨平台的计算机程序设计语言&#xff0c;是ABC语言的替代品&#xff0c;属于面向对象的动态类型语言&#xff0c;最初被设计用于编写自动化脚本&#xff0c;随着版本的不断更新和语言新功能的添加&#xff0c;越来越多被用于独立的、大型项目的开发。 从这篇…

轮盘赌选择法

轮盘赌选择原理 轮盘赌选择法&#xff08;roulette wheel selection&#xff09;是最简单也是最常用的选择方法&#xff0c;在该方法中&#xff0c;各个个体的选择概率和其适应度值成比例&#xff0c;适应度越大&#xff0c;选中概率也越大。 从图中可以看出一等奖、二等奖、…

【Java集合框架】篇六:Collections工具类

Collections 是一个操作 Set、List 和 Map 等集合的工具类。 1。 常用方法 Collections 中提供了一系列静态的方法对集合元素进行排序、查询和修改等操作&#xff0c;还提供了对集合对象设置不可变、对集合对象实现同步控制等方法&#xff08;均为static方法&#xff09;&…

【Python学习笔记】第二十六节 Python PyMySQL

一、什么是 PyMySQL&#xff1f;PyMySQL 是在 Python3.x 版本中用于连接 MySQL 服务器的一个库。可以用它来连接Python和MySQL。如果你追求速度&#xff0c;这是一个很好的选择&#xff0c;因为它比mysql-connector-python快。PyMySQL 遵循 Python 数据库 API v2.0 规范&#x…

AntDB“超融合+流式实时数仓”,谈传统数据库与流计算的有机融合

&#xff08;一&#xff09; 前言 据统计&#xff0c;在信息化时代的今天&#xff0c;人们一天所接触到的信息量&#xff0c;是古人一辈子所能接收到的信息量的总和。当今社会中除了信息量“多”以外&#xff0c;人们对信息处理的“效率”和“速度”的要求也越来越高。譬如&am…

浅谈一下mysql8.0与5.7的字符集

修改字符集 修改步骤 在MySQL8.0版本之前&#xff0c;默认字符集为1atin1,utf8字符集指向的是utf8mb3。网站开发人员在数据库设计的时候往往会将编码修改为ut8字符集。如果遗忘修改默认的编码&#xff0c;就会出现乱码的问题。从MySQL8.0开始&#xff0c;数据库的默认编码将改…

王道C语言督学营OJ练习全解【24考研最新版】

前言 本篇博客是在博主参加王道408专业课前置课程-----C语言督学营的学习笔记&#xff0c;包含了从第一节课到最后一节课的所有OJ习题题解&#xff0c;文章中每一题都给出了详尽的代码&#xff0c;并在每一题的关键部位加上了注释&#xff0c;记录下来的目的是方便自己以后进行…

maven镜像源及代理配置

在公司使用网络一般需要设置代理&#xff0c; 我在idea中创建springboot工程时&#xff0c;发现依赖下载不了&#xff0c;原以为只要浏览器设置代理&#xff0c;其他的网络访问都会走代理&#xff0c;经过查资料设置了以下几个地方后工程创建正常&#xff0c;在此记录给大家参考…

Python中Opencv和PIL.Image读取图片的差异对比

近日&#xff0c;在进行深度学习进行推理的时候&#xff0c;发现不管怎么样都得不出正确的结果&#xff0c;再仔细和正确的代码进行对比了后发现原来是Python中不同的库读取的图片数组是有差异的。 image np.array(Image.open(image_file).convert(RGB)) image cv2.imread(…

SpringBoot实现Excel导入导出,简单好用

EasyPoi简介 POI是Java操作MicroOffice&#xff08;如对Excel的导入导出&#xff09;的一个插件。POI的全称是&#xff08;Poor Obfuscation Implementation&#xff09;&#xff0c;POI官网地址是 http://poi.achache.org/index.html 。 EasyPoi对POI进行了优化&#xff0c;…

Navicat 现已支持 OceanBase 全线数据库产品

Navicat 作为 OceanBase 生态工具的合作伙伴&#xff0c;这是双方产品适配第三个里程碑。2022 年 7 月的首个里程碑&#xff0c;Navicat 实现了 OceanBase 社区版的功能性兼容。同年10 月&#xff0c;进一步实现了针对 OceanBase 企业版&#xff08; 兼容 MySQL 模式&#xff0…