RabbitMQ入门(二)

news/2024/5/19 23:33:11/文章来源:https://blog.csdn.net/qq_39482039/article/details/126480243

1.概述

RabbityMQ整体上是一个生产者和消费者模式。生产者生产消息到消息中间件的服务节点(Broker),服务节点中包含交换器(Exchange)和队列(Queue),生产的消息首先经过交换器,再由交换器转发到对应的队列,之后消费者从队列消费消息。

2.生产者与消费者

Producer:生产者,生产消息。

生产者创建消息发布到RabbitMQ。消息一般包含两个部分:消息体(payload)和标签(Label),消息体一般是一个带有业务逻辑结构的数据,如一个json字符串。消息的标签用来表述这条消息,比如衣蛾交换器的名称和一个路由键。生产者将消息交给RabbitMQ,RabbitMQ会根据标签将消息发送给感兴趣的消费者

Consumer:消费者,消费消息。

消费者连接到RabbitMQ服务器,并订阅到队列上。消费者消费一条消息时,只是消费消息的消息体,,消息路由的过程中,消息的标签会被丢弃,存入队列的消息只有消息体,不需要知道生产者是谁。

Broker:消息中间件的服务节点

一个RabbitMQ Broker可以看做一个RabbitMQ的服务节点或者RabbitMQ实例。

生产者将业务数据包装为消息,发送到Broker中。消费者订阅并接受消息经过解包得到完整的数据,之后进行业务处理。业务逻辑处理不一定需要和接受的逻辑使用同一个线程,提高处理效率。

Queue:队列,存储消息。

Queue是RabbityMQ的内部对象,用于存储消息。

RabbityMQ的消息只能存在队列里,这点和Kafka相反,Kafka将消息存储在topic(主题)这个逻辑层面,对应的队列逻辑只是topic实际存储文件中的位移标识。多个消费者可以订阅同一个队列,这时队列中的消息会被平均分摊给多个消费者处理,而不是每个消费者都收到消息处理。RabbityMQ不支持队列层面的广播消费,如果需要广播消费,需要在其上进行二次开发处理逻辑会变复杂,不建议这么做。

Exchange:交换器

生产者将消息发送到Exchange(交换器),交换器将消息路由到一个队列或多个队列。如果路由不到会返回生产者,或者丢弃。

交换器,路由,绑定

RabbityMQ交换器一般有4种类型,不同类型的交换器有着不同的路由策略。

RoutingKey:路由键,生产者将消息发给交换器会指定一个RoutingKey,用来指定这个消息的路由规则,RoutingKey需要与交换器类型和绑定键(BindingKey)联合使用才能最终生效。

Binding:绑定。RabbityMQ通过绑定将交换器和队列关联起来,绑定的时候指定一个绑定key,RabbityMQ通过绑定键将消息路由到对应的队列。

生产者将消息发到交换器需要一个RoutingKey(路由键),RabbityMQ根据RoutingKey(路由键)规则找到对应队列,像channel.exchangBind、channel.queueBind,在使用绑定的时候使用的BindingKey(绑定键,像channel.basicPublish,根据绑定键将消息路由到对应的队列。

交换器类型

RabbityMQ常用的交换器类型有fanout,direct,topic,headers这四种。amqp协议中还提到了另外两种类型System和自定义。

fanout

它会把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中

direct

它会把消息路由到那些RoutingKey和BindingKey完全匹配的对垒中

交换器类型是direct,发送消息的时候设置路由键为“warning”,该消息会路由到Queue1和Queue2,发送消息设置路由键为“info”或者“debug”,消息只会路由到Queue2

topic

前面direct类型的交换器路由规则是完全匹配RoutingKey和BindingKey,这种严格的匹配方式很多情况下不能满足实际业务的需求,topic和direct类型类似,但是匹配规则有些不同,

        RoutingKey为一个点号分割的字符串如:“com.rabbitmq.client”

        BindingKey和RoutingKey一样是点号分割的字符串

        BindingKey中可以存在两种特殊字符串 * 和 #  用于做模糊匹配,其中 # 用于匹配一个单词,* 用于匹配多个单词

headers

headers类型的交换器不依赖于路由键的匹配规则,发消息时指定一个headers属性,绑定队列和交换器时制定一组键值对,发消息到交换器,RabbityMQ会获取该消息的headers,完全匹配则消息会路由到该队列。headers类型的交换器性能很差,而且不实用,基本上不会看到它的存在。

RabbitMQ运作流程

 生产者生产消息流程 

(1)生产者连接到RabbiyMQ Broker建立一个连接(Connection),开启一个信道(Channel)

(2)生产者声明一个交换器,并设置相关属性,比如交换器类型,是否持久化

(3)生产者声明一个队列,并设置相关属性,比如是否排他,是否持久化,是否自动删除等

(4)生产者通过路由键将交换器和队列绑定起来

(5)生产者发送消息至RabbitMQ Broker,其中包含路由键,交换器等信息

(6)相应的交换器根据接收到的路由键查找相匹配的队列

(7)如果找到,则将从生产者发送过来的消息存入相应的队列

(8)如果没有找到,则根据生产者配置的属性选择丢弃还是回退到生产者

(9)关闭信道

(10)关闭连接

  消费者消费消息流程

(1)消费者连接到RabbiyMQ Broker建立一个连接(Connection),开启一个信道(Channel)

(2)消费者向RabbiyMQ Broker请求消费相应队列中的消息,可能会设置相应的回调函数,以及一些准备工作。

(3)等待RabbiyMQ Broker回应并投递相应队列中的消息,消费者接受消息

(4)消费者确认(ack)接收到的消息

(5)RabbiyMQ从队列中删除相应已被确认的消息

(6)关闭信道

(7)关闭连接

这里引入了两个新的概念:Connection和Channel.生产者和消费者都需要和RabbiyMQ Broker连接,这个连接诶就是一个TCP连接,也就是Connection。建立好连接,客户端紧接着创建一个AMQP信道(Channel),每个信道都会被指派一个虚拟的id.信道是建立在Connection之上的虚拟连接,RabbitMQ处理每条AMQP都是通过信道完成的。

我们完全可以直接使用Connection就能完成信道的工作,为什么要引入信道。

一个应用程序中有很多的线程消费消息,或生产消息,name必然要建立很多个Connection,也就是很多个TCP连接。对于操作系统而言,建立和销毁TCP连接是非常昂贵的开销,如果遇到高峰,性能脖颈随着而现。RabbiyMQ采用nio的做法,选择TCP连接复用,不仅减少性能开销,同时也便于管理。但是当信道本身的流量很大时,多个信道复用一个Connection就会产生性能瓶颈,进而使整体的流量被限制了。此时就要开辟多个Connection,将信道分摊到Connection中,这些调优需要根据业务自身的实际情况进行调节

AMQP协议

RabbitMQ是基于AMQP协议的,RabbitMQ也就是AMQP协议的Erlang的实现(还支持STOMP²、MQTT³等协议)。AMQP的模型架构和RabbitMQ是一样的,生产者将消息发送给交换器,交换器和队列绑定。生产者发送消息时所携带的RoutingKey与绑定的BingdingKey想匹配时,消息即被存入相应的队列中,消费者可以订阅对应的队列获取消息。当前的协议是AMQP 0-9-1

AMQP协议包含三层

Module Layer:位于协议的最高层,主要定义了一些客户端调用的命令,客户端可以利用这些命令实现自己的业务逻辑。例如,客户端可以使用Queue.Declare命令声明一个队列或者使用Basic.Consume订阅消费一个队列中的消息。

Session Layer:位于中间层,主要负责将客户端的命令发给服务端,再将服务端的应答返回给客户端,主要为了客户端和服务端之间的通信提供可靠性同步机制和错误处理。

Transport Layer:位于最底层,主要传输二进制数据流,提供帧的处理、信道复用、错误检测和数据表示等

AMQP协议是通过协议命令交互的,可以看错一系列结构化命令的集合,这里的命令类似HTTP中的方法(GET、POST、PUT、DELETE)

部分生产者代码:

        //创建连接Connection connection = factory.newConnection();//创建信道Channel channel = connection.createChannel();//发送一条持久化的消息:Hello World!String message = "Hello World!";channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());channel.close();connection.close();

 客户端调用factory.newConnection()方法,这个方法会封装为Protocol Header 0-9-1的报文头发给Broker,通知Broker使用AMQP 0-9-1协议,紧接着Broker返回Connection.start建立连接,建立连接涉及到Connection.start/.Start-OK、Connection.Tune/.Tune-Ok、Connection.Open/.Open-Ok这6个命令的交互。

客户端调用connection.createChannel方法准备开启信道时,包装的Channel.Open命令就会发给Broker,等待Channel.Open-Ok命令。

客户端发送消息的时候,需要调用channel.basicPublish方法,对应的命令是Basic.Publish,注意这个命令和前面的命令略有不同,还包含了Content Header和Content Body。Content Header包含消息体的属性,例如,投递模式,优先级等,Content Body包含消息体本身。

发送完消息需要关闭资源时,涉及Channel.Clost/.Close-Ok与Connection.Close/.Clost-Ok的命令交互

x

消费者

         //创建连接Connection connection = factory.newConnection(addresses);//创建信道Channel channel = connection.createChannel();//设置客户端最多接受未被ack的消息的个数channel.basicQos(64);Consumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("recv message:" + new String(body));try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}channel.basicAck(envelope.getDeliveryTag(), false);}};channel.basicConsume(QUEUE_NAME, consumer);//等待回调行数执行完毕之后,关闭资源TimeUnit.SECONDS.sleep(5);channel.close();connection.close();

消费者客户端同样需要与Broker建立连接,与生产客户端一样,协议交互同样涉及Connection.start/.Start-OK、Connection.Tune/.Tune-Ok、Connection.Open/.Open-Ok、Channel.Open、Channel.Open-Ok等。

在消费之前调用了channel.basicQos(int prefetchCount)的方法来设置消费者客户端最大能保持的未确认的消息数,那么协议流转会涉及Basic.Qos/.Qos-Ok这两个AMQP命令。

在真正消费前,消费者客户端需要向Broker发送Basic.Consume命令(即调用channel.basicConsume方法)将Channel置为接受模式,之后Broker回执Basic.Consume-Ok告诉消费者客户端准备好消费消息,紧接着Broker向消费者客户端推送消息,即Basic.Deliver命令,这个和Basic.Publish命令一样会携带Content Header和Content Body.

消费者接受消息正确消费后,向Broker发送确认,即Basic.Ack命令。

消费者停止消费主动关闭连接时,涉及Channel.Clost/.Close-Ok与Connection.Close/.Clost-Ok的命令交互

 AMQP命令

名称是否包含内容体对应客户端中的方法简要描述
Connection.Startfactory.newConnection建立连接相关
Connection.Start-Ok同上同上
Connection.Tune同上同上
Connection.Tune-Ok同上同上
Connection.Open同上同上
Connection.Open-Ok同上同上
Connection.Closeconnection.close()关闭连接
Connection.Close-Ok同上同上
Channel.Openconnection.OpenChannel()开启信道
Channel.Open-Ok同上同上
Channel.Closechannel.close()关闭信道
Channel.Close-Ok同上同上
Exchange.Declarechannel.exchaneDeclare声明交换器
Exchange.Declare-Ok同上同上
Exchange.Deletechannel.exchangeDelete删除交换器
Exchange.Delete-Ok同上同上
Exchange.Bindchannel.exchangeBind交换器与交换器绑定
Exchange.Bind-Ok同上同上
Exchange.Unbindchannel.exchangeUnBind交换器与交换器解绑
Exchange.Unbind-Ok同上同上
Queue.Declarechannel.queueDeclare声明队列
Queue.Declare-Ok同上同上
Queue.Bindchannel.queueBind队列和交换器绑定
Queue.Bind-Ok同上同上
Queue.Purgechannel.queuePurge清除队列的内容
Queue.Purge-Ok同上同上
Queue.Deletechannel.queueDelete删除队列
Queue.Delete-Ok同上同上
Queue.Unbindchannel.queueUnbind队列与交换器解绑
Queue.Unbind-Ok同上同上
Basic.Qoschannel.basicQos设置未被确认消费的个数
Basic.Qos-Ok同上同上
Basic.Consumechannel.basicConsume消费消息
Basic.Consume-Ok同上同上
Basic.Cancelchannel.basicCancel取消
Basic.Cancel-Ok同上同上
Basic.Publishchannel.basicPublish发送消息
Basic.Return未能成功路由的消息返回
Basic.DeliverBroker推送消息
Basic.Getchannel.basicGet消费模式
Basic.Get-Ok同上同上
Basic.Ackchannel.basicAck确认
Basic.Rejectchannel.basicReject拒绝(单条拒绝)
Basic.Recoverchannel.basicRecover请求Broker重新发送未被确认的消息
Basic.Recover-Ok同上同上
Basic.Nackchannel.basicNack拒绝(可批量拒绝)
Tx.Selectchannel.txSelect开启事务
Tx.Select-Ok同上同上
Tx.Commitchannel.txCommit事务提交
Tx.Commit-Ok同上同上
Tx.Rollbackchannel.txRollback事务回滚
Tx.Rollback-Ok同上同上
Confirm Selectchannel.confirmSelect开启发送端确认模式
Confirm Select-Ok同上同上

出自RabbitMQ实战指南

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_381444.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

搭建vue3项目

搭建vue3项目搭建准备创建项目选择所需配置运行项目vue3已经被大众所熟悉,很多公司都在做vue2到vue3的升级。 介绍vue3项目的搭建过程 搭建准备 前端开发环境需要node.js&npm node下载地址:http://nodejs.cn/download/ 根据自己电脑环境下载就行 安装vue-cli3…

2022/08/31 day14:企业级解决方案

文章目录目录缓存预热缓存雪崩缓存击穿缓存穿透性能指标监控总结目录 面试问题 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-EtBtkGNE-1661933471760)(en-resource://database/5507:1)] [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下…

抖音小程序模板全行业整理合集,抖音小程序制作平台分享

小弟我是来自第三方抖音小程序制作平台的打工人,给大家整合了一些我们平台的抖音小程序模板,大家可以根据需要来获取。 步骤就是点击下方的链接,选好自己的抖音小程序模板,在平台注册账号直接套用到自己的抖音小程序上&#xff0…

深入理解蓝牙BLE之“信道管理”

目录 一.BLE的调制解调: 二.BLE的信道: 三.BLE的广播信道: 四.BLE的数据信道: 五.BLE信道管理: 5.1广播信道的随机延时: 5.2数据信道的调频算法: 跳频算法1: 跳频算法2&…

02.Haoop 虚拟机 桥接与NAT之间区别 及桥接设置

首先说 我的硬件准备,1台windows系统,1台mac pro 。 在 物理机上使用了 VMWARE CENTOS 7 的 方式进行配置。 那么我希望能实现把 这2台机器连在一起,做Hadoop 的集群。 网络问题是首先需要解决的事情,主要不通物理主机之间一直…

02:入门及安装(狂神说RabbitMQ)

RabbitMQ入门及安装 https://www.bilibili.com/video/BV1dX4y1V73Gp27 概述 简单概述: RabbitMQ是一个开源的遵循 AMQP协议实现的基于 Erlang语言编写,支持多种客户端(语言),用于在分布式系统中存储消息&#xff0…

Spring Security 入门之自定义表单登录开发实现(三)

文章目录1. 前言2. 自定义认证2.1 自定义登录页面2.2 后端认证逻辑3. 自定义登陆成功处理3.1 登陆成功原理3.2 自定义登陆成功响应处理4. 自定义登陆失败处理4.1 登陆失败原理4.2 自定义登陆失败响应处理5. 注销用户处理5.1 注销原理总结1. 前言 在弄懂HelloWorld案例后&#…

Node.js | 使用内置模块 event 实现发布订阅模式

🖥️ NodeJS专栏:Node.js从入门到精通 🖥️ 蓝桥杯真题解析:蓝桥杯Web国赛真题解析 🧧 加入社区领红包:海底烧烤店ai(从前端到全栈) 🧑‍💼个人简介&#xff…

自动化测试中的验证码问题

做自动化测试的同学在面试的时候经常会遇到这问题,而且我们在实际的工作中也会遇到这个问题,那么这问题到底该怎么处理? 下面给出了面试过程中常见的相关面试题供大家参考: 01 在做自动化登陆的同时,如何绕过验证码&a…

windows下安装docker

下载docker,通过Redirecting…这个下载docker 正在上传…重新上传取消 下载完安装 安装完成后,进入powershell,输入命令docker network ls,查看docker网络,如果没有bridge项目,创建容器会报错(Windows容器就是两…

3D格式转换神器HOOPS Exchange使用教程(一):打印组件结构

HOOPS Exchange是什么? HOOPS Exchange 是一组软件库,可以帮助开发人员在开发应用程序时读取和写入主流的 2D 和 3D 格式。HOOPS Exchange 支持在主流的3D 文件格式中读取 CAD 数据,并支持将 3D 数据转换为 PRC 数据格式,这是一种…

NGINX源码之:event与epoll

在进入正题之前,先来大概了解下epoll: 引入多路复用之前socket建立连接流程: 1、服务端先建立socket(serversocket)占用一个文件描述符fd,然后bind端口,开启监听listen accept事件; 2、客户端请…

有趣的前端项目——一个暴躁萌的大眼仔

有趣的前端项目——一个暴躁萌的大眼仔 众所周知,我是一个摆子前端(真的 ),闲来无事,网上冲浪 遇见了如此蠢萌的大眼 于是我,行也思,坐也思,可算把这个大眼给复刻出来了。 原文出…

01-Flink概述

1. 源起和设计理念https://flink.apache.org/在 Flink 官网主页的顶部可以看到,项目的核心目标,是“数据流上的有状态计算”(Stateful Computations over Data Streams)。 具体定位是:Apache Flink 是一个框架和分布式处理引擎,如下图所示,用于对无界和有界数据流进行有…

利用逻辑分析仪处理CAN协议数据

1.设置逻辑分析仪 设置合适的采样频率和软件定义的波特率500K 2.数据分析 实测SOF的宽度是2us,因此可计算出波特率为500K,与实际程序设置的速率一致。 取图中的一部分数据分析,从左往右看: 0|| 000 0[1]001 0010 0|| 000 1000 …

《RO 仙境传说》Game Jam 游戏创作大赛

浪漫奇幻 MMORPG《RO 仙境传说》迎接 20 周年的到来,同时展开在 The Sandbox 元宇宙的旅程。冒险家们,是时候创建你的 RO 元宇宙新世代! 游戏创作大赛 (Game Jam) 在 The Sandbox 中制作最有代表性的《RO》内容,同时重温 RO 20 年…

使用谷歌浏览器 devtools 调试node项目

使用谷歌浏览器 devtools 调试node项目 当我们写node项目时,可以通过谷歌浏览器自带的devtools帮助我们debug我们的node项目 1.启动调试命令 nodemon --inspect app.js如上图所示,当出现Debugger… 字样时即说明启动成功! 2.在谷歌浏览器…

中国精炼铜行业发展监测及投资战略研究报告

精炼铜行业相关公司:江西铜业(600362)、铜陵有色(000630)、云南铜业(000878)、西部矿业(601168)、紫金矿业(601899)、海亮股份(002203)等 本文核心数据:全球精铜产销规模、全球精铜产销区域分布等。 从铜冶炼工艺来看,目前生产铜主要有两种方式。一种是…

Java线程池参数动态化实践

为什么需要线程池参数动态化? 日常业务开发中,线程池参数是很难计算准确的,往往需要在实践中不断的调整才能得到一个较为合理的取值。在取值不是那么明确之前,每次修改参数都需要重新部署服务才能生效,这显然不是一个…

艺术收藏NFT系统开发:NFT功能搭建

NFT数字藏品是种特殊的基于区块链的代币,可以用来证明数字物品的所有权,如艺术、音乐、礼物地图或游戏中的稀有物品。区块链技术赋予每一个数字作品一个独一无二的识别码,使相对应具体的作品、艺术品产生独一无二的数字凭证。 在传统艺术品收…