RabbitMQ发布确认模式
- 前言
- 如何实现发布确认
- 发布确认模式有三种策略
- 单独发布消息
- 执行结果
- 批量发布消息
- 执行结果
- 异步处理发布确认
- 执行结果
- 思考点
- 如何追踪未完成的确认?
- 重新发布丢失的消息
- 总结
- 收获
前言
发布确认是解决消息不丢失的重要环节,在设置队列持久化、消息持久化的基础上,设置发布确认,一旦生产者投递消息之后,如果Broker接收到消息,会给生产者一个应答。
生产者进行接收应答,用来确认这条消息是否正常发送到Broker。生产者也可以根据收没有收到这条消息的应答进行相应的处理。
如何实现发布确认
发布者确认在通道级别使用confirmSelect方法启用
Channel channel = connection.createChannel();
channel.confirmSelect();
发布确认模式有三种策略
- 单独发布消息,同步等待确认:简单,但吞吐量非常有限。
- 批量发布消息,等待批量的同步确认:简单、合理的吞吐量,但是很难判断出什么时候出了问题。
- 异步处理:最佳的性能和资源利用,良好的控制情况下的错误,但涉及到正确的实现,相对复杂
单独发布消息
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;/*** @author : [WangWei]* @version : [v1.0]* @className : Producer* @description : [生产者]* @createTime : [2023/2/1 9:38]* @updateUser : [WangWei]* @updateTime : [2023/2/1 9:38]* @updateRemark : [描述说明本次修改内容]*/
public class Producer {private static final String QUEUE_NAME = "Confirm";public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//建立连接RabbitMQUtils.getConnection();//声明通道Channel channel = RabbitMQUtils.getChannel();//开启确认模式channel.confirmSelect();channel.queueDeclare(QUEUE_NAME,true,false,false,null);long start = System.currentTimeMillis();//循环发送2条消息for (int i = 0; i <200 ; i++) {String msg="消息确认模式消息:"+i;/*推送消息*交换机命名,不填写使用默认的交换机* routingKey -路由键-* props:消息的其他属性-路由头等正文* msg消息正文*/channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));// uses a 5 second timeout//如果超时过期,则抛出TimeoutException。如果任何消息被nack(丢失), waitForConfirmsOrDie将抛出IOException。channel.waitForConfirmsOrDie(5_000);}long end = System.currentTimeMillis();System.out.println("发送200条消息使用时间:"+(end-start));}
}
执行结果
waitForConfirmsOrDie(long)方法等待其确认。该方法在确认消息后立即返回。如果消息在超时内没有得到确认,或者消息被nack-ed(意味着代理由于某种原因无法处理它),该方法将抛出异常。异常的处理通常包括记录错误消息和/或重新尝试发送该消息。
这种技术非常简单,但也有一个主要缺点:它大大减慢了发布速度,因为对消息的确认会阻塞所有后续消息的发布。这种方法交付的吞吐量不会超过每秒发布的几百条消息。
批量发布消息
发布一批消息并等待整个批消息被确认。以100个为例
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;/*** @author : [WangWei]* @version : [v1.0]* @className : Producer* @description : [生产者]* @createTime : [2023/2/1 9:38]* @updateUser : [WangWei]* @updateTime : [2023/2/1 9:38]* @updateRemark : [描述说明本次修改内容]*/
public class Producer2 {private static final String QUEUE_NAME = "Confirm";public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//用于记录每发布100条消息进行一次确认int batchSize = 100;int outstandingMessageCount = 0;//建立连接RabbitMQUtils.getConnection();//声明通道Channel channel = RabbitMQUtils.getChannel();//开启确认模式channel.confirmSelect();//声明持久化队列channel.queueDeclare(QUEUE_NAME,true,false,false,null);long start = System.currentTimeMillis();//循环发送200条消息for (int i = 0; i <200 ; i++) {String msg="消息确认模式消息:"+i;/*推送消息*交换机命名,不填写使用默认的交换机* routingKey -路由键-* props:消息的其他属性-路由头等正文* msg消息正文*/channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));outstandingMessageCount++;//每100次确认一次if (outstandingMessageCount == batchSize) {// uses a 5 second timeout//如果超时过期,则抛出TimeoutException。如果任何消息被nack(丢失), waitForConfirmsOrDie将抛出IOException。channel.waitForConfirmsOrDie(5_000);outstandingMessageCount = 0;}}if (outstandingMessageCount > 0) {channel.waitForConfirmsOrDie(5_000);}long end = System.currentTimeMillis();System.out.println("发送200条消息使用时间:"+(end-start));}
}
执行结果
与等待单个消息的确认相比,等待一批消息被确认大大提高了吞吐量(对于远程RabbitMQ节点,最多可提高20-30倍)。一个缺点是,在失败的情况下,我们无法确切地知道哪里出了问题,因此我们可能不得不在内存中保留整个批处理,以记录有意义的内容或重新发布消息。而且这种解决方案仍然是同步的,因此它会阻止消息的发布。
异步处理发布确认
异步处理发行商确认通常需要以下步骤:
- 提供一种将发布序列号与消息关联起来的方法。
- 在通道上注册一个确认侦听器,以便在发布者acks/nacks到达时得到通知,以执行适当的操作,如记录日志或重新发布nack-ed消息。在此步骤中,序列号到消息的关联机制也可能需要进行一些清理。
- 在发布消息之前跟踪发布序列号。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import com.rabbitmq.client.MessageProperties;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeoutException;/*** @author : [WangWei]* @version : [v1.0]* @className : Producer* @description : [生产者]* @createTime : [2023/2/1 9:38]* @updateUser : [WangWei]* @updateTime : [2023/2/1 9:38]* @updateRemark : [描述说明本次修改内容]*/
public class Producer3 {private static final String QUEUE_NAME = "Confirm";public static void main(String[] args) throws IOException, TimeoutException {//建立连接RabbitMQUtils.getConnection();//声明通道Channel channel = RabbitMQUtils.getChannel();//开启确认模式channel.confirmSelect();//用于保存序列号与消息之前映射的容器ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();//清理映射关系的回调//multiple 一个布尔值,如果为false则说明只有一条消息被确认或者丢失,如果为true这表示有小于或等于sequenceNumber的消息被确认或者丢失。ConfirmCallback cleanOutstandingConfirms = (sequenceNumber, multiple) -> {if (multiple) {//如果包含值为true则返回该映射的部分视图,其键值小于或等于,sequenceNumberConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(sequenceNumber, true);//移除所有key和valueconfirmed.clear();} else {//移除序列号对象的消息outstandingConfirms.remove(sequenceNumber);}};//两个回调一个用于确认消息,另一个用于nack-ed消息(可以被代理视为丢失的消息)。//sequenceNumber用于标识确认的消息或者丢失的消息//
// channel.addConfirmListener((sequenceNumber,multiple)->{
// String body = outstandingConfirms.get(sequenceNumber);
// System.out.println("ack message:"+body);
// },(sequenceNumber,multiple)->{
// //当消息被丢失时。。。。
// String body = outstandingConfirms.get(sequenceNumber);
// System.out.println("no ack message:"+body);
// });//当消息缺失的回调channel.addConfirmListener(cleanOutstandingConfirms, (sequenceNumber, multiple) -> {String body = outstandingConfirms.get(sequenceNumber);System.out.println("没有确认的消息:"+body);//重新使用回调来清理映射cleanOutstandingConfirms.handle(sequenceNumber, multiple);});//声明持久化队列channel.queueDeclare(QUEUE_NAME,true,false,false,null);long start = System.nanoTime();//循环发送2条消息for (int i = 0; i <200 ; i++) {String msg="消息确认模式消息:"+i;//使用map将发布序列号与消息的字符串体关联起来//channel.getNextPublishSeqNo()获取下一个消息的序列号outstandingConfirms.put(channel.getNextPublishSeqNo(), msg);/*推送消息*交换机命名,不填写使用默认的交换机* routingKey -路由键-* props:消息的其他属性-路由头等正文* msg消息正文*/channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));System.out.println(msg+"发布成功");}long end = System.nanoTime();System.out.println("发送200条消息使用时间:"+ Duration.ofNanos(end - start).toMillis());}
}
执行结果
思考点
如何追踪未完成的确认?
示例使用ConcurrentNavigableMap来跟踪未完成的确认。这种数据结构的方便有以下几个原因。
它允许轻松地将序列号与消息(无论消息数据是什么)关联起来,并轻松地将条目清理到给定的序列id(以处理多个确认/nack)。
最后,它支持并发访问,因为确认回调是在客户端库拥有的线程中调用的,应该与发布线程保持不同。
除了使用复杂的映射实现之外,还有其他方法可以跟踪未完成的确认,比如使用简单的并发散列映射和变量来跟踪发布序列的下界,但它们通常更复杂,
重新发布丢失的消息
从相应的回调中重新发布nack-ed消息可能很诱人,但应该避免这种情况,因为确认回调是在I/O线程中分派的,其中通道不应该执行操作。更好的解决方案是将消息放入由发布线程轮询的内存队列中。像ConcurrentLinkedQueue这样的类可以很好地在确认回调和发布线程之间传输消息。
总结
在某些应用程序中,确保发布的消息到达代理非常重要。发布者确认RabbitMQ的特性可以帮助满足这一要求。发布者确认本质上是异步的,但也可以同步地处理它们。没有确定的方法来实现发布者确认,这通常归结于应用程序和整个系统中的约束。典型的技术有:
单独发布消息,同步等待确认:简单,但吞吐量非常有限。
批量发布消息,等待批量的同步确认:简单、合理的吞吐量,但是很难判断出什么时候出了问题。
异步处理:最佳的性能和资源利用,良好的控制情况下的错误,但是实现较为复杂
收获
1.从官网学习rabbitmq的使用,严格落实先宏观,后微观,结合思维导图和三遍读书法.
2.学习的过程中,锤炼英文阅读,动手查字典和利用工具.
3. 理论和实践结合,理论学习完成之后,及时进行实践包括先代码实现,阶段总结等等.