JAVA开发( 腾讯云消息队列 RocketMQ使用总结 )

news/2024/4/29 7:48:58/文章来源:https://blog.csdn.net/dongjing991/article/details/131510421

一、问题背景

      之所以需要不停的总结是因为在java开发过程中使用到中间件实在太多了,久久不用就会慢慢变得生疏,有时候一个中间很久没使用,可能经过了很多版本的迭代,使用起来又有区别。所以还是得不断总结更新。最近博主就是在使用腾讯云RocketMQ中遇到了点问题,排查了很久,也不知道什么原因,最好咨询了了腾讯官方技术支撑,最终解决。现在很多中间件都是各位巨头经过封装,然后卖给中小企业,有时候遇到点问题,还不容易在网上搜索资料排查到,都只能在巨头的生态里摸索,排查,请教。

二、RocketMQ产品概述

消息队列 RocketMQ 版(TDMQ for RocketMQ,简称 TDMQ RocketMQ 版)是腾讯云基于 Apache RocketMQ 构建的分布式消息中间件,完全兼容 Apache RocketMQ 的各个组件与概念,支持开源社区版本的客户端零改造接入。

消息队列 RocketMQ 版具有低延迟、高性能、高可靠、万亿级消息容量和灵活可扩展等特点,可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。

三、RocketMQ的组成部分和基本概念

Producer 集群: 客户侧应用,负责生产并发送消息。

Consumer 集群:客户侧应用,负责订阅和消费处理消息。

Nameserver 集群: 服务端应用,负责路由寻址和 Broker 心跳注册。

心跳注册:NameServer 相当于注册中心的角色,各个角色的机器都要定时向 NameServer 上报自己的状态,如果超时未上报,NameServer 会认为某个机器出现故障不可用了,从而将这个机器从可用列表中删除。

路由寻址:每个 NameServer 中都保存着 Broker 集群的整个路由信息和用于客户端查询的队列信息,生产者和消费者通过 NameServer 去获取整个Broker 集群的路由信息,从而进行消息的投递和消费。

Broker集群:服务端应用,负责接收,存储,投递消息,支持主从多副本模式,从节点可选部署,实际现网公有云上数据高可靠直接依赖云盘三副本。

管控集群: 服务端应用,可视化的管控控制台,负责运维整个集群,例如源数据的收发和管理等。

消息(Message)

消息系统所传输信息的物理载体,生产和消费数据的最小单位。生产者将业务数据的负载和拓展属性包装成消息发送到服务端,服务端按照相关语义将消息投递到消费端进行消费。

主题(Topic)

Topic 表示一类消息的集合,每个主题包含若干消息,是 RocketMQ 进行消息订阅的基本单位。

消息标签(MessageTag)

为消息设置的标签,用于将同一个 Topic 下区分不同类型的消息,可以理解为 Topic 是消息的一级分类,Tag 是消息的二级分类。

消息队列(MessageQueue)

存储消息的物理实体,一个 Topic 可以包含多个 Queue,Queue 也叫消息分区,一个 Queue 中的消息只能被一个消费者组中的一个消费者消费,一个 Queue 中的消息不允许同一个消费者组中的多个消费者同时消费。

消息位点(MessageQueueOffset)​

消息是按到达 RocketMQ 服务端的先后顺序存储在指定主题的多个队列中,每条消息在队列中都有一个唯一的 Long 类型坐标,这个坐标被定义为消息位点。

消费位点(ConsumerOffset)​

一条消息被某个消费者消费完成后不会立即从队列中删除, RocketMQ 会基于每个消费者分组记录消费过的最新一条消息的位点,即消费位点。

消息索引(MessageKey)​

消息索引是 RocketMQ 提供的面向消息的索引属性。通过设置的消息索引可以快速查找到对应的消息内容。

生产者(Producer)​

生产者是 RocketMQ 系统中用来构建并传输消息到服务端的运行实体。生产者通常被集成在业务系统中,将业务消息按照要求封装成消息并发送至服务端。

消费者(Consumer)​

消费者是 RocketMQ 中用来接收并处理消息的运行实体。消费者通常被集成在业务系统中,从服务端获取消息,并将消息转化成业务可理解的信息,供业务逻辑处理。

分组(Group)

可分为生产者组和消费者组:

生产者组:同一类 Producer 的集合,这类 Producer 发送同一类消息且发送逻辑一致。如果发送的是事务消息,且生产者发送后崩溃,则 Broker 服务器会联系同一个生产者组的其他生产者实例以提交或者回溯消费。

消费者组:同一类 Consumer 的集合,这类 Consumer 通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面实现了负载均衡和容错。消费者组的消费者实例必须订阅完全相同的 Topic。

消息类型(MessageType)​

按照消息传输特性的不同而定义的分类,用于类型管理和安全校验。RocketMQ 支持的消息类型有普通消息、顺序消息、事务消息和定时/延时消息。

普通消息

普通消息是一种基础的消息类型,由生产投递到指定 Topic 后,被订阅了该 Topic 的消费者所消费。普通消息的 Topic 中无顺序的概念,可以使用多个分区数来提升消息的生产和消费效率,在吞吐量巨大时其性能最好。

顺序消息

顺序消息是消息队列 RocketMQ 提供的一种高级消息类型,对于一个指定的Topic,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发送的消息先消费,后发送的消息后消费。

重试队列

重试队列是一种为了确保消息被正常消费而设计的队列。当某些消息第一次被消费者消费后,没有得到正常的回应,则会进入重试队列,当重试达到一定次数后,停止重试,投递到死信队列中。

由于实际场景中,可能会存在的一些临时短暂的问题(如网络抖动,服务重启等)导致消息无法及时被处理,但短暂时间过后又恢复正常。这种场景下,重试队列的重试机制就可以很好解决此类问题。

死信队列

死信队列是一种特殊的消息队列,用于集中处理无法被正常消费的消息的队列。当消息在重试队列中达到一定重试次数后仍未能被正常消费,TDMQ 会判定这条消息在当前情况下无法被消费,将其投递至死信队列。

实际场景中,消息可能会由于持续一段时间的服务宕机,网络断连而无法被消费。这种场景下,消息不会被立刻丢弃,死信队列会对这种消息进行较为长期的持久化,用户可以在找到对应解决方案后,创建消费者订阅死信队列来完成对当时无法处理消息的处理。

集群消费

集群消费:当使用集群消费模式时,任意一条消息只需要被集群内的任意一个消费者处理即可。适用于每条消息只需要被处理一次的场景。

广播消费

广播消费:当使用广播消费模式时,每条消息会被推送给集群内所有注册过的消费者,保证消息至少被每个消费者消费一次。适用于每条消息需要被集群下每一个消费者处理的场景。

消息过滤​

消费者可以通过订阅指定消息标签(Tag)对消息进行过滤,确保最终只接收被过滤后的消息合集。过滤规则的计算和匹配在 RocketMQ 的服务端完成。

重置消费位点​

以时间轴为坐标,在消息持久化存储的时间范围内,重新设置消费者分组对已订阅主题的消费进度,设置完成后消费者将接收设定时间点之后,由生产者发送到 RocketMQ 服务端的消息。

消息轨迹​

在一条消息从生产者发出到消费者接收并处理过程中,由各个相关节点的时间、地点等数据汇聚而成的完整链路信息。通过消息轨迹,您能清晰定位消息从生产者发出,经由 RocketMQ 服务端,投递给消费者的完整链路,方便定位排查问题。

消息堆积​

生产者已经将消息发送到 RocketMQ 的服务端,但由于消费者的消费能力有限,未能在短时间内将所有消息正确消费掉,此时在服务端保存着未被消费的消息,该状态即消息堆积。

四、应用场景

异步解耦

交易引擎作为腾讯计费最核心的系统,每笔交易订单数据需要被几十个下游业务系统关注,包括库存系统、仓储系统、促销系统、积分系统等,多个系统对消息的处理逻辑不一致,单个系统不可能去适配每一个关联业务。此时,TDMQ RocketMQ 版可解除多个业务系统之间的耦合度,减少系统之间影响,提升核心业务响应速度和健壮性。

削峰填谷

企业不定时举办的一些营销活动,新品发布上线,节日抢红包等,往往都会带来临时性的流量洪峰,这对后端的各个应用系统考验是十分巨大的,如果直接采用扩容方式应对又会带来一定的资源浪费。RocketMQ 可以应对突发性的流量洪峰,在峰值时堆积消息,而在峰值过去后下游系统慢慢消费消息,解决上下游处理能力不匹配,提升系统可用性。

 还有等等

顺序收发

顺序消息是消息队列 RocketMQ 提供的一种高级消息类型,对于一个指定的Topic,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发送的消息先消费,后发送的消息后消费。顺序消息常用于以下业务场景:

订单创建场景:在一些电商系统中,同一个订单相关的创建订单消息、订单支付消息、订单退款消息、订单物流消息必须严格按照先后顺序来进行生产或者消费,否则消费中传递订单状态会发生紊乱,影响业务的正常进行。因此,该订单的消息必须按照一定的顺序在客户端和消息队列中进行生产和消费,同时消息之间有先后的依赖关系,后一条消息需要依赖于前一条消息的处理结果。

日志同步场景:在有序事件处理或者数据实时增量同步的场景中,顺序消息也能发挥较大的作用,如同步 mysql 的 binlog 日志时,需要保证数据库的操作是有顺序的。

金融场景:在一些撮合交易的场景下,比如某些证券交易,在价格相同的情况下,先出价者优先处理,则需要按照FIFO的方式生产和消费顺序消息。

五、如何使用

 集成到springBoot

mq配置信息项

server:port: 8082#rocketmq配置信息rocketmq:# tdmq-rocketmq服务接入地址name-server: rocketmq-xxx.rocketmq.ap-bj.public.tencenttdmq.com:9876# 生产者配置producer:# 生产者组名group: group111# 角色密钥access-key: eyJrZXlJZC....# 已授权的角色名称secret-key: admin# 消费者公共配置consumer:# 角色密钥access-key: eyJrZXlJZC....# 已授权的角色名称secret-key: admin# 用户自定义配置namespace: MQ_INST_rocketmqxxxxxxxxproducer1:topic: testdev1consumer1:group: group111topic: testdev1subExpression: TAG1consumer2:group: group222topic: testdev1subExpression: TAG2

<!-- in your <dependencies> block -->
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.9.3</version>
</dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-acl</artifactId><version>4.9.3</version>
</dependency>

创建生产者 

// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer(namespace, groupName,new AclClientRPCHook(new SessionCredentials(accessKey, secretKey)) // ACL权限
);
// 设置NameServer的地址
producer.setNamesrvAddr(nameserver);
// 启动Producer实例
producer.start();

发送消息

for (int i = 0; i < 10; i++) {// 创建消息实例,设置topic和消息内容Message msg = new Message(topic_name, "TAG", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));// 发送消息SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);
}

异步发送

// 设置发送失败后不重试
producer.setRetryTimesWhenSendAsyncFailed(0);
// 设置发送消息的数量
int messageCount = 10;
final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
for (int i = 0; i < messageCount; i++) {try {final int index = i;// 创建消息实体,设置topic和消息内容Message msg = new Message(topic_name, "TAG", ("Hello rocketMq " + index).getBytes(RemotingHelper.DEFAULT_CHARSET));producer.send(msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {// 消息发送成功逻辑countDownLatch.countDown();System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());}@Overridepublic void onException(Throwable e) {// 消息发送失败逻辑countDownLatch.countDown();System.out.printf("%-10d Exception %s %n", index, e);e.printStackTrace();}});} catch (Exception e) {e.printStackTrace();}
}
countDownLatch.await(5, TimeUnit.SECONDS);

创建消费者

// 实例化消费者
DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer(namespace,                                                  groupName,                                              new AclClientRPCHook(new SessionCredentials(accessKey, secretKey))); //ACL权限
// 设置NameServer的地址
pushConsumer.setNamesrvAddr(nameserver);

消费信息

// 实例化消费者
DefaultLitePullConsumer pullConsumer = new DefaultLitePullConsumer(namespace,                                               groupName,                                             new AclClientRPCHook(new SessionCredentials(accessKey, secretKey)));
// 设置NameServer的地址
pullConsumer.setNamesrvAddr(nameserver);
// 设置从第一个偏移量开始消费
pullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

发布与订阅模式

发布到订阅

// 订阅topic
pushConsumer.subscribe(topic_name, "*");
// 注册回调实现类来处理从broker拉取回来的消息
pushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {// 消息处理逻辑System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);// 标记该消息已经被成功消费, 根据消费情况,返回处理状态return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// 启动消费者实例
pushConsumer.start();

订阅信息

// 订阅topic
pullConsumer.subscribe(topic_name, "*");
// 启动消费者实例
pullConsumer.start();
try {System.out.printf("Consumer Started.%n");while (true) {// 拉取消息List<MessageExt> messageExts = pullConsumer.poll();System.out.printf("%s%n", messageExts);}
} finally {pullConsumer.shutdown();
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_325519.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

jenkins的环境搭建

jenkins 环境 安装 我之前使用war安装、安装比较简单、就是jenkins的 对应的插件不能下载下来、后来发现是版本的问题、使用docker-compose 安装、jenkins安装 插件很容易安装下来 1、安装jdk 解压jdk 配置环境变量 #set java environment JAVA_HOME/usr/local/jdk1.8.0_281…

blender 之点云渲染(论文渲图)

blender 之点云渲染&#xff08;论文渲图&#xff09; 一、导入点云1.新建2.导入点云3.位置移动&放大缩小 二、Geometry Nodes实体化点云1.新建节点2.实体化 三、给实体化点云添加材质四、设置渲染引擎更换为Cycles。 五、对准视角1.新建一个球2.创建相机视角跟踪3.将uv球挪…

二、Spring Cloud Eureka 简介、快速入门

注册发现中心 Eureka 来源于古希腊词汇&#xff0c;意为“发现了”。在软件领域&#xff0c; Eureka 是 Netflix 在线影片公司开源的一个服务注册与发现的组件&#xff0c;和其他 Netflix 公司的服务组件&#xff08;例如负载均衡、熔断器、网关等&#xff09; 一起&#xff0…

LLM prompt提示构造案例

参考&#xff1a; https://github.com/PlexPt/awesome-chatgpt-prompts-zh 吴恩达 prompt工程应用&#xff1a; https://www.bilibili.com/video/BV1No4y1t7Zn prompt构造案例代码 prompt """文本分类任务&#xff1a;将一段用户给外卖服务的评论进行分类…

初级保育员专业知识配合教育考试题库及答案

本题库是根据最新考试大纲要求&#xff0c;结合近年来考试真题的重难点进行汇编整理组成的全真模拟试题&#xff0c;考生们可以进行专项训练&#xff0c;查漏补缺巩固知识点。本题库对热点考题和重难点题目都进行了仔细的整理和编辑&#xff0c;相信考生在经过了针对性的刷题练…

Linux基础笔记

已经有很长很长一段时间没有更新帖子了&#xff0c;一眨眼2023 已经过半&#xff0c;这些日子里&#xff0c;有太多太多事情要做了&#xff0c;今年只更新了几篇&#xff0c;这几天刚好有空&#xff0c;浅浅更新一篇叭&#xff01;~~~ 首先&#xff0c;Linux是一种开源的操作系…

手搓GPT系列之 - 通过理解LSTM的反向传播过程,理解LSTM解决梯度消失的原理 - 逐条解释LSTM创始论文全部推导公式,配超多图帮助理解(下篇)

本文承接上篇上篇在此和中篇中篇在此&#xff0c;继续就Sepp Hochreiter 1997年的开山大作 Long Short-term Memory 中APPENDIX A.1和A.2所载的数学推导过程进行详细解读。希望可以帮助大家理解了这个推导过程&#xff0c;进而能顺利理解为什么那几个门的设置可以解决RNN里的梯…

git push origin masterEverything up-to-date解决方法

按住这个看一下很简单的问题&#xff0c;我在网上看了很多就是没找到能用的&#xff0c;最后找到了这个看起来写的很简单的一个文章&#xff0c;但他写的真的有用。 出现的问题 解决步骤第一步 git add . 第二步 git commit -m “message” 第三步 git push origin master…

PyTorch示例——ResNet34模型和Fruits图像数据

PyTorch示例——ResNet34模型和Fruits图像数据 前言导包数据探索查看数据集构建构建模型 ResNet34模型训练绘制训练曲线 前言 ResNet34模型&#xff0c;做图像分类数据使用水果图片数据集&#xff0c;下载见Kaggle Fruits Dataset (Images)Kaggle的Notebook示例见 PyTorch——…

综合实验---基于卷积神经网络的目标分类案例

文章目录 配置环境猫狗数据分类建模猫狗分类的实例基准模型猫狗分类的实例基准模型之数据增强问题回答 配置环境 ①首先打开 cmd&#xff0c;创建虚拟环境。 conda create -n tf1 python3.6如果报错&#xff1a;‘conda’ 不是内部或外部命令,也不是可运行的程序 或批处理文件…

[github-100天机器学习]day1 data preprocessing

https://github.com/LiuChuang0059/100days-ML-code/blob/master/Day1_Data_preprocessing/README.md#step-6-feature-scaling—特征缩放 数据预处理 数据帧(Data Frame) 二维的表格形式&#xff0c;类似于电子表格或关系型数据库中的表。数据帧通常被用来存储和操作结构化数据…

科技项目验收测试报告有什么注意事项和疑惑?

科技项目验收测试报告是一份重要的文件&#xff0c;用于评估科技项目的质量和可靠性&#xff0c;对项目的成功交付具有关键作用。在项目完成的最后阶段&#xff0c;通过对项目进行全面测试和评估&#xff0c;以确保项目符合预期的目标和需求&#xff0c;并满足用户的期望。 一…

3D深度视觉与myCobot 320机械臂无序抓取

今天我记录使用myCobot320 M5跟FS820-E1深度相机进行一个无序抓取物体的分享。 为什么会选择深度相机和机械臂做一个案例呢&#xff1f; 2D相机&#xff08;最常见使用的相机&#xff09;可以捕捉二维图像&#xff0c;也就是在水平和垂直方向上的像素值。它们通常用于拍摄静态…

卷积神经网络--猫狗系列【VGG16】

数据集&#xff1a;【文末】 ​ 数据集预处理 定义读取数据辅助类&#xff08;继承torch.utils.data.Dataset&#xff09; import osimport PILimport torchimport torchvisionimport matplotlib.pyplot as pltimport torch.utils.dataimport PIL.Image # 数据集路径train_p…

nohup命令解决SpringBoot/java -jar命令启动项目运行一段时间自动停止问题

问题描述&#xff1a; 在centos7上部署多个springcloud项目。出现了服务莫名其妙会挂掉一两个的问题&#xff0c;重新启动挂掉的服务之后又会出现其他服务挂掉的情况&#xff0c;查看启动日志也并没有发现有异常抛出。令人费解的是所有的服务都是通过nohup java -jar xxx.jar …

强化学习路径优化:基于Q-learning算法的机器人路径优化(MATLAB)

一、强化学习之Q-learning算法 Q-learning算法是强化学习算法中的一种&#xff0c;该算法主要包含&#xff1a;Agent、状态、动作、环境、回报和惩罚。Q-learning算法通过机器人与环境不断地交换信息&#xff0c;来实现自我学习。Q-learning算法中的Q表是机器人与环境交互后的…

图像视频基础

图像视频基础 文章目录 图像视频基础图像颜色深度分辨率 视频帧率比特率帧类型 YUV模型色度子采样 图像 颜色深度 存储颜色的强度&#xff0c;需要占用一定大小的数据空间&#xff0c;这个大小被称为颜色深度。假如每个颜色的强度占用 8 bit&#xff08;取值范围为 0 到 255&…

nginx+tomcat负载均衡和动静分离

目录 1.部署nginx 2.部署两台tomcat 3.配置nginx 1.部署nginx vim /vim/lib/systemd/system/nginx.service 2.部署两台tomcat 进入第一台装第一个tomcat vim /etc/profile vim /usr/local/tomcat/webapps/test/index.jsp 重启 进入第二台安装第二台tomcat vim /usr/local/tom…

(0021) H5-Vuejs配合 mint-ui 开发移动端web

mint-ui 初衷 element-ui主打pcweb&#xff0c;导致移动端上UI适配问题突出&#xff0c;趟了很多坑。这次更加理智些&#xff0c;选择了饿了么团队的主打移动端的mint-ui&#xff0c;目前来说体验很好。 认识Mint-ui 首先在手机上体验其demo&#xff0c;扫描链接&#xff1a;…

在 Jetpack Compose 中创建 Drawer

Jetpack Compose 是一个现代的构建 Android UI 的工具集&#xff0c;它使得构建 UI 变得更加简单快速。在本篇博客中&#xff0c;我们将讨论如何在 Jetpack Compose 中创建 Drawer&#xff0c;也就是我们常见的侧边抽屉。 什么是 Drawer&#xff1f; Drawer 是一个提供导航选项…