RabbitMQ
【黑马程序员RabbitMQ全套教程,rabbitmq消息中间件到实战】
文章目录
- RabbitMQ
- 第一天 基础
- 4 RabbitMQ 的工作模式
- 4.4 Topic 通配符模式
- 4.4.1 模式说明
- 4.4.2 代码编写
- 4.4.3 小结
- 4.5 工作模式总结
第一天 基础
4 RabbitMQ 的工作模式
4.4 Topic 通配符模式
4.4.1 模式说明
看看文档
Topic 类型与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型中的 Exchange 可以让队列在绑定 Routing key 的时候使用通配符!
Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
通配符规则:# 匹配一个或多个词,* 匹配不多不少恰好1个词,例如:item.# 能够匹配 item.insert.abc
或者 item.insert,item.* 只能匹配 item.insert
没毛病,很清晰
4.4.2 代码编写
来个小需求:
之前我们已经可以“定向” 实现将error 级别的信息存入数据库了,
现在我希望 将订单相关的日志【不管什么级别】 都走这个 方式,存入数据库。
【生产者】
package com.dingjiaxiong.producer;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** ClassName: Producer_Topics* date: 2022/11/16 10:58* 发送消息** @author DingJiaxiong*/public class Producer_Topics {public static void main(String[] args) throws IOException, TimeoutException {//1. 创建连接工厂ConnectionFactory factory = new ConnectionFactory();//2. 设置参数factory.setHost("xxxxxxxxxxxxxxxxxx"); // 服务器IP【默认本机 localhost】factory.setPort(5672); //端口【默认也是 5672】factory.setVirtualHost("/ding"); //虚拟机【 默认是 /】factory.setUsername("dingjiaxiong"); // 用户名【默认 guest】factory.setPassword("12345"); //密码【默认 guest】//3. 创建连接 ConnectionConnection connection = factory.newConnection();//4. 创建ChannelChannel channel = connection.createChannel();//5. 创建交换机/*** exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)* 参数解释:* 1. exchange:交换机名称* 2. type:交换机类型* 【枚举】* DIRECT("direct"):定向* FANOUT("fanout"):扇形【广播】【发送消息到每一个与之绑定队列】* TOPIC("topic"):通配符的方式* HEADERS("headers"):参数匹配* 3. durable:是否持久化* 4. autoDelete:自动删除* 5. internal:内部使用【一般false】* 6. arguments:参数* */String exchangeName = "test_topic";channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC, true, false, false, null);//6. 创建队列【两个】String queue1Name = "test_topic_queue1";String queue2Name = "test_topic_queue2";channel.queueDeclare(queue1Name, true, false, false, null);channel.queueDeclare(queue2Name, true, false, false, null);//7. 绑定队列和交换机/*** queueBind(String queue, String exchange, String routingKey)* 参数解释:* 1. queue:参数* 2. exchange:交换机名称* 3. routingKey:路由key【绑定规则】[交换机 类型若为fanout, 默认为空]* *///routing key 系统的名称.日志的级别// 需求:所有error 级别的日志存入数据库,所有order 系统的日志存入数据库channel.queueBind(queue1Name, exchangeName, "#.error");channel.queueBind(queue1Name, exchangeName, "order.*");channel.queueBind(queue2Name, exchangeName, "*.*");//8. 发送消息String body = "这是一条日志信息...【order 系统的】";channel.basicPublish(exchangeName,"order.info",null,body.getBytes());//9. 释放资源channel.close();connection.close();}}
OK,直接运行
OK, 查看管控台
交换机创建成功,查看绑定关系
OK,没问题
查看队列
可以看到 两个 队列中都收到了 消息
【消费者】
package com.dingjiaxiong.consumer;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** ClassName: Consumer_Topic1* date: 2022/11/16 12:37** @author DingJiaxiong*/public class Consumer_Topic1 {public static void main(String[] args) throws IOException, TimeoutException {//1. 创建连接工厂ConnectionFactory factory = new ConnectionFactory();//2. 设置参数factory.setHost("xxxxxxxxxxxxxxxxx"); // 服务器IP【默认本机 localhost】factory.setPort(5672); //端口【默认也是 5672】factory.setVirtualHost("/ding"); //虚拟机【 默认是 /】factory.setUsername("dingjiaxiong"); // 用户名【默认 guest】factory.setPassword("12345"); //密码【默认 guest】//3. 创建连接 ConnectionConnection connection = factory.newConnection();//4. 创建ChannelChannel channel = connection.createChannel();String queue1Name = "test_topic_queue1";String queue2Name = "test_topic_queue2";// 接收消息【消费消息】/*** basicConsume(String queue, boolean autoAck, Consumer callback)* 参数解释:* 1. queue:队列名称* 2. autoAck:是否自动确认* 3. callback:回调对象* */Consumer consumer = new DefaultConsumer(channel){//回调方法,当收到消息后,会自动执行这个 方法/** 1. consumerTag:消息的标识* 2. envelope:获取一些 信息【交换机、路由key...】* 3. properties: 配置信息* 4. body: 数据* */@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// System.out.println("consumerTag:" + consumerTag);
// System.out.println("Exchange:" + envelope.getExchange());
// System.out.println("RoutingKey:" + envelope.getRoutingKey());
// System.out.println("properties:" + properties);System.out.println("body:" + new String(body));System.out.println("将日志 存入数据库...");}};channel.basicConsume(queue1Name,true,consumer);// 不要关闭资源,让它一直监听}}
OK,消费者 2
OK
直接运行
OK,没毛病,order.info 既打印到 了控制台,也存入了 数据库
现在来条goods 日志
直接发送
再来个goods.error
没毛病。【这就是 Topic 工作模式】
4.4.3 小结
Topic 主题模式可以实现 Pub/Sub 发布与订阅模式和 Routing 路由模式的功能,只是 Topic 在配置routing key 的时候可以使用通配符,显得更加灵活。
4.5 工作模式总结
- 简单模式 HelloWorld
一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)。
- 工作队列模式 Work Queue
一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)。
- 发布订阅模式 Publish/subscribe
需要设置类型为 fanout 的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列。
- 路由模式 Routing
需要设置类型为 direct 的交换机,交换机和队列进行绑定,并且指定 routing key,当发送消息到交换机后,交换机会根据 routing key 将消息发送到对应的队列。
- 通配符模式 Topic
需要设置类型为 topic 的交换机,交换机和队列进行绑定,并且指定通配符方式的 routing key,当发送消息到交换机后,交换机会根据 routing key 将消息发送到对应的队列。