RocketMQ 消费者Rebalance算法 解析——图解、源码级解析

news/2024/5/4 17:11:49/文章来源:https://blog.csdn.net/HNU_Csee_wjw/article/details/123531191

🍊 Java学习:Java从入门到精通总结

🍊 深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想

🍊 绝对不一样的职场干货:大厂最佳实践经验指南


📆 最近更新:2022年10月15日

🍊 个人简介:通信工程本硕💪、Java程序员🌕。做过科研paper,发过专利,优秀的程序员不应该只是CRUD

🍊 点赞 👍 收藏 ⭐留言 📝 都是我最大的动力!


文章目录

  • 平均分配算法
  • 环形平均分配算法
  • 一致性哈希算法
  • 指定机房算法
  • 就进机房算法
  • 手动配置负载均衡参数

平均分配算法

这也是消息消费时候的默认算法,所谓平均,就是同一个Topic主题下的所有队列被同一个消费者组中的所有Consumer平均消费掉。

例如有5个队列和2和Consumer,就会根据下面的步骤进行分配:

  1. 5除以2不能整除,所以队列无法均分
  2. 每个消费者先分到2个队列
  3. 多出来的1个队列按照顺序分配给了第一个Consumer

在这里插入图片描述
具体的源码如下:

public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy {private final InternalLogger log = ClientLogger.getLog();@Overridepublic List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,List<String> cidAll) {if (currentCID == null || currentCID.length() < 1) {throw new IllegalArgumentException("currentCID is empty");}if (mqAll == null || mqAll.isEmpty()) {throw new IllegalArgumentException("mqAll is null or mqAll empty");}if (cidAll == null || cidAll.isEmpty()) {throw new IllegalArgumentException("cidAll is null or cidAll empty");}List<MessageQueue> result = new ArrayList<MessageQueue>();if (!cidAll.contains(currentCID)) {log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",consumerGroup,currentCID,cidAll);return result;}// 当前分配到的Consumer的索引int index = cidAll.indexOf(currentCID);// 余数int mod = mqAll.size() % cidAll.size();// 队列总数小于Consumer总数时,给当前Consumer分配一个队列消费// 不能均分且当前编号小于余数时,需要给当前Consumer分配x + 1个队列,否则分配x个队列int averageSize =mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()+ 1 : mqAll.size() / cidAll.size());int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;// 取min的原因是,如果Consumer多,队列少,多出来的Consumer分配不到队列int range = Math.min(averageSize, mqAll.size() - startIndex);for (int i = 0; i < range; i++) {result.add(mqAll.get((startIndex + i) % mqAll.size()));}return result;}@Overridepublic String getName() {return "AVG";}
}

环形平均分配算法

使用方法:

consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueAveragelyByCircle());

也可以自定义消费策略:

consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueStrategy() {@Overridepublic List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) {// 自定义负载策略return null;}@Overridepublic String getName() {return null;}});

所谓环形分配算法,就是把消息队列按照环形进行排列,然后同一个组下的所有Consumer按照顺序进行匹配即可,如下图所示:
在这里插入图片描述
上图中Topic下共有10个消息队列,假设消费者组里有4个Consumer,分配过程如下:

  1. 对所有的消息队列和Consumer分别排序
  2. 按照顺序让Consumer和消息队列进行匹配

第一轮分配Queue1到Queue4,第二轮分配Queue5到Queue8,第三轮分配Queue9和Queue10。经过3轮分配完毕

具体源码如下所示:

public class AllocateMessageQueueAveragelyByCircle implements AllocateMessageQueueStrategy {private final InternalLogger log = ClientLogger.getLog();@Overridepublic List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,List<String> cidAll) {if (currentCID == null || currentCID.length() < 1) {throw new IllegalArgumentException("currentCID is empty");}if (mqAll == null || mqAll.isEmpty()) {throw new IllegalArgumentException("mqAll is null or mqAll empty");}if (cidAll == null || cidAll.isEmpty()) {throw new IllegalArgumentException("cidAll is null or cidAll empty");}List<MessageQueue> result = new ArrayList<MessageQueue>();if (!cidAll.contains(currentCID)) {log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",consumerGroup,currentCID,cidAll);return result;}int index = cidAll.indexOf(currentCID);for (int i = index; i < mqAll.size(); i++) {if (i % cidAll.size() == index) {result.add(mqAll.get(i));}}return result;}@Overridepublic String getName() {return "AVG_BY_CIRCLE";}
}



一致性哈希算法

首先先介绍一下一致性哈希算法:

hash算法带来的问题:
假设后台有多个服务器,我们就可以做负载均衡将前端来的请求“平均”分配到各个服务器上来处理,如果是按照用户id对服务器个数N取模来计算hash的话,如果有一台服务器宕机,之前所有的求模计算都要重来,开销较大。


一致性哈希的思路就是:用户按照顺时针方向做排列,离哪个节点近,就去访问哪个节点。
请添加图片描述
用户按照顺时针方向,离哪个节点近,就去访问哪个节点。

此时如果有服务器宕机,直接顺着找下一个服务器节点就可以了。
请添加图片描述如果要增加节点:
请添加图片描述


RocketMQ中对于一致性哈希的源码级实现:

public class AllocateMessageQueueConsistentHash implements AllocateMessageQueueStrategy {private final InternalLogger log = ClientLogger.getLog();private final int virtualNodeCnt;private final HashFunction customHashFunction;public AllocateMessageQueueConsistentHash() {this(10);}// 设计虚拟节点数量public AllocateMessageQueueConsistentHash(int virtualNodeCnt) {this(virtualNodeCnt, null);}public AllocateMessageQueueConsistentHash(int virtualNodeCnt, HashFunction customHashFunction) {if (virtualNodeCnt < 0) {throw new IllegalArgumentException("illegal virtualNodeCnt :" + virtualNodeCnt);}this.virtualNodeCnt = virtualNodeCnt;this.customHashFunction = customHashFunction;}// 负载均衡算法主要实现@Overridepublic List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,List<String> cidAll) {if (currentCID == null || currentCID.length() < 1) {throw new IllegalArgumentException("currentCID is empty");}if (mqAll == null || mqAll.isEmpty()) {throw new IllegalArgumentException("mqAll is null or mqAll empty");}if (cidAll == null || cidAll.isEmpty()) {throw new IllegalArgumentException("cidAll is null or cidAll empty");}List<MessageQueue> result = new ArrayList<MessageQueue>();if (!cidAll.contains(currentCID)) {log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",consumerGroup,currentCID,cidAll);return result;}// 把所有消费者放到一个List里Collection<ClientNode> cidNodes = new ArrayList<ClientNode>();for (String cid : cidAll) {cidNodes.add(new ClientNode(cid));}// 创建hash环形结构final ConsistentHashRouter<ClientNode> router; //for building hash ringif (customHashFunction != null) {router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt, customHashFunction);} else {router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt);}// 根据一致性hash算法,基于客户端节点,把分配到当前消费者组的MQ添加到集合里并返回List<MessageQueue> results = new ArrayList<MessageQueue>();for (MessageQueue mq : mqAll) {ClientNode clientNode = router.routeNode(mq.toString());if (clientNode != null && currentCID.equals(clientNode.getKey())) {results.add(mq);}}return results;}@Overridepublic String getName() {return "CONSISTENT_HASH";}private static class ClientNode implements Node {private final String clientID;public ClientNode(String clientID) {this.clientID = clientID;}@Overridepublic String getKey() {return clientID;}}}

上面代码在ConsistentHashRouter中创建了hash环,算法的主要流程是在这个类中实现的,主要是基于TreeMap,感兴趣的小伙伴可以深入研究一下它的源码~



指定机房算法

假设有两个机房,则对应的消费关系如下图:
在这里插入图片描述
指定机房分配算法先根据MQ所述的Broker找出有效的机房里的所有MQ,然后再平分给所有的Consumer

public class AllocateMessageQueueByMachineRoom implements AllocateMessageQueueStrategy {private Set<String> consumeridcs;@Overridepublic List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,List<String> cidAll) {List<MessageQueue> result = new ArrayList<MessageQueue>();// 计算出当前消费者ID在消费者集合中的具体位置int currentIndex = cidAll.indexOf(currentCID);if (currentIndex < 0) {return result;}// 拿出BrokerName下的所有MQList<MessageQueue> premqAll = new ArrayList<MessageQueue>();for (MessageQueue mq : mqAll) {String[] temp = mq.getBrokerName().split("@");if (temp.length == 2 && consumeridcs.contains(temp[0])) {premqAll.add(mq);}}// 队列长度除以客户端长度int mod = premqAll.size() / cidAll.size();// 队列长度mod客户端长度int rem = premqAll.size() % cidAll.size();// 给Consumer分配MQint startIndex = mod * currentIndex;int endIndex = startIndex + mod;for (int i = startIndex; i < endIndex; i++) {result.add(premqAll.get(i));}if (rem > currentIndex) {result.add(premqAll.get(currentIndex + mod * cidAll.size()));}return result;}@Overridepublic String getName() {return "MACHINE_ROOM";}public Set<String> getConsumeridcs() {return consumeridcs;}public void setConsumeridcs(Set<String> consumeridcs) {this.consumeridcs = consumeridcs;}
}



就进机房算法

顾名思义,就近机房分配策略是一种基于Consumer和机房距离来分配的策略。部署在同一个机房的MQ会被先分配给同一个机房里的Consumer

具体步骤是先统计ConsumerBroker所在的机房,之后再将Broker中的MQ分配给同机房的Consumer消费,如果本机房里没有Consumer,则再尝试分配给其他机房的Consumer

在这里插入图片描述

public class AllocateMachineRoomNearby implements AllocateMessageQueueStrategy {private final InternalLogger log = ClientLogger.getLog();private final AllocateMessageQueueStrategy allocateMessageQueueStrategy;//actual allocate strategyprivate final MachineRoomResolver machineRoomResolver;public AllocateMachineRoomNearby(AllocateMessageQueueStrategy allocateMessageQueueStrategy,MachineRoomResolver machineRoomResolver) throws NullPointerException {if (allocateMessageQueueStrategy == null) {throw new NullPointerException("allocateMessageQueueStrategy is null");}if (machineRoomResolver == null) {throw new NullPointerException("machineRoomResolver is null");}this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;this.machineRoomResolver = machineRoomResolver;}@Overridepublic List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,List<String> cidAll) {if (currentCID == null || currentCID.length() < 1) {throw new IllegalArgumentException("currentCID is empty");}if (mqAll == null || mqAll.isEmpty()) {throw new IllegalArgumentException("mqAll is null or mqAll empty");}if (cidAll == null || cidAll.isEmpty()) {throw new IllegalArgumentException("cidAll is null or cidAll empty");}List<MessageQueue> result = new ArrayList<MessageQueue>();if (!cidAll.contains(currentCID)) {log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",consumerGroup,currentCID,cidAll);return result;}// 将MQ按照不同的机房归纳Map<String/*machine room */, List<MessageQueue>> mr2Mq = new TreeMap<String, List<MessageQueue>>();for (MessageQueue mq : mqAll) {String brokerMachineRoom = machineRoomResolver.brokerDeployIn(mq);if (StringUtils.isNoneEmpty(brokerMachineRoom)) {if (mr2Mq.get(brokerMachineRoom) == null) {mr2Mq.put(brokerMachineRoom, new ArrayList<MessageQueue>());}mr2Mq.get(brokerMachineRoom).add(mq);} else {throw new IllegalArgumentException("Machine room is null for mq " + mq);}}// 将consumer按照不同的机房归纳Map<String/*machine room */, List<String/*clientId*/>> mr2c = new TreeMap<String, List<String>>();for (String cid : cidAll) {String consumerMachineRoom = machineRoomResolver.consumerDeployIn(cid);if (StringUtils.isNoneEmpty(consumerMachineRoom)) {if (mr2c.get(consumerMachineRoom) == null) {mr2c.put(consumerMachineRoom, new ArrayList<String>());}mr2c.get(consumerMachineRoom).add(cid);} else {throw new IllegalArgumentException("Machine room is null for consumer id " + cid);}}List<MessageQueue> allocateResults = new ArrayList<MessageQueue>();// 1. 分配与当前消费者部署在同一机房的MQString currentMachineRoom = machineRoomResolver.consumerDeployIn(currentCID);List<MessageQueue> mqInThisMachineRoom = mr2Mq.remove(currentMachineRoom);List<String> consumerInThisMachineRoom = mr2c.get(currentMachineRoom);if (mqInThisMachineRoom != null && !mqInThisMachineRoom.isEmpty()) {allocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup, currentCID, mqInThisMachineRoom, consumerInThisMachineRoom));}//2.如果机房没有活着的消费者,则将其MQ分配给每个其他的机房for (String machineRoom : mr2Mq.keySet()) {if (!mr2c.containsKey(machineRoom)) { // no alive consumer in the corresponding machine room, so all consumers share these queuesallocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup, currentCID, mr2Mq.get(machineRoom), cidAll));}}return allocateResults;}@Overridepublic String getName() {return "MACHINE_ROOM_NEARBY" + "-" + allocateMessageQueueStrategy.getName();}/*** 一个解析器对象,用于确定消息队列或客户端部署在哪个机房。** AllocateMachineRoomNearby将使用该结果按机房对消息队列和客户端进行分组。** 返回值不能为null*/public interface MachineRoomResolver {String brokerDeployIn(MessageQueue messageQueue);String consumerDeployIn(String clientID);}
}



手动配置负载均衡参数

除了使用内置的负载均衡算法以外,还可以手动配置相关的参数,例如设置消费的队列、消费的Topic、消费的机器等,在消费端直接设置消费队列即可:

consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueByConfig(){{this.setMessageQueueList(Collections.<MessageQueue>singletonList(new MessageQueue(){{this.setQueueId(0);this.setTopic("Topic name");this.setBrokerName("Broker name");}}));}});

上面的代码里,手动指定了消费队列的索引,Topic和Broker服务器的名称,之后Consumer就会在指定的服务器中进行消费,源码如下:

public class AllocateMessageQueueByConfig implements AllocateMessageQueueStrategy {private List<MessageQueue> messageQueueList;@Overridepublic List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,List<String> cidAll) {return this.messageQueueList;}@Overridepublic String getName() {return "CONFIG";}public List<MessageQueue> getMessageQueueList() {return messageQueueList;}public void setMessageQueueList(List<MessageQueue> messageQueueList) {this.messageQueueList = messageQueueList;}
}

可以看到源码里只提供了一个消息队列集合,就是我们上面传入的自定义配置的MQ列表,配置完成之后就可以进行负载均衡及消费。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_23992.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

(附源码)计算机毕业设计大学生网上书店

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

(附源码)计算机毕业设计电脑外设销售系统小程序

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

操作系统基本功能(操作系统)

目录 一、处理机管理 二、存储器管理 三、设备管理 四、文件管理 五、作业管理 一、处理机管理 中央处理机&#xff08;CPU&#xff09;是计算机系统中一个举足轻重的资源。用户程序进入内存后&#xff0c;只有获得CPU&#xff0c;才能真正得以运行。 为了提高CPU的利用率…

前端都应该了解的 NodeJs 知识及原理浅析

node.js 初探 Node.js 是一个 JS 的服务端运行环境&#xff0c;简单的来说&#xff0c;它是在 JS 语言规范的基础上&#xff0c;封装了一些服务端的运行时对象&#xff0c;让我们能够简单实现非常多的业务功能。 如果我们只使用 JS 的话&#xff0c;实际上只是能进行一些简单…

docker mysql8使用SSL及使用openssl生成自定义证书

《docker安装MySQL8》 修改my.cnf vi /docker_data/mysql/conf/my.cnf[client] default-character-setutf8mb4 [mysql] default-character-setutf8mb4 [mysqld] character-set-serverutf8mb4 default_authentication_pluginmysql_native_password #增加ssl ssl保存&#xff0…

【让你从0到1学会c语言】文件操作

作者&#xff1a;喜欢猫咪的的程序员 专栏&#xff1a;《C语言》 喜欢的话&#xff1a;世间因为少年的挺身而出&#xff0c;而更加瑰丽。 ——《人民日报》 目录 什么是文件&#xff1a; 我们为什么要使用文件呢&#xff1f; 文件分类&#x…

rbf神经网络和bp神经网络,rbf神经网络百度百科

1、rbf神经网络算法是什么? RBF神经网络算法是由三层结构组成&#xff0c;输入层至隐层为非线性的空间变换&#xff0c;一般选用径向基函数的高斯函数进行运算&#xff1b;从隐层至输出层为线性空间变换&#xff0c;即矩阵与矩阵之间的变换。 RBF神经网络进行数据运算时需要…

基于springboot的旅游打卡攻略分享小程序

&#x1f496;&#x1f496;作者&#xff1a;IT跃迁谷毕设展 &#x1f499;&#x1f499;个人简介&#xff1a;曾长期从事计算机专业培训教学&#xff0c;本人也热爱上课教学&#xff0c;语言擅长Java、微信小程序、Python、Golang、安卓Android等。平常会做一些项目定制化开发…

预处理的补充知识

&#x1f3d6;️作者&#xff1a;malloc不出对象 ⛺专栏&#xff1a;《初识C语言》 &#x1f466;个人简介&#xff1a;一名双非本科院校大二在读的科班编程菜鸟&#xff0c;努力编程只为赶上各位大佬的步伐&#x1f648;&#x1f648; 目录一、宏的补充知识1.1 宏定义充当注释…

MABSA(Multimodal Aspect-Based Sentiment Analysis)2022ACL 预训练

大致浏览&#xff0c;没有细看。 论文题目&#xff08;Title&#xff09;&#xff1a; Vision-Language Pre-Training for Multimodal Aspect-Based Sentiment Analysis 研究问题&#xff08;Question&#xff09;&#xff1a;多模态情感分析 MABSA (Multimodal Aspectased S…

黑马程序员Java零基础视频教程(2022最新Java)B站视频学习笔记-Day14-面向对象进阶02

1、权限修饰符和代码块 1.1 权限修饰符 权限修饰符&#xff1a;是用来控制一个成员能够被访问的范围的。 可以修饰&#xff1a;成员变量、方法、构造方法、内部类。 巧计举例&#xff1a; private--------私有的----------相当于私房钱&#xff0c;只能自己用 默认--------…

LVS+KeepAlived高可用负载均衡集群

内容预知 1. 高可用群集的相关知识 1. 1 高可用&#xff08;HA&#xff09;群集与普通群集的比较 普通群集 高可用群集(HA) 1.2 KeepAlive 高可用方案 1.3 KeepAlived的体系模块 1.4 Keepalived实现原理 2. 高可用群集的脑裂现象及预防措施 2.1 高可用集群的脑裂现象及其…

树莓派学习笔记

记录一下树莓派的使用,包含操作系统、linux命令、python、硬件等知识。参考《树莓派开发实战》树莓派简介及型号 树莓派(Raspberry Pi)是一款基于 Linux 系统的、只有一张信用卡大小的卡片式计算机,树莓派已经成为基于 Linux 的低成本电脑和嵌入式计算机平台这个领域中的重…

Material UI – React (2022) 版的完整教程

Material UI – React (2022) 版的完整教程 这是关于 Material UI 的最期待的课程。该课程涵盖了 Material UI 的所有组件 课程英文名&#xff1a;Material UI - The Complete Guide With React (2022) Editio 此视频教程共5.5小时&#xff0c;中英双语字幕&#xff0c;画质…

【贝塞尔曲线拟合】

贝塞尔曲线拟合问题描述拟合曲线生成过程参考程序注意事项问题描述 已知一条n阶贝塞尔曲线L(P0,P1,P2,P3,...,Pn)L(P0, P1, P2, P3, ..., Pn)L(P0,P1,P2,P3,...,Pn)&#xff08;P0P0P0为起点&#xff0c;P1P1P1为第一个控制点&#xff0c;P2P2P2为第二个控制点&#xff0c;P3P…

Mysql删除重复数据只保留一条

&#xff08;1&#xff09;以这张表为例&#xff1a; CREATE TABLE test (id varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT 注解id,name varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT 名字,PRIMARY KEY…

队列的顺序存储结构

说白了,就是一个数组 ,然后在两端进行操作 ,两端用首队指针和尾指针分别指向 ,然后进行相关的删除,插入操作, 目的还是模拟现实对数据的处理 ●描述队列 •数据元素data , 元素具有同一类型ElemType ,最多为MaxSize(数组容量) •当前队首front •当前队尾 rear 定义队列的数据…

RK3588安装部署openmediavault

RK3588安装部署openmediavault部署准备Debian 10 文件系统编译和获取安装 openmediavault安装基础依赖安装 openmediavault 原秘钥环添加 openmediavault 官方原安装 openmediavault 基础依赖安装 openmediavaultopenmediavault 相关资料&#xff1a; https://docs.openmediav…

YOLOX 学习笔记

笔记来源&#xff1a;https://www.bilibili.com/video/BV1jo4y1D7CF/?vd_source2ed6e8af02f9ba8cb90b90e99bd4ccee 近年来&#xff0c;目标检测的工程应用研究中&#xff0c;YOLO系列以快速响应、高精度、结构简单以及容易部署的特点备受工程研究人员的青睐。同时&#xff0c;…

3. HDFS分布式文件系统

3.1 HDFS简介 随着数据量越来越大&#xff0c;在一个操作系统存不下所有的数据&#xff0c;那么就分配到更多的操作系统管理的磁盘中&#xff0c;但是不方便管理和维护&#xff0c;迫切需要一种系统来管理多台机器上的文件&#xff0c;这就是分布式文件管理系统。HDFS只是分布…