RocketMQ源码分析之消息ACK机制(消费进度)

news/2024/4/19 16:27:54/文章来源:https://blog.csdn.net/qq_33291299/article/details/129175624

1、消息消费进度概述

首先简要阐述一下消息消费进度:

消费者订阅消息消费队列(MessageQueue), 当生产者将消息负载发送到 MessageQueue 中时,消费订阅者开始消费消息,消息消费过程中,为了避免重复消费,需要一个地方存储消费进度(消费偏移量)。

消息模式主要分为集群模式、广播模式:

  • 集群模式:一条消息被集群中任何一个消费者消费。
  • 广播模式:每条消息都被每一个消费者消费。

广播模式,既然每条消息要被每一个消费者消费,则消费进度可以与消费者保存在一起,也就是本地保存,但由于集群模式下,一条消息只能被集群内的一个消费者消费,进度不能保存在消费端,只能集中保存在一个地方,比较合适的是在 Broker 端。

2、消息消费进度存储接口

接下来我们先分析一下消息消费进度接口:OffsetStore。

/*** Offset store interface*/
public interface OffsetStore {/*** Load** @throws MQClientException*/void load() throws MQClientException;/*** Update the offset,store it in memory** @param mq* @param offset* @param increaseOnly*/void updateOffset(final MessageQueue mq, final long offset, final boolean increaseOnly);/*** Get offset from local storage** @param mq* @param type* @return The fetched offset*/long readOffset(final MessageQueue mq, final ReadOffsetType type);/*** Persist all offsets,may be in local storage or remote name server** @param mqs*/void persistAll(final Set<MessageQueue> mqs);/*** Persist the offset,may be in local storage or remote name server** @param mq*/void persist(final MessageQueue mq);/*** Remove offset** @param mq*/void removeOffset(MessageQueue mq);/*** @param topic* @return The cloned offset table of given topic*/Map<MessageQueue, Long> cloneOffsetTable(String topic);/*** @param mq* @param offset* @param isOneway*/void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,MQBrokerException, InterruptedException, MQClientException;
}

入口代码:DefaultMQPushConsumerImpl#start()。

根据消息消费模式(集群模式、广播模式)会创建不同的 OffsetStore 对象。

由于上篇文章,谈到广播模式消息,如果返回 CONSUME_LATER,竟然不会重试,而是直接丢弃,为什么呢?由于这个原因,这次破天荒的从广播模式的OffsetStore开始学习。

2.1 LocalFileOffsetStore (广播模式)

消息进度以本地文件方式保存。源码路径:org.apache.rocketmq.client.consumer.store.LocalFileOffsetStore。

2.1.1 核心属性与构造函数

  • LOCAL_OFFSET_STORE_DIR
    offset 存储根目录,默认为用户主目录,例如 /home/dingw,可以在消费者启动的JVM参数中,通过 – Drocketmq.client.localOffsetStoreDir=路径。
  • groupName
    消费组名称。
  • storePath
    具体的消费进度保存文件名(全路径)。
  • offsetTable
    内存中的 offfset 进度保持,以 MessageQueue 为键,偏移量为值。

继续看一下构造函数:

LocalFileOffsetStore 首先在 DefaultMQPushConsumerImpl#start 方法中创建,并 执行load方法加载消费进度。接下来结束一下几个关键的实现方法。

2.1.2 load()方法

public void load() throws MQClientException {OffsetSerializeWrapper offsetSerializeWrapper = this.readLocalOffset();if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null) {offsetTable.putAll(offsetSerializeWrapper.getOffsetTable());for (MessageQueue mq : offsetSerializeWrapper.getOffsetTable().keySet()) {AtomicLong offset = offsetSerializeWrapper.getOffsetTable().get(mq);log.info("load consumer's offset, {} {} {}",this.groupName,mq,offset.get());}}

该方法,主要就是读取 offsets.json 或 offsets.json.bak 中的内容,然后将json转换成map。

然后更新或获取消息队列的消费进度,就是从内存(Map)或 store 中获取,接下来看一下初次保存offsets.json文件。

@Overridepublic void persistAll(Set<MessageQueue> mqs) {if (null == mqs || mqs.isEmpty())return;OffsetSerializeWrapper offsetSerializeWrapper = new OffsetSerializeWrapper();for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {if (mqs.contains(entry.getKey())) {AtomicLong offset = entry.getValue();offsetSerializeWrapper.getOffsetTable().put(entry.getKey(), offset);}}String jsonString = offsetSerializeWrapper.toJson(true);if (jsonString != null) {try {MixAll.string2File(jsonString, this.storePath);} catch (IOException e) {log.error("persistAll consumer offset Exception, " + this.storePath, e);}}

保存逻辑很简单,就没必要一一分析,重点看一下,该方法的调用入口:

MQClientInstance#startScheduledTask

保存逻辑很简单,就没必要一一分析,其调用入口为:MQClientInstance#startScheduledTask。

顺藤摸瓜,原来是一个定时任务,默认消费端启动10秒后,每隔5s的频率持久化一次。

广播模式消费进度存储容易,但其实还是不明白为什么RocketMQ广播模式,如果消费失败,则丢弃,因为广播模式有时候也必须确保每个消费者都成功消费,,通常的场景为,通过MQ刷新本地缓存等。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_72944.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Photon Vectorized Engine 学习记录

Photon Hash Aggregation Vectorization Photon Hash Join 的向量化的要点是&#xff1a;使用开放地址法。步骤&#xff1a; 向量化计算 hash 值基于 hash 向量化计算 bucket 下标&#xff0c;得到 bucket index 向量基于 bucket index 向量中记录的下标找到 bucket&#xff…

(考研湖科大教书匠计算机网络)第六章应用层-第四节:域名系统DNS

获取pdf&#xff1a;密码7281专栏目录首页&#xff1a;【专栏必读】考研湖科大教书匠计算机网络笔记导航 文章目录一&#xff1a;DNS概述二&#xff1a;层次域名结构&#xff08;1&#xff09;概述&#xff08;2&#xff09;顶级域名分类&#xff08;3&#xff09;因特网命名空…

部署跨云容灾的五大难点

为什么企业需要跨云容灾&#xff1f; 据统计&#xff0c;全球已有70%的企业使用云计算服务。上云帮助企业更高效地管理数据资产&#xff0c;但它并非绝对安全。如停电、漏水等机房事故&#xff1b;地震、火灾等自然性灾害&#xff1b;亦或是人为失误&#xff0c;都有可能造成数…

视频技术基础知识

一、视频图像基础 像素&#xff1a;图像的基本单元&#xff0c;即一个带有颜色的小块分辨率&#xff1a;图像的大小或尺寸&#xff0c;用像素个数来表示。原始图像分辨率越高&#xff0c;图像就越清晰位深&#xff1a;存储每位像素需要的二进制位数&#xff1b;位深越大&#…

华为OD机试 C++ 实现 - 第 N 个排列

最近更新的博客 华为OD机试 - 入栈出栈(C++) | 附带编码思路 【2023】 华为OD机试 - 箱子之形摆放(C++) | 附带编码思路 【2023】 华为OD机试 - 简易内存池 2(C++) | 附带编码思路 【2023】 华为OD机试 - 第 N 个排列(C++) | 附带编码思路 【2023】 华为OD机试 - 考古…

模电学习7. 三极管特性曲线与静态工作点

模电学习7. 三极管特性曲线与静态工作点一、三极管的伏安特性曲线1. 三极管的伏安特性曲线2. 三极管的静态工作点二、合适的静态工作点选择1. 合适静态工作点条件2. 静态工作点的确定三、使用立创EDA仿真查看静态工作点1. 搭建如下图所示测试电路2. 点击菜单仿真、仿真设置3. 运…

springboot整合springdata jpa全能书

一&#xff1a;spring data jpa介绍 spring data:其实spring data就是spring提供了一个操作数据的框架。而spirng data jpa只是spring data框架下的一个基于jpa标准操作数据的模块。 spring data jpa&#xff1a;基于jpa的标准对数据进行操作。简化操作持久层的代码。只需要编…

【离线数仓-4-数据仓库设计】

离线数仓-4-数据仓库设计离线数仓-4-数据仓库设计1.数据仓库分层规划2.数据仓库构建流程1.数据调研1.业务调研2.需求分析3.总结2.明确数据域3.构建业务总线矩阵&维度模型设计4.明确统计指标1.指标体系相关概念1.原子指标2.派生指标3.衍生指标2.指标体系对于数仓建模的意义5…

儿童全脑九大能力,3-6岁的家长都应该知道

什么是全脑&#xff1f; 人的大脑分左右两个半球&#xff0c;形态虽然相似&#xff0c;功能却各有不同。其中&#xff0c;左脑负责文字、数学、计算、分析、逻辑、顺序、事实和记忆&#xff0c;掌管右侧肢体的感觉和运动&#xff1b;右脑则负责颜色、音乐、想象、韵律、感觉、…

其它 Composition API

1.shallowReactive 与 shallowRef shallowReactive&#xff1a;只处理对象最外层属性的响应式&#xff08;浅响应式&#xff09;。 shallowRef&#xff1a;只处理基本数据类型的响应式, 不进行对象的响应式处理。 什么时候使用? 如果有一个对象数据&#xff0c;结构比较深, …

vue-print-nb使用

下载 pnpm add vue-print-nb --save 全局注册&#xff0c;使用插件的注册方式 或 局部注册自定义指令 import print from vue-print-nb directives: {print } 绑定到点击按钮上 <button v-print"content">Print!</button> 设置配置项-常用 id和popTi…

总结:NodeJS

一、介绍Nodejs就像是Java中的JVM&#xff0c;是js的运行环境。nodejs不是一个js框架&#xff0c;千万不要认为是类似jquery的框架。nodejs的作用和jvm的一样一样的&#xff0c;也是js的运行环境&#xff0c;不管你是什么操作系统&#xff0c;只要安装对应版本的nodejs&#xf…

华为OD机试真题 用 C++ 实现 - 字符串加密 | 多看题,提高通过率

最近更新的博客 华为OD机试 - 入栈出栈(C++) | 附带编码思路 【2023】 华为OD机试 - 箱子之形摆放(C++) | 附带编码思路 【2023】 华为OD机试 - 简易内存池 2(C++) | 附带编码思路 【2023】 华为OD机试 - 第 N 个排列(C++) | 附带编码思路 【2023】 华为OD机试 - 考古…

angular

1. angular获取不到DOM结点 angular中的ngOnInit钩子函数获取不到DOM节点&#xff1b; 这个钩子函数中&#xff0c;表示组件和指令初始化完成&#xff0c;并不是真正的DOM加载完成&#xff1b; 所以这时候需要利用另外一个钩子函数ngAfterViewInit()&#xff0c;是在视图加载完…

界面组件Kendo UI for Angular——让网格数据信息显示更全面

Kendo UI致力于新的开发&#xff0c;来满足不断变化的需求&#xff0c;通过React框架的Kendo UI JavaScript封装来支持React Javascript框架。Kendo UI for Angular是专用于Angular开发的专业级Angular组件&#xff0c;telerik致力于提供纯粹的高性能Angular UI组件&#xff0c…

Leetcode之消失的数字轮转数组

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录一、消失的数字一、消失的数字 二、旋转数组 提示&#xff1a;以下是本篇文章正文内容&#xff0c;下面案例可供参考 一、消失的数字 这题找出消失的一个数字&#…

(二十三)、实现评论功能(3)【uniapp+uinicloud多用户社区博客实战项目(完整开发文档-从零到完整项目)】

1&#xff0c;删除评论的样式和实现逻辑 1.1 添加删除评论的样式 在comment-item组件中&#xff1a; <view class"username">{{giveName(item)}}<text class"iconfont icon-a-43-guanbi" click.stop"delComment"></text><…

【总结】python3启动web服务引发的一系列问题

背景 在某行的实施项目&#xff0c;需要使用python3环境运行某些py脚本。 由于行内交付的机器已自带python3 &#xff0c;没有采取自行安装python3&#xff0c;但是运行python脚本时报没有tornado module。 错误信息 ModuleNotFoundError&#xff1a;No module named ‘torn…

计算机网络第3章(数据链路层)学习笔记

❤ 作者主页&#xff1a;欢迎来到我的技术博客&#x1f60e; ❀ 个人介绍&#xff1a;大家好&#xff0c;本人热衷于Java后端开发&#xff0c;欢迎来交流学习哦&#xff01;(&#xffe3;▽&#xffe3;)~* &#x1f34a; 如果文章对您有帮助&#xff0c;记得关注、点赞、收藏、…

JVM面试总结

文章目录栈帧中存放的信息&#xff1a;对象的创建过程对象的内存布局&#xff1f;对象的访问定位方式&#xff1f;如何判断对象已死&#xff1f;可以作为GC Root的点&#xff1a;谈一下引用对象再被回收时如何逃脱&#xff1f;回收方法区如何判断常量是否废弃&#xff1f;垃圾回…