大家心心念念的RocketMQ5.x入门手册来喽

news/2024/5/9 4:27:28/文章来源:https://blog.csdn.net/prestigeding/article/details/128995213

1、前言

为了更好的拥抱云原生,RocketMQ5.x架构进行了大的重构,提出了存储与计算分离的设计架构,架构设计图如下所示:

00

RocketMQ5.x提供了一套非常建议的消息发送、消费API,并统一放在Apache顶级开源项目rocketmq-clients下,链接:https://github.com/apache/rocketmq-clients,提供了cpp、go、java、php、rust的实现,多语言生态初现,如下图所示:

01

2、源码级调试 RocketMQ 5.x

当RocketMQ为了顺应云原生大潮,提出存储与计算分离后,想必我相信很多粉丝朋友和我一样,都希望尽快一睹RocketMQ5.x的”芳颜“,如果还没有在IDE中调试通过的小伙伴,那就跟着我的步骤来,带你一起体验RocketMQ 5.x。

Step1:从github(https://github.com/apache/rocketmq)下载源码,并导入到IDEA中,如下图所示:

02

相比RocketMQ4.x,5.x主要是增加了一个代理模块(rocketmq-proxy),将路由、计算等功能从Broker中剥离出来。

Step2:创建一个RocketMQ主目录,并在主目录中创建conf文件夹,并把源码中distribution模块中conf下的文件拷贝到当前目录,如下图所示:

03

Step3:从namesrv模块中找到类NamesrvStartup类,配置后运行,如下图所示:

04

这里的关键点在于需要配置环境变量ROCKETMQ_HOME,其路径设置为【Step2】中创建的目录,然后启动该类,输出如下所示表示NameServer启动成功。

The Name Server boot success. serializeType=JSON

Step4:从broker模块中找到类BrokerStartup,配置后运行,效果如下图所示:

05

这里有两个要点:

  • 通过 -c 参数指定broker配置文件的位置
  • 设置ROCKETMQ_HOME环境变量,其路径就是上文中conf目录所在的父目录

Step5:启动proxy模块,如下图所示:

06

设置好环境变量RMQ_PROXY_HOME环境变量,直接启动,会抛出如下错误:

07

原因是RocketMQ Proxy在启动时会RMQ_PROXY_HOME加载日志文件,我们从源码模块中distribution中logback_proxy.xml拷贝到proxy主目录的conf文件夹下。

再次尝试启动,抛出如下错误:

08

需要再从源码模块中distribution中rmq-proxy.json拷贝到proxy主目录的conf文件夹下,启动成功如下所示:

09

那问题来了,rmq-proxy.json文件中的内容是多少呢?

{"rocketMQClusterName": "DefaultCluster"
}

那这个文件中又可以陪着哪些参数呢?这个目前无法从官方网站中获取,大家可以去查看org.apache.rocketmq.proxy.config.ProxyConfig,里面所有的属性都可以在这个文件中配置。

Nameserver、broker、Proxy都已经启动成功了,那我们如何发送消息呢?

由于RocketMQ 5.x引入了Proxy,原先的RocketMQ Client API 不能直接使用,RocketMQ官方提供了一套极简API,API的完整定义在Apache顶级开源项目rocketmq-apis(https://github.com/apache/rocketmq-apis),具体的定义如下图所示:

10

具体的实现在https://github.com/apache/rocketmq-clients,实现了cpp、golang、java、php、rust的实现。

接下来,我们使用一下java版本的客户端尝试发送一条消息,代码如下所示:

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client-apis</artifactId><version>5.0.0</version></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client-java</artifactId><version>5.0.0</version></dependency>import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.SessionCredentialsProvider;
import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;public class RocketMQProxyTest {public static void main(String[] args) throws Exception {final ClientServiceProvider provider = ClientServiceProvider.loadService();// Credential provider is optional for client configuration.String accessKey = "yourAccessKey";String secretKey = "yourSecretKey";SessionCredentialsProvider sessionCredentialsProvider =new StaticSessionCredentialsProvider(accessKey, secretKey);String endpoints = "127.0.0.1:8081";ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(endpoints).setCredentialProvider(sessionCredentialsProvider).setRequestTimeout(Duration.ofSeconds(30)).build();String topic = "TopicTest";final Producer producer = provider.newProducerBuilder().setClientConfiguration(clientConfiguration)// Set the topic name(s), which is optional. It makes producer could prefetch the topic route before// message publishing..setTopics(topic)// May throw {@link ClientException} if the producer is not initialized..build();// Define your message body.byte[] body = "This is a normal message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8);String tag = "yourMessageTagA";final Message message = provider.newMessageBuilder()// Set topic for the current message..setTopic(topic)// Message secondary classifier of message besides topic..setTag(tag)// Key(s) of the message, another way to mark message besides message id..setKeys("yourMessageKey-0e094a5f9d85").setBody(body).build();final CompletableFuture<SendReceipt> future = producer.sendAsync(message);future.whenComplete((sendReceipt, throwable) -> {if (null == throwable) {System.out.println("Send message successfully, messageId=" + sendReceipt.getMessageId());} else {System.out.println("Failed to send message");}});// Block to avoid exist of background threads.Thread.sleep(Long.MAX_VALUE);// Close the producer when you don't need it anymore.producer.close();}
}

运行结果:

Send message successfully, messageId=01C6A0F34F62CB328C03EFF3EF00000000

运行成功,在这里给大家留一个作业,那消息消费如何写呢?

原文首发:https://www.codingw.net/Article?id=783

一键三连(关注、点赞、留言)是对我最大的鼓励。

各位技术朋友们,我是《RocketMQ技术内幕》一书作者,CSDN2020博客之星TOP2,热衷于中间件领域的技术分享,维护「中间件兴趣圈」公众号,旨在成体系剖析Java主流中间件,构建完备的分布式架构体系,欢迎大家大家关注我,回复「专栏」可获取15个专栏;回复「PDF」可获取海量学习资料,回复「加群」可以拉你入技术交流群,零距离与BAT大厂的大神交流。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_256843.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

TC3xx FlexRay™ 协议控制器 (E-Ray)-01

1 FlexRay™ 协议控制器 (E-Ray) E-Ray IP 模块根据为汽车应用开发的 FlexRay™ 协议规范 v2.1 执行通信【performs communication according to the FlexRay™ 1) protocol specification v2.1】。使用最大指定时钟&#xff0c;比特率可以编程为高达 10 Mbit/s 的值。连接到物…

就现在!为元宇宙和Web3对互联网的改造做准备!

欢迎来到Hubbleverse &#x1f30d; 关注我们 关注宇宙新鲜事 &#x1f4cc; 预计阅读时长&#xff1a;8分钟 本文仅代表作者个人观点&#xff0c;不代表平台意见&#xff0c;不构成投资建议。 如今&#xff0c;互联网是各种不同的网站、应用程序和平台的集合。由于彼此分离…

STM32单片机GSM短信自动存取快递柜

实践制作DIY- GC0104-自动存取快递柜 一、功能说明&#xff1a; 基于STM32单片机设计-自动存取快递柜 二、功能介绍&#xff1a; STM32F103C系列最小系统板0.96寸OLED显示器DY-SV17F串口语音播报模块4*4矩阵键盘GSM短信模块4路舵机&#xff08;模拟4个柜子&#xff09; ***…

【openGauss实战9】深度分析分区表

&#x1f4e2;&#x1f4e2;&#x1f4e2;&#x1f4e3;&#x1f4e3;&#x1f4e3; 哈喽&#xff01;大家好&#xff0c;我是【IT邦德】&#xff0c;江湖人称jeames007&#xff0c;10余年DBA及大数据工作经验 一位上进心十足的【大数据领域博主】&#xff01;&#x1f61c;&am…

Syzkaller学习笔记---更新syz-extract/syz-sysgen(一)

Syzkaller学习笔记Syzkaller 安装文件系统内核Android common kernel参考文献syzkaller 源码阅读笔记-1前言syz-extractmainarchListcreateArchesworkerprocessArchprocessFileextractcheckUnsupportedCallsarchList小结syz-sysgenmainprocessJob()generateExecutorSyscalls()w…

2016-ICLR-Order Matters- Sequence to sequence for sets

2016-ICLR-Order Matters- Sequence to sequence for sets Paper: [https://arxiv.org/pdf/1511.06391.pdf](https://arxiv.org/pdf/1511.06391.pdf) Code: 顺序重要性&#xff1a;集合的顺序到序列 摘要 许多需要从观察序列映射或映射到观察序列的复杂任务现在可以使用序列…

C++创建多线程的方法总结

下个迭代有个任务很有趣&#xff0c;用大量的线程去访问一个接口&#xff0c;直至其崩溃为止&#xff0c;这就需要多线程的知识&#xff0c;这也不是什么难事&#xff0c;总结一下C中的多线程方法&#xff1a;std、boost、pthread、windows api。 目录 一、多线程预备知识 二…

基于SpringBoot实现ChatGPT-QQ机器人

概述 近期ChatGPT火爆全球&#xff0c;在其官方网站上也列举了非常全面的应用案例&#xff0c;仅仅上线两个月活跃用户数已经达到1亿&#xff0c;成为历史上用户数增长最快的面向消费者的应用 快速体验 OpenAI官网对外提供了标准的 API 接口&#xff0c;可以通过HTTP请求进行…

简单的密码加密

用户的密码必须被加密后再存储到数据库, 否则就存在用户账号安全问题用户使用的原始密码通常称之为"原文"或"明文", 经过算法的运算, 得到的结果通常称之为"密文"在处理密码加密时, 不可以使用任何加密算法, 因为所有加密算法都是可以被逆向运算…

centos学习记录

遇到的问题及其解决办法 centos7安装图形化界面 yum groupinstall ‘X Window System’ yum groupinstall -y ‘GNOME Desktop’ 安装完成后输入init 5进入图形化界面 centos7安装vmware-tools 第一步卸载open-vm-tools 输入命令 yum remove open-vm-tools 输入命令 reboot 在…

微前端基础

一、什么是微前端 微前端是一种软件架构&#xff0c;可以将前端应用拆解成一些更小的能够独立开发部署的微型应用&#xff0c;然后再将这些微应用进行组合使其成为整体应用的架构模式。微前端架构类似于组件架构&#xff0c;但不同的是&#xff0c;组件不能独立构建和发布&…

大数据时代的小数据神器 - asqlcell

自从Google发布了经典的MapReduce论文&#xff0c;以及Yahoo开源了Hadoop的实现&#xff0c;大数据这个词就成为了一个行业的热门。在不断提高的机器性能和各种层出不穷的工具框架加持下&#xff0c;数据分析开始从过去的采样抽查变成全量整体&#xff0c;原先被抽样丢弃的隐藏…

网络安全实验室7.综合关

7.综合关 1.渗透测试第一期 url&#xff1a;http://lab1.xseclab.com/base14_2d7aae2ae829d1d5f45c59e8046bbc54/ 进入忘记密码页面&#xff0c;右键查看源码&#xff0c;发现一个手机号 解题思路&#xff1a;通过给admin用户绑定13388758688手机号码&#xff0c;然后再进行…

使用vue3,vite,less,flask,python从零开始学习硅谷外卖(16-40集)

严正声明&#xff01; 重要的事情说一遍&#xff0c;本文章仅供分享&#xff0c;文章和代码都是开源的&#xff0c;严禁以此牟利&#xff0c;严禁侵犯尚硅谷原作视频的任何权益&#xff0c;我知道学习编程的人各种各样的心思都有&#xff0c;但这不是你对开源社区侵权的理由&am…

【算法题解】15. 设计最小栈

这是一道 中等难度 的题。 题目来自&#xff1a;leetcode 题目 设计一个支持 push &#xff0c;pop &#xff0c;top 操作&#xff0c;并能在 常数时间 内检索到最小元素的栈。 实现 MinStack 类: MinStack() 初始化堆栈对象。void push(int val) 将元素val推入堆栈。void p…

驱动 | Linux | NVMe 不完全总结

本文主要参考这里 1’ 2 的解析和 linux 源码 3。 此处推荐一个可以便捷查看 linux 源码的网站 bootlin 4。 更新&#xff1a;2022 / 02 / 11 驱动 | Linux | NVMe 不完全总结NVMe 的前世今生从系统角度看 NVMe 驱动NVMe CommandPCI 总线从架构角度看 NVMe 驱动NVMe 驱动的文件…

详细解读503服务不可用的错误以及如何解决503服务不可用

文章目录1. 问题引言2. 什么是503服务不可用错误3 尝试解决问题3.1 重新加载页面3.2 检查该站点是否为其他人关闭3.3 重新启动设备3.3 联系网站4. 其他解决问的方法1. 问题引言 你以前遇到过错误503吗&#xff1f; 例如&#xff0c;您可能会收到消息&#xff0c;如503服务不可…

三种方式查看linux终端terminal是否可以访问外网ping,curl,wget

方法1&#xff1a;ping注意不要用ping www.google.com.hk来验证&#xff0c;因为有墙&#xff0c;墙阻止了你接受网址发回的响应数据。即使你那啥过&#xff0c;浏览器都可以访问Google&#xff0c;terminal里面也是无法得到响应 百度在墙内&#xff0c;所以可以正常拿到响应信…

sklearn降维算法1 - 降维思想与PCA实现

目录1、概述1.1 维度概念2、PCA与SVD2.1 降维实现2.2 重要参数n_components2.2.1 案例&#xff1a;高维数据的可视化2.2.2 最大似然估计自选超参数2.2.3 按信息量占比选超参数1、概述 1.1 维度概念 shape返回的结果&#xff0c;几维几个方括号嵌套 特征矩阵特指二维的 一般来…

truffle 创建测试合约并部署到测试网络

1、npm 安装truffle npm install -g truffle2、创建truffle项目 mkdir imooc-on-blockchain-truffle && cd imooc-on-blockchain-truffle3、初始化truffle目录&#xff0c;会生成如下几个目录 contracts 存放.sol合约文件migrations 部署脚本目录test 测试文件目录t…