解析 RocketMQ 业务消息--“顺序消息”

news/2024/5/2 10:23:47/文章来源:https://www.cnblogs.com/alisystemsoftware/p/16620101.html

作者:绍舒

引言

Apache RocketMQ 诞生至今,历经十余年大规模业务稳定性打磨,服务了阿里集团内部业务以及阿里云数以万计的企业客户。作为金融级可靠的业务消息方案,RocketMQ 从创建之初就一直专注于业务集成领域的异步通信能力构建。本篇将继续业务消息集成的场景,从功能原理、应用案例、最佳实践以及实战等角度介绍 RocketMQ 的顺序消息功能。

简介

顺序消息是消息队列 RocketMQ 版提供的一种对消息发送和消费顺序有严格要求的消息。对于一个指定的 Topic,同一 MessageGroup 的消息按照严格的先进先出(FIFO)原则进行发布和消费,即先发布的消息先消费,后发布的消息后消费,服务端严格按照发送顺序进行存储、消费。同一 MessageGroup 的消息保证顺序,不同 MessageGroup 之间的消息顺序不做要求,因此需做到两点,发送的顺序性和消费的顺序性。

1.jpeg

功能原理

在这里首先抛出一个问题,在日常的接触中,许多 RocketMQ 使用者会认为,既然顺序消息能在普通消息的基础上实现顺序,看起来就是普通消息的加强版,那么为什么不全部都使用顺序消息呢?接下来就会围绕这个问题,对比普通消息和顺序消息进行阐述。

顺序发送

在分布式环境下,保证消息的全局顺序性是十分困难的,例如两个 RocketMQ Producer A 与 Producer B,它们在没有沟通的情况下各自向 RocketMQ 服务端发送消息 a 和消息 b,由于分布式系统的限制,我们无法保证 a 和 b 的顺序。因此业界消息系统通常保证的是分区的顺序性,即保证带有同一属性的消息的顺序,我们将该属性称之为 MessageGroup。如图所示,ProducerA 发送了 MessageGroup 属性为 A 的两条消息 A1,A2 和 MessageGroup 属性为 B 的 B1,B2,而 ProducerB 发送了 MessageGroup 属性为 C 的两条属性 C1,C2。

2.jpeg

同时,对于同一 MessageGroup,为了保证其发送顺序的先后性,比较简单的做法是构造一个单线程的场景,即不同的 MessageGroup 由不同的 Producer 负责,并且对于每一个 Producer 而言,顺序消息是同步发送的。同步发送的好处是显而易见的,在客户端得到上一条消息的发送结果后再发送下一条,即能准确保证发送顺序,若使用异步发送或多线程则很难保证这一点。

3.png

因此可以看到,虽然在底层原理上,顺序消息发送和普通消息发送并无二异,但是为了保证顺序消息的发送顺序性,同步发送的方式相比较普通消息,实际上降低了消息的最大吞吐。

顺序消费

与顺序消息不同的是,普通消息的消费实际上没有任何限制,消费者拉取的消息是被异步、并发消费的,而顺序消息,需要保证对于同一个 MessageGroup,同一时刻只有一个客户端在消费消息,并且在该条消息被确认消费完成之前(或者进入死信队列),消费者无法消费同一 MessageGroup 的下一条消息,否则消费的顺序性将得不到保证。因此这里存在着一个消费瓶颈,该瓶颈取决于用户自身的业务处理逻辑。极端情况下当某一 MessageGroup 的消息过多时,就可能导致消费堆积。当然也需要明确的是,这里的语境都指的是同一 MessageGroup,不同 MessageGroup 的消息之间并不存在顺序性的关联,是可以进行并发消费的。因此全文中提到的顺序实际上是一种偏序。

4.png

小结

无论对于发送还是消费,我们通过 MessageGroup 的方式将消息分组,即并发的基本单元是 MessageGroup,不同的 MessageGroup 可以并发的发送和消费,从而一定程度具备了可拓展性,支持多队列存储、水平拆分、并发消费,且不受影响。回顾普通消息,站在顺序消息的视角,可以认为普通消息的并发基本单元是单条消息,即每条消息均拥有不同的 MessageGroup。

我们回到开头那个问题:

既然顺序消息能在普通消息的基础上实现顺序,看起来就是普通消息的加强版,那么为什么不全部都使用顺序消息呢?

现在大家对于这个问题可能有一个基本的印象了,消息的顺序性当然很好,但是为了实现顺序性也是有代价的。

下述是一个表格,简要对比了顺序消息和普通消息。

5.jpeg

最佳实践

合理设置 MessageGroup

MessageGroup 会有很多错误的选择,以某电商平台为例,某电商平台将商家 ID 作为 MessageGroup,因为部分规模较大的商家会产出较多订单,由于下游消费能力的限制,因此这部分商家所对应的订单就发生了严重的堆积。正确的做法应当是将订单号作为 MessageGroup,而且站在背后的业务逻辑上来说,同一订单才有顺序性的要求。即选择 MessageGroup 的最佳实践是:MessageGroup 生命周期最好较为短暂,且不同 MessageGroup 的数量应当尽量相同且均匀。

同步发送和发送重试

如之前章节所述,需使用同步发送和发送重试来保证发送的顺序性。

消费幂等

消息传输链路在异常场景下会有少量重复,业务消费是需要做消费幂等,避免重复处理带来的风险。

应用案例

  • 用户注册需要发送验证码,以用户 ID 作为 MessageGroup,那么同一个用户发送的消息都会按照发布的先后顺序来消费。

  • 电商的订单创建,以订单 ID 作为 MessageGroup,那么同一个订单相关的创建订单消息、订单支付消息、订单退款消息、订单物流消息都会按照发布的先后顺序来消费。

6.png

实战

发送

可以看到,该发送案例设置了 MessageGroup 并且使用了同步发送,发送的代码如下:

public class ProducerFifoMessageExample {private static final Logger LOGGER = LoggerFactory.getLogger(ProducerFifoMessageExample.class);private ProducerFifoMessageExample() {}public static void main(String[] args) throws ClientException, IOException {final ClientServiceProvider provider = ClientServiceProvider.loadService();// Credential provider is optional for client configuration.String accessKey = "yourAccessKey";String secretKey = "yourSecretKey";SessionCredentialsProvider sessionCredentialsProvider =new StaticSessionCredentialsProvider(accessKey, secretKey);String endpoints = "foobar.com:8080";ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(endpoints).setCredentialProvider(sessionCredentialsProvider).build();String topic = "yourFifoTopic";final Producer producer = provider.newProducerBuilder().setClientConfiguration(clientConfiguration)// Set the topic name(s), which is optional. It makes producer could prefetch the topic route before // message publishing..setTopics(topic)// May throw {@link ClientException} if the producer is not initialized..build();// Define your message body.byte[] body = "This is a FIFO message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8);String tag = "yourMessageTagA";final Message message = provider.newMessageBuilder()// Set topic for the current message..setTopic(topic)// Message secondary classifier of message besides topic..setTag(tag)// Key(s) of the message, another way to mark message besides message id..setKeys("yourMessageKey-1ff69ada8e0e")// Message group decides the message delivery order..setMessageGroup("youMessageGroup0").setBody(body).build();try {final SendReceipt sendReceipt = producer.send(message);LOGGER.info("Send message successfully, messageId={}", sendReceipt.getMessageId());} catch (Throwable t) {LOGGER.error("Failed to send message", t);}// Close the producer when you don't need it anymore.producer.close();}
}

消费

消费的代码如下:

public class SimpleConsumerExample {private static final Logger LOGGER = LoggerFactory.getLogger(SimpleConsumerExample.class);private SimpleConsumerExample() {}public static void main(String[] args) throws ClientException, IOException {final ClientServiceProvider provider = ClientServiceProvider.loadService();// Credential provider is optional for client configuration.String accessKey = "yourAccessKey";String secretKey = "yourSecretKey";SessionCredentialsProvider sessionCredentialsProvider =new StaticSessionCredentialsProvider(accessKey, secretKey);String endpoints = "foobar.com:8080";ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(endpoints).setCredentialProvider(sessionCredentialsProvider).build();String consumerGroup = "yourConsumerGroup";Duration awaitDuration = Duration.ofSeconds(30);String tag = "yourMessageTagA";String topic = "yourTopic";FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);SimpleConsumer consumer = provider.newSimpleConsumerBuilder().setClientConfiguration(clientConfiguration)// Set the consumer group name..setConsumerGroup(consumerGroup)// set await duration for long-polling..setAwaitDuration(awaitDuration)// Set the subscription for the consumer..setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)).build();// Max message num for each long polling.int maxMessageNum = 16;// Set message invisible duration after it is received.Duration invisibleDuration = Duration.ofSeconds(5);final List<MessageView> messages = consumer.receive(maxMessageNum, invisibleDuration);for (MessageView message : messages) {try {consumer.ack(message);} catch (Throwable t) {LOGGER.error("Failed to acknowledge message, messageId={}", message.getMessageId(), t);}}// Close the simple consumer when you don't need it anymore.consumer.close();}
}

今天通过对 RocketMQ 顺序消息的介绍,希望能够帮大家对顺序消息的原理和应用有更深入的了解,同时也期望 RocketMQ 的顺序消息能够帮助您更有效的解决业务问题。如果您对 RocktMQ 的业务消息感兴趣,也欢迎您扫描下方二维码加入钉钉群一起沟通交流~

7.png

点击此处,进入官网了解更多详情~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_2612.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C# 读取MotherBoard的信息

通过C# 来读取PC 的MotherBoard 上的信息,如 产品名称,制造商,版本等,方法如下:Reference中添加 System.Management,并在头文件中引入该 Assemble 添加对应的类,并进行使用,如下实例:public static class MotherBoardInfo{private static ManagementObjectSearcher …

万物皆可集成系列:低代码释放用友U8+深度价值(2)—数据拓展应用

在上一篇内容我们介绍了如何利用低代码开发套件实现低代码应用与U8+系统的对接集成,本次给大家带来的是如何将用友U8+系统中的数据进行价值扩展和实际应用。 我们以生产物料齐套分析为例来说明如何利用低代码将U8+系统中的系统进行扩展和应用。在开始之前,先来看看什么是生产…

java数据类型转换问题

我们知道java中的各个数据类型的取值范围不同,可以理解成容量大小,而针对容量大小可以对他们进行一个由低到高的排序,也就是优先级。 优先级 低-----------------------------------------------------------------------高 (byte,short,char)=> int => long => …

记esxi linux主机调整分区大小

调整前效果:调整后效果: 方法如下: 工具:VMware vCenter Converter

顶象为飞凡汽车App提供全方位防护 助力新能源汽车发展

汽车App集展示、体验、交互、交易和远程服务于一身,已成为智能汽车的一部分。日前,飞凡汽车与顶象达成合作,为飞凡汽车App提供安全加固服务,为用户提供全方位的安全保障。 用户眼里的“宝藏App” 每个智能汽车都有一个App,大多数将其当做车的附加服务,飞凡汽车把App当做汽…

Fecify 跨境电商系统,有哪些优势?

独立站是最近几年重新火爆起来的跨境电商经营方式。人都喜欢一切都掌握在自己手里的感觉,而独立站的好处就是自己说了算,足够自由。因为可以自己说了算,风险自己把控,自己做的事情,自己负责,独立站的这些自主特性深受货代人的喜爱。独立站以其独立自主的特性,引领未来跨…

删除RAID

场景 当服务器不需要RAID或重新配置RAID时,要先删除当前RAID配置来释放硬盘。 LSI SAS2208 (Legacy/Dual) 操作步骤 步骤1 通过远程虚拟控制台登录到服务器的实时操作桌面。 步骤2 登录CU管理界面。 步骤3 备份删除RAID中的数据。 步骤4 删除RAID。 1. 在CU主界面左侧区域中选…

HTML基础(二):标签学习

html学习笔记排版标签 标题标签场景:在新闻和文章的页面中,都离不开标题,用来突出显示文章主题代码:h系列标签<h1>1级标题</h1> <h2>2级标题</h2> <h3>3级标题</h3> <h4>4级标题</h4> <h5>5级标题</h5> <h…

电商行业:全链路监测广告投放效果,用数据驱动业务增长

哪个营销任务、营销渠道的引流用户更多? 买量用户的活跃、留存情况如何? 哪个营销任务引流的用户后续的加购、下单转化最多? HMS Core分析服务作为广告转化跟踪工具,广告主可实现从“曝光、点击、下载、激活、注册、留存、收藏、加入购物车、下单、开始结算、支付成功、复购…

35. Redis---缓存问题

1. 前言 在实际的业务场景中,Redis 一般和其他数据库搭配使用,用来减轻后端数据库的压力,比如和关系型数据库 MySQL 配合使用。Redis 会把 MySQL 中经常被查询的数据缓存起来,比如热点数据,这样当用户来访问的时候,就不需要到 MySQL 中去查询了,而是直接获取 Redis 中的…

面试突击77:Spring 依赖注入有几种?各有什么优缺点?

IoC 和 DI 是 Spring 中最重要的两个概念,其中 IoC(Inversion of Control)为控制反转的思想,而 DI(Dependency Injection)依赖注入为其(IoC)具体实现。那么 DI 实现依赖注入的方式有几种?这些注入方式又有什么不同?接下来,我们一起来看。 0.概述 在 Spring 中实现依…

[AcWing 167] 木棒

DFS 剪枝点击查看代码 #include<bits/stdc++.h>using namespace std;typedef long long LL;const int N = 1e6 + 10;int n; int w[N]; int sum, len; bool st[N];bool dfs(int u, int s, int start) {if (u * len == sum)return true;if (s == len)return dfs(u + 1, 0, …

Word修订内容批量标红

Word修订标红Plus,适用于科研狗最近改文章,期刊要求提供所有修改内容都标红的修订稿,本着能不手改就不手改的原则,我尝试检索了一下自动修改的方法,最先找到的是简书上的一篇使用VB宏命令批量修改的文章 (Word-接受全部修订为标红字体),但是尝试之后发现运行时间很长,且…

《GB27607-2011》PDF下载

《GB27607-2011 机械压力机 安全技术要求》PDF下载 《GB27607-2011》简介本标准规定了机械压力机类产品的设计、制造、改造、使用的术语和定义、严重危险、安全要求和(或)措施、检验和使用信息; 本标准适用于压力机及作为压力机组成部分的辅助设备的设计、制造、改造和使用,也…

二进制位运算

二进制位运算基础及其应用: 一、基本位运算符: 1.& 按位与:(从左到右)二进制中对应位都是1则为1,否则为0; 2. | 按位或:(从左到右)二进制中对应位有一个是1则为1,否则为0; 3. ^按位异或:(从左到右)二进制中对应位相同则为0,不同为1; 4. <<左移:右侧…

《GB27887-2011》PDF下载

《GB27887-2011 机动车儿童乘员用约束系统》PDF下载 《GB27887-2011》简介 本标准规定了机动车儿童乘员用约束系统术语、定义,在车辆上的安装及固定要求,约束系统的结构,以及对约束系统总成及其组成部件的性能要求和试验方法;本标准适用于适合安装在三个车轮或三个车轮以上…

JS基础:数组、函数、对象

字符串要用英文双引号括起来。字符串与其他类型数据之间用加号+连接起来 // -------------------------------------------------------- JS中定义声明变量是用关键字var,JS中变量名函数名都可以用中文。 JS中定义数组不用写函数长度[],JS中可以定义字符串数组向数组添加新元…

《GB12523-2011》PDF下载

《GB12523-2011 建筑施工场界环境噪声排放标准》PDF下载 《GB12523-2011》简介本标准规定了建筑施工场界环境噪声排放限值及测定方法; 本标准适用于周围有噪声敏感建筑物的建筑施工噪声排放的管理、评价及控制。市政、通信、交通、水利等其他类型的施工噪声排放可参照本标准执…

CATIA——什么是汽车设计硬点和骨架?

什么是汽车设计「硬点」? 汽车设计硬点(Hard point)的概念: 所谓硬点,是通过英文的"hardpoint"直译过来的。 由于零部件设计要在整车总布置基本完成后才开始,在总布置设计阶段中往往没有零部件的详细资料,还不能解决零部件和总成内部的细节问题。所以在布置设…

方法引用-通过this引用成员方法和类的构造器引用

通过this引用成员方法 this代表当前对象 如果需要引用的方法就是当前类中的成员方法 那么可以使用this::成员方法 的格式来使用方法引用函数式接口:public interface Richanle {void buy(); }测试类:public class Husband {//重写父类的成员方法public void buyHouse() {Sys…