🍊 Java学习:Java从入门到精通总结
🍊 深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想
🍊 绝对不一样的职场干货:大厂最佳实践经验指南
📆 最近更新:2022年10月15日
🍊 个人简介:通信工程本硕💪、Java程序员🌕。做过科研paper,发过专利,优秀的程序员不应该只是CRUD
🍊 点赞 👍 收藏 ⭐留言 📝 都是我最大的动力!
文章目录
- 平均分配算法
- 环形平均分配算法
- 一致性哈希算法
- 指定机房算法
- 就进机房算法
- 手动配置负载均衡参数
平均分配算法
这也是消息消费时候的默认算法,所谓平均,就是同一个Topic
主题下的所有队列被同一个消费者组中的所有Consumer平均消费掉。
例如有5个队列和2和Consumer,就会根据下面的步骤进行分配:
- 5除以2不能整除,所以队列无法均分
- 每个消费者先分到2个队列
- 多出来的1个队列按照顺序分配给了第一个Consumer
具体的源码如下:
public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy {private final InternalLogger log = ClientLogger.getLog();@Overridepublic List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,List<String> cidAll) {if (currentCID == null || currentCID.length() < 1) {throw new IllegalArgumentException("currentCID is empty");}if (mqAll == null || mqAll.isEmpty()) {throw new IllegalArgumentException("mqAll is null or mqAll empty");}if (cidAll == null || cidAll.isEmpty()) {throw new IllegalArgumentException("cidAll is null or cidAll empty");}List<MessageQueue> result = new ArrayList<MessageQueue>();if (!cidAll.contains(currentCID)) {log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",consumerGroup,currentCID,cidAll);return result;}// 当前分配到的Consumer的索引int index = cidAll.indexOf(currentCID);// 余数int mod = mqAll.size() % cidAll.size();// 队列总数小于Consumer总数时,给当前Consumer分配一个队列消费// 不能均分且当前编号小于余数时,需要给当前Consumer分配x + 1个队列,否则分配x个队列int averageSize =mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()+ 1 : mqAll.size() / cidAll.size());int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;// 取min的原因是,如果Consumer多,队列少,多出来的Consumer分配不到队列int range = Math.min(averageSize, mqAll.size() - startIndex);for (int i = 0; i < range; i++) {result.add(mqAll.get((startIndex + i) % mqAll.size()));}return result;}@Overridepublic String getName() {return "AVG";}
}
环形平均分配算法
使用方法:
consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueAveragelyByCircle());
也可以自定义消费策略:
consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueStrategy() {@Overridepublic List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) {// 自定义负载策略return null;}@Overridepublic String getName() {return null;}});
所谓环形分配算法,就是把消息队列按照环形进行排列,然后同一个组下的所有Consumer按照顺序进行匹配即可,如下图所示:
上图中Topic
下共有10个消息队列,假设消费者组里有4个Consumer,分配过程如下:
- 对所有的消息队列和Consumer分别排序
- 按照顺序让Consumer和消息队列进行匹配
第一轮分配Queue1到Queue4,第二轮分配Queue5到Queue8,第三轮分配Queue9和Queue10。经过3轮分配完毕
具体源码如下所示:
public class AllocateMessageQueueAveragelyByCircle implements AllocateMessageQueueStrategy {private final InternalLogger log = ClientLogger.getLog();@Overridepublic List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,List<String> cidAll) {if (currentCID == null || currentCID.length() < 1) {throw new IllegalArgumentException("currentCID is empty");}if (mqAll == null || mqAll.isEmpty()) {throw new IllegalArgumentException("mqAll is null or mqAll empty");}if (cidAll == null || cidAll.isEmpty()) {throw new IllegalArgumentException("cidAll is null or cidAll empty");}List<MessageQueue> result = new ArrayList<MessageQueue>();if (!cidAll.contains(currentCID)) {log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",consumerGroup,currentCID,cidAll);return result;}int index = cidAll.indexOf(currentCID);for (int i = index; i < mqAll.size(); i++) {if (i % cidAll.size() == index) {result.add(mqAll.get(i));}}return result;}@Overridepublic String getName() {return "AVG_BY_CIRCLE";}
}
一致性哈希算法
首先先介绍一下一致性哈希算法:
hash算法带来的问题:
假设后台有多个服务器,我们就可以做负载均衡将前端来的请求“平均”分配到各个服务器上来处理,如果是按照用户id对服务器个数N取模来计算hash的话,如果有一台服务器宕机,之前所有的求模计算都要重来,开销较大。
一致性哈希的思路就是:用户按照顺时针方向做排列,离哪个节点近,就去访问哪个节点。
用户按照顺时针方向,离哪个节点近,就去访问哪个节点。
此时如果有服务器宕机,直接顺着找下一个服务器节点就可以了。
如果要增加节点:
RocketMQ中对于一致性哈希的源码级实现:
public class AllocateMessageQueueConsistentHash implements AllocateMessageQueueStrategy {private final InternalLogger log = ClientLogger.getLog();private final int virtualNodeCnt;private final HashFunction customHashFunction;public AllocateMessageQueueConsistentHash() {this(10);}// 设计虚拟节点数量public AllocateMessageQueueConsistentHash(int virtualNodeCnt) {this(virtualNodeCnt, null);}public AllocateMessageQueueConsistentHash(int virtualNodeCnt, HashFunction customHashFunction) {if (virtualNodeCnt < 0) {throw new IllegalArgumentException("illegal virtualNodeCnt :" + virtualNodeCnt);}this.virtualNodeCnt = virtualNodeCnt;this.customHashFunction = customHashFunction;}// 负载均衡算法主要实现@Overridepublic List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,List<String> cidAll) {if (currentCID == null || currentCID.length() < 1) {throw new IllegalArgumentException("currentCID is empty");}if (mqAll == null || mqAll.isEmpty()) {throw new IllegalArgumentException("mqAll is null or mqAll empty");}if (cidAll == null || cidAll.isEmpty()) {throw new IllegalArgumentException("cidAll is null or cidAll empty");}List<MessageQueue> result = new ArrayList<MessageQueue>();if (!cidAll.contains(currentCID)) {log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",consumerGroup,currentCID,cidAll);return result;}// 把所有消费者放到一个List里Collection<ClientNode> cidNodes = new ArrayList<ClientNode>();for (String cid : cidAll) {cidNodes.add(new ClientNode(cid));}// 创建hash环形结构final ConsistentHashRouter<ClientNode> router; //for building hash ringif (customHashFunction != null) {router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt, customHashFunction);} else {router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt);}// 根据一致性hash算法,基于客户端节点,把分配到当前消费者组的MQ添加到集合里并返回List<MessageQueue> results = new ArrayList<MessageQueue>();for (MessageQueue mq : mqAll) {ClientNode clientNode = router.routeNode(mq.toString());if (clientNode != null && currentCID.equals(clientNode.getKey())) {results.add(mq);}}return results;}@Overridepublic String getName() {return "CONSISTENT_HASH";}private static class ClientNode implements Node {private final String clientID;public ClientNode(String clientID) {this.clientID = clientID;}@Overridepublic String getKey() {return clientID;}}}
上面代码在ConsistentHashRouter
中创建了hash环,算法的主要流程是在这个类中实现的,主要是基于TreeMap
,感兴趣的小伙伴可以深入研究一下它的源码~
指定机房算法
假设有两个机房,则对应的消费关系如下图:
指定机房分配算法先根据MQ所述的Broker找出有效的机房里的所有MQ,然后再平分给所有的Consumer
public class AllocateMessageQueueByMachineRoom implements AllocateMessageQueueStrategy {private Set<String> consumeridcs;@Overridepublic List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,List<String> cidAll) {List<MessageQueue> result = new ArrayList<MessageQueue>();// 计算出当前消费者ID在消费者集合中的具体位置int currentIndex = cidAll.indexOf(currentCID);if (currentIndex < 0) {return result;}// 拿出BrokerName下的所有MQList<MessageQueue> premqAll = new ArrayList<MessageQueue>();for (MessageQueue mq : mqAll) {String[] temp = mq.getBrokerName().split("@");if (temp.length == 2 && consumeridcs.contains(temp[0])) {premqAll.add(mq);}}// 队列长度除以客户端长度int mod = premqAll.size() / cidAll.size();// 队列长度mod客户端长度int rem = premqAll.size() % cidAll.size();// 给Consumer分配MQint startIndex = mod * currentIndex;int endIndex = startIndex + mod;for (int i = startIndex; i < endIndex; i++) {result.add(premqAll.get(i));}if (rem > currentIndex) {result.add(premqAll.get(currentIndex + mod * cidAll.size()));}return result;}@Overridepublic String getName() {return "MACHINE_ROOM";}public Set<String> getConsumeridcs() {return consumeridcs;}public void setConsumeridcs(Set<String> consumeridcs) {this.consumeridcs = consumeridcs;}
}
就进机房算法
顾名思义,就近机房分配策略是一种基于Consumer
和机房距离来分配的策略。部署在同一个机房的MQ会被先分配给同一个机房里的Consumer
。
具体步骤是先统计Consumer
与Broker
所在的机房,之后再将Broker
中的MQ分配给同机房的Consumer
消费,如果本机房里没有Consumer
,则再尝试分配给其他机房的Consumer
public class AllocateMachineRoomNearby implements AllocateMessageQueueStrategy {private final InternalLogger log = ClientLogger.getLog();private final AllocateMessageQueueStrategy allocateMessageQueueStrategy;//actual allocate strategyprivate final MachineRoomResolver machineRoomResolver;public AllocateMachineRoomNearby(AllocateMessageQueueStrategy allocateMessageQueueStrategy,MachineRoomResolver machineRoomResolver) throws NullPointerException {if (allocateMessageQueueStrategy == null) {throw new NullPointerException("allocateMessageQueueStrategy is null");}if (machineRoomResolver == null) {throw new NullPointerException("machineRoomResolver is null");}this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;this.machineRoomResolver = machineRoomResolver;}@Overridepublic List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,List<String> cidAll) {if (currentCID == null || currentCID.length() < 1) {throw new IllegalArgumentException("currentCID is empty");}if (mqAll == null || mqAll.isEmpty()) {throw new IllegalArgumentException("mqAll is null or mqAll empty");}if (cidAll == null || cidAll.isEmpty()) {throw new IllegalArgumentException("cidAll is null or cidAll empty");}List<MessageQueue> result = new ArrayList<MessageQueue>();if (!cidAll.contains(currentCID)) {log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",consumerGroup,currentCID,cidAll);return result;}// 将MQ按照不同的机房归纳Map<String/*machine room */, List<MessageQueue>> mr2Mq = new TreeMap<String, List<MessageQueue>>();for (MessageQueue mq : mqAll) {String brokerMachineRoom = machineRoomResolver.brokerDeployIn(mq);if (StringUtils.isNoneEmpty(brokerMachineRoom)) {if (mr2Mq.get(brokerMachineRoom) == null) {mr2Mq.put(brokerMachineRoom, new ArrayList<MessageQueue>());}mr2Mq.get(brokerMachineRoom).add(mq);} else {throw new IllegalArgumentException("Machine room is null for mq " + mq);}}// 将consumer按照不同的机房归纳Map<String/*machine room */, List<String/*clientId*/>> mr2c = new TreeMap<String, List<String>>();for (String cid : cidAll) {String consumerMachineRoom = machineRoomResolver.consumerDeployIn(cid);if (StringUtils.isNoneEmpty(consumerMachineRoom)) {if (mr2c.get(consumerMachineRoom) == null) {mr2c.put(consumerMachineRoom, new ArrayList<String>());}mr2c.get(consumerMachineRoom).add(cid);} else {throw new IllegalArgumentException("Machine room is null for consumer id " + cid);}}List<MessageQueue> allocateResults = new ArrayList<MessageQueue>();// 1. 分配与当前消费者部署在同一机房的MQString currentMachineRoom = machineRoomResolver.consumerDeployIn(currentCID);List<MessageQueue> mqInThisMachineRoom = mr2Mq.remove(currentMachineRoom);List<String> consumerInThisMachineRoom = mr2c.get(currentMachineRoom);if (mqInThisMachineRoom != null && !mqInThisMachineRoom.isEmpty()) {allocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup, currentCID, mqInThisMachineRoom, consumerInThisMachineRoom));}//2.如果机房没有活着的消费者,则将其MQ分配给每个其他的机房for (String machineRoom : mr2Mq.keySet()) {if (!mr2c.containsKey(machineRoom)) { // no alive consumer in the corresponding machine room, so all consumers share these queuesallocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup, currentCID, mr2Mq.get(machineRoom), cidAll));}}return allocateResults;}@Overridepublic String getName() {return "MACHINE_ROOM_NEARBY" + "-" + allocateMessageQueueStrategy.getName();}/*** 一个解析器对象,用于确定消息队列或客户端部署在哪个机房。** AllocateMachineRoomNearby将使用该结果按机房对消息队列和客户端进行分组。** 返回值不能为null*/public interface MachineRoomResolver {String brokerDeployIn(MessageQueue messageQueue);String consumerDeployIn(String clientID);}
}
手动配置负载均衡参数
除了使用内置的负载均衡算法以外,还可以手动配置相关的参数,例如设置消费的队列、消费的Topic
、消费的机器等,在消费端直接设置消费队列即可:
consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueByConfig(){{this.setMessageQueueList(Collections.<MessageQueue>singletonList(new MessageQueue(){{this.setQueueId(0);this.setTopic("Topic name");this.setBrokerName("Broker name");}}));}});
上面的代码里,手动指定了消费队列的索引,Topic和Broker服务器的名称,之后Consumer就会在指定的服务器中进行消费,源码如下:
public class AllocateMessageQueueByConfig implements AllocateMessageQueueStrategy {private List<MessageQueue> messageQueueList;@Overridepublic List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,List<String> cidAll) {return this.messageQueueList;}@Overridepublic String getName() {return "CONFIG";}public List<MessageQueue> getMessageQueueList() {return messageQueueList;}public void setMessageQueueList(List<MessageQueue> messageQueueList) {this.messageQueueList = messageQueueList;}
}
可以看到源码里只提供了一个消息队列集合,就是我们上面传入的自定义配置的MQ列表,配置完成之后就可以进行负载均衡及消费。