RocketMQ实现延迟队列精确到秒级实现

news/2024/4/27 12:37:48/文章来源:https://blog.csdn.net/qq_19734597/article/details/129194124

前言篇:

为了节约成本,决定通过自研来改造rocketmq,添加任意时间延迟的延时队列,开源版本的rocketmq只有支持18个等级的延迟时间,

其实对于大部分的功能是够用了的,但是以前的项目,全部都是使用了阿里云的rocketmq,原因是不同的供应商的订单的延时时间是不同的

(部分供应商的订单未支付30分钟取消,有些1个半小时取消,各种时间都有),

所以使用了大量的延时队列,但是开源版本不支持任意时间延时(希望官方支持这个功能)

为了实现这个功能,网上查询了不少资料,查询到不少相关的文章,主要实现都是基于时间轮来实现的,

但是比较少开源的代码实现(也许大家都没有这个需求吧)

debug实践篇:

1. 撸起袖子加油干,首先,下载源代码 https://github.com/apache/rocketmq.git,导入ide

运行mvn package 生成jar包,如果成功的话,会生成到distribution目录下面

2. 查看文档,发现要运行namesvr 和 broker

找到 src\main\java\org\apache\rocketmq\namesrv\NamesrvStartup.java ,开心的执行main方法,

哦哦哦哦哦,果然报错了,提示 rocketmq.home.dir 目录不存在,查看源码, 原来是从system.propeties读取的,

为了调试,我毫不犹豫的加上了配置文件,

再次运行,不报错了,控制台显示,成功啦( 生活是多么美好,空气是多么清晰! )

3.运行 broker ,打开 src\main\java\org\apache\rocketmq\broker\BrokerStartup.java,执行main方法,

添加 配置文件 ( D:\\mq\\rocketmq-rocketmq-all-4.9.2是我本地的路径,你要修改成自己的 )

1 System.setProperty("rocketmq.home.dir", "D:\\mq\\rocketmq-rocketmq-all-4.9.2\\rb");2 System.setProperty("user.home", "D:\\mq\\rocketmq-rocketmq-all-4.9.2\\rb\\home\\");

运行一下,成功了,开心的发一条消息,试试,哦哦哦哦哦。 发不出去哦( 人生最痛苦的事情是,快要成功了,却没有成功 ) 。

原来还要配置namesvr地址,在启动命令,添加 -n localhost:9876 ( 上面的namesvr 启动的ip和端口)

4.漫长的改造之路 (我们是勇敢的斯巴达勇士,一直勇往直前)

用了阿里云的延时队列,发现它的message 可以传一个时间过来(任意的延时时间)

来来来,我们复制一下( 不要告诉别人,我们一直是复制,粘贴的,没有原创, 嘘 ...... )

1/** 2      * 该类预定义一些系统键. 3      */4     static public class SystemPropKey {5         public static final String TAG = "__TAG";6         public static final String KEY = "__KEY";7         public static final String MSGID = "__MSGID";8         public static final String SHARDINGKEY = "__SHARDINGKEY";9         public static final String RECONSUMETIMES = "__RECONSUMETIMES";10         public static final String BORNTIMESTAMP = "__BORNTIMESTAMP";11         public static final String BORNHOST = "__BORNHOST";12/**13          * 设置消息的定时投递时间(绝对时间). <p>例1: 延迟投递, 延迟3s投递, 设置为: System.currentTimeMillis() + 3000; <p>例2: 定时投递,14          * 2016-02-01 11:30:00投递, 设置为: new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2016-02-0115          * 11:30:00").getTime()16          */17         public static final String STARTDELIVERTIME = "__STARTDELIVERTIME";18     }
/**     * <p> 设置消息的定时投递时间(绝对时间),最大延迟时间为7天. </p> <ol> <li>延迟投递: 延迟3s投递, 设置为: System.currentTimeMillis() + 3000;</li>     * <li>定时投递: 2016-02-01 11:30:00投递, 设置为: new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2016-02-01     * 11:30:00").getTime()</li> </ol>     */    public void setStartDeliverTime(final long value) {        putSystemProperties(SystemPropKey.STARTDELIVERTIME, String.valueOf(value));    }

5.既然要改造rocketmq,在哪里改呢,debug,debug,debug(一直到天荒地老),功夫不负有心人,找到啦,

找到 \src\main\java\org\apache\rocketmq\broker\processor\SendMessageProcessor.java, 发现

public RemotingCommand processRequest(ChannelHandlerContext ctx,                                          RemotingCommand request) throws RemotingCommandException {        SendMessageContext mqtraceContext;switch (request.getCode()) {case RequestCode.CONSUMER_SEND_MSG_BACK:return this.consumerSendMsgBack(ctx, request);default:                SendMessageRequestHeader requestHeader = parseRequestHeader(request);if (requestHeader == null) {return null;                }                mqtraceContext = buildMsgContext(ctx, requestHeader);                this.executeSendMessageHookBefore(ctx, request, mqtraceContext);                RemotingCommand response;if (requestHeader.isBatch()) {                    response = this.sendBatchMessage(ctx, request, mqtraceContext, requestHeader);                } else {                    response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);                }                this.executeSendMessageHookAfter(response, mqtraceContext);return response;        }    }

继续debug,发现 sendMessage 就是处理发送消息的,

如果我们在这里判断是否延时消息就写入文件,然后返回成功到客户端,等到了时间就发送延迟消息,不就搞定了吗?

oh,yes,就是这么干的

//处理延迟消息 delay message        String startTime = msgInner.getProperty(Message.SystemPropKey.STARTDELIVERTIME);        boolean isDelayMsg = false;        long nextStartTime = 0;if (startTime != null && msgInner.getDelayTimeLevel() <= 0) {            nextStartTime = Long.parseLong(startTime);if (nextStartTime >= System.currentTimeMillis()) {                isDelayMsg = true;            }        }if (isDelayMsg) {return delayProcessor.handlePutMessageResultFuture(response, request, msgInner, ctx, queueIdInt, nextStartTime);        } else {if (traFlag != null && Boolean.parseBoolean(traFlag)) {if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {                    response.setCode(ResponseCode.NO_PERMISSION);                    response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()                                    + "] sending transaction message is forbidden");return response;                }                putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);            } else {                putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);            }return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt);        }    }

其中 delayProcessor.handlePutMessageResultFuture 是我们用来处理延迟消息的地方

我们按照每个时间一个文件夹来保存延时消息,等延时消息到达后,定时的写入延时队列里面。

详细原理,请查考 rocketmq 原理实现篇 https://www.cnblogs.com/tomj2ee/p/15815186.html

<em> </em>
package org.apache.rocketmq.broker.delay;import io.netty.channel.ChannelHandlerContext;import org.apache.commons.lang3.time.DateFormatUtils;import org.apache.rocketmq.broker.BrokerController;import org.apache.rocketmq.common.protocol.ResponseCode;import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;import org.apache.rocketmq.logging.InternalLogger;import org.apache.rocketmq.logging.InternalLoggerFactory;import org.apache.rocketmq.remoting.protocol.RemotingCommand;import org.apache.rocketmq.store.MessageExtBrokerInner;import java.io.*;import java.net.InetSocketAddress;import java.net.SocketAddress;import java.util.Date;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.ThreadLocalRandom;public class DelayProcessor implements  Runnable {    protected static final InternalLogger log = InternalLoggerFactory.getLogger(DelayProcessor.class.getCanonicalName());    protected final BrokerController brokerController;    protected final SocketAddress storeHost;    private ExecutorService jobTaskExecute = Executors.newFixedThreadPool(16);    public DelayProcessor(final BrokerController brokerController) {        this.brokerController = brokerController;        this.storeHost = new InetSocketAddress(brokerController.getBrokerConfig().getBrokerIP1(), brokerController                .getNettyServerConfig().getListenPort());        Thread thread = new Thread(this);        thread.setName("delayProcessor-run---thread");        thread.setDaemon(true);new File(getDelayPath()).mkdirs();        thread.start();        Thread missCallThread = new Thread(() -> {            try {for(;;) {                    Thread.sleep(10 * 1000);                    sendMissCallMsg();                }            } catch (InterruptedException e) {                e.printStackTrace();            }        });        missCallThread.setName("delayProcessor-callback-thread");        missCallThread.start();        System.out.println("init delay success " +getDelayPath());    }    public RemotingCommand handlePutMessageResultFuture(RemotingCommand response,                                                        RemotingCommand request,                                                        MessageExtBrokerInner msgInner,                                                        ChannelHandlerContext ctx,int queueIdInt, long nextStartTime) {return handlePutMessageResult(response, request, msgInner, ctx, queueIdInt, nextStartTime);    }    private RemotingCommand handlePutMessageResult(RemotingCommand response,                                                   RemotingCommand request, MessageExtBrokerInner msg,                                                   ChannelHandlerContext ctx,int queueIdInt, long nextStartTime) {        boolean svOk = saveMsgFile(nextStartTime, msg);        SendMessageResponseHeader sendMessageResponseHeader = new SendMessageResponseHeader();        sendMessageResponseHeader.setQueueId(1);        sendMessageResponseHeader.setMsgId("0");        sendMessageResponseHeader.setQueueOffset(0l);        sendMessageResponseHeader.setTransactionId("");        RemotingCommand newCommand = RemotingCommand.createRequestCommand(ResponseCode.SUCCESS, sendMessageResponseHeader);if (svOk) {            newCommand.setCode(ResponseCode.SUCCESS);        } else {            newCommand.setCode(ResponseCode.SYSTEM_ERROR);            newCommand.setRemark("发送消息延迟失败!");        }        newCommand.setExtFields(request.getExtFields());        newCommand.setVersion(response.getVersion());        newCommand.setOpaque(response.getOpaque());        newCommand.setLanguage(response.getLanguage());        newCommand.setBody(request.getBody());if (!request.isOnewayRPC()) {            try {                ctx.writeAndFlush(newCommand);            } catch (Throwable e) {                log.error("DelayProcessor process request over, but response failed", e);                log.error(request.toString());                log.error(response.toString());            }        }return newCommand;    }    public void putMessage(MessageExtBrokerInner msgInner) {        this.brokerController.getMessageStore().putMessage(msgInner);    }    @Override    public void run() {for (; ; ) {            long curTime = System.currentTimeMillis() / 1000;            jobTaskExecute.submit(() -> sendMsg(curTime));            try {                Thread.sleep(1000);            } catch (InterruptedException e) {            }        }    }    private String getDelayPath() {        String delayPath = "./delay-store"+ File.separator + "delay";return delayPath;    }    private boolean saveMsgFile(long startTime, MessageExtBrokerInner msgInner) {        ObjectOutputStream objectOutputStream = null;        try {            String msgId =(startTime/1000 )+"-"+ System.currentTimeMillis() + "-" + ThreadLocalRandom.current().nextInt(99999999);            System.out.println( getCurrentTime()+"写入延迟消息 >>" + msgId);            String parentDir = getDelayPath() + File.separator + startTime / 1000;            File parentFile = new File(parentDir);if (!parentFile.exists()) {                parentFile.mkdirs();            }            String fileName = parentDir + File.separator + msgId;            FileOutputStream fos = new FileOutputStream(fileName);            BufferedOutputStream bos = new BufferedOutputStream(fos);            objectOutputStream = new ObjectOutputStream(bos);            objectOutputStream.writeObject(msgInner);returntrue;        } catch (Exception ex) {            log.error("saveMsgFile ex:", ex);returnfalse;        } finally {            try {if (objectOutputStream != null) {                    objectOutputStream.close();                }            } catch (Exception ex) {                log.error("saveMsgFile ex:", ex);            }        }    }    private MessageExtBrokerInner readFile(File f) {        ObjectInputStream ois = null;        try {            ois = new ObjectInputStream(new FileInputStream(f));return (MessageExtBrokerInner) ois.readObject();        } catch (Exception ex) {return null;        } finally {if (ois != null) {                try {                    ois.close();                } catch (IOException e) {                    e.printStackTrace();                }            }        }    }    private  void sendMissCallMsg() {        File lst = new File(getDelayPath());        File[] files = lst.listFiles();        long startTime = System.currentTimeMillis() / 1000 - 10 * 1000;for (File f : files) {            String name = f.getName();if (f.isDirectory() && !name.equals(".") && !name.equals("..")) {                try {                    Long fileTime = Long.parseLong(name);if (fileTime <= startTime) {                        sendMsg(fileTime);                    }                } catch (Exception ex) {                }            }        }    }    private String  getCurrentTime(){return  Thread.currentThread().getName()+ ">>["+DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss")+"] ";    }    private void sendMsg(long startTime) {        File lst = new File(getDelayPath() + File.separator + startTime);        File[] files = lst.listFiles();if (files != null) {for (File f : files) {                System.out.println( getCurrentTime()+"时间到发送>> "+ startTime+" to commitLog " + f.getName());                MessageExtBrokerInner msgInner = readFile(f);if (msgInner != null) {                    putMessage(msgInner);                    System.out.println( getCurrentTime()+"写入log >> "+ startTime+" to commitLog " + f.getName()+" success");                    f.delete();                }            }            lst.delete();        }    }}

总结:rocketmq延迟队列实现主要是通过时间轮和文件来保存延时消息,等到了时间后,再写入延时队列,来达到延时的目的。

总共有4种方式来实现延时队列,可以参考延时队列的实现原理篇

https://www.cnblogs.com/tomj2ee/p/15815157.html

开源rocketmq延迟队列实现:

https://gitee.com/venus-suite/rocketmq-with-delivery-time.git

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_73517.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

STM32 OTA应用开发——通过USB实现OTA升级

STM32 OTA应用开发——通过USB实现OTA升级 目录STM32 OTA应用开发——通过USB实现OTA升级前言1 环境搭建2 功能描述3 BootLoader的制作4 APP的制作5 烧录下载配置6 运行测试结束语前言 什么是OTA&#xff1f; 百度百科&#xff1a;空中下载技术&#xff08;Over-the-Air Techn…

【SpringCloud系列】SpringCloudConfig配置中心

前言 我们在开发过程中总是会有各种各样的配置&#xff0c;比较如数据库连接配置&#xff0c;Mybatis配置等等各种组件的配置&#xff0c;这些配置都放在yml中&#xff0c;如果想要变更这些配置&#xff0c;需要修改yml文件&#xff0c;然后重新部署项目才能生效&#xff0c;同…

后端接收格式为x-www-form-urlencoded的数据

1.x-www-form-urlencoded是什么&#xff1f; x-www-form-urlencoded纸面翻译即所谓url格式的编码&#xff0c;是post的默认Content-Type&#xff0c;其实就是一种编码格式&#xff0c;类似json也是一种编码传输格式。form表单中使用 form的enctype属性为编码方式&#xff0…

GoFrame工程目录设计介绍

GoFrame框架针对业务项目的目录设计&#xff0c;主体的思想来源于三层架构&#xff0c;但在具体实现中&#xff0c;对其进行了一定的改进和细化使其更符合工程实践和时代进步。 一.工程目录结构 GoFrame业务项目基本目录结构如下&#xff1a; 二.目录结构解释 对外接口 对…

AWS攻略——使用中转网关(Transit Gateway)连接不同区域(Region)VPC

文章目录Peering方案Transit Gateway方案环境准备创建Transit Gateway Peering Connection接受邀请修改中转网关路由修改被邀请方中转网关路由修改邀请方中转网关路由测试修改Public子网路由知识点参考资料区别于 《AWS攻略——使用中转网关(Transit Gateway)连接同区域(Region…

记录--前端项目中运行 npm run xxx 的时候发生了什么?

这里给大家分享我在网上总结出来的一些知识&#xff0c;希望对大家有所帮助 npm 是 node 捆绑的依赖管理器&#xff0c;常用程度可想而知。那么你每天都在 npm/yarn run 的命令到底是如何运行项目的呢&#xff1f; 前端项目中运行 npm run xxx 的时候发生了什么&#xff1f;大家…

LinkSLA智能运维技术派-Redis的监控

Redis是一个开源&#xff0c;内存存储的数据服务器&#xff0c;可用作数据库、高速缓存和消息队列代理等场景。 首先我们对内存进行监控&#xff0c;主要指标如下&#xff1a; - used_memory:使用内存 - used_memory_rss:从操作系统分配的内存 - mem_fragmentation_ratio:内…

Mac mini 外接移动硬盘无法写入或者无法显示的解决方法

文章目录1. 背景2. 让NTFS格式的移动硬盘正常读写方法3. 打开“启动安全性实用工具”4. 更改“安全启动”设置1. 背景 刚买mac min&#xff08;2023年2月3日&#xff09;不久&#xff0c;发现macOS的玩起来并不容易&#xff0c;勇习惯了windows系统的习惯&#xff0c;感觉 mac…

免去打包烦恼,自动构建你的GitHub Pages|玩转GitHub Pages三部曲(二)

本文讲述了如何利用 GitHub Actions 来自动构建 GitHub Pages 项目&#xff0c;免去繁琐的手动构建再提交过程&#xff0c;让你专注于写作。大家的点赞和互动是我更文的动力 /(ㄒoㄒ)/ 所以我决定发起一项活动&#xff0c;到三月三十一日统计&#xff0c;留言次数和赞赏次数最多…

selenium基本操作

爬虫与反爬虫之间的斗争爬虫&#xff1a;对某个网站数据或图片感兴趣&#xff0c;开始抓取网站信息&#xff1b;网站&#xff1a;请求次数频繁&#xff0c;并且访问ip固定&#xff0c;user_agent也是python&#xff0c;开始限制访问&#xff1b;爬虫&#xff1a;通过设置user_a…

ifconfig不显示ipv4地址,ifconfig eth0 192.168.5.9失败

ifconfig eth0 192.168.5.9设置ip地址后&#xff0c;通过ifconfig仍然没有ipv4地址&#xff1a; 一、 执行ifup eth0启动eth0: ifconfig、ifup、ifdown &#xff1a;这三个命令的用途都是启动网络接口&#xff0c;不过&#xff0c;ifup 与 ifdown 仅就 /etc/sysconfig/network-…

【数据存储】浮点型在内存中的存储

目录 一、存储现象 二、IEEE标准规范 1.存储 2.读取 三、举例验证 1.存储 2.读取 浮点型存储的标准是IEEE&#xff08;电气电子工程师学会&#xff09;754制定的。 一、存储现象 浮点数由于其有小数点的特殊性&#xff0c;有很多浮点数是不能精确存储的&#xff0c;如&#…

阅读HAL源码之重点总结

HAL封装中有如下特点&#xff08;自己总结的&#xff09;&#xff1a; 特定外设要设置的参数组成一个结构体&#xff1b; 特定外设所有寄存器组成一个结构体&#xff1b; 地址基本都是通过宏来定义的&#xff0c;定义了各外设的起始地址&#xff0c;也就是对应寄存器结构体的地…

优秀外贸业务员必备的业务技能

2023年的春天&#xff0c;可谓是外贸企业三年寒冬后的第一个春天。外贸行业离不开的就是优秀的外贸业务员&#xff0c;那么一个优秀的外贸业务员需要有哪些必备的技能呢&#xff1f;跟着我一起来看看吧&#xff01;一、电话开发客户能力首先&#xff0c;要知道&#xff0c;声音…

【unittest学习】unittest框架主要功能

1.认识unittest在 Python 中有诸多单元测试框架&#xff0c;如 doctest、unittest、pytest、nose 等&#xff0c;Python 2.1 及其以后的版本已经将 unittest 作为一个标准模块放入 Python 开发包中。2.认识单元测试不用单元测试框架能写单元测试吗&#xff1f;答案是肯定的。单…

华为OD机试题,用 Java 解【最小施肥机能效】问题

最近更新的博客 华为OD机试 - 猴子爬山 | 机试题算法思路 【2023】华为OD机试 - 分糖果(Java) | 机试题算法思路 【2023】华为OD机试 - 非严格递增连续数字序列 | 机试题算法思路 【2023】华为OD机试 - 消消乐游戏(Java) | 机试题算法思路 【2023】华为OD机试 - 组成最大数…

无线通信时代的新技术----信标( Beacon)

随着IT技术的发展&#xff0c;无线通信技术也在不断发展。 现已根据预期用途开发了各种无线通信技术&#xff0c;例如 NFC、WIFI、Bluetooth和 RFID。 车辆内部结构的复杂化和数字化&#xff0c;车载通信网络技术的重要性也越来越高。 一个典型的例子是远程信息处理。 远程信息…

ESP32 Arduino EspNow点对点双向通讯

ESP32 Arduino EspNow点对点双向通讯✨本案例分别采用esp32和esp32C3之间点对点单播无线通讯方式。 &#x1f33f;esp32开发板 &#x1f33e;esp32c3开发板 &#x1f527;所需库(需要自行导入到Arduino IDE library文件夹中&#xff0c;无法在IDE 管理库界面搜索下载到该库)&am…

git cherry-pick could not apply fb2cde669...问题解决

最近多个分支修复bug&#xff0c;在使用git cherry-pick进行小功能合并时经常会出现类似could not apply fb2cde669...的错误。具体如下图&#xff1a;具体原因是cherry-pick指定的commit内容中和当前分支有冲突导致的。具体解决分以下步骤&#xff1a;1&#xff1a;首先使用gi…

京东前端二面必会vue面试题(持续更新中)

说一下Vue的生命周期 Vue 实例有⼀个完整的⽣命周期&#xff0c;也就是从开始创建、初始化数据、编译模版、挂载Dom -> 渲染、更新 -> 渲染、卸载 等⼀系列过程&#xff0c;称这是Vue的⽣命周期。 beforeCreate&#xff08;创建前&#xff09;&#xff1a;数据观测和初…