RabbitMQ
【黑马程序员RabbitMQ全套教程,rabbitmq消息中间件到实战】
文章目录
- RabbitMQ
- 第二天 高级
- 7 RabbitMQ 高级特性
- 7.1 消息的可靠投递
- 7.1.1 confirm【确认模式】
第二天 高级
7 RabbitMQ 高级特性
7.1 消息的可靠投递
7.1.1 confirm【确认模式】
在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。
RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。
- confirm:确认模式
- return:退回模式
【RabbitMQ 整个消息投递的路径】
producer—>rabbitmq broker—>exchange—>queue—>consumer
生产者 → broker → 交换机 → 队列 → 消费者
- 消息从 producer 到 exchange 则会返回一个 confirmCallback 。【确认模式】
- 消息从 exchange–>queue 投递失败则会返回一个 returnCallback 。【退回模式】
我们将利用这两个 callback 控制消息的可靠性投递
【先来看看 confirm 】
先来两个新的 模块工程
【生产者】
直接创建
OK,一个全新的maven 工程模块
pom 文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.dingjiaxiong</groupId><artifactId>rabbitmq-producer-spring</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.springframework</groupId><artifactId>spring-context</artifactId><version>5.1.7.RELEASE</version></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit</artifactId><version>2.1.8.RELEASE</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-test</artifactId><version>5.1.7.RELEASE</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.8.0</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin></plugins></build></project>
OK
rabbitmq.properties
rabbitmq.host=xxxxxxxxxxxxxxx # 改成自己的服务器IP
rabbitmq.port=5672
rabbitmq.username=guest
rabbitmq.password=guest
rabbitmq.virtual-host=/
spring-rabbitmq-producer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:context="http://www.springframework.org/schema/context"xmlns:rabbit="http://www.springframework.org/schema/rabbit"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/contexthttps://www.springframework.org/schema/context/spring-context.xsdhttp://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit.xsd"><!--加载配置文件--><context:property-placeholder location="classpath:rabbitmq.properties"/><!-- 定义rabbitmq connectionFactory --><rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"port="${rabbitmq.port}"username="${rabbitmq.username}"password="${rabbitmq.password}"virtual-host="${rabbitmq.virtual-host}"/><!--定义管理交换机、队列--><rabbit:admin connection-factory="connectionFactory"/><!--定义rabbitTemplate对象操作可以在代码中方便发送消息--><rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
</beans>
OK
现在就开始 消息的可靠性 配置
<!-- 消息可靠性投递【生产端】 -->
<rabbit:queue id="test_queue_confirm" name="test_queue_confirm"></rabbit:queue>
<rabbit:direct-exchange name="test_exchange_confirm"><rabbit:bindings><rabbit:binding queue="test_queue_confirm" key="confirm"></rabbit:binding></rabbit:bindings>
</rabbit:direct-exchange>
OK
编写测试类
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;/*** ClassName: ProducerTest* date: 2022/11/16 19:43** @author DingJiaxiong*/@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
public class ProducerTest {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 【确认模式】* 步骤:* 1. 确认模式开启 ConnectionFactory 中开启publisher-confirms="true"* 2. 在rabbitTemplate 定义ConfirmCallBack 回调函数** */@Testpublic void testConfirm(){// 2. 定义回调rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {System.out.println("confirm 方法被执行了");}});// 3. 发送消息rabbitTemplate.convertAndSend("test_exchange_confirm","confirm","message confirm...");}}
第一步别忘了,修改一下核心配置文件
OK
直接运行 测试
OK,绿了
查看管控台
交换机创建成功
查看绑定 关系
OK,没问题
查看队列
OK,也创建 了,而且还有一条消息 等待消费,而且confirm 方法也调用到了
下面来看看 confirm 方法的参数
@Test
public void testConfirm() {// 2. 定义回调rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {/*** correlationData : 相关配置信息* ack: 交换机是否成功收到消息【true成功 false 失败】* cause :失败原因* */@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("confirm 方法被执行了");if (ack) {//交换机接收成功System.out.println("消息接收成功" + cause);} else {//交换机接收失败System.out.println("消息接收失败" + cause);}}});// 3. 发送消息rabbitTemplate.convertAndSend("test_exchange_confirm", "confirm", "message confirm...");}
OK,再次运行
说是 接收失败了 ,看看管控台
但是消息确实已经到 队列了,这个问题,之前笔者就已经碰到过了 ,这是一次测试一次会话,关的 太快,导致来不及回调,解决方法,可以在测试方法中让它 睡一秒,再试一次
再次运行测试方法:
OK,这样就成功 了
这下试试 把交换机名字 改错
再次运行
OK,没毛病哈
之后会做一些处理,让消息 再次发送
【这就是confirm 确认模式】