深入了解RabbitMQ:构建高效的消息队列系统(二)

news/2024/4/28 1:31:09/文章来源:https://blog.csdn.net/weixin_42506246/article/details/137100613

本系列文章简介:

        本系列文章将深入了解RabbitMQ工作原理、特性和最佳实践。我们将介绍如何安装和配置RabbitMQ服务器,以及如何开发生产者和消费者应用程序。我们还将探讨如何处理消息的持久化、消息路由和消息过滤。除此之外,我们将研究如何使用RabbitMQ进行负载均衡、集群和高可用性配置

        希望通过本系列文章能够帮助大家深入了解RabbitMQ,并能够应用它构建高效的消息队列系统。无论你是一个开发人员、系统管理员还是系统架构师,本系列文章都将为你提供宝贵的指导和实用的技术知识。

        欢迎大家订阅《Java技术栈高级攻略》专栏,一起学习,一起涨分!

目录

1、前言

2、RabbitMQ的使用

2.1 发布和消费消息

2.2 工作队列模式

2.3 发布/订阅模式

2.4 路由模式

2.5 主题模式

2.6 RPC模式

2.7 持久化和消息确认

3、结语


1、前言

        RabbitMQ是一种开源的消息代理软件,它实现了高效、可靠、可扩展的消息传递。它基于AMQP(Advanced Message Queuing Protocol)协议,可以用于在分布式系统中传递消息。RabbitMQ使用队列来存储消息,并通过发布(publish)和订阅(subscribe)模式来传递消息。生产者(producer)将消息发布到队列中,而消费者(consumer)则从队列中订阅并接收消息。

        本文将跟随《深入了解RabbitMQ:构建高效的消息队列系统(一)》的进度,继续介绍RabbitMQ。希望通过本系列文章的学习,您将能够更好地理解RabbitMQ的内部工作原理,掌握RabbitMQ的使用技巧,以及通过合理的设计完成最佳实践,充分发挥RabbitMQ的潜力,为系统的高效运行提供有力保障。

2、RabbitMQ的使用

2.1 发布和消费消息

在RabbitMQ中,发布消息和消费消息是使用不同的API进行操作的。

发布消息:

首先,需要建立与RabbitMQ服务器的连接,可以使用RabbitMQ的官方客户端库的Connection类来实现。连接的参数包括RabbitMQ服务器的地址、端口、用户名、密码等。创建连接后,可以通过Connection对象创建一个Channel对象,所有的消息发布和消费操作都是在Channel对象上进行的。

在RabbitMQ中,消息是通过交换机(Exchange)进行路由的。在发布消息之前,需要先声明一个交换机。RabbitMQ提供了几种常见的交换机类型,例如直连交换机(direct)、主题交换机(topic)、扇形交换机(fanout)等。根据业务需求选择合适的交换机类型,通过Channel对象的exchangeDeclare方法来声明交换机,并指定其名称、类型等参数。

然后,可以使用Channel对象的basicPublish方法来发布消息。方法的参数包括交换机名称、路由键(用于将消息发送到指定的队列)、消息的属性及消息体等。消息体可以是任意格式的数据,通常是一个字符串或者字节流。

消费消息:

首先,同样需要建立与RabbitMQ服务器的连接,并创建一个Channel对象用于消费消息。与发布消息不同的是,在消费消息之前,还需要声明一个队列。队列可以通过Channel对象的queueDeclare方法来声明,并指定其名称、是否为持久化队列等参数。

为了消费消息,需要将队列与交换机进行绑定。可以使用Channel对象的queueBind方法来进行绑定操作,指定队列名称、交换机名称、路由键等参数。

最后,可以通过Channel对象的basicConsume方法来消费消息。方法的参数包括队列名称、是否自动确认消息等参数。消费消息时,需要提供一个回调函数,当有消息到达时,会自动调用该函数进行处理。

以上就是使用RabbitMQ进行发布和消费消息的基本流程。在实际应用中,还可以通过设置消息的持久化、消息的优先级、消息的超时等参数来实现更多的功能。同时,也可以使用多个消费者来共同消费同一个队列中的消息,实现消息的并发处理。

下面是一个使用Java示例来演示RabbitMQ的发布和消费消息的过程:

发布消息:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class Publisher {private final static String QUEUE_NAME = "my_queue";public static void main(String[] args) throws Exception {// 创建连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");// 创建连接Connection connection = factory.newConnection();// 创建信道Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 发布消息String message = "Hello RabbitMQ!";channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println("消息已发布:" + message);// 关闭信道和连接channel.close();connection.close();}
}

消费消息:

import com.rabbitmq.client.*;import java.io.IOException;public class Consumer {private final static String QUEUE_NAME = "my_queue";public static void main(String[] args) throws Exception {// 创建连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");// 创建连接Connection connection = factory.newConnection();// 创建信道Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 创建消费者并设置消息处理com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println("收到消息:" + message);}};// 开始消费消息channel.basicConsume(QUEUE_NAME, true, consumer);// 等待用户输入,终止程序System.in.read();// 关闭信道和连接channel.close();connection.close();}
}

以上示例中,Publisher类负责发布消息,它首先创建一个连接工厂,然后创建连接和信道,声明一个队列,最后发布一条消息到队列中。

Consumer类负责消费消息,它也创建了连接工厂,连接和信道,然后声明队列。接着创建一个消费者并设置消息处理方法,最后开始消费消息。

2.2 工作队列模式

工作队列模式是RabbitMQ中最简单的消息模式之一。在工作队列模式中,一个生产者发送消息到一个队列,多个消费者从队列中接收并处理消息。每个消息只能被一个消费者处理。

在工作队列模式中,消息发送者(Producer)将消息发送到一个队列(Queue)中,然后多个消息接收者(Consumer)从队列中获取消息并进行处理。每个消息只能被一个消息接收者处理,这样可以保证消息的顺序性和可靠性。

下面是使用RabbitMQ实现工作队列模式的步骤:

  1. 创建一个消息队列(Queue):使用RabbitMQ的管理界面或者代码创建一个队列,消息发送者将消息发送到这个队列中。

  2. 创建消息发送者(Producer):编写一个程序来发送消息到队列中。可以使用RabbitMQ的客户端库来创建一个连接,然后将消息发送到队列中。

  3. 创建消息接收者(Consumer):编写一个程序来接收队列中的消息并进行处理。可以创建多个消息接收者来实现并发处理。

  4. 消息发送者发送消息:将消息发送到队列中。

  5. 消息接收者接收消息:从队列中获取消息并进行处理。

  6. 消息处理完成后,消息接收者可以发送一个确认消息给RabbitMQ,告诉它已经处理完这条消息。然后,RabbitMQ会从队列中删除这条消息。

  7. 可以使用RabbitMQ的监控工具来查看队列的状态和消息的处理情况。

工作队列模式的优点是可以实现任务的异步处理和负载均衡,多个消息接收者可以同时处理消息,提高系统的并发性能和可扩展性。同时,消息接收者可以按照自己的能力来调整处理消息的速度,避免消息的积压和系统资源的浪费。

总结来说,工作队列模式是一种简单而有效的消息队列模式,适用于需要异步处理任务的场景。使用RabbitMQ可以很方便地实现工作队列模式,并提供了可靠的消息传输和处理机制。

下面是一个使用工作队列模式的Java示例:

生产者(Producer):

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class Producer {private final static String QUEUE_NAME = "work_queue";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.queueDeclare(QUEUE_NAME, false, false, false, null);String message = "Hello RabbitMQ!";channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));System.out.println("Sent message: " + message);}}
}

消费者(Consumer):

import com.rabbitmq.client.*;public class Consumer {private final static String QUEUE_NAME = "work_queue";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);System.out.println("Waiting for messages...");int prefetchCount = 1;channel.basicQos(prefetchCount); // 设置每个消费者最多只能处理一个消息DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("Received message: " + message);try {doWork(message);} finally {System.out.println("Done processing message: " + message);channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}};boolean autoAck = false; // 关闭自动消息确认channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {});}private static void doWork(String message) {try {Thread.sleep(1000); // 模拟处理消息的耗时} catch (InterruptedException e) {Thread.currentThread().interrupt();}}
}

请确保在运行示例之前已安装并启动RabbitMQ服务器。

在上述示例中,生产者向名为"work_queue"的队列发送了一条消息,消费者通过监听该队列接收并处理消息。每个消费者一次只处理一个消息,处理完后再从队列中获取下一个消息。消费者通过调用channel.basicQos(prefetchCount)方法设置prefetchCount为1,从而确保每个消费者一次只处理一个消息,避免了负载不均衡的问题。

在消费者处理完消息后,调用channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false)方法进行消息确认,告诉RabbitMQ该消息已经处理完毕。这样,RabbitMQ就可以安全地将该消息从队列中删除。

2.3 发布/订阅模式

发布/订阅模式是一种经典的消息传递模式,在该模式中,消息发送者将消息发布到交换机上,而消息的接收者通过创建绑定到该交换机上的队列来订阅消息。所有向该交换机发送消息的消费者都会收到消息。

发布/订阅模式包括两个主要的角色:发布者和订阅者。发布者负责发送消息,而订阅者负责接收消息。

在RabbitMQ中,发布者将消息发送到交换机(exchange),交换机负责将消息广播到所有与之绑定的队列(queue)中。每个订阅者创建一个队列,并将其绑定到交换机上,这样就可以接收到交换机发布的消息。

下面是使用RabbitMQ实现发布/订阅模式的步骤:

  1. 创建一个连接到RabbitMQ服务器的连接
  2. 创建一个通道(channel),用于发送和接收消息
  3. 创建一个交换机(exchange),用于发布消息
  4. 创建一个队列(queue),并将其绑定到交换机上
  5. 订阅者通过监听队列来接收消息
  6. 发布者发送消息到交换机上,交换机将消息广播到所有订阅者的队列中

下面是一个使用 RabbitMQ 的发布/订阅模式的 Java 示例代码:

生产者代码:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class Publisher {private static final String EXCHANGE_NAME = "logs";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, "fanout");String message = "Hello World!";channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");channel.close();connection.close();}
}

消费者代码:

import com.rabbitmq.client.*;import java.io.IOException;public class Consumer {private static final String EXCHANGE_NAME = "logs";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, "fanout");String queueName = channel.queueDeclare().getQueue();channel.queueBind(queueName, EXCHANGE_NAME, "");System.out.println(" [*] Waiting for messages. To exit press CTRL+C");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });}
}

在上述代码中,生产者代码创建了一个名为 "logs" 的交换机,并将消息发送到该交换机上。消费者代码创建了一个与该交换机绑定的队列,并通过 basicConsume 方法监听该队列的消息。

以上示例实现了一个简单的发布/订阅模式,通过运行生产者和消费者的代码,可以在控制台看到消息的发送和接收过程。

2.4 路由模式

路由模式是RabbitMQ中的一种消息传递模式,它通过在消息中指定路由键来选择性地将消息发送到指定的队列中。路由模式主要用于在多个消费者之间进行选择性地接收消息。

在路由模式中,需要使用交换机(Exchange)来进行消息的路由。交换机根据路由键(Routing Key)将消息发送到对应的队列中。在Java中使用RabbitMQ的AMQP客户端库可以实现路由模式。

使用路由模式时,有以下几个主要的组件和概念:

  1. 交换机(Exchange):消息的发送者将消息发送到交换机,交换机根据消息的路由键将消息发送给相应的队列。路由模式中常用的交换机类型有直连交换机(direct)、主题交换机(topic)等。

  2. 绑定(Binding):绑定是交换机和队列之间的关系,它定义了哪些队列可以接收交换机发送的消息。绑定还可以附带一个路由键,用于指定接收特定消息的队列。

  3. 路由键(Routing Key):消息的发送者在发送消息时,可以指定一个路由键,交换机根据这个路由键将消息发送给相应的队列。

使用路由模式时,通常的流程如下:

  1. 创建交换机和队列,并使用绑定将它们关联起来。

  2. 消息的发送者通过指定路由键将消息发送到交换机。

  3. 交换机根据消息的路由键将消息发送给相应的队列。

  4. 队列中的消费者接收消息并进行处理。

使用路由模式可以实现灵活的消息路由和选择性消费,可以根据不同的路由键将消息发送给不同的队列,从而实现不同的消费逻辑。

在RabbitMQ中使用路由模式时,需要注意以下几点:

  1. 交换机和队列需要事先创建,并通过绑定将它们关联起来。

  2. 消息的发送者需要指定路由键,以便交换机将消息发送给相应的队列。

  3. 消费者需要监听相应的队列,并接收交换机发送的消息。

  4. 路由模式中的交换机类型和绑定的路由键需要根据实际需求进行选择和配置。

总结来说,路由模式是一种消息传递的模式,可以根据消息的路由键将消息发送给特定的消费者。使用路由模式时,需要创建交换机、队列和绑定,并指定路由键和监听队列,从而实现灵活的消息路由和选择性消费。

以下是一个简单的Java示例,演示了如何使用路由模式发送和接收消息:

  1. 发送端示例:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class Sender {private static final String EXCHANGE_NAME = "direct_exchange";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 声明交换机,并设置类型为directchannel.exchangeDeclare(EXCHANGE_NAME, "direct");// 设置发送的路由键String routingKey = "info";String message = "Hello, RabbitMQ!";// 发送消息到交换机,并指定路由键channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));System.out.println("Sent message: " + message);channel.close();connection.close();}
}

        2. 接收端示例:

import com.rabbitmq.client.*;public class Receiver {private static final String EXCHANGE_NAME = "direct_exchange";private static final String QUEUE_NAME = "direct_queue";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 声明交换机,并设置类型为directchannel.exchangeDeclare(EXCHANGE_NAME, "direct");// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机,并指定路由键String routingKey = "info";channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, routingKey);System.out.println("Waiting for messages...");// 创建消费者并设置回调函数,用于接收消息Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println("Received message: " + message);}};// 消费消息channel.basicConsume(QUEUE_NAME, true, consumer);}
}

在上述示例中,发送端将消息发送到名为"direct_exchange"的交换机,并指定"info"作为路由键。接收端将队列"direct_queue"绑定到该交换机,并指定接收路由键为"info"的消息。最后,接收端通过回调函数处理接收到的消息。

运行发送端和接收端的代码,就可以实现路由模式中的消息发送和接收。

2.5 主题模式

RabbitMQ是一个消息队列中间件,它支持多种消息模式,包括发布/订阅模式、点对点模式和主题模式。

在主题模式中,消息发送者将消息发送到特定的主题(Topic),而接收者可以选择订阅哪些主题的消息。主题由一个或多个单词组成,用“.”分割。接收者可以使用通配符来匹配特定的主题。

下面是一个使用Java实现的主题模式示例:

发送者:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;public class TopicSender {private static final String EXCHANGE_NAME = "topic_exchange";public static void main(String[] args) throws IOException, TimeoutException {// 创建连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");// 创建连接Connection connection = factory.newConnection();// 创建信道Channel channel = connection.createChannel();// 声明交换机channel.exchangeDeclare(EXCHANGE_NAME, "topic");// 发送消息String routingKey = "com.rabbitmq.example.topic.key1";String message = "Hello RabbitMQ!";channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());System.out.println("Sent message: " + message);// 关闭连接和信道channel.close();connection.close();}
}

接收者:

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;public class TopicReceiver {private static final String EXCHANGE_NAME = "topic_exchange";public static void main(String[] args) throws IOException, TimeoutException {// 创建连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");// 创建连接Connection connection = factory.newConnection();// 创建信道Channel channel = connection.createChannel();// 声明交换机channel.exchangeDeclare(EXCHANGE_NAME, "topic");// 创建一个随机的队列,用于接收消息String queueName = channel.queueDeclare().getQueue();// 设置接收者只接收指定的主题消息String[] routingKeys = {"com.rabbitmq.example.topic.key1", "com.rabbitmq.#"};for (String routingKey : routingKeys) {channel.queueBind(queueName, EXCHANGE_NAME, routingKey);}System.out.println("Waiting for messages...");// 创建消息处理器DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("Received message: " + message);};// 开始接收消息channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});}
}

在上述示例中,发送者将消息发送到名为“topic_exchange”的主题交换机,接收者创建一个队列并通过绑定指定的主题来接收消息。发送者发送的消息会被接收者接收并打印出来。

注意,需要提前安装RabbitMQ并启动RabbitMQ服务器。

2.6 RPC模式

RPC(Remote Procedure Call)远程过程调用是一种通信机制,用于在分布式系统中调用远程服务。RabbitMQ可以用于实现RPC模式,其中客户端发送请求消息给RabbitMQ服务器,服务器处理请求并返回响应消息给客户端。

在RPC模式中,通常有一个客户端应用程序和一个服务器应用程序。客户端应用程序发送请求消息给服务器应用程序,并等待服务器应用程序返回结果。服务器应用程序接收到请求消息后,执行相应的处理逻辑,并将结果返回给客户端应用程序。

在RabbitMQ中实现RPC模式,需要使用到两个队列,一个用于发送请求消息,一个用于接收返回结果。客户端应用程序创建一个唯一的回调队列,并将其作为reply_to属性附加在请求消息中,并设置correlation_id属性。服务器应用程序收到请求消息后,执行处理逻辑,并将结果发送到客户端指定的回调队列,并设置correlation_id属性与请求消息相同。

以下是使用RabbitMQ实现RPC模式的几个步骤:

  1. 客户端应用程序:

    • 创建一个唯一的回调队列,并订阅该队列。
    • 发送请求消息到服务器的请求队列,并设置reply_to属性为回调队列,correlation_id属性为一个唯一的标识符。
    • 等待回调队列接收到结果消息,并获取相应的结果。
  2. 服务器应用程序:

    • 接收客户端发送的请求消息。
    • 执行处理逻辑,并得到结果。
    • 将结果发送到客户端指定的回调队列,并设置correlation_id属性与请求消息相同。

通过RPC模式,可以实现应用程序之间的远程调用,并获取结果。这种模式可以提高系统的灵活性和可扩展性,同时也可以实现请求和响应的解耦。

下面是一个使用Java语言实现RPC模式的示例:

  1. 定义一个接口,包含远程调用的方法
public interface HelloService {String sayHello(String name);
}

        2. 实现HelloService接口的服务端

public class HelloServiceImpl implements HelloService {@Overridepublic String sayHello(String name) {return "Hello, " + name + "!";}
}

        3. 编写RPC客户端

public class RpcClient {private final static String RPC_QUEUE_NAME = "rpc_queue";public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {String replyQueueName = channel.queueDeclare().getQueue();final String corrId = UUID.randomUUID().toString();// 发送请求消息AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName).build();String message = "Alice";channel.basicPublish("", RPC_QUEUE_NAME, props, message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + message + "'");// 接收响应消息final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> {if (delivery.getProperties().getCorrelationId().equals(corrId)) {response.offer(new String(delivery.getBody(), "UTF-8"));}}, consumerTag -> {});String result = response.take();System.out.println(" [.] Got '" + result + "'");channel.basicCancel(ctag);}}
}

        4. 编写RPC服务端

public class RpcServer {private final static String RPC_QUEUE_NAME = "rpc_queue";public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);channel.queuePurge(RPC_QUEUE_NAME);channel.basicQos(1);System.out.println(" [x] Awaiting RPC requests");// 处理请求消息Object monitor = new Object();DeliverCallback deliverCallback = (consumerTag, delivery) -> {AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder().correlationId(delivery.getProperties().getCorrelationId()).build();String response = "";try {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [.] Received '" + message + "'");response += "Hello, " + message + "!";} catch (RuntimeException e) {System.out.println(" [.] " + e.toString());} finally {channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8"));channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);// 唤醒等待的线程synchronized (monitor) {monitor.notify();}}};channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, (consumerTag -> {}));// 等待处理请求的线程结束while (true) {synchronized (monitor) {try {monitor.wait();} catch (InterruptedException e) {e.printStackTrace();}}}}}
}

在这个示例中,客户端发送请求消息给名为"rpc_queue"的队列,服务器接收并处理请求消息,并将响应消息发送到客户端指定的回调队列中。

注意:在实际使用中,可能需要更复杂的异常处理、超时机制等。此示例仅展示了基本的RPC模式的实现。

2.7 持久化和消息确认

在RabbitMQ中,持久化和消息确认是保证消息不丢失的重要机制。持久化可以将消息存储到磁盘上,即使RabbitMQ服务重启或崩溃,消息也不会丢失。消息确认机制可以确保消息被成功接收和处理。

在RabbitMQ中,持久化和消息确认是保证消息的可靠性传输的两个重要机制。

持久化:在RabbitMQ中,消息默认是非持久化的,也就是说,一旦RabbitMQ服务器宕机或重启,那些未被消费的消息将会丢失。为了解决这个问题,我们可以将消息设置为持久化。这样,消息会被保存在磁盘上,在RabbitMQ服务器重启后可以被重新加载。我们可以通过设置消息的delivery mode为2来将消息设置为持久化:

channel.basicPublish(exchange, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

消息确认:消息确认机制是指生产者发送消息后,等待RabbitMQ服务器返回确认信息,以保证消息成功发送到Broker。消息确认可以分为两种模式:确认模式和事务模式。

  1. 确认模式:确认模式为生产者提供了一种异步确认消息是否发送成功的机制。使用确认模式时,生产者发送消息后,会立即继续发送下一条消息,而不需要等待RabbitMQ服务器的确认。当RabbitMQ服务器成功将消息写入到磁盘后,会返回一个确认信息给生产者。生产者可以通过设置channel的confirmSelect()方法开启确认模式,并通过添加ConfirmListener监听器来处理确认信息。
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {// 处理确认消息}@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {// 处理未确认消息}
});

        2. 事务模式:事务模式是一种更为保守的消息确认机制,生产者在发送消息后,需要等待RabbitMQ服务器的确认信息。只有在接收到确认信息后,生产者才能继续发送下一条消息。如果RabbitMQ服务器接收消息失败,生产者会将之前发送的所有消息回滚。这种模式会降低消息的吞吐量,因此一般情况下不推荐使用。

channel.txSelect();
// 发送消息
channel.txCommit();
// 提交事务

通过使用持久化和消息确认机制,我们可以确保消息在传输过程中的可靠性,并且避免消息丢失的风险。

下面是一个使用RabbitMQ的Java示例,演示了如何实现持久化和消息确认:

import com.rabbitmq.client.*;import java.io.IOException;
import java.nio.charset.StandardCharsets;public class RabbitMQExample {private final static String QUEUE_NAME = "test_queue";private final static String EXCHANGE_NAME = "test_exchange";private final static String ROUTING_KEY = "test_routing_key";private final static String MESSAGE = "Hello, RabbitMQ!";public static void main(String[] args) throws Exception {// 创建连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");// 创建连接Connection connection = factory.newConnection();// 创建信道Channel channel = connection.createChannel();// 声明交换机channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);// 声明队列channel.queueDeclare(QUEUE_NAME, true, false, false, null);// 绑定队列和交换机channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);// 创建消息属性AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().deliveryMode(2) // 持久化消息.build();// 发布消息channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, properties, MESSAGE.getBytes(StandardCharsets.UTF_8));// 消费消息channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, StandardCharsets.UTF_8);System.out.println("Received message: " + message);// 手动确认消息channel.basicAck(envelope.getDeliveryTag(), false);}});// 关闭连接// connection.close();}
}

这个示例中,我们首先创建了一个连接工厂,并设置连接的主机、端口、用户名和密码。然后创建连接和信道。

我们使用了一个直连交换机,并创建了一个持久化队列。然后绑定队列和交换机。

接下来,我们创建了一个消息属性对象,设置了消息的持久化属性。

然后通过调用basicPublish方法发布消息到交换机,指定了交换机、路由键和消息属性。

最后,我们创建了一个消费者对象,通过调用basicConsume方法开始消费消息。在消费者的回调函数中,我们手动确认了消息的接收和处理,通过调用basicAck方法。

请注意,在实际使用中,需要根据具体的需求,结合业务逻辑来进行配置和使用。

3、结语

        通过本文,我们对RabbitMQ这款强大的消息队列系统有了更深入的了解。RabbitMQ提供了许多有用的功能和灵活的配置选项,使得它成为构建高效的消息传递系统的首选工具之一。

        总结来说,RabbitMQ是一个功能强大且灵活的消息队列系统,它可以帮助我们构建高效的分布式系统。通过合理地使用RabbitMQ,我们可以实现可靠的消息传递,提高系统的可伸缩性和可靠性。我希望本文对你深入了解RabbitMQ有所帮助,能够在你的项目中发挥作用。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_1027247.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

北斗短报文+4G应急广播系统:实时监控 自动预警 保护校园安全的新力量

安全无小事&#xff0c;生命重如山。学生是祖国的未来&#xff0c;校园安全是全社会安全工作的一个重要的组成部分。它直接关系到青少年学生能否安健康地成长&#xff0c;关系到千千万万个家庭的幸福安宁和社会稳定。 灾害事故和突发事件频频发生&#xff0c;给学生、教职员工…

XSS学习(cookie远程登录演示)

1.HTTP特点&#xff1a; 1.请求应答模式。 2.灵活可扩展 3.可靠传输 4.无状态。 这里给大家举一个例子&#xff1a; HTTP是无状态的&#xff0c;所按理来说我每进行一次会话&#xff0c;比如我在CSDN发一个帖子&#xff0c;好像按理来以说我都要进行一次重新登陆&#xff0…

Vue 04 Vue 中的 Ajax、slot 插槽

Vue学习 Vue 0401 Vue中的Ajax服务器准备axios使用跨域问题解决Vue-CLI 配置代理1Vue-CLI 配置代理2案例: 用户搜索vue-resource 02 slot插槽默认插槽具名插槽作用域插槽slot总结 Vue 04 B站 Vue全家桶&#xff08;BV1Zy4y1K7SH&#xff09; 学习笔记 Vue 中的 ajax 01 Vue中的…

uniApp使用XR-Frame创建3D场景(8)粒子系统

上篇文章讲述了如何将XR-Frame作为子组件集成到uniApp中使用 本片我们详细讲解一下xr-frame的粒子系统 先看源码 <xr-scene render-system"alpha:true" bind:ready"handleReady"> <xr-node visible"{{sec8}}"><xr-asset-load t…

小程序利用WebService跟asp.net交互过程发现的问题并处理

最近在研究一个项目&#xff0c;用到asp.net跟小程序交互&#xff0c;简单的说就是小程序端利用wx.request发起请求。获取asp.net 响应回来的数据。但经常会报错。点击下图的测试按钮 出现如下错误&#xff1a; 百思不得其解&#xff0c;试了若干方法&#xff0c;都不行。 因为…

京东云搭建幻兽帕鲁Palworld多人游戏联机服务器教程,1分钟开服

使用京东云服务器搭建幻兽帕鲁Palworld游戏联机服务器教程&#xff0c;非常简单&#xff0c;京东云推出幻兽帕鲁镜像系统&#xff0c;镜像直接选择幻兽帕鲁镜像即可一键自动部署&#xff0c;不需要手动操作&#xff0c;真正的新手0基础部署幻兽帕鲁&#xff0c;阿腾云atengyun.…

Machine Learning机器学习之统计分析

目录 前言 机器学习之统计分析 统计学的主要目标包括&#xff1a; 统计学核心概念&#xff1a; 统计基础&#xff1a; 训练误差&#xff1a; 常见的损失函数&#xff1a; 正则化和交叉验证 博主介绍&#xff1a;✌专注于前后端、机器学习、人工智能应用领域开发的优质创作者、秉…

网络工程师之路由交换技术篇

网络工程师之路由交换技术篇 路由交换之技术篇ARPICMPBPDUIPv6IP编址MAC其他技术点参考 以下均为个人笔记&#xff0c;摘录到csdn做备份 路由交换之技术篇 ARP Operation Code指定了ARP报文的类型&#xff0c; 包括ARP request 和ARP reply&#xff1b;取值为1或者2 &#x…

uniapp输入框事件(防抖)

一、描述 在输入框输入内容或者说输入关键词的时候&#xff0c;往往都要进行做防抖处理。如果不做防抖&#xff0c;你输入什么&#xff0c;动态绑定的数据就会保持一致。这样不好吗&#xff0c;同步获取。有个业务场景&#xff0c;如果是搜索框&#xff0c;你每次一个字符&…

Java中读取html文件转成String,展示在浏览器

这里写目录标题 第一章1.1&#xff09;pom中引入依赖和html文件示例1.2&#xff09;使用hutool工具包读取html文件转为string1.3&#xff09;页面显示 第一章 1.1&#xff09;pom中引入依赖和html文件示例 引入hutool工具包依赖 <dependency><groupId>cn.hutool&…

【Linux】 gcc(linux下的编译器)程序的编译和链接详解

目录 前言&#xff1a;快速认识gcc 1. 程序的翻译环境和执行环境 2.编译和链接 2.1翻译环境 2.2编译环境 1. 预处理 gcc -E指令 test.c&#xff08;源文件&#xff09; -o test.i&#xff08;生成在一个文件中&#xff0c;可以自己指定&#xff09; 预处理完成之后就停下来&am…

贪心算法--最大数

个人主页&#xff1a;Lei宝啊 愿所有美好如期而遇 本题链接https://leetcode.cn/problems/largest-number/description/ class Solution { public:bool static compare(int a, int b){return (to_string(a) to_string(b)) > (to_string(b) to_string(a));}bool operato…

MySQL创建表:练习题

练习题&#xff1a; 创建一个名为"students"的数据库&#xff0c;并切换到该数据库。 在"students"数据库中创建一个名为"grades"的表&#xff0c;包含以下字段&#xff1a; id: 整数类型 name: 字符串类型&#xff0c;学生姓名 subject: 字符串…

最小可行产品需要最小可行架构——可持续架构(三)

前言 最小可行产品&#xff08;MVP&#xff09;的概念可以帮助团队专注于尽快交付他们认为对客户最有价值的东西&#xff0c;以便在投入大量时间和资源之前迅速、廉价地评估产品的市场规模。MVP不仅需要考虑产品的市场可行性&#xff0c;还需要考虑其技术可行性&#xff0c;以…

计算机专业学习单片机有什么意义吗?

玩单片机跟玩计算机区别还是很大的, 单片机有众多的种类,每一种又可能有很多个系列.可以说单片机就是为了专款专用而生的.这样来达到产品成本的降低,这就是现在身边的很多的电子产品价格一降再降的原因之一.在开始前我有一些资料&#xff0c;是我根据网友给的问题精心整理了一…

安装paddle detection心得

一、安装PaddlePaddle conda create -n mypaddle python3.8 conda activate mypaddle python -m pip install paddlepaddle-gpu2.6.0 -i https://mirror.baidu.com/pypi/simple 请确保您的PaddlePaddle安装成功并且版本不低于需求版本。使用以下命令进行验证。 这是CUDA1…

SpringBoot项目启动成功,但是调用接口直接报NOT FOUND 404

问题描述 SpringBoot项目启动成功&#xff0c;但是调用接口直接报NOT FOUND 404 解决办法 启动类中ComponentScan(basePackages {“com.afclab”})中的扫包路径和项目路径不一样&#xff0c;导致扫不到Controller等组件&#xff0c;修改成和项目路径一样就可以解决&#xf…

8、鸿蒙学习-HAR

HAR&#xff08;Harmony Archive&#xff09;是静态共享包&#xff0c;可以包含代码、C库、资源和配置文件。通过HAR可以实现多个模块或多个工程共享ArkUI组件、资源等相关代码。HAR不同于HAP&#xff0c;不能独立安装运行在设备上。只能作为应用模块的依赖项被引用。 一、创建…

206基于matlab的无人机航迹规划(UAV track plannin)

基于matlab的无人机航迹规划(UAV track plannin&#xff09;。输入输出参数包括 横滚、俯仰、航向角&#xff08;单位&#xff1a;度&#xff09;&#xff1b;横滚速率、俯仰速率、航向角速率&#xff08;单位&#xff1a;度/秒&#xff09;&#xff1b;飞机运动速度——X右翼、…

小美的平衡矩阵(前缀和例题)

2024美团秋招&#xff0c;被这一题给难住了 美团校招笔试真题_Java工程师、C工程师_牛客网 题目&#xff1a; 解答&#xff1a; 这道题的关键点就是要计算出以某一点为矩阵右下角时&#xff0c;1的个数 我一开始是想着遍历&#xff0c;以某一点为起点&#xff08;矩阵左上角&a…