RabbitMQ
【黑马程序员RabbitMQ全套教程,rabbitmq消息中间件到实战】
文章目录
- RabbitMQ
- 第二天 高级
- 7 RabbitMQ 高级特性
- 7.2 Consumer Ack
- 7.2.1 Consumer Ack
- 7.2.2 Consumer Ack 小结
- 7.2.3 消息可靠性总结
第二天 高级
7 RabbitMQ 高级特性
7.2 Consumer Ack
7.2.1 Consumer Ack
ack指Acknowledge,确认。 表示消费端收到消息后的确认方式。
有三种确认方式:
- 自动确认:acknowledge=“none”
- 手动确认:acknowledge=“manual”
- 根据异常情况确认:acknowledge=“auto”,(这种方式使用麻烦,不作讲解)【啊这…】
其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。
但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。
【试试】
【编写消费者】
先来一个 新的模块工程
直接创建
pom 文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.dingjiaxiong</groupId><artifactId>rabbitmq-consumer-spring</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.springframework</groupId><artifactId>spring-context</artifactId><version>5.1.7.RELEASE</version></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit</artifactId><version>2.1.8.RELEASE</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-test</artifactId><version>5.1.7.RELEASE</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.8.0</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin></plugins></build></project>
OK
rabbitmq.properties
rabbitmq.host=43.138.50.253
rabbitmq.port=5672
rabbitmq.username=guest
rabbitmq.password=guest
rabbitmq.virtual-host=/
spring-rabbitmq-consumer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:context="http://www.springframework.org/schema/context"xmlns:rabbit="http://www.springframework.org/schema/rabbit"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/contexthttps://www.springframework.org/schema/context/spring-context.xsdhttp://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit.xsd"><!--加载配置文件--><context:property-placeholder location="classpath:rabbitmq.properties"/><!-- 定义rabbitmq connectionFactory --><rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"port="${rabbitmq.port}"username="${rabbitmq.username}"password="${rabbitmq.password}"virtual-host="${rabbitmq.virtual-host}"/></beans>
OK,这样环境就准备好了
【定义监听器】
<!-- 包扫描 -->
<context:component-scan base-package="com.dingjiaxiong.listener"/>
再来一个 监听器 容器
<!-- 定义监听器容器 --><rabbit:listener-container connection-factory="connectionFactory"><rabbit:listener ref="ackListener" queue-names="test_queue_confirm"></rabbit:listener></rabbit:listener-container>
OK,定义监听器类
package com.dingjiaxiong.listener;import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.stereotype.Component;/*** ClassName: AckListener* date: 2022/11/16 21:16** @author DingJiaxiong*/@Component
public class AckListener implements MessageListener {@Overridepublic void onMessage(Message message) {System.out.println(new String(message.getBody()));}
}
OK,再来一个 死循环的测试类
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;/*** ClassName: ConsumerTest* date: 2022/11/16 21:18** @author DingJiaxiong*/@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-consumer.xml")
public class ConsumerTest {@Testpublic void test(){while (true){}}}
OK, 直接运行这个 测试
OK,队列中的所有消息 都拿出来了
也识别 到了有一个消费者,OK,接下来就 是编写 ACK 的逻辑
package com.dingjiaxiong.listener;import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;import java.io.IOException;/*** ClassName: AckListener* date: 2022/11/16 21:16** @author DingJiaxiong*//*** Consumer ACK机制:* 1. 设置手动签收【默认 就是自动】acknowledge="manual"* 2. 让监听器类实现 ChannelAwareMessageListener 接口* 3. 如果消息成功处理,则调用channel 的basicAck() 签收* 4. 如果消息处理失败,则调用channel 的basicNack() 拒绝签收,让 broker重新发送 consumer*/
@Component
public class AckListener implements ChannelAwareMessageListener {@Overridepublic void onMessage(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {//1. 接收转换消息System.out.println(new String(message.getBody()));//2. 处理业务逻辑System.out.println("处理业务逻辑...");//3. 手动签收channel.basicAck(deliveryTag, true);} catch (IOException e) {
// e.printStackTrace();// 如果出现异常 4. 拒绝签收// 第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker 会重新发送给该消息给消费端channel.basicNack(deliveryTag,true,true);}}
}
OK,重新启动消费者
OK,在监听了
打开生产者,发送一条消息
OK,消息发送成功, 查看消费者控制台
这是正常的情况,手动 签收
一个消费者,消息都消费光了
【模拟一个错误】
OK,重新启动消费者
发送一条消息进去
效果就是 会一直重发,然后消费者 就一直处理,始终不能签收
而且消息的 状态
一直Unacked ,不能签收
7.2.2 Consumer Ack 小结
- 在rabbit:listener-container标签中设置acknowledge属性,设置ack方式 none:自动确认,manual:手动确认
- 如果在消费端没有出现异常,则调用channel.basicAck(deliveryTag,false);方法确认签收消息
- 如果出现异常,则在catch中调用 basicNack或 basicReject,拒绝消息,让MQ重新发送消息。
OK
7.2.3 消息可靠性总结
- 持久化
- exchange要持久化
- queue要持久化
- message要持久化
- 生产方确认Confirm
- 消费方确认Ack
- Broker高可用