Docker启动RabbitMQ,实现生产者与消费者

news/2024/4/27 2:40:40/文章来源:https://blog.csdn.net/m0_62946761/article/details/129168455

目录

一、Docker拉取镜像并启动RabbitMQ

二、Hello World

(一)依赖导入

(二)消息生产者

(三)消息消费者

三、实现轮训分发消息

(一)抽取工具类

(二)启动两个工作线程

(三)启动发送线程

四、实现手动应答

(一)消息应答概念

(二)消息应答的方法

(三)消息自动重新入队 

(四)消息手动应答代码 

1、生产者

2、睡眠工具类模拟业务执行

3、消费者


一、Docker拉取镜像并启动RabbitMQ

拉取镜像

docker pull rabbitmq:3.8.8-management

查看镜像

docker images rabbitmq

 启动镜像

docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.8.8-management

Linux虚拟机记得开放5672端口或者关闭防火墙,在window通过 主机ip:15672 访问rabbitmq控制台

 用户名密码默认为guest

 

 

二、Hello World

(一)依赖导入

<!--指定 jdk 编译版本--><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>8</source><target>8</target></configuration></plugin></plugins></build><dependencies><!--rabbitmq 依赖客户端--><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.8.0</version></dependency><!--操作文件流的一个依赖--><dependency><groupId>commons-io</groupId><artifactId>commons-io</artifactId><version>2.6</version></dependency></dependencies>

(二)消息生产者

工作原理

  • Broker:接收和分发消息的应用,RabbitMQ Server 就是 Message Broker
  • Connectionpublisherconsumer broker 之间的 TCP 连接
  • Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection 的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个 thread 创建单独的 channel 进行通讯,AMQP method 包含了 channel id 帮助客户端和 message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销
  • Exchangemessage 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到 queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
  • Queue消息最终被送到这里等待 consumer 取走

我们需要先获取连接(Connection),然后通过连接获取信道(Channel),这里我们演示简单例子,可以直接跳过交换机(Exchange)发送队列(Queue)

public class Producer {private final static String QUEUE_NAME = "hello";public static void main(String[] args) throws Exception {//创建一个连接工厂ConnectionFactory factory = new ConnectionFactory();// 设置主机ipfactory.setHost("182.92.234.71");// 设置用户名factory.setUsername("guest");// 设置密码factory.setPassword("guest");//channel 实现了自动 close 接口 自动关闭 不需要显示关闭Connection connection = factory.newConnection();// 获取信道Channel channel = connection.createChannel();/** 生成一个队列* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments)* 1.队列名称* 2.队列里面的消息是否持久化 默认消息存储在内存中* 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费* 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除* 5.其他参数**/channel.queueDeclare(QUEUE_NAME, false, false, false, null);String message = "hello rabbitmq";/** 发送一个消息* basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)* 1.发送到哪个交换机* 2.路由的key是哪个* 3.其他的参数信息* 4.发送消息的消息体***/channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println("发送成功");}
}

(三)消息消费者

public class Consumer {private static final String QUEUE_NAME = "hello";public static void main(String[] args) throws Exception {//创建一个连接工厂ConnectionFactory factory = new ConnectionFactory();// 设置主机ipfactory.setHost("182.92.234.71");// 设置用户名factory.setUsername("guest");// 设置密码factory.setPassword("guest");//channel 实现了自动 close 接口 自动关闭 不需要显示关闭Connection connection = factory.newConnection();// 获取信道Channel channel = connection.createChannel();// 推送的消息如何进行消费的回调接口DeliverCallback deliverCallback = (consumerTag, message) -> {System.out.println(new String(message.getBody()));};// 取消消费的一个回调接口,如在消费的时候队列被删除了CancelCallback cancelCallback = (consumerTag) -> {System.out.println("消息消费被中断");};/** 消费者消费消息* basicConsume(String queue, boolean autoAck, * DeliverCallback deliverCallback, CancelCallback cancelCallback)* 1.消费哪个队列* 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答* 3.消费者未成功消费的回调**/channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);}
}

 

三、实现轮训分发消息

(一)抽取工具类

可以发现,上面获取连接工厂,然后获取连接,再获取信道的步骤是一致的,我们可以抽取成一个工具类来调用,并使用单例模式-饿汉式完成信道的初始化

public class RabbitMqUtils {private static Channel channel;static {ConnectionFactory factory = new ConnectionFactory();// 设置ip地址factory.setHost("192.168.23.100");// 设置用户名factory.setUsername("guest");// 设置密码factory.setPassword("guest");try {// 创建连接Connection connection = factory.newConnection();// 获取信道channel = connection.createChannel();} catch (Exception e) {System.out.println("创建信道失败,错误信息:" + e.getMessage());}}public static Channel getChannel() {return channel;}
}

(二)启动两个工作线程

相当于前面的消费者,我们只需要写一个类,通过ideal实现多线程启动即可模拟两个线程

public class Worker01 {private final static String QUEUE_NAME = "hello";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();DeliverCallback deliverCallback = ( consumerTag,  message) -> {System.out.println("接受到消息:" + new String(message.getBody()));};CancelCallback cancelCallback = (cunsumerTag) -> {System.out.println("消费者取消消费接口回调逻辑");};// 启动两次,第一次为C1, 第二次为C2System.out.println("C2消费者等待消费消息");channel.basicConsume(QUEUE_NAME, true, deliverCallback,cancelCallback);}
}

(三)启动发送线程

public class Test01 {private final static String QUEUE_NAME = "hello";public static void main(String[] args) throws IOException {Channel channel = RabbitMqUtils.getChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 通过控制台输入充当消息,使轮训演示更明显Scanner scanner = new Scanner(System.in);while(scanner.hasNext()) {String message = scanner.next();channel.basicPublish("", QUEUE_NAME,null, message.getBytes() );System.out.println("消息发送完成:" + message);}}
}

结果 

四、实现手动应答

(一)消息应答概念

消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成
了部分突然它挂掉了,会发生什么情况。RabbitMQ 一旦向消费者传递了一条消息,便立即将该消
息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续
发送给该消费这的消息,因为它无法接收到。
为了保证消息在发送过程中不丢失,rabbitmq 引入消息应答机制,消息应答就是:消费者在接
收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了。

自动应答:消费者发送后立即被认为已经传送成功。这种模式需要在高吞吐量和数据传输安全性方面做权,因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢失了。

当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制
当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终 使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用

手动应答:消费者接受到消息并顺利完成业务后再调用方法进行确认,rabbitmq 才可以把该消息删除

(二)消息应答的方法

 

  • Channel.basicAck(用于肯定确认)
    • RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了
  • Channel.basicNack(用于否定确认)
  • Channel.basicReject(用于否定确认)
    • 与 Channel.basicNack 相比少一个参数Multiple
      • ​​​​​​​multiple 的 true 和 false 代表不同意思
                true 代表批量应答 channel 上未应答的消息
                比如说 channel 上有传送 tag 的消息 5,6,7,8 当前 tag 是 8 那么此时
                5-8 的这些还未应答的消息都会被确认收到消息应答
                false 同上面相比
                只会应答 tag=8 的消息 5,6,7 这三个消息依然不会被确认收到消息应答
    • 不处理该消息了直接拒绝,可以将其丢弃了

 

(三)消息自动重新入队 

如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确
保不会丢失任何消息。

(四)消息手动应答代码 

1、生产者

public class Test01 {private final static String QUEUE_NAME = "ack";public static void main(String[] args) throws IOException {Channel channel = RabbitMqUtils.getChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);Scanner scanner = new Scanner(System.in);while(scanner.hasNext()) {String message = scanner.next();channel.basicPublish("", QUEUE_NAME,null, message.getBytes() );System.out.println("消息发送完成:" + message);}}
}

2、睡眠工具类模拟业务执行

public class SleepUtils {public static void sleep(int second) {try {Thread.sleep(1000 * second);} catch (InterruptedException _ignored) {Thread.currentThread().interrupt();}}
}

3、消费者

public class Worker01 {private final static String QUEUE_NAME = "ack";public static void main(String[] args) throws Exception {System.out.println("C1,业务时间短");Channel channel = RabbitMqUtils.getChannel();DeliverCallback deliverCallback = ( consumerTag,  message) -> {SleepUtils.sleep(1);  // 模拟业务执行1秒System.out.println("接受到消息:" + new String(message.getBody()));/** 1、消息标识* 2、是否启动批量确认,false:否。*    启用批量有可能造成消息丢失,比如未消费的消息提前被确然删除,后面业务消费该消息*    时出现异常会导致该消息的丢失*/channel.basicAck(message.getEnvelope().getDeliveryTag(), false);};CancelCallback cancelCallback = (cunsumerTag) -> {System.out.println("消费者取消消费接口回调逻辑");};boolean autoAck = false;channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback,cancelCallback);}
}==============================================================================
public class Worker02 {private final static String QUEUE_NAME = "ack";public static void main(String[] args) throws Exception {System.out.println("C2,业务时间长");Channel channel = RabbitMqUtils.getChannel();DeliverCallback deliverCallback = ( consumerTag,  message) -> {SleepUtils.sleep(15);  // 模拟业务执行15秒System.out.println("接受到消息:" + new String(message.getBody()));/** 1、消息标识* 2、是否启动批量确认,false:否。*    启用批量有可能造成消息丢失,比如未消费的消息提前被确然删除,后面业务消费该消息*    时出现异常会导致该消息的丢失*/channel.basicAck(message.getEnvelope().getDeliveryTag(), false);};CancelCallback cancelCallback = (cunsumerTag) -> {System.out.println("消费者取消消费接口回调逻辑");};boolean autoAck = false;channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback,cancelCallback);}
}

worker01业务时间短,worker02业务时间长,我们提前终止worker02模拟出异常,可以看到消息dd会被放回队列由worker01接收处理。

注意:这里需要先启动生产者声明队列ack,不然启动消费者会报错

 

最后一个案例我们可以看到消息轮训+消息自动重新入队+手动应答。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_72675.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

零基础机器学习做游戏辅助第十四课--原神自动钓鱼(四)yolov5目标检测

一、yolo介绍 目标检测有两种实现,一种是one-stage,另一种是two-stage,它们的区别如名称所体现的,two-stage有一个region proposal过程,可以理解为网络会先生成目标候选区域,然后把所有的区域放进分类器分类,而one-stage会先把图片分割成一个个的image patch,然后每个im…

【微信小程序】--JSON 配置文件作用(三)

&#x1f48c; 所属专栏&#xff1a;【微信小程序开发教程】 &#x1f600; 作  者&#xff1a;我是夜阑的狗&#x1f436; &#x1f680; 个人简介&#xff1a;一个正在努力学技术的CV工程师&#xff0c;专注基础和实战分享 &#xff0c;欢迎咨询&#xff01; &#…

二叉树、二叉搜索树、二叉树的最近祖先、二叉树的层序遍历【零神基础精讲】

来源0x3f&#xff1a;https://space.bilibili.com/206214 文章目录二叉树[104. 二叉树的最大深度](https://leetcode.cn/problems/maximum-depth-of-binary-tree/)[111. 二叉树的最小深度](https://leetcode.cn/problems/minimum-depth-of-binary-tree/)[129. 求根节点到叶节点…

黑马 Vue 快速入门 笔记

黑马 Vue 快速入门 笔记0 VUE相关了解0.1 概述0.2 MVVM0.3 JavaScript框架0.4 七大属性0.5 el:挂载点1 VUE基础1.0 第一个vue代码&#xff1a;Hello&#xff0c;vue1.1 v-bind 设置元素的属性 简写 &#xff1a;1.2 v-if &#xff0c; v-else &#xff0c; v-else-ifv-if , v-e…

XC7K70T-1FBG676C应用XC7K70T-L2FBG484E Kintex-7, FPGA 规格参数

概述Kintex-7 FPGA为快速增长应用和无线通信提供最优性价比和低功耗。Kintex-7 FPGA允许设计人员构建卓越带宽和12位数字可编程模拟&#xff0c;同时满足成本和功耗要求。144GMACS数字信号处理器 (DSP) 的独特功耗使得多功能Kintex-7器件成为便携式超声波设备和下一代通信等应用…

文案女王彭芳如何转变为“百万发售系统”创始人?我们来探个究竟!

智多星老师 她的输出跟智多星老师几乎毫无二致&#xff0c;是抄袭还是纯属巧合呢&#xff1f; 你们问的这个问题我也想知道&#xff0c;为了了解真相&#xff0c;我让我的一个学生把那个叫“彭芳老师”的视频给我看&#xff0c;当看到她的简介时&#xff0c;我非常震惊&#…

Elasticsearch在Linux中的单节点部署和集群部署

目录一、Elasticsearch简介二、Linux单节点部署1、软件下载解压2、创建用户3、修改配置文件4、切换到刚刚创建的用户启动软件5、测试三、Linux集群配置1、拷贝文件2、修改配置文件3、分别修改文件所有者4、启动三个软件5、测试四、问题总结1、在elasticsearch启动时如果报错内存…

numpy的常见数据类型

常见数据类型介绍Python 原生的数据类型相对较少&#xff0c; bool、int、float、str等。这在不需要关心数据在计算机中表示的所有方式的应用中是方便的。然而&#xff0c;对于科学计算&#xff0c;通常需要更多的控制。为了加以区分 numpy 在这些类型名称末尾都加了“_”。类型…

ES mapping 详解

nested 类型&#xff1f;&#xff1f;&#xff1f; _all _routing; ES-mapping Elasticsearch根据业务创建映射mapping结构分析&#xff1a;keyword和text&#xff08;一&#xff09;_elasticsearch keyword mapping_周全全的博客-CSDN博客 0.Mapping样例 {"mapping…

【MySQL进阶】 锁

&#x1f60a;&#x1f60a;作者简介&#x1f60a;&#x1f60a; &#xff1a; 大家好&#xff0c;我是南瓜籽&#xff0c;一个在校大二学生&#xff0c;我将会持续分享Java相关知识。 &#x1f389;&#x1f389;个人主页&#x1f389;&#x1f389; &#xff1a; 南瓜籽的主页…

[手把手教你]实现简单的登录跳转以及tab栏的动态渲染

需求:实现login登录页输入用户名和密码, 匹配失败显示提示信息, 成功则跳转到index主页index页面中各组件通过嵌套路由实现,点击一级菜单可以动态显示二级菜单1.使用vite搭建项目安装初始化npm init vuelatest选择配置进入项目目录,安装模块npm i, 启动项目npm run dev2.项目目…

S5P6818_系统篇(2)源码编译及烧录

源码获取 源码获取和操作流程 1.下载liunux下的系统制作脚本&#xff0c;可以烧录系统和构建镜像 git clone https://github.com/friendlyarm/sd-fuse_s5p6818.git 如果出现git错误可使用如下方法&#xff1a; git config --global http.sslverify false 2.阅读该工具rea…

软件测试5年,一路走来的艰辛路程

前言 不论你是什么时候开始接触测试这个行业的&#xff0c;你首先听说的应该是功能测试。通过一些测试手段来验证开发做出的代码是否符合产品的需求&#xff1f;当然你也有自己对功能测试的理解&#xff0c;但是最近两年感觉功能测试好像不太受欢迎&#xff0c;同时不少同学真的…

Linux 基础知识之文件系统

目录一、文件系统1.文件种类2.Linux和Windows文件后缀的不同3.查看文件类型3.绝对路径与相对路径二、系统分区三、目录结构一、文件系统 1.文件种类 Linux中一切皆文件。目光所及&#xff0c;皆是文件。文件的种类共有七种&#xff0c;每种文件都有自己的独特标识&#xff1a;…

SCADA-1-组态前期需求调研篇

近期有朋友找到我&#xff0c;说scada组态系统开源的很少&#xff0c;不少开发者借此售卖这种软件&#xff0c;我回了句&#xff1a;这有什么难的&#xff0c;不就是拖拖拽拽&#xff0c;再绑定上数据源&#xff0c;实现动态效果嘛。。。&#xff08;先装了个X&#xff09;一、…

【C++】类和对象入门必知

面向过程和面向对象的初步认识类的引入类的定义类的访问限定符封装类的作用域类的实例化类对象模型this指针C语言和C实现Stack的对比面向过程和面向对象的初步认识 C语言是面向过程的&#xff0c;关注的是过程&#xff0c;分析出求解问题的步骤&#xff0c;通过函数调用逐步解…

3717: yuyu学数数

描述yuyu开始学数数了&#xff0c;她要爸爸给他一些火柴棍&#xff0c;她要拼出很多数来。yuyu每次说要拼什么数字&#xff0c;爸爸就得想想要给她几根&#xff0c;好累啊&#xff0c;于是就只好写程序了。输入输入数据有多组&#xff0c;每组占一行&#xff0c;每行一个非负整…

版本控制软件SVN

SVN学习 1 版本控制软件定义及用途 版本控制软件是为适应软件配置管理的需要&#xff0c;控制软件的修改&#xff0c;减少混乱&#xff0c;提高软件生产效率&#xff0c;其是软件质量保证的重要环节软件配置管理是对软件修改进行标识、组织和控制的技术&#xff0c;用来协调和…

数据结构:循环队列的实现(leetcode622.设计循环队列)

目录 一.循环队列简单介绍 二.用静态数组实现循环队列 1.数组循环队列结构设计 2.数组循环队列的堆区内存申请接口 3.数据出队和入队的接口实现 4.其他操作接口 5.数组循环队列的实现代码总览 三.静态单向循环链表实现循环队列 1.链表循环队列的结构设计 2.创建静…

Linux服务:Nginx服务配置及相关模块

目录 一、Nginx配置文件 1、主配置文件解析 2、子配置文件启用 二、子配置文件使用 1、创建虚拟主机实验 2、基于端口虚拟主机实验 三、Nginx模块 1、access模块 2、自定义错误页面 3、状态页开启 一、Nginx配置文件 1、主配置文件解析 ①yum安装主配置文件位置&…