549、RocketMQ详细入门教程系列 -【消息队列之 RocketMQ(三)】 2023.02.28

news/2024/4/27 10:01:22/文章来源:https://blog.csdn.net/youyouwuxin1234/article/details/129265621

目录

    • 一、Spring 整合 RocketMQ
      • 1.1 消息生产者
      • 1.2 消息消费者
      • 1.3 Spring 配置文件
      • 1.4 运行实例程序
    • 二、参考链接

一、Spring 整合 RocketMQ

不同于 RabbitMQ、ActiveMQ、Kafka 等消息中间件,Spring 社区已经通过多种方式提供了对这些中间件产品集成,例如通过 spring-jms 整合 ActiveMQ、通过 Spring AMQP 项目下的 spring-rabbit 整合 RabbitMQ、通过 spring-kafka 整合 kafka ,通过他们可以在 Spring 项目中更方便使用其 API 。目前在 Spring 框架中集成 RocketMQ 有三种方式,一是将消息生产者和消费者定义成 bean 对象交由 Spring 容器管理,二是使用 RocketMQ 社区的外部项目 rocketmq-jms(https://github.com/apache/rocketmq-externals/tree/master/rocketmq-jms)然后通过 spring-jms 方式集成使用,三是如果你的应用是基于 spring-boot 的,可以使用 RocketMQ 的外部项目 rocketmq-spring-boot-starter(https://github.com/apache/rocketmq-externals/tree/master/rocketmq-spring-boot-starter)比较方便的收发消息。
总的来讲 rocketmq-jms 项目实现了 JMS 1.1 规范的部分内容,目前支持 JMS 中的发布/订阅模型收发消息。rocketmq-spring-boot-starter 项目目前已经支持同步发送、异步发送、单向发送、顺序消费、并行消费、集群消费、广播消费等特性,如果比较喜欢 Spring Boot 这种全家桶的快速开发框架并且现有特性已满足业务要求可以使用该项目。当然从 API 使用上最灵活的还是第一种方式,下面以第一种方式为例简单看下Spring 如何集成 RocketMQ 的。

1.1 消息生产者

package org.study.mq.rocketMQ.spring;import org.apache.log4j.Logger;
import org.apache.rocketmq.client.producer.DefaultMQProducer;public class SpringProducer {private Logger logger = Logger.getLogger(getClass());private String producerGroupName;private String nameServerAddr;private DefaultMQProducer producer;public SpringProducer(String producerGroupName, String nameServerAddr) {this.producerGroupName = producerGroupName;this.nameServerAddr = nameServerAddr;}public void init() throws Exception {logger.info("开始启动消息生产者服务...");//创建一个消息生产者,并设置一个消息生产者组producer = new DefaultMQProducer(producerGroupName);//指定 NameServer 地址producer.setNamesrvAddr(nameServerAddr);//初始化 SpringProducer,整个应用生命周期内只需要初始化一次producer.start();logger.info("消息生产者服务启动成功.");}public void destroy() {logger.info("开始关闭消息生产者服务...");producer.shutdown();logger.info("消息生产者服务已关闭.");}public DefaultMQProducer getProducer() {return producer;}
}

消息生产者就是把生产者 DefaultMQProducer 对象的生命周期分成构造函数、init、destroy 三个方法,构造函数中将生产者组名、NameServer 地址作为变量由 Spring 容器在配置时提供,init 方法中实例化 DefaultMQProducer 对象、设置 NameServer 地址、初始化生产者对象,destroy 方法用于生产者对象销毁时清理资源。

1.2 消息消费者

package org.study.mq.rocketMQ.spring;import org.apache.log4j.Logger;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;public class SpringConsumer {private Logger logger = Logger.getLogger(getClass());private String consumerGroupName;private String nameServerAddr;private String topicName;private DefaultMQPushConsumer consumer;private MessageListenerConcurrently messageListener;public SpringConsumer(String consumerGroupName, String nameServerAddr, String topicName, MessageListenerConcurrently messageListener) {this.consumerGroupName = consumerGroupName;this.nameServerAddr = nameServerAddr;this.topicName = topicName;this.messageListener = messageListener;}public void init() throws Exception {logger.info("开始启动消息消费者服务...");//创建一个消息消费者,并设置一个消息消费者组consumer = new DefaultMQPushConsumer(consumerGroupName);//指定 NameServer 地址consumer.setNamesrvAddr(nameServerAddr);//设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);//订阅指定 Topic 下的所有消息consumer.subscribe(topicName, "*");//注册消息监听器consumer.registerMessageListener(messageListener);// 消费者对象在使用之前必须要调用 start 初始化consumer.start();logger.info("消息消费者服务启动成功.");}public void destroy(){logger.info("开始关闭消息消费者服务...");consumer.shutdown();logger.info("消息消费者服务已关闭.");}public DefaultMQPushConsumer getConsumer() {return consumer;}}

同消息生产者类似,消息消费者是把生产者 DefaultMQPushConsumer 对象的生命周期分成构造函数、init、destroy 三个方法,具体含义在介绍 Java 访问 RocketMQ 实例时已经介绍过了,不再赘述。当然,有了消费者对象还需要消息监听器在接收到消息后执行具体的处理逻辑。

package org.study.mq.rocketMQ.spring;import org.apache.log4j.Logger;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;import java.io.UnsupportedEncodingException;
import java.util.List;public class MessageListener implements MessageListenerConcurrently {private Logger logger = Logger.getLogger(getClass());public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {if (list != null) {for (MessageExt ext : list) {try {logger.info("监听到消息 : " + new String(ext.getBody(), "UTF-8"));} catch (UnsupportedEncodingException e) {e.printStackTrace();}}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}}

消息监听器类就是把前面 Java 示例中注册消息监听器时声明的匿名内部类代码抽取出来定义成单独一个类而已。

1.3 Spring 配置文件

因为只使用 Spring 框架集成,所以除了 Sping 框架核心 jar 包外不需要额外添加依赖包了。本例中将消息生产者和消息消费者分成两个配置文件,这样能更好的演示收发消息的效果。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans-4.3.xsd"><bean id="producer" class="org.study.mq.rocketMQ.spring.SpringProducer" init-method="init" destroy-method="destroy"><constructor-arg name="nameServerAddr" value="localhost:9876"/><constructor-arg name="producerGroupName" value="spring_producer_group"/></bean>
</beans>

消息生产者配置很简单,定义了一个消息生产者对象,该对象初始化时调用 init 方法,对象销毁前执行 destroy 方法,将 Name Server 地址和生产者组配置好。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans-4.3.xsd"><bean id="messageListener" class="org.study.mq.rocketMQ.spring.MessageListener" /><bean id="consumer" class="org.study.mq.rocketMQ.spring.SpringConsumer" init-method="init" destroy-method="destroy"><constructor-arg name="nameServerAddr" value="localhost:9876"/><constructor-arg name="consumerGroupName" value="spring_consumer_group"/><constructor-arg name="topicName" value="spring-rocketMQ-topic" /><constructor-arg name="messageListener" ref="messageListener" /></bean></beans>

消息消费者同消息生产者配置类似,多了一个消息监听器对象的定义和绑定。

1.4 运行实例程序

按前述步骤 启动 Name Server 和 Broker,接着运行消息生产者和消息消费者程序,简化起见我们用两个单元测试类模拟这两个程序:

package org.study.mq.rocketMQ.spring;import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.junit.Before;
import org.junit.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;public class SpringProducerTest {private ApplicationContext container;@Beforepublic void setup() {container = new ClassPathXmlApplicationContext("classpath:spring-producer.xml");}@Testpublic void sendMessage() throws Exception {SpringProducer producer = container.getBean(SpringProducer.class);for (int i = 0; i < 20; i++) {//创建一条消息对象,指定其主题、标签和消息内容Message msg = new Message("spring-rocketMQ-topic",null,("Spring RocketMQ demo " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* 消息内容 */);//发送消息并返回结果SendResult sendResult = producer.getProducer().send(msg);System.out.printf("%s%n", sendResult);}}
}

SpringProducerTest 类模拟消息生产者发送消息。

package org.study.mq.rocketMQ.spring;import org.junit.Before;
import org.junit.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;public class SpringConsumerTest {private ApplicationContext container;@Beforepublic void setup() {container = new ClassPathXmlApplicationContext("classpath:spring-consumer.xml");}@Testpublic void consume() throws Exception {SpringConsumer consumer = container.getBean(SpringConsumer.class);Thread.sleep(200 * 1000);consumer.destroy();}
}

SpringConsumerTest 类模拟消息消费者者接收消息,在 consume 方法返回之前需要让当前线程睡眠一段时间,使消费者程序继续存活才能监听到生产者发送的消息。

分别运行 SpringProducerTest 类 和 SpringConsumerTest 类,在 SpringConsumerTest 的控制台能看到接收的消息:
在这里插入图片描述
假如启动两个 SpringConsumerTest 类进程,因为它们属于同一消费者组,在 SpringConsumerTest 的控制台能看到它们均摊到了消息:
在这里插入图片描述
在这里插入图片描述

二、参考链接

[01] 消息队列之 RocketMQ

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_75426.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux | 2. 用户管理

如有错误&#xff0c;恳请指出。 1. 设置文件权限 权限设置如下&#xff1a; root表示文件所有者&#xff0c;stud1表示文件所属组。其他用户无法访问。更改指令是chown。 更改目录文件所属组&#xff1a;chown .lab lossfound/更改目录文件所有者&#xff1a;chown lab loss…

html实现浪漫的爱情日记(附源码)

文章目录1.设计来源1.1 主界面1.2 遇见1.3 相熟1.4 相知1.5 相念2.效果和源码2.1 动态效果2.2 源代码2.3 代码结构源码下载更多爱情表白源码作者&#xff1a;xcLeigh 文章地址&#xff1a;https://blog.csdn.net/weixin_43151418/article/details/129264757 html实现浪漫的爱情…

Javaweb复习之HTTPTomcatServelet

1.Web概述 1.1 Web和JavaWeb的概念 Web是全球广域网&#xff0c;也称为万维网(www)&#xff0c;能够通过浏览器访问的网站。 JavaWeb就是用Java技术来解决相关web互联网领域的技术栈 1.2 JavaWeb技术栈 B/S 架构&#xff1a;Browser/Server&#xff0c;浏览器/服务器 架构模…

Linux: malloc()的指向指针发生指向偏移后,释放前需要将指针指向复原。

Linux: malloc()的指向指针发生指向偏移后&#xff0c;释放前需要将指针指向复原。 #include <stdlib.h> #include <stdio.h> #include <unistd.h> #include <sys/types.h> #include <sys/stat.h> #include <string.h> #include <time…

MIT:只需一层RF传感器,就能为AR头显赋予“X光”穿透视力

近年来&#xff0c;AR在仓库、工厂等场景得到应用&#xff0c;比如GlobalFoundries、亚马逊、菜鸟裹裹就使用摄像头扫描定位货品&#xff0c;并使用AR来导航和标记。目前&#xff0c;这种方案主要基于视觉算法&#xff0c;因此仅能定位视线范围内的目标。然而&#xff0c;在一些…

C++ string类(二)及深浅拷贝

一、string类方法使用举例1.迭代器迭代器本质&#xff1a;指针&#xff08;理解&#xff09;迭代器&#xff1a;正向迭代器&#xff1a; begin() | end() 反向迭代器&#xff1a; rbegin() | rend()2.find使用//找到s中某个字符 void TestString3() {string s("AAADEFNUIE…

携程面经1

面经 HDFS读写流程 1.读流程 客户端向NameNode发起读请求&#xff08;如果存在&#xff09;NameNode返回一批block地址客户端与第一个block的拓扑距离最近的节点建立连接以packet&#xff08;64kb&#xff09;的单位读取数据块。一个block读取完成后客户端会断开与该DataNod…

5个开源的Java项目快速开发脚手架

概览 &#xff1a; GunspigRuoYiJeecg-bootiBase4J 一、Guns 推荐指数 &#xff1a;⭐⭐⭐⭐⭐ 简介 采用主流框架 &#xff1a; 基于 Spring Boot2.0版本开发&#xff0c;并且支持 Spring Cloud Alibaba 微服务。功能齐全 &#xff1a;包含系统管理&#xff0c;代码生成&a…

这么强才给我28k,我头都不回,转身拿下40k~

时间真的过得很快&#xff0c;眨眼就从校园刚出来的帅气小伙变成了油腻大叔&#xff0c;给各位刚入道的测试朋友一点小建议&#xff0c;希望你们直通罗马吧&#xff01; 如何选择自己合适的方向 关于选择测试管理&#xff1a; 第一&#xff0c;你一定不会是一个喜欢技术&…

Vue的组件(注册、局部、组件复用、props、emit、生命周期)全解

文章目录前言知识点组件注册局部组件组件复用组件间通信props 类型检测子父组件通信之 emit动态组件生命周期函数前言 Vue 支持模块化和组件化开发&#xff0c;可以将整个页面进行模块化分割&#xff0c;低耦合高内聚&#xff0c;使得代码可以在各个地方使用。 知识点 组件注册…

accent-color一行代码,让你的表单组件变好看

不做切图仔,从关注本专栏开始 文章目录 不做切图仔,从关注本专栏开始前言兼容性语法继承性智能前言 在之前的网站开发中,我们是很难去更改的你某些控件的颜色。我们可能要使用各种技巧来自定义我们的控件。好消息是,今天如果我们想要去改变控件的颜色,css为我们提供了一些…

心系区域发展,高德用一体化出行服务平台“聚”力区域未来

交通&#xff0c;是城市的血脉。通过对人、资源、产业的连接&#xff0c;交通建设往往是城市和区域经济发展的前提。不过&#xff0c;在度过了“要想富&#xff0c;先修路”的初级建设阶段后&#xff0c;交通产业内部也出现了挑战&#xff0c;诸如城市秩序、发展成本、用户使用…

【目标检测】Dynamic Head Unifying Object Detection Heads with Attentions

文章目录一、背景二、方法2.1 scale-aware attention2.2 spatial-aware attention2.3 task-aware attention2.4 总体过程2.5 和现有的检测器适配2.6 和其他注意力机制的关联三、效果四、代码论文链接&#xff1a; https://arxiv.org/pdf/2106.08322.pdf代码链接&#xff1a;htt…

Windows 安装RocketMQ

文章目录一、RocketMQ是什么&#xff1f;二、准备工作1.环境要求2.下载与解压3.启动MQ4. 测试是否成功启动三、安装管理端1. 代码下载2. 修改配置文件3. 启动MQ客户端jar包四、rocketMQ代码的使用入门五、问题记录1. 启动mqbroker.cmd没有反应2.消费者重复消费消息一、RocketMQ…

NCRE计算机等级考试Python真题(六)

第六套试题1、算法的时间复杂度是指A.执行算法程序所需要的时间B.算法程序的长度C.算法程序中的指令条数D.算法执行过程中所需要的基本运算次数正确答案&#xff1a; D2、下列关于栈的叙述中正确的是A.在栈中只能插入数据B.在栈中只能删除数据C.栈是先进先出的线性表D.栈是先进…

【Django功能开发】如何正确使用定时任务(启动、停止)

系列文章目录 【Django开发入门】ORM的增删改查和批量操作 【Django功能开发】编写自定义manage命令 文章目录系列文章目录前言一、django定时任务二、django-apscheduler基本使用1.安装django-apscheduler2.配置settings.py的INSTALLED_APPS3.通过命令生成定时记录表3.如何创…

嵌入式 linux 系统开发网络的设置

目录 一、前言 二、linux网络静态地址设置 前言 为什么要对linux系统下的ubuntu进行网络设置呢&#xff1f; 因为我们在嵌入式开发中&#xff0c;我们要保证windows系统、linux系统、开发板的ip要处于同一个网段&#xff0c;而默认ubuntu下的linux系统的ip是动态分配的&#…

如何彻底删除SQL Server 2008中的登录账号

我个人遇到的最烦人的事情之一是 SQL Server Management Studio中“服务器名称和登录名”对话框的下拉列表。 以下是我想从 SSMS 连接屏幕中删除某些内容的两种情况: 键入的服务器名称不正确 服务器将来不需要。当我看到服务器的名称,它已经存在了很长一段时间,我知道我不会…

图像处理实战--Opencv实现人像迁移

前言&#xff1a; Hello大家好&#xff0c;我是Dream。 今天来学习一下如何使用Opencv实现人像迁移&#xff0c;欢迎大家一起参与探讨交流~ 本文目录&#xff1a;一、实验要求二、实验环境三、实验原理及操作1.照片准备2.图像增强3.实现美颜功能4.背景虚化5.图像二值化处理6.人…

Day21【元宇宙的实践构想07】—— 元宇宙与人工智能

&#x1f483;&#x1f3fc; 本人简介&#xff1a;男 &#x1f476;&#x1f3fc; 年龄&#xff1a;18 &#x1f91e; 作者&#xff1a;那就叫我亮亮叭 &#x1f4d5; 专栏&#xff1a;元宇宙 0.0 写在前面 “元宇宙”在2021年成为时髦的概念。元宇宙到底是什么&#xff1f;元宇…