RocketMQ 源码分析——Producer

news/2024/4/29 12:16:25/文章来源:https://blog.csdn.net/qq_28314431/article/details/133035661

文章目录

  • 消息发送代码实现
  • 消息发送者启动流程
    • 检查配置
    • 获得MQ客户端实例
    • 启动实例
    • 定时任务
  • Producer 消息发送流程
    • 选择队列
      • 默认选择队列策略
      • 故障延迟机制策略*
      • 两种策略的选择
  • 技术亮点:ThreadLocal

消息发送代码实现

下面是一个生产者发送消息的demo(同步发送)

image.png

主要做了几件事:

  • 初始化一个生产者(DefaultMQProducer)对象
  • 设置 NameServer 的地址
  • 启动生产者
  • 发送消息

消息发送者启动流程

image.png

DefaultMQProducerImpl类start()

image.png

检查配置

DefaultMQProducerImpl

image.png

获得MQ客户端实例

整个JVM中只存在一个MQClientManager实例,维护一个MQClientInstance缓存表。DefaultMQProducerImpl类start()

image.png

一个clientId只会创建一个MQClientInstance

image.png

clientId生成规则:IP@instanceName@unitName

image.png

RocketMQ中消息发送者、消息消费者都属于”客户端“。每一个客户端就是一个MQClientInstance,每一个ClientConfig对应一个实例。

不同的生产者、消费端如果引用同一个客户端配置(ClientConfig),则它们共享一个MQClientInstance实例。所以我们在定义的的时候要注意这种问题(生产者和消费者如果分组名相同容易导致这个问题)

image.png

启动实例

MQClientInstance类start()

image.png

定时任务

MQClientInstance类startScheduledTask()

image.png

Producer 消息发送流程

我们从一个生产者案例的代码进入代码可知:DefaultMQProducerImpl中的sendDefaultImpl()是生产者消息发送的核心方法

image.png

image.png

从核心方法可知消息发送就是4个步骤:验证消息、查找路由、选择队列、消息发送。

image.png

image.png

选择队列

默认选择队列策略

采用了最简单的轮询算法,这种算法有个很好的特性就是,保证每一个Queue队列的消息投递数量尽可能均匀。这种算法只要消息投递过程中没有发生重试的话,基本上可以保证每一个Queue队列的消息投递数量尽可能均匀。当然如果投递中发生问题,比如第一次投递就失败,那么很大的可能性是集群状态下的一台Broker挂了,所以在重试发送中进行规避。这样设置也是比较合理的。

故障延迟机制策略*

采用此策略后,每次向Broker成功或者异常的发送,RocketMQ都会计算出该Borker的可用时间(发送结束时间-发送开始时间,失败的按照30S计算),并且保存,方便下次发送时做筛选。

image.png

除了记录Broker的发送消息时长之外,还要计算一个Broker的不可用时长。这里采用一个经验值:

如果发送时长在550ms之内,不可用时长为0。

达到550ms,不可用时长为30S

达到1000ms,不可用时长为60S

达到2000ms,不可用时长为120S

达到3000ms,不可用时长为180S

达到15000ms,不可用时长为600S

image.png

image.png

有了以上的Broker规避信息后发送消息就非常简单了。

在开启故障延迟机制策略步骤如下:

  1. 根据消息队列表时做轮训
  2. 选好一个队列
  3. 判断该队列所在Broker是否可用
  4. 如果是可用则返回该队列,队列选择逻辑结束
  5. 如果不可用,则接着步骤2继续
  6. 如果都不可用,则随机选一个

代码如下:

image.png

两种策略的选择

从这种策略上可以很明显看到,默认队列选择是轮训策略,而故障延迟选择队列则是优先考虑消息的发送时长短的队列。那么如何选择呢?

首先RocketMQ默认的发送失败有重试策略,默认是2,也就是如果向不同的Broker发送三次都失败了那么这条消息的发送就失败了,作为RocketMQ肯定是尽力要确保消息发送成功。所以给出以下建议。

如果是网络比较好的环境,推荐默认策略,毕竟网络问题导致的发送失败几率比较小。

如果是网络不太好的环境,推荐故障延迟机制,消息队列选择时,会在一段时间内过滤掉RocketMQ认为不可用的broker,以此来避免不断向宕机的broker发送消息,从而实现消息发送高可用。

当然以上成立的条件是一个Topic创建在2个Broker以上的的基础上。

技术亮点:ThreadLocal

image.png

image.png

image.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_173574.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Unity丨移动相机朝向目标并确定目标在摄像机可视范围内丨摄像机注释模型丨摄像机移动丨不同尺寸模型优化丨

文章目录 问题描述功能展示技术细节小结 问题描述 本文提供的功能是摄像机朝向目标移动,并确定整个目标出现在摄像机视角内,针对不同尺寸的模型优化。 功能展示 提示:这里可以添加技术名词解释 技术细节 直接上代码 using UnityEngine;…

【Linux】【网络】传输层协议:UDP

文章目录 UDP 协议1. 面向数据报2. UDP 协议端格式3. UDP 的封装和解包4. UDP 的缓冲区 UDP 协议 UDP传输的过程类似于寄信。 无连接:知道对端的IP和端口号就直接进行传输,不需要建立连接。不可靠:没有确认机制,没有重传机制&am…

基于微信小程序的超市售货管理平台设计与实现(源码+lw+部署文档+讲解等)

前言 💗博主介绍:✌全网粉丝10W,CSDN特邀作者、博客专家、CSDN新星计划导师、全栈领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java、小程序技术领域和毕业项目实战✌💗 👇🏻…

面向面试知识--MySQL数据库与索引

面向面试知识–MySQL数据库与索引 优化难点与面试点 什么是MySQL索引? 索引的MySQL官方定义:索引是帮助MySQL快速获取数据的数据结构。 动力节点原文: MysQL官方对于索引的定义:索引是帮助MySQL高效获取数据的数据结构。 MysQL在存储数据之…

【Unity实战】从零手戳一个库存背包系统

文章目录 前言素材开始一、绘制背包UI二、背包开启关闭三、初始化背包网格四、 添加物品五、 拖拽交换功能物品六、 物品拆分七、 物品堆叠八、 拖拽还原九、 引入字典存储数据十、 拾取物品十一、 丢弃物品 最终效果源码完结 前言 库存背包系统是大多数游戏的关键部分&#x…

python项目2to3方案预研

目录 官方工具2to3工具安装参数解释基本使用工具缺陷 future工具安装参数解释基本使用工具缺陷 python-modernize工具安装参数解释基本使用工具缺陷 pyupgrade工具安装参数解释基本使用工具缺陷 对比 官方工具2to3 2to3 是Python官方提供的用于将Python 2代码转换为Python 3代…

解决qml编译时出现错误ninja: build stopped: subcommand failed.

qml编译时出现错误ninja: build stopped: subcommand failed. 如下图: 解决这个编译错误其实很简单,我把Window写错了,写成了window, 如果有类似的报错,可以检查一下qml代码是否有问题。当然在Qt Creator里也没有错误提示&#x…

Canal实现Mysql数据同步至Redis、Elasticsearch

文章目录 1.Canal简介1.1 MySQL主备复制原理1.2 canal工作原理 2.开启MySQL Binlog3.安装Canal3.1 下载Canal3.2 修改配置文件3.3 启动和关闭 4.SpringCloud集成Canal4.1 Canal数据结构![在这里插入图片描述](https://img-blog.csdnimg.cn/c64b40c2231a4ea39a95aac81d771bd1.pn…

重装系统(配置环境)

这里写目录标题 0.重装系统1.python1.1 anaconda1.2 pycharm1.3 深度学习环境配置 2.java2.1.安装JDK2.2.配置JDK环境变量2.3IDEA2.4 Maven 3.大数据3.1 虚拟机3.2 Hadoop平台3.3 存储3.4 采集3.5 计算3.6 查询3.7 可视化 0.重装系统 // An highlighted block var foo bar;1.…

【性能优化下】组织结构同步优化二,全量同步/增量同步,断点续传实现方式

看到这一篇文章的 xdm ,应该对组织结构同步有一些想法了吧,如果没有,可以看前面两篇文章,可以通过如下地址查看一下: 【性能优化上】第三方组织结构同步优化一,你 get 到了吗? 坑爹&#xff0c…

UE5读取json文件

一、下载插件 在工程中启用 二、定义读取外部json文件的函数,参考我之前的文章 ue5读取外部文件_艺菲的博客-CSDN博客 三、读取文件并解析为json对象 这里Load Text就是自己定义的函数,ResourceBundle为一个字符串常量,通常是读取的文件夹…

数据预处理方式合集

删除空行 #del all None value data_all.dropna(axis1, howall, inplaceTrue) 删除空列 #del all None value data_all.dropna(axis0, howall, inplaceTrue) 缺失值处理 观测缺失值 观测数据缺失值有一个比较好用的工具包——missingno,直接传入DataFrame&…

【Html】用CSS定义咖啡 - 咖啡配料展示

显示效果 代码 index.html <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><title>CodePen - For The Love Of Coffee</title><link rel"stylesheet" href"./style.css">&l…

Jenkins结合Gitlab,实现镜像构建及推送

docker-compose jenkins的docker-compose目录为为/home/jenkins&#xff0c;这个后面写脚本的时候需要对应上 version: 3 services:docker_jenkins:restart: alwaysimage: jenkins/jenkins:ltscontainer_name: docker_jenkinsprivileged: true ports:- 8080:8080- 50000:5000…

MyBatis基础之执行SQL

文章目录 执行 SQL 语句1. 增删改操作insert 元素insert 过程中的主键回填delete 元素 和 update 元素 2. getMapper 方法3. 查操作select 元素select 与 聚合函数 4. 传递多个参数使用 Map 传递多参数使用 JavaBean 传递多参使用注解方式传递多参数 执行 SQL 语句 Mapper 是 …

(2) Java 8 实战第二版——补充 收集数据、并行数据处理能力与性能

第6章 用Collectors类创建和使用收集器将数据流归约为一个值汇总&#xff1a;归约的特殊情况数据分组和分区开发你的自定义收集器 对一个交易列表按货币分组&#xff0c;获得该货币的所有交易额总和&#xff08;返回一个Map<Currency, Integer>&#xff09;。将交易列表…

面向组织分析的内容

声明 本文是学习GB-T 42859-2023 航天产品质量问题三个面向分析方法实施要求. 而整理的学习笔记,分享出来希望更多人受益,如果存在侵权请及时联系我们 1 范围 本文件规定了航天产品质量问题三个面向分析方法实施的一般要求、程序和分析内容。 本文件适用于承担航天产品研制…

性能测试 —— Tomcat监控与调优:Jconsole监控

JConsole的图形用户界面是一个符合Java管理扩展(JMX)规范的监测工具&#xff0c;JConsole使用Java虚拟机(Java VM)&#xff0c;提供在Java平台上运行的应用程序的性能和资源消耗的信息。在Java平台&#xff0c;标准版(Java SE平台)6&#xff0c;JConsole的已经更新到目前的外观…

阿里云服务器价格表,轻量和服务器最新活动价格表汇总

租用阿里云服务器怎么收费&#xff1f;阿里云服务器配置不同一年价格也不同&#xff0c;阿里云2核2G3M带宽108元一年、2核4G4M带宽297.98元12个月&#xff0c;云服务器u1公网带宽可选1M到5M&#xff0c;系统盘为ESSD云盘40GB起&#xff0c;CPU内存配置可选2核2G、2核4G、4核8G、…

抖音seo矩阵系统源码分享-技术梳理

抖音seo源码&#xff0c;抖音seo矩阵系统源码技术搭建&#xff0c;抖音seo源码技术开发思路梳理搭建 抖音账号矩阵系统部分源代码分享 if (empty($video_item)) {$this->displayJsonError(参数错误);}$curr_platform json_decode($video_item[dv_platform], 1);$curr_plat…