目录
1、RabitMQ工作队列
2、交换机
3、RabbitMQ Fanout 发布订阅--- Fanout exchange(扇型交换机)
3.1、创建连接代码
3.1、生产者代码
3.2、消费者代码
4、Direct路由模式
4.1、生产者代码
4.2、消费者代码
5、Topic主题模式
5.1、生产者代码
5.2、消费者代码
1、RabitMQ工作队列
默认的传统队列是为均摊消费,存在不公平性;如果每个消费者速度不一样的情况下,均摊消费是不公平的,应该是能者多劳。
采用工作队列
在通道中只需要设置basicQos为1即可,表示MQ服务器每次只会给消费者推送1条消息必须手动ack确认之后才会继续发送。
2、交换机
Exchange:在RabbitMQ中,生产者发送消息不会直接将消息投递到队列中,而是先将消息投递到交换机中, 在由交换机转发到具体的队列, 队列再将消息以推送或者拉取方式给消费者进行消费。
生产者将消息发送到Exchange, 由Exchange再路由到一个或多个队列中:
/Virtual Hosts:区分不同的团队
交换机exchabge: 路由消息存放在那个队列中 类似于nginx
队列queue:队列 存放消息
路由key(RoutingKey): 分发规则,生产者将消息发送给交换机的时候, 会指定RoutingKey指定路由规则。
交换机种类:
Direct exchange(直连交换机)
Fanout exchange(扇型交换机)
Topic exchange(主题交换机)
Headers exchange(头交换机)
3、RabbitMQ Fanout 发布订阅--- Fanout exchange(扇型交换机)
生产者发送一条消息,经过交换机转发到多个不同的队列,多个不同的队列就多个不同的消费者。同一条消息,经路由器转发 c1和c2都能收到。
原理:
- 需要创建两个队列 ,每个队列对应一个消费者;
- 队列需要绑定我们交换机;
- 生产者投递消息到交换机中,交换机在将消息分配给两个队列中都存放起来;
- 消费者从队列中获取这个消息;
3.1、创建连接代码
导入依赖:
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>3.6.5</version></dependency>
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** 获取连接*/
public class RabbitMQConnection {/*** 创建连接** @return* @throws IOException* @throws TimeoutException*/public static Connection getConnection() throws IOException, TimeoutException {//1.创建connectionFactoryConnectionFactory connectionFactory = new ConnectionFactory();//2.配置HostconnectionFactory.setHost("127.0.0.1");//3.设置PortconnectionFactory.setPort(5672);//4.设置账户和密码connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");//5.设置VirtualHostconnectionFactory.setVirtualHost("/rkVirtualHost");return connectionFactory.newConnection();}
}
3.1、生产者代码
import com.mayikt.rabbitmq.RabbitMQConnection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;public class ProducerFanout {/*** 定义交换机的名称*/private static final String EXCHANGE_NAME = "fanout_exchange";public static void main(String[] args) throws IOException, TimeoutException {// 创建ConnectionConnection connection = RabbitMQConnection.getConnection();// 创建ChannelChannel channel = connection.createChannel();// 通道关联交换机 绑定交换机 将消息发送到该交换机channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true);String msg = "rk 学 rabbitmq";//发送消息channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());//关闭连接channel.close();connection.close();}
}
3.2、消费者代码
邮件消费者:
import com.mayikt.rabbitmq.RabbitMQConnection;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;public class MailConsumer {/*** 定义邮件队列*/private static final String QUEUE_NAME = "fanout_email_queue";/*** 定义交换机的名称*/private static final String EXCHANGE_NAME = "fanout_exchange";public static void main(String[] args) throws IOException, TimeoutException {System.out.println("邮件消费者...");// 创建我们的连接Connection connection = RabbitMQConnection.getConnection();// 创建我们通道final Channel channel = connection.createChannel();// 关联队列消费者关联队列 将交换机和队列绑定起来 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {//从队列中获取消息@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, "UTF-8");System.out.println("邮件消费者获取消息:" + msg);}};// 开始监听消息 自动签收channel.basicConsume(QUEUE_NAME, true, defaultConsumer);}
}
短信消费者:
和邮件消费者代码一样,唯一不同的就是 QUEUE_NAME 变成了fanout_sms_queue
4、Direct路由模式
当交换机类型为direct类型时,根据队列绑定的路由建转发到具体的队列中存放消息。
4.1、生产者代码
import com.mayikt.rabbitmq.RabbitMQConnection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;public class ProducerDirect {/*** 定义交换机的名称*/private static final String EXCHANGE_NAME = "direct_exchange";public static void main(String[] args) throws IOException, TimeoutException {// 创建ConnectionConnection connection = RabbitMQConnection.getConnection();// 创建ChannelChannel channel = connection.createChannel();// 通道关联交换机channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);String msg = "rk 6666";//投递的消息的 路由键为 email 所以只有邮件消费者才能收到channel.basicPublish(EXCHANGE_NAME, "email", null, msg.getBytes());channel.close();connection.close();}}
4.2、消费者代码
邮件消费者:
public class MailConsumer {/*** 定义邮件队列*/private static final String QUEUE_NAME = "direct_email_queue";/*** 定义交换机的名称*/private static final String EXCHANGE_NAME = "direct_exchange";public static void main(String[] args) throws IOException, TimeoutException {System.out.println("邮件消费者...");// 创建我们的连接Connection connection = RabbitMQConnection.getConnection();// 创建我们通道final Channel channel = connection.createChannel();// 关联队列消费者关联队列 关联交换机和队列 路由键为emailchannel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "email");DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, "UTF-8");System.out.println("邮件消费者获取消息:" + msg);}};// 开始监听消息 自动签收channel.basicConsume(QUEUE_NAME, true, defaultConsumer);}
}
短信消费者:
只有队列和路由键变了:QUEUE_NAME = "direct_sms_queue"; 路由键为: sms
5、Topic主题模式
当交换机类型为topic类型时,根据队列绑定的路由建模糊转发到具体的队列中存放。
#号表示支持匹配多个词;
*号表示只能匹配一个词
如图所示:生产者的路由键为mayikt.sms发送给交换机的消息会转发给邮件队列。
5.1、生产者代码
import com.mayikt.rabbitmq.RabbitMQConnection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;public class ProducerTopic {/*** 定义交换机的名称*/private static final String EXCHANGE_NAME = "topic_exchange";public static void main(String[] args) throws IOException, TimeoutException {// 创建ConnectionConnection connection = RabbitMQConnection.getConnection();// 创建ChannelChannel channel = connection.createChannel();// 通道关联交换机 设置为topic主题模式channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);String msg = "rk 6666";//发送消息 rk.smschannel.basicPublish(EXCHANGE_NAME, "rk.sms", null, msg.getBytes());//关闭连接channel.close();connection.close();}}
5.2、消费者代码
邮件消费者:
import com.mayikt.rabbitmq.RabbitMQConnection;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;public class MailConsumer {/*** 定义邮件队列*/private static final String QUEUE_NAME = "topic_email_queue";/*** 定义交换机的名称*/private static final String EXCHANGE_NAME = "topic_exchange";public static void main(String[] args) throws IOException, TimeoutException {System.out.println("邮件消费者...");// 创建我们的连接Connection connection = RabbitMQConnection.getConnection();// 创建我们通道final Channel channel = connection.createChannel();// 关联队列消费者关联队列 绑定路由器和队列 设置路由键为rk.*channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "rk.*");DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {//接收队列消息@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, "UTF-8");System.out.println("邮件消费者获取消息:" + msg);}};// 开始监听消息 自动签收channel.basicConsume(QUEUE_NAME, true, defaultConsumer);}
}
短信消费者:
只有队列和路由键变了:QUEUE_NAME = "topic_sms_queue"; 路由键为:luo.*