RabbitMQ的6种工作模式

news/2024/5/11 13:16:55/文章来源:https://blog.csdn.net/qq_30614345/article/details/132135058

RabbitMQ的6种工作模式

官方文档:

http://www.rabbitmq.com/

https://www.rabbitmq.com/getstarted.html

RabbitMQ 常见的 6 种工作模式:
在这里插入图片描述

1、simple简单模式

在这里插入图片描述

1)、消息产生后将消息放入队列。

2)、消息的消费者监听消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除。

3)、存在的问题:消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失。

4)、应用场景:聊天(中间有一个过度的服务器)。

5)、代码实现:

pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example</groupId><artifactId>rabbitmq-java</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.10.0</version></dependency></dependencies></project>

工具类

package com.example;import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class ConnectionUtil {// 连接rabbitmq服务,共享一个工厂对象private static ConnectionFactory factory;static {factory=new ConnectionFactory();//设置rabbitmq属性factory.setHost("127.0.0.1");factory.setUsername("zsx242030");factory.setPassword("zsx242030");factory.setVirtualHost("/");factory.setPort(5672);}public static Connection getConnection(){Connection connection=null;try {//获取连接对象connection = factory.newConnection();} catch (Exception e) {e.printStackTrace();}return connection;}
}

消息提供者

package com.example.simple;import com.example.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;public class Provider {public static void main(String[] args) {try {//获取连接对象Connection connection = ConnectionUtil.getConnection();//获取通道对象Channel channel = connection.createChannel();//通过通道创建队列,后续所有的操作都是基于channel实现(队列也可以由消费方创建)channel.queueDeclare("queue1", false, false, false, null);//向队列中发送消息channel.basicPublish("", "queue1", null, "Hello RabbitMQ!!!".getBytes());//断开连接channel.close();connection.close();} catch (Exception e) {e.printStackTrace();}}
}

消息消费者

package com.example.simple;import com.example.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;public class Consumer {public static void main(String[] args) {try {Connection connection = ConnectionUtil.getConnection();//获取通道对象Channel channel = connection.createChannel();//监听队列中的消息(消费的是队列,而不是交换机)channel.basicConsume("queue1", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者获得消息为:" + new String(body, "utf-8"));}});//消费方不需要关闭连接,保持一直监听队列状态channel.close();connection.close();} catch (Exception e) {e.printStackTrace();}}
}
消费者获得消息为:Hello RabbitMQ!!!

2、work工作模式(资源的竞争)

在这里插入图片描述

1)、消息产生者将消息放入队列,消费者可以有多个,消费者1,消费者2,同时监听同一个队列。消息被消费,

C1 和 C2 共同争抢当前的消息队列内容,谁先拿到谁负责消费消息。

2)、存在的问题:高并发情况下,默认会产生某一个消息被多个消费者共同使用,可以设置一个开关

(synchronized,与同步锁的性能不一样),保证一条消息只能被一个消费者使用。

3)、应用场景:红包;大项目中的资源调度(任务分配系统不需知道哪一个任务执行系统在空闲,直接将任务扔到

消息队列中,空闲的系统自动争抢);对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。

4)、代码实现:

消息提供者

package com.example.work;import com.example.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;public class Provider {public static void main(String[] args) {try {//获取连接对象Connection connection = ConnectionUtil.getConnection();//获取通道对象Channel channel = connection.createChannel();//通过通道创建队列channel.queueDeclare("queue1", false, false, false, null);//向队列中发送消息for (int i = 1; i <= 10; i++) {channel.basicPublish("", "queue1", null, ("Hello RabbitMQ!!!" + i).getBytes());}//断开连接channel.close();connection.close();} catch (Exception e) {e.printStackTrace();}}
}

消息消费者1

package com.example.work;import com.example.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;public class Consumer1 {public static void main(String[] args) {try {Connection connection = ConnectionUtil.getConnection();//获取通道对象Channel channel = connection.createChannel();//监听队列中的消息channel.basicConsume("queue1", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者1获得消息为:" + new String(body, "utf-8"));}});//消费方不需要关闭连接,保持一直监听队列状态// channel.close();// connection.close();} catch (Exception e) {e.printStackTrace();}}
}

消息消费者2

package com.example.work;import com.example.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;public class Consumer2 {public static void main(String[] args) {try {Connection connection = ConnectionUtil.getConnection();//获取通道对象Channel channel = connection.createChannel();//监听队列中的消息channel.basicConsume("queue1", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者2获得消息为:" + new String(body, "utf-8"));}});//消费方不需要关闭连接,保持一直监听队列状态// channel.close();// connection.close();} catch (Exception e) {e.printStackTrace();}}
}
消费者1获得消息为:Hello RabbitMQ!!!1
消费者1获得消息为:Hello RabbitMQ!!!3
消费者1获得消息为:Hello RabbitMQ!!!5
消费者1获得消息为:Hello RabbitMQ!!!7
消费者1获得消息为:Hello RabbitMQ!!!9
消费者2获得消息为:Hello RabbitMQ!!!2
消费者2获得消息为:Hello RabbitMQ!!!4
消费者2获得消息为:Hello RabbitMQ!!!6
消费者2获得消息为:Hello RabbitMQ!!!8
消费者2获得消息为:Hello RabbitMQ!!!10

3、publish/subscribe发布订阅(共享资源)

在这里插入图片描述

1)、X代表交换机,rabbitMQ 内部组件,erlang 消息产生者是代码完成,代码的执行效率不高,消息产生者将消

息放入交换机,交换机发布订阅把消息发送到所有消息队列中,对应消息队列的消费者拿到消息进行消费。

Exchange 有常见以下 3 种类型:

  • Fanout:广播,将消息交给所有绑定到交换机的队列。

  • Direct:定向,把消息交给符合指定 routing key 的队列。

  • Topic:通配符,把消息交给符合 routing pattern (路由模式)的队列。

Exchange (交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者

没有符合路由规则的队列,那么消息会丢失。

2)相关场景:邮件群发,群聊天,广播(广告)。

3)、代码实现:

消息提供者

package com.example.publishsubscribe;import com.example.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;// 交换机和队列可以在提供方和消费方某一方创建,在两边同时创建也可以,只要创建的名称一致。
// 保证,哪一方先运行则在哪一方创建
public class Provider {public static void main(String[] args) {try {//获取连接对象Connection connection = ConnectionUtil.getConnection();//获取通道对象Channel channel = connection.createChannel();//创建交换机(交换机没有存储数据的能力,数据存储在队列上,如果有交换机没队列的情况下,数据会丢失)// 1.参数一:交换机名称    参数二:交换机类型channel.exchangeDeclare("fanout_exchange", "fanout");//通过通道创建队列//channel.queueDeclare("queue1",false,false,false,null);//向队列中发送消息for (int i = 1; i <= 10; i++) {channel.basicPublish("fanout_exchange", "", null, ("Hello RabbitMQ!!!" + i).getBytes());}//断开连接channel.close();connection.close();} catch (Exception e) {e.printStackTrace();}}
}

消息消费者1

package com.example.publishsubscribe;import com.example.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;public class Consumer1 {public static void main(String[] args) {try {Connection connection = ConnectionUtil.getConnection();//获取通道对象Channel channel = connection.createChannel();//创建队列channel.queueDeclare("fanout_queue1", false, false, false, null);//给队列绑定交换机channel.queueBind("fanout_queue1", "fanout_exchange", "");//监听队列中的消息channel.basicConsume("fanout_queue1", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者1获得消息为:" + new String(body, "utf-8"));}});//消费方不需要关闭连接,保持一直监听队列状态// channel.close();//connection.close();} catch (Exception e) {e.printStackTrace();}}
}

消息消费者2

package com.example.publishsubscribe;import com.example.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;public class Consumer2 {public static void main(String[] args) {try {Connection connection = ConnectionUtil.getConnection();//获取通道对象Channel channel = connection.createChannel();//创建队列channel.queueDeclare("fanout_queue2", false, false, false, null);//给队列绑定交换机channel.queueBind("fanout_queue2", "fanout_exchange", "");//监听队列中的消息channel.basicConsume("fanout_queue2", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者2获得消息为:" + new String(body, "utf-8"));}});//消费方不需要关闭连接,保持一直监听队列状态// channel.close();//connection.close();} catch (Exception e) {e.printStackTrace();}}
}
消费者1获得消息为:Hello RabbitMQ!!!1
消费者1获得消息为:Hello RabbitMQ!!!2
消费者1获得消息为:Hello RabbitMQ!!!3
消费者1获得消息为:Hello RabbitMQ!!!4
消费者1获得消息为:Hello RabbitMQ!!!5
消费者1获得消息为:Hello RabbitMQ!!!6
消费者1获得消息为:Hello RabbitMQ!!!7
消费者1获得消息为:Hello RabbitMQ!!!8
消费者1获得消息为:Hello RabbitMQ!!!9
消费者1获得消息为:Hello RabbitMQ!!!10
消费者2获得消息为:Hello RabbitMQ!!!1
消费者2获得消息为:Hello RabbitMQ!!!2
消费者2获得消息为:Hello RabbitMQ!!!3
消费者2获得消息为:Hello RabbitMQ!!!4
消费者2获得消息为:Hello RabbitMQ!!!5
消费者2获得消息为:Hello RabbitMQ!!!6
消费者2获得消息为:Hello RabbitMQ!!!7
消费者2获得消息为:Hello RabbitMQ!!!8
消费者2获得消息为:Hello RabbitMQ!!!9
消费者2获得消息为:Hello RabbitMQ!!!10

4、routing路由模式

在这里插入图片描述

1)、消息生产者将消息发送给交换机按照路由判断,路由是字符串,当前产生的消息携带路由字符,交换机根据路

由的 key,只能匹配上路由 key 对应的消息队列,对应的消费者才能消费消息。队列与交换机的绑定,不能是任意

绑定了,而是要指定一个 RoutingKey (路由 key)。消息的发送方在向 Exchange 发送消息时,也必须指定消息的

RoutingKey 。Exchange 不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列

的 Routingkey 与消息的 Routing key 完全一致,才会接收到消息。

2)、根据业务功能定义路由字符串。

3)、从系统的代码逻辑中获取对应的功能字符串,将消息任务扔到对应的队列中。

4)、业务场景:error 通知;EXCEPTION;错误通知的功能;传统意义的错误通知;客户通知;利用key路由,可

以将程序中的错误封装成消息传入到消息队列中,开发者可以自定义消费者,实时接收错误。

5)、代码实现:

消息提供者

package com.example.souting;import com.example.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;// 交换机和队列可以在提供方和消费方某一方创建,在两边同时创建也可以,只要创建的名称一致。
// 保证,哪一方先运行则在哪一方创建public class Provider {public static void main(String[] args) {try {//获取连接对象Connection connection = ConnectionUtil.getConnection();//获取通道对象Channel channel = connection.createChannel();//创建交换机(交换机没有存储数据的能力,数据存储在队列上,如果有交换机没队列的情况下,数据会丢失)// 1.参数一:交换机名称    参数二:交换机类型channel.exchangeDeclare("direct_exchange", "direct");//向队列中发送消息for (int i = 1; i <= 10; i++) {channel.basicPublish("direct_exchange",//设置路由键,符合路由键的队列,才能拿到消息"insert",null,("Hello RabbitMQ!!!" + i).getBytes());}//断开连接channel.close();connection.close();} catch (Exception e) {e.printStackTrace();}}
}

消息消费者1

package com.example.souting;import com.example.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;public class Consumer1 {public static void main(String[] args) {try {Connection connection = ConnectionUtil.getConnection();//获取通道对象Channel channel = connection.createChannel();//创建队列channel.queueDeclare("direct_queue1", false, false, false, null);//绑定交换机(routingKey:路由键)channel.queueBind("direct_queue1", "direct_exchange", "select");channel.queueBind("direct_queue1", "direct_exchange", "insert");//监听队列中的消息channel.basicConsume("direct_queue1", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者1获得消息为:" + new String(body, "utf-8"));}});//消费方不需要关闭连接,保持一直监听队列状态// channel.close();//connection.close();} catch (Exception e) {e.printStackTrace();}}
}

消息消费者2

package com.example.souting;import com.example.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;public class Consumer2 {public static void main(String[] args) {try {Connection connection = ConnectionUtil.getConnection();//获取通道对象Channel channel = connection.createChannel();//创建队列channel.queueDeclare("direct_queue2", false, false, false, null);//绑定交换机(routingKey:路由键)channel.queueBind("direct_queue2", "direct_exchange", "delete");channel.queueBind("direct_queue2", "direct_exchange", "select");//监听队列中的消息channel.basicConsume("direct_queue2", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者2获得消息为:" + new String(body, "utf-8"));}});//消费方不需要关闭连接,保持一直监听队列状态// channel.close();//connection.close();} catch (Exception e) {e.printStackTrace();}}
}
消费者1获得消息为:Hello RabbitMQ!!!1
消费者1获得消息为:Hello RabbitMQ!!!2
消费者1获得消息为:Hello RabbitMQ!!!3
消费者1获得消息为:Hello RabbitMQ!!!4
消费者1获得消息为:Hello RabbitMQ!!!5
消费者1获得消息为:Hello RabbitMQ!!!6
消费者1获得消息为:Hello RabbitMQ!!!7
消费者1获得消息为:Hello RabbitMQ!!!8
消费者1获得消息为:Hello RabbitMQ!!!9
消费者1获得消息为:Hello RabbitMQ!!!10

5、topic 主题模式(路由模式的一种)

在这里插入图片描述

1)、Topic 类型与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型

Exchange 可以让队列在绑定 Routing key 的时候使用通配符。

2)、Routingkey 一般都是有一个或多个单词组成,多个单词之间以 . 分割,例如:item.insert。

通配符规则:

# :匹配一个或多个词

*:匹配不多不少恰好1个词

举例:

item.# :能够匹配item.insert.abc或者item.insert

item.* :只能匹配 item.insert

usa.# ,因此凡是以 usa. 开头的 routing key 都会被匹配到

#.news ,因此凡是以 .news 结尾的 routing key 都会被匹配

3)、路由功能添加模糊匹配。

4)、消息产生者产生消息,把消息交给交换机。

5)、交换机根据key的规则模糊匹配到对应的队列,由队列的监听消费者接收消息消费。

6)、代码实现:

消息提供者

package com.example.topic;import com.example.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;//交换机和队列可以在提供方和消费方某一方创建,在两边同时创建也可以,只要创建的名称一致。
// 保证,哪一方先运行则在哪一方创建public class Provider {public static void main(String[] args) {try {//获取连接对象Connection connection = ConnectionUtil.getConnection();//获取通道对象Channel channel = connection.createChannel();//创建交换机(交换机没有存储数据的能力,数据存储在队列上,如果有交换机没队列的情况下,数据会丢失)   //1.参数一:交换机名称    参数二:交换机类型channel.exchangeDeclare("topic_exchange", "topic");//向队列中发送消息for (int i = 1; i <= 10; i++) {channel.basicPublish("topic_exchange",// #:匹配0-n个单词(之间以.区分,两点之间算一个单词,可以匹配hello world空格的情况)   *(匹配一个单词)"emp.hello world",null,("Hello RabbitMQ!!!" + i).getBytes());}//断开连接channel.close();connection.close();} catch (Exception e) {e.printStackTrace();}}
}

消息消费者1

package com.example.topic;import com.example.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;public class Consumer1 {public static void main(String[] args) {try {Connection connection = ConnectionUtil.getConnection();//获取通道对象Channel channel = connection.createChannel();//创建队列channel.queueDeclare("topic_queue1", false, false, false, null);//绑定交换机(routingKey:路由键)  #:匹配0-n个单词(之间以.区分,两点之间算一个单词)channel.queueBind("topic_queue1", "topic_exchange", "emp.#");//监听队列中的消息channel.basicConsume("topic_queue1", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者1获得消息为:" + new String(body, "utf-8"));}});//消费方不需要关闭连接,保持一直监听队列状态// channel.close();//connection.close();} catch (Exception e) {e.printStackTrace();}}
}

消息消费者2

package com.example.topic;import com.example.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;public class Consumer2 {public static void main(String[] args) {try {Connection connection = ConnectionUtil.getConnection();//获取通道对象Channel channel = connection.createChannel();//创建队列channel.queueDeclare("topic_queue2", false, false, false, null);//绑定交换机(routingKey:路由键)  *:匹配1个单词(之间以.区分,两点之间算一个单词)channel.queueBind("topic_queue2", "topic_exchange", "emp.*");//监听队列中的消息channel.basicConsume("topic_queue2", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者2获得消息为:" + new String(body, "utf-8"));}});//消费方不需要关闭连接,保持一直监听队列状态// channel.close();//connection.close();} catch (Exception e) {e.printStackTrace();}}
}
消费者1获得消息为:Hello RabbitMQ!!!1
消费者1获得消息为:Hello RabbitMQ!!!2
消费者1获得消息为:Hello RabbitMQ!!!3
消费者1获得消息为:Hello RabbitMQ!!!4
消费者1获得消息为:Hello RabbitMQ!!!5
消费者1获得消息为:Hello RabbitMQ!!!6
消费者1获得消息为:Hello RabbitMQ!!!7
消费者1获得消息为:Hello RabbitMQ!!!8
消费者1获得消息为:Hello RabbitMQ!!!9
消费者1获得消息为:Hello RabbitMQ!!!10

6、RPC

在这里插入图片描述

RPC即客户端远程调用服务端的方法 ,使用MQ可以实现RPC的异步调用,基于Direct交换机实现,流程如下:

1)、客户端即是生产者也是消费者,向RPC请求队列发送RPC调用消息,同时监听RPC响应队列。

2)、服务端监听RPC请求队列的消息,收到消息后执行服务端的方法,得到方法返回的结果。

3)、服务端将RPC方法 的结果发送到RPC响应队列。

4)、客户端(RPC调用方)监听RPC响应队列,接收到RPC调用结果。

5)、代码实现:

Client端

package com.example.rpc;import com.example.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;public class Client {public static void main(String[] argv) throws IOException, InterruptedException {String message = "Hello World!!!";// 建立一个连接和一个通道,并为回调声明一个唯一的回调队列Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 定义一个临时变量的接受队列名String replyQueueName = channel.queueDeclare().getQueue();// 生成一个唯一的字符串作为回调队列的编号String corrId = UUID.randomUUID().toString();// 发送请求消息,消息使用了两个属性:replyTo和correlationId// 服务端根据replyTo返回结果,客户端根据correlationId判断响应是不是给自己的AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName).build();// 发布一个消息,rpc_queue路由规则channel.basicPublish("", "rpc_queue", props, message.getBytes("UTF-8"));// 由于我们的消费者交易处理是在单独的线程中进行的,因此我们需要在响应到达之前暂停主线程。// 这里我们创建的容量为1的阻塞队列ArrayBlockingQueue,因为我们只需要等待一个响应。final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);// String basicConsume(String queue, boolean autoAck, Consumer callback)channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {//检查它的correlationId是否是我们所要找的那个if (properties.getCorrelationId().equals(corrId)) {//如果是,则响应BlockingQueueresponse.offer(new String(body, "UTF-8"));}}});System.out.println(" 客户端请求的结果:" + response.take());}
}

Server端

package com.example.rpc;import com.example.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;public class Server {public static void main(String[] args) {Connection connection = null;try {connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();channel.queueDeclare("rpc_queue", false, false, false, null);channel.basicQos(1);System.out.println("Awaiting RPC requests:");Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder().correlationId(properties.getCorrelationId()).build();String response = "";try {response = new String(body, "UTF-8");System.out.println("response (" + response + ")");} catch (RuntimeException e) {System.out.println("错误信息 " + e.toString());} finally {// 返回处理结果队列channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));// 确认消息,已经收到后面参数 multiple:是否批量.true:将一次性确认所有小于envelope.getDeliveryTag()的消息。channel.basicAck(envelope.getDeliveryTag(), false);// RabbitMq consumer worker thread notifies the RPC// server owner threadsynchronized (this) {this.notify();}}}};// 取消自动确认boolean autoAck = false;channel.basicConsume("rpc_queue", autoAck, consumer);// Wait and be prepared to consume the message from RPC client.while (true) {synchronized (consumer) {try {consumer.wait();} catch (InterruptedException e) {e.printStackTrace();}}}} catch (Exception e) {e.printStackTrace();} finally {try {connection.close();} catch (IOException e) {e.printStackTrace();}}}
}
Awaiting RPC requests:
response (Hello World!!!)
response (Hello World!!!)
response (Hello World!!!)# 客戶端发起3次请求
客户端请求的结果:Hello World!!!
客户端请求的结果:Hello World!!!
客户端请求的结果:Hello World!!!

7、发布订阅模式与工作队列模式的区别

1、工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机。

2、发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使

用默认交换机)。

3、发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将

队列绑定到默认的交换机 。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_152434.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

《MySQL高级篇》十五、其他数据库日志

文章目录 1. MySQL支持的日志1.1 日志类型1.2 日志的弊端 2. 慢查询日志(slow query log)3. 通用查询日志3.1 问题场景3.2 查看当前状态3.3 启动日志3.4 查看日志3.5 停止日志3.6 删除\刷新日志 4. 错误日志(error log)4.1 启动日志4.2 查看日志4.3 删除\刷新日志4.4 MySQL8.0新…

C语言多级指针

#include "stdio.h" #include <stdlib.h>int main() {int a 10;//*p int a int *pint* p &a;int** q &p;//int** q int *(*q) int *(q) a//int**q int*(*q) int*(&a) int*&a aint*** k &q;//分析&#xff1a;首先k是个变量&…

STM32--综述

文章目录 前言STM32简介STM32F103C8T6系统结构Keil软件安装注意事项新建工程操作流程 前言 本专栏将学习B站江协科技的STM32入门教程&#xff0c;通过自身理解和对老师的总结所写的博客专栏。 STM32简介 STM32是意法半导体&#xff08;STMicroelectronics&#xff09;公司推…

【计算机视觉 | 图像分割】arxiv 计算机视觉关于图像分割的学术速递(8 月 4 日论文合集)

文章目录 一、分割|语义相关(6篇)1.1 Point2Mask: Point-supervised Panoptic Segmentation via Optimal Transport1.2 Weakly Supervised 3D Instance Segmentation without Instance-level Annotations1.3 LiDAR-Camera Panoptic Segmentation via Geometry-Consistent and S…

ACL访问控制列表

ACL介绍 acl: 访问控制列表 步骤&#xff1a; 创建一个访问控制规则调用这个规则 ACL的分类和标识 ACL的匹配顺序以及匹配结果 拓扑图 配置 # 首先通过三层交换的实验做一次 ....## 检测ip地址 display ip interface brief## 在交换机2上做配置 [S2]acl name test ?IN…

【神经网络手写数字识别-最全源码(pytorch)】

Torch安装的方法 学习方法 1.边用边学&#xff0c;torch只是一个工具&#xff0c;真正用&#xff0c;查的过程才是学习的过程2.直接就上案例就行&#xff0c;先来跑&#xff0c;遇到什么来解决什么 Mnist分类任务&#xff1a; 网络基本构建与训练方法&#xff0c;常用函数解析…

【Linux命令详解 | cd命令】Linux系统中用于更改当前工作目录的命令

文章标题 简介一&#xff0c;参数列表二&#xff0c;使用介绍1. 使用cd命令切换到特定目录2. 使用cd命令与路径相关的特殊字符3. 使用cd命令切换到包含空格的目录4. 使用cd命令切换到前一个和后一个目录5. 使用cd命令切换到用户的主目录6. 使用cd命令与绝对路径和相对路径 总结…

【项目流程】前端项目的开发流程

1. 项目中涉及的所有角色及其职责 - PM 产品经理 产品经理&#xff08;Product Manager&#xff0c;简称PM&#xff09;负责明确和定义产品的愿景和战略&#xff0c;与客户、用户、业务部门和其他利益相关者进行沟通&#xff0c;收集并分析他们的需求和期望。负责制定产品的详…

TCP三次握手,四次挥手理解

1. 三次握手 *三次握手&#xff08;Three-way Handshake&#xff09;*其实就是指建立一个TCP连接时&#xff0c;需要客户端和服务器总共发送3个包。进行三次握手的主要作用就是为了确认双方的接收能力和发送能力是否正常、指定自己的初始化序列号为后面的可靠性传送做准备。实…

前端学习---vue2--选项/数据--data-computed-watch-methods-props

写在前面&#xff1a; vue提供了很多数据相关的。 文章目录 data 动态绑定介绍使用使用数据 computed 计算属性介绍基础使用计算属性缓存 vs 方法完整使用 watch 监听属性介绍使用 methodspropspropsData data 动态绑定 介绍 简单的说就是进行双向绑定的区域。 vue实例的数…

MPU6050

偏航角&#xff08;Yaw&#xff09; 横滚角&#xff08;ROll&#xff09; 俯仰角&#xff08;Pit&#xff09; 误差 mpu6050里面有一个受力的东西 受重力影响的电容 某个导体就往下一点 根据fma就可以算出当前的加速度值 加速度传感器只输出加速度 知道重力加速度和重力的角度可…

C++入门之stl六大组件--List源码深度剖析及模拟实现

文章目录 前言 一、List源码阅读 二、List常用接口模拟实现 1.定义一个list节点 2.实现一个迭代器 2.2const迭代器 3.定义一个链表&#xff0c;以及实现链表的常用接口 三、List和Vector 总结 前言 本文中出现的模拟实现经过本地vs测试无误&#xff0c;文件已上传gite…

java: 非法字符: ‘\ufeff‘

遇到这种情况是编码转换问题 解决办法&#xff1a; 单个文件&#xff1a;可以先将格式转换为utf-16&#xff0c;然后在转换回utf-8 多个文件&#xff1a;在setting-file encodings将乱码的这个文件夹里的所有Java文件都设置utf-8格式就可以了

小成本大幅度增幅CNN鲁棒性,完美的结合GLCM+CNN

本文以实验为导向&#xff0c;使用vgg16GLCM实现一场精彩的新冠肺炎的分类识别&#xff0c;并且对比不加GLCM后的效果。在这之前&#xff0c;我们需要弄明白一些前缀知识和概念问题&#xff1a; GLCM&#xff08;Gray-Level Co-occurrence Matrix&#xff09;&#xff0c;中文称…

比特鹏哥-数据类型和变量【自用笔记】

这里写目录标题 1.数据类型介绍字符&#xff0c;整型&#xff0c;浮点型&#xff0c;布尔类型 2.signed 和unsigned3.数据类型的取值范围sizeof 展示字节大小--- 计算机中单位&#xff1a;字节 4.变量 常量4.1 变量创建变量&#xff08;数据类型 变量名&#xff09;创建变量的时…

基于react-native的简单消息确认框showModel

基于react-native的简单消息确认框showModel 效果示例图组件代码ShowModel/index.jsx使用案例device.js安装线性渐变色 效果示例图 组件代码ShowModel/index.jsx import React, {forwardRef, useImperativeHandle, useState} from react; import {View,Text,Modal,TouchableOp…

2023,哪些大厂不再值钱?

2023年&#xff0c;摘下口罩的第一年&#xff0c;虽然经济复苏没那么强劲&#xff0c;但对于在资本寒冬中熬了许久的互联网科技股来说&#xff0c;春天的步伐好像越来越近了。今年以来&#xff0c;主要互联网科技公司的股价基本都涨了不少&#xff0c;尤其美国那边&#xff0c;…

ROS添加发布者和订阅者机制实现

一. ROS的节点和包 ✨Node&#xff1a; ROS的基本单位&#xff0c;实现某个功能的节点。比如实现超声波传感器就是一个节点&#xff0c;雷达传感器就可以是一个节点 ✨Package&#xff1a; 多个有联系的节点组成的单位&#xff0c;比如你要控制无人机姿态&#xff0c;可能需要…

【Linux命令详解 | pwd命令】Linux系统中用于显示当前工作目录的命令

文章标题 简介一&#xff0c;参数列表二&#xff0c;使用介绍1. pwd命令的基本使用2. pwd命令中的参数3. pwd命令的工作机制4. pwd命令的实际应用 总结 简介 pwd命令是Linux中的基础命令之一&#xff0c;使用该命令可以快速查看当前工作目录。在掌握Linux命令时&#xff0c;pw…

在Raspberry Pi 4上安装Ubuntu 20.04 + ROS noetic(不带显示器)

在Raspberry Pi 4上安装Ubuntu 20.04 ROS noetic&#xff08;不带显示器&#xff09; 1. 所需设备 所需设备&#xff1a; 树莓派 4 B 型 wifi microSD 卡&#xff1a;最小 32GB MicroSD 转 SD 适配器 &#xff08;可选&#xff09;显示器&#xff0c;鼠标等 2. 树莓派…