SpringBoot整合RabbitMQ的快速使用教程

news/2024/7/20 17:48:39/文章来源:https://blog.csdn.net/weixin_50348837/article/details/139276462

     

目录

一、引入依赖

二、配置rabbitmq的连接信息等

1、生产者配置

2、消费者配置 

三、设置消息转换器

四、生产者代码示例

 1、配置交换机和队列信息

2、生产消息代码

五、消费者代码示例

1、消费层代码

2、业务层代码 


        在分布式系统中,消息队列是一种重要的通信方式,它能够有效地将消息从一个应用程序传递到另一个应用程序。RabbitMQ是一款流行的开源消息队列系统,简单易用且功能强大。本文将介绍如何使用SpringBoot快速整合RabbitMQ,实现消息的发送和接收。

 

交换机: 主要负责接收生产者发送的消息,并根据特定的规则将这些消息路由到一个或多个队列中。交换机的类型有:

  •  Fanout Exchange(扇出交换机)

        Fanout交换机会将接收到的所有消息广播到它知道的所有队列中。这种类型的交换机不考虑路由键,只是简单地将消息复制到所有绑定的队列中。适用于不需要选择性地发送消息给特定队列的情况,例如,广播系统通知或有多个服务需要消费同一份数据的场景。

  • Direct Exchange(直连交换机)

       Direct交换机根据消息的路由键将消息发送到与之匹配的队列中。只有当路由键与绑定关键字完全匹配时,消息才会被路由到相应的队列。适合于精确控制消息投递的场景,如特定的服务或功能模块只关心特定类型的消息。

  • Topic Exchange(主题交换机)

       Topic交换机允许更复杂的匹配规则,通过模式匹配的方式将消息路由到一个或多个队列。路由键和绑定键都使用点分隔的字符串,可以包含特殊字符如“#”和“*”来实现模糊匹配。"*"用于匹配一个单词,而“#”则用于匹配零个或多个单词。适合于需要按内容分类消息的系统,如日志处理系统,可以根据日志等级或来源将日志消息分发到不同的队列。

  • Headers Exchange(头交换机)

        Headers交换机使用消息头的一组键值对来决定消息应该被路由到哪个队列。这种交换机允许更细粒度的路由控制,但配置和使用较为复杂。适合需要基于消息多个属性来动态决定路由的场景,例如某些高级的路由策略或复杂的事件处理系统。

队列:主要用于存储消息,实现先进先出(FIFO)的特性。

一、引入依赖

这里引入了两个依赖。一个是rabbitmq的依赖,另一个是配置json转换器所需要的依赖。生产者和消费者服务都需要引入这两个依赖。

<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
      <groupId>com.fasterxml.jackson.dataformat</groupId>
       <artifactId>jackson-dataformat-xml</artifactId>
 </dependency>

二、配置rabbitmq的连接信息等

1、生产者配置

  rabbitmq:
    host: 170.40.20.16
    port: 5672
    username: zhuoye
    password: zy521
    virtual-host: /

2、消费者配置 

   rabbitmq:
    host: 170.40.20.16
  port: 5672
    username: zhuoye
    password: zy521
    virtual-host: /
    listener:
      simple:
        prefetch: 1 #每次只能处理一个,处理完成才能获取下一个消息

三、设置消息转换器

        默认情况下Spring采用的序列化方式是JDK序列化,而JDK的序列化存在可读性性差、占用内存大、存在安全漏洞等问题。所以,这里我们一般使用Jackson的序列化代替JDk的序列化。

在生产者和消费者的启动类上加上如下代码:  

@SpringBootApplication
@EnableRabbit //开启rabbitmq的使用
public class ConsumerApp {public static void main( String[] args ) {SpringApplication.run(ConsumerApp.class, args);}//使用的是Jackson库中的Jackson2JsonMessageConverter类,代替使用jdk自带的序列化@Beanpublic MessageConverter jacksonMessageConvertor(){Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();jackson2JsonMessageConverter.setCreateMessageIds(true);//开启消息id的自动生成功能return jackson2JsonMessageConverter;}
}

四、生产者代码示例

 1、配置交换机和队列信息
@Configuration
public class RabbitMqConfig {private static String EXCHANGE_NAME="amq.topic";private static String QUEUE_NAME="alarm.data.topic.queue";private static String CONFIRM_ALARM_QUEUE_NAME="alarm.confirm.data.topic.queue";/*** 声明交换机*/@Beanpublic TopicExchange exchange(){// durable:是否持久化,默认是false// autoDelete:是否自动删除,当没有生产者或者消费者使用此交换机,该交换机会自动删除。return new TopicExchange(EXCHANGE_NAME,true,false);}/*** 声明告警队列* @return*/@Bean("alarmQueue")public Queue alarmQueue(){// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。return new Queue(QUEUE_NAME,true,false,false);}/*** 声明确认告警队列* @return*/@Bean("confirmAlarmQueue")public Queue confirmAlarmQueue(){return new Queue(CONFIRM_ALARM_QUEUE_NAME,true,false,false);}/*** 声明告警队列绑定关系* @param queue* @param topicExchange* @return*/@Beanpublic Binding alarmBinding(@Qualifier("alarmQueue") Queue queue, TopicExchange topicExchange){return BindingBuilder.bind(queue).to(topicExchange).with("server.event.#");}/*** 声明确认告警队列绑定关系* @param queue* @param topicExchange* @return*/@Beanpublic Binding confirmAlarmBinding(@Qualifier("confirmAlarmQueue") Queue queue, TopicExchange topicExchange){return BindingBuilder.bind(queue).to(topicExchange).with("server.event_confirm.#");}
2、生产消息代码
    @Autowiredprivate RabbitTemplate rabbitTemplate;private static String EXCHANGE_NAME="amq.topic";private static String CONFIRM_ALARM_QUEUE_NAME="alarm.confirm.data.topic.queue";@Testvoid producerAlarmMsg() {String msg = "发送一条告警消息";rabbitTemplate.convertAndSend(EXCHANGE_NAME, "server.event.#",msg);System.out.println("msg = " + msg);}@Testvoid producerConfirmAlarmMsg() {String msg = "发送一条确认告警消息";rabbitTemplate.convertAndSend(CONFIRM_ALARM_QUEUE_NAME, "server.event_confirm.#",msg);System.out.println("msg = " + msg);}

五、消费者代码示例

1、消费层代码
@Component
public class AlarmConsumer {@Autowiredprivate IAlarmService alarmService;@RabbitListener(queues ="alarm.data.topic.queue",concurrency = "5")public void getAlarmInfo(String data){alarmService.dealAlarmData(data);}@RabbitListener(queues ="alarm.confirm.data.topic.queue",concurrency = "5")public void getConfirmAlarmInfo(String data){alarmService.dealConfirmAlarmData(data);}
}
2、业务层代码 
@Service
public class IAlarmServiceImpl implements IAlarmService {@Overridepublic void dealAlarmData(String data) {EquipAlarmResp equipAlarmResp= JSON.parseObject(result,EquipAlarmResp.class);List<String> alarmIdsOld = dceEquipAlarmMapper.queryAllAlarmIds();DceEquipAlarmDto dceEquipAlarmDto = CopyBeanUtils.copyProperties(equipAlarmResp, DceEquipAlarmDto.class);dceEquipAlarmDto.setCreateTime(new Date());dceEquipAlarmDto.setAlarmTime(dceEquipAlarmDto.getAlarmTime()/1000);//查询出需要新增或者更新的数据Boolean flag=alarmIdsOld.stream().filter(a->a.equals(dceEquipAlarmDto.getAlarmId())).findFirst().isPresent();//开启事务,保证新增、更新、删除的原子性TransactionStatus transaction = transactionManager.getTransaction(transactionDefinition);List<DceEquipAlarmDto> list=new ArrayList<>();list.add(dceEquipAlarmDto);try {//新增if (!flag) {dceEquipAlarmMapper.insertBatch(list);}//更新if (flag) {dceEquipAlarmMapper.updateBatch(list);}//提交事务transactionManager.commit(transaction);} catch (Exception e) {//回滚transactionManager.rollback(transaction);log.error("DynamicEnvironmentServiceImpl.getAlarmInfoByRabbitMq 新华报业动环设备告警信息更新失败!", e);}}@Overridepublic void dealConfirmAlarmData(String data) {EquipConfirmAlarmResp alarmResp = JSON.parseObject(data,EquipConfirmAlarmResp.class);Integer confirmTime = Integer.parseInt(String.valueOf(System.currentTimeMillis() / 1000));alarmResp.setConfirmTime(confirmTime);dceEquipAlarmMapper.updateConfirmAlarmBatch(alarmResp,alarmResp.getAlarmIds());}}

注:以上代码为对接告警信息和对接告警确认消息的示例。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_1053538.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

HarmonyOS interface router scale pageTransition SlideEffect.Left ArkTS ArkUI

&#x1f3ac;️create Component export default struct TitleBar {build(){Row(){Text(transition).fontSize(30fp).fontColor(Color.White)}.width(100%).height(8%).backgroundColor(#4169E1).padding({left:10})}}&#x1f39e;️interface export interface IList{ti…

②单细胞学习-组间及样本细胞比例分析

目录 数据读入 每个样本各细胞比例 两个组间细胞比例 亚组间细胞比例差异分析&#xff08;循环&#xff09; 单个细胞类型亚新间比例差异 ①单细胞学习-数据读取、降维和分群-CSDN博客 比较各个样本间的各类细胞比例或者亚组之间的细胞比例差异 ①数据读入 #各样本细胞…

Android studio sdk 虚拟机无法打开运行

1.确认是否在BIOS开启硬件虚拟化支持,选择Enable 2.win8/win10 Hyper-V冲突。控制面板-》程序与功能-》windows功能-》关闭 Hyper-V 3.sdk 路径非默认路径 复制avd C:\Users\Administrator\.android\avd 到 sdk的安装路径下 D:\Android\sdk 。重启软件重新启动即可

Py之llama-parse:llama-parse(高效解析和表示文件)的简介、安装和使用方法、案例应用之详细攻略

Py之llama-parse&#xff1a;llama-parse(高效解析和表示文件)的简介、安装和使用方法、案例应用之详细攻略 目录 llama-parse的简介 llama-parse的安装和使用方法 1、安装 2、使用方法 第一步&#xff0c;获取API 密钥 第二步&#xff0c;安装LlamaIndex、LlamaParse L…

基于SpringBoot的本科生考研率统计系统

基于SpringBoot的本科生考研率统计系统 一、开发技术二、功能模块三、代码结构四、数据库设计五、运行截图六、源码获取 一、开发技术 技术&#xff1a;SpringBoot、MyBatis-Plus、Redis、MySQL、Thymeleaf、Html、Vue、Element-ui。 框架&#xff1a;基于开源框架easy-admin开…

18 - grace数据处理 - 补充 - 地下水储量计算过程分解 - 地表水储量变化Glads水文数据处理

18 - grace数据处理 - 补充 - 地下水储量计算过程分解 - 地表水储量变化 0 引言1 Grace陆地水储量过程整合0 引言 由水量平衡方程可以将地下水储量的计算过程分解为3个部分,第一部分计算陆地水储量变化、第二部分计算地表水储量变化、第三部分计算地下水储量变化。本篇简单介绍…

PCIe协议之-DLLP详解

✨前言&#xff1a; &#x1f31f;数据链路层的功能 数据链路层将从物理层中获得报文&#xff0c; 并将其传递给事务层&#xff1b; 同时接收事务层的报文&#xff0c; 并将其转发到物理层; 核心的功能有以下三点 1.保证TLP在 PCIe 链路中的正确传递; 2.数据链路层使用了容错…

【全开源】知识库文档系统源码(ThinkPHP+FastAdmin)

知识库文档系统源码&#xff1a;构建智慧知识库的基石 引言 在当今信息爆炸的时代&#xff0c;知识的有效管理和利用对于企业和个人来说至关重要。知识库文档系统源码正是为了满足这一需求而诞生的&#xff0c;它提供了一个高效、便捷的平台&#xff0c;帮助用户构建、管理、…

【ARM+Codesys案例】T3/RK3568/树莓派+Codesys枕式包装机运动控制器

枕式包装机是一种包装能力非常强&#xff0c;且能适合多种规格用于食品和非食品包装的连续式包装机。它不但能用于无商标包装材料的包装&#xff0c;而且能够使用预先印有商标图案的卷筒材料进行高速包装。同时&#xff0c;具有稳定性高、生产效率高&#xff0c;适合连续包装、…

arcgisPro将一个图层的要素复制到另一个图层

1、打开两个图层&#xff0c;如下&#xff0c;其中一个图层中有两个要素&#xff0c;需要将其中一个要素复制到另一个图层中&#xff0c;展示如下&#xff1a; 2、选中待复制要素&#xff0c;点击复制按钮&#xff0c;如下&#xff1a; 3、下拉粘贴按钮列表&#xff0c;选择【选…

922. 按奇偶排序数组 II - 力扣

1. 题目 给定一个非负整数数组 nums&#xff0c; nums 中一半整数是 奇数 &#xff0c;一半整数是 偶数 。 对数组进行排序&#xff0c;以便当 nums[i] 为奇数时&#xff0c;i 也是 奇数 &#xff1b;当 nums[i] 为偶数时&#xff0c; i 也是 偶数 。 你可以返回 任何满足上述…

推荐一个娱乐网站poki

今天&#xff0c;我要向您介绍一个充满乐趣的娱乐网站——Poki。这是一个集合了众多在线小游戏的平台&#xff0c;适合所有年龄段的玩家。无论您是想在工作间隙放松一下&#xff0c;还是寻找适合家庭聚会时的娱乐活动&#xff0c;Poki都能满足您的需求。所有游戏都无需下载或安…

Oracle递归查询笔记

目录 一、创建表结构和插入数据 二、查询所有子节点 三、查询所有父节点 四、查询指定节点的根节点 五、查询指定节点的递归路径 六、递归子类 七、递归父类 一、创建表结构和插入数据 CREATE TABLE "REGION" ( "ID" VARCHAR2(36) DEFAULT SYS_GUI…

SQL试题使得每个学生 按照姓名的字⺟顺序依次排列 在对应的⼤洲下⾯

学⽣地理信息报告 学校有来⾃亚洲、欧洲和美洲的学⽣。 表countries 数据如下&#xff1a; namecontinentJaneAmericaPascalEuropeXiAsiaJackAmerica 1、编写解决⽅案实现对⼤洲&#xff08;continent&#xff09;列的 透视表 操作&#xff0c;使得每个学生 按照姓名的字⺟顺…

【全开源】点餐小程序系统源码(ThinkPHP+FastAdmin+UniApp)

基于ThinkPHPFastAdminUniApp开发的点餐微信小程序&#xff0c;类似肯德基&#xff0c;麦当劳&#xff0c;喜茶等小程序多店铺模式&#xff0c;支持子商户模式&#xff0c;提供全部前后台无加密源代码和数据库&#xff0c;支持私有化部署。 革新餐饮行业的智慧点餐解决方案 一…

有些错误,常犯常新、常新常犯:记录一个使用element-plus的tooltip组件的错误

使用element-plus的tooltip组件&#xff0c;最开始的写法是这样的&#xff1a; <el-tooltipclass"box-item"effect"dark"content"tooltip content" ><el-button v-if"isDisabled" :underline"false" type"pr…

JavaSE--基础语法(第一期)

Java是一种优秀的程序设计语言&#xff0c;它具有令人赏心悦目的语法和易于理解的语义。不仅如此&#xff0c;Java还是一个有一系列计算机软件和规范形成的技术体系&#xff0c;这个技术体系提供了完整的用于软件开发和 跨平台部署的支持环境&#xff0c;并广泛应用于嵌入式系统…

Jeecg | 如何解决 ERR Client sent AUTH, but no password is set 问题

最近在尝试Jeecg低代码开发&#xff0c;但是碰到了超级多的问题&#xff0c;不过总归是成功运行起来了。 下面说说碰到的最后一个配置问题&#xff1a;连接redis失败 Error starting ApplicationContext. To display the conditions report re-run your application with deb…

ELK 日志监控平台(一)- 快速搭建

文章目录 ELK 日志监控平台&#xff08;一&#xff09;- 快速搭建1.ELK 简介2.Elasticsearch安装部署3.Logstash安装部署4.Kibana安装部署5.日志收集DEMO5.1.创建SpringBoot应用依赖导入日志配置文件 logback.xml启动类目录结构启动项目 5.2.创建Logstash配置文件5.3.重新启动L…

解读makefile中的.PHONY

在 Makefile 中&#xff0c;.PHONY 是一个特殊的目标&#xff0c;用于声明伪目标&#xff08;phony target&#xff09;。伪目标是指并不代表实际构建结果的目标&#xff0c;而是用来触发特定动作或命令的标识。通常情况下&#xff0c;.PHONY 会被用来声明一组需要执行的动作&a…