1.概述
RabbityMQ整体上是一个生产者和消费者模式。生产者生产消息到消息中间件的服务节点(Broker),服务节点中包含交换器(Exchange)和队列(Queue),生产的消息首先经过交换器,再由交换器转发到对应的队列,之后消费者从队列消费消息。
2.生产者与消费者
Producer:生产者,生产消息。
生产者创建消息发布到RabbitMQ。消息一般包含两个部分:消息体(payload)和标签(Label),消息体一般是一个带有业务逻辑结构的数据,如一个json字符串。消息的标签用来表述这条消息,比如衣蛾交换器的名称和一个路由键。生产者将消息交给RabbitMQ,RabbitMQ会根据标签将消息发送给感兴趣的消费者
Consumer:消费者,消费消息。
消费者连接到RabbitMQ服务器,并订阅到队列上。消费者消费一条消息时,只是消费消息的消息体,,消息路由的过程中,消息的标签会被丢弃,存入队列的消息只有消息体,不需要知道生产者是谁。
Broker:消息中间件的服务节点
一个RabbitMQ Broker可以看做一个RabbitMQ的服务节点或者RabbitMQ实例。
生产者将业务数据包装为消息,发送到Broker中。消费者订阅并接受消息经过解包得到完整的数据,之后进行业务处理。业务逻辑处理不一定需要和接受的逻辑使用同一个线程,提高处理效率。
Queue:队列,存储消息。
Queue是RabbityMQ的内部对象,用于存储消息。
RabbityMQ的消息只能存在队列里,这点和Kafka相反,Kafka将消息存储在topic(主题)这个逻辑层面,对应的队列逻辑只是topic实际存储文件中的位移标识。多个消费者可以订阅同一个队列,这时队列中的消息会被平均分摊给多个消费者处理,而不是每个消费者都收到消息处理。RabbityMQ不支持队列层面的广播消费,如果需要广播消费,需要在其上进行二次开发处理逻辑会变复杂,不建议这么做。
Exchange:交换器
生产者将消息发送到Exchange(交换器),交换器将消息路由到一个队列或多个队列。如果路由不到会返回生产者,或者丢弃。
交换器,路由,绑定
RabbityMQ交换器一般有4种类型,不同类型的交换器有着不同的路由策略。
RoutingKey:路由键,生产者将消息发给交换器会指定一个RoutingKey,用来指定这个消息的路由规则,RoutingKey需要与交换器类型和绑定键(BindingKey)联合使用才能最终生效。
Binding:绑定。RabbityMQ通过绑定将交换器和队列关联起来,绑定的时候指定一个绑定key,RabbityMQ通过绑定键将消息路由到对应的队列。
生产者将消息发到交换器需要一个RoutingKey(路由键),RabbityMQ根据RoutingKey(路由键)规则找到对应队列,像channel.exchangBind、channel.queueBind,在使用绑定的时候使用的BindingKey(绑定键),像channel.basicPublish,根据绑定键将消息路由到对应的队列。
交换器类型
RabbityMQ常用的交换器类型有fanout,direct,topic,headers这四种。amqp协议中还提到了另外两种类型System和自定义。
fanout
它会把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中
direct
它会把消息路由到那些RoutingKey和BindingKey完全匹配的对垒中
交换器类型是direct,发送消息的时候设置路由键为“warning”,该消息会路由到Queue1和Queue2,发送消息设置路由键为“info”或者“debug”,消息只会路由到Queue2
topic
前面direct类型的交换器路由规则是完全匹配RoutingKey和BindingKey,这种严格的匹配方式很多情况下不能满足实际业务的需求,topic和direct类型类似,但是匹配规则有些不同,
RoutingKey为一个点号分割的字符串,如:“com.rabbitmq.client”
BindingKey和RoutingKey一样是点号分割的字符串
BindingKey中可以存在两种特殊字符串 * 和 # 用于做模糊匹配,其中 # 用于匹配一个单词,* 用于匹配多个单词
headers
headers类型的交换器不依赖于路由键的匹配规则,发消息时指定一个headers属性,绑定队列和交换器时制定一组键值对,发消息到交换器,RabbityMQ会获取该消息的headers,完全匹配则消息会路由到该队列。headers类型的交换器性能很差,而且不实用,基本上不会看到它的存在。
RabbitMQ运作流程
生产者生产消息流程
(1)生产者连接到RabbiyMQ Broker建立一个连接(Connection),开启一个信道(Channel)
(2)生产者声明一个交换器,并设置相关属性,比如交换器类型,是否持久化
(3)生产者声明一个队列,并设置相关属性,比如是否排他,是否持久化,是否自动删除等
(4)生产者通过路由键将交换器和队列绑定起来
(5)生产者发送消息至RabbitMQ Broker,其中包含路由键,交换器等信息
(6)相应的交换器根据接收到的路由键查找相匹配的队列
(7)如果找到,则将从生产者发送过来的消息存入相应的队列
(8)如果没有找到,则根据生产者配置的属性选择丢弃还是回退到生产者
(9)关闭信道
(10)关闭连接
消费者消费消息流程
(1)消费者连接到RabbiyMQ Broker建立一个连接(Connection),开启一个信道(Channel)
(2)消费者向RabbiyMQ Broker请求消费相应队列中的消息,可能会设置相应的回调函数,以及一些准备工作。
(3)等待RabbiyMQ Broker回应并投递相应队列中的消息,消费者接受消息
(4)消费者确认(ack)接收到的消息
(5)RabbiyMQ从队列中删除相应已被确认的消息
(6)关闭信道
(7)关闭连接
这里引入了两个新的概念:Connection和Channel.生产者和消费者都需要和RabbiyMQ Broker连接,这个连接诶就是一个TCP连接,也就是Connection。建立好连接,客户端紧接着创建一个AMQP信道(Channel),每个信道都会被指派一个虚拟的id.信道是建立在Connection之上的虚拟连接,RabbitMQ处理每条AMQP都是通过信道完成的。
我们完全可以直接使用Connection就能完成信道的工作,为什么要引入信道。
一个应用程序中有很多的线程消费消息,或生产消息,name必然要建立很多个Connection,也就是很多个TCP连接。对于操作系统而言,建立和销毁TCP连接是非常昂贵的开销,如果遇到高峰,性能脖颈随着而现。RabbiyMQ采用nio的做法,选择TCP连接复用,不仅减少性能开销,同时也便于管理。但是当信道本身的流量很大时,多个信道复用一个Connection就会产生性能瓶颈,进而使整体的流量被限制了。此时就要开辟多个Connection,将信道分摊到Connection中,这些调优需要根据业务自身的实际情况进行调节
AMQP协议
RabbitMQ是基于AMQP协议的,RabbitMQ也就是AMQP协议的Erlang的实现(还支持STOMP²、MQTT³等协议)。AMQP的模型架构和RabbitMQ是一样的,生产者将消息发送给交换器,交换器和队列绑定。生产者发送消息时所携带的RoutingKey与绑定的BingdingKey想匹配时,消息即被存入相应的队列中,消费者可以订阅对应的队列获取消息。当前的协议是AMQP 0-9-1
AMQP协议包含三层
Module Layer:位于协议的最高层,主要定义了一些客户端调用的命令,客户端可以利用这些命令实现自己的业务逻辑。例如,客户端可以使用Queue.Declare命令声明一个队列或者使用Basic.Consume订阅消费一个队列中的消息。
Session Layer:位于中间层,主要负责将客户端的命令发给服务端,再将服务端的应答返回给客户端,主要为了客户端和服务端之间的通信提供可靠性同步机制和错误处理。
Transport Layer:位于最底层,主要传输二进制数据流,提供帧的处理、信道复用、错误检测和数据表示等
AMQP协议是通过协议命令交互的,可以看错一系列结构化命令的集合,这里的命令类似HTTP中的方法(GET、POST、PUT、DELETE)
部分生产者代码:
//创建连接Connection connection = factory.newConnection();//创建信道Channel channel = connection.createChannel();//发送一条持久化的消息:Hello World!String message = "Hello World!";channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());channel.close();connection.close();
客户端调用factory.newConnection()方法,这个方法会封装为Protocol Header 0-9-1的报文头发给Broker,通知Broker使用AMQP 0-9-1协议,紧接着Broker返回Connection.start建立连接,建立连接涉及到Connection.start/.Start-OK、Connection.Tune/.Tune-Ok、Connection.Open/.Open-Ok这6个命令的交互。
客户端调用connection.createChannel方法准备开启信道时,包装的Channel.Open命令就会发给Broker,等待Channel.Open-Ok命令。
客户端发送消息的时候,需要调用channel.basicPublish方法,对应的命令是Basic.Publish,注意这个命令和前面的命令略有不同,还包含了Content Header和Content Body。Content Header包含消息体的属性,例如,投递模式,优先级等,Content Body包含消息体本身。
发送完消息需要关闭资源时,涉及Channel.Clost/.Close-Ok与Connection.Close/.Clost-Ok的命令交互
x
消费者
//创建连接Connection connection = factory.newConnection(addresses);//创建信道Channel channel = connection.createChannel();//设置客户端最多接受未被ack的消息的个数channel.basicQos(64);Consumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("recv message:" + new String(body));try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}channel.basicAck(envelope.getDeliveryTag(), false);}};channel.basicConsume(QUEUE_NAME, consumer);//等待回调行数执行完毕之后,关闭资源TimeUnit.SECONDS.sleep(5);channel.close();connection.close();
消费者客户端同样需要与Broker建立连接,与生产客户端一样,协议交互同样涉及Connection.start/.Start-OK、Connection.Tune/.Tune-Ok、Connection.Open/.Open-Ok、Channel.Open、Channel.Open-Ok等。
在消费之前调用了channel.basicQos(int prefetchCount)的方法来设置消费者客户端最大能保持的未确认的消息数,那么协议流转会涉及Basic.Qos/.Qos-Ok这两个AMQP命令。
在真正消费前,消费者客户端需要向Broker发送Basic.Consume命令(即调用channel.basicConsume方法)将Channel置为接受模式,之后Broker回执Basic.Consume-Ok告诉消费者客户端准备好消费消息,紧接着Broker向消费者客户端推送消息,即Basic.Deliver命令,这个和Basic.Publish命令一样会携带Content Header和Content Body.
消费者接受消息正确消费后,向Broker发送确认,即Basic.Ack命令。
消费者停止消费主动关闭连接时,涉及Channel.Clost/.Close-Ok与Connection.Close/.Clost-Ok的命令交互
AMQP命令
名称 | 是否包含内容体 | 对应客户端中的方法 | 简要描述 |
Connection.Start | 否 | factory.newConnection | 建立连接相关 |
Connection.Start-Ok | 否 | 同上 | 同上 |
Connection.Tune | 否 | 同上 | 同上 |
Connection.Tune-Ok | 否 | 同上 | 同上 |
Connection.Open | 否 | 同上 | 同上 |
Connection.Open-Ok | 否 | 同上 | 同上 |
Connection.Close | 否 | connection.close() | 关闭连接 |
Connection.Close-Ok | 否 | 同上 | 同上 |
Channel.Open | 否 | connection.OpenChannel() | 开启信道 |
Channel.Open-Ok | 否 | 同上 | 同上 |
Channel.Close | 否 | channel.close() | 关闭信道 |
Channel.Close-Ok | 否 | 同上 | 同上 |
Exchange.Declare | 否 | channel.exchaneDeclare | 声明交换器 |
Exchange.Declare-Ok | 否 | 同上 | 同上 |
Exchange.Delete | 否 | channel.exchangeDelete | 删除交换器 |
Exchange.Delete-Ok | 否 | 同上 | 同上 |
Exchange.Bind | 否 | channel.exchangeBind | 交换器与交换器绑定 |
Exchange.Bind-Ok | 否 | 同上 | 同上 |
Exchange.Unbind | 否 | channel.exchangeUnBind | 交换器与交换器解绑 |
Exchange.Unbind-Ok | 否 | 同上 | 同上 |
Queue.Declare | 否 | channel.queueDeclare | 声明队列 |
Queue.Declare-Ok | 否 | 同上 | 同上 |
Queue.Bind | 否 | channel.queueBind | 队列和交换器绑定 |
Queue.Bind-Ok | 否 | 同上 | 同上 |
Queue.Purge | 否 | channel.queuePurge | 清除队列的内容 |
Queue.Purge-Ok | 否 | 同上 | 同上 |
Queue.Delete | 否 | channel.queueDelete | 删除队列 |
Queue.Delete-Ok | 否 | 同上 | 同上 |
Queue.Unbind | 否 | channel.queueUnbind | 队列与交换器解绑 |
Queue.Unbind-Ok | 否 | 同上 | 同上 |
Basic.Qos | 否 | channel.basicQos | 设置未被确认消费的个数 |
Basic.Qos-Ok | 否 | 同上 | 同上 |
Basic.Consume | 否 | channel.basicConsume | 消费消息 |
Basic.Consume-Ok | 否 | 同上 | 同上 |
Basic.Cancel | 否 | channel.basicCancel | 取消 |
Basic.Cancel-Ok | 否 | 同上 | 同上 |
Basic.Publish | 是 | channel.basicPublish | 发送消息 |
Basic.Return | 是 | 无 | 未能成功路由的消息返回 |
Basic.Deliver | 是 | 无 | Broker推送消息 |
Basic.Get | 否 | channel.basicGet | 消费模式 |
Basic.Get-Ok | 否 | 同上 | 同上 |
Basic.Ack | 否 | channel.basicAck | 确认 |
Basic.Reject | 否 | channel.basicReject | 拒绝(单条拒绝) |
Basic.Recover | 否 | channel.basicRecover | 请求Broker重新发送未被确认的消息 |
Basic.Recover-Ok | 否 | 同上 | 同上 |
Basic.Nack | 否 | channel.basicNack | 拒绝(可批量拒绝) |
Tx.Select | 否 | channel.txSelect | 开启事务 |
Tx.Select-Ok | 否 | 同上 | 同上 |
Tx.Commit | 否 | channel.txCommit | 事务提交 |
Tx.Commit-Ok | 否 | 同上 | 同上 |
Tx.Rollback | 否 | channel.txRollback | 事务回滚 |
Tx.Rollback-Ok | 否 | 同上 | 同上 |
Confirm Select | 否 | channel.confirmSelect | 开启发送端确认模式 |
Confirm Select-Ok | 否 | 同上 | 同上 |
出自RabbitMQ实战指南