Rocketmq-Mqtt 开发实例

news/2024/5/10 19:25:34/文章来源:https://blog.csdn.net/leesinbad/article/details/129697029

一、RocketMQ MQTT 概览

传统的消息队列MQ主要应用于服务(端)之间的消息通信,比如电商领域的交易消息、支付消息、物流消息等等。然而在消息这个大类下,还有一个非常重要且常见的消息领域,即IoT类终端设备消息。近些年,我们看到随着智能家居、工业互联而兴起的面向IoT设备类的消息正在呈爆炸式增长,而且已经发展十余年的移动互联网的手机APP端消息仍然是数量级庞大。面向终端设备的消息数量级比传统服务端的消息要大很多量级并仍然在快速增长。

如果可以有一个统一的消息系统(产品)来提供多场景计算(如stream、event)、多场景(IoT、APP)接入,其实是非常有价值的,因为消息也是一种重要数据,数据如果只存在一个系统内,可以最大地降低存储成本,同时可以有效地避免数据因在不同系统间同步带来的一致性难题和挑战。

基于此,我们引入了RocketMQ-MQTT这个扩展项目来实现RocketMQ统一接入IoT设备和服务端的消息,提供一体化消息存储和互通能力。

1、MQTT协议

在IoT终端场景,目前业界广泛使用的是MQTT协议,是起源于物联网IoT场景,OASIS联盟定义的标准的开放式协议。因为IoT设备种类繁多,运行环境各异,一个标准的接入协议尤为关键。

MQTT协议定义的是一个Pub/Sub的通信模型,这个与RocketMQ是类似的,不过其在订阅方式上比较灵活,可以支持多级Topic订阅(如 “/t/t1/t2”),甚至可以支持通配符订阅(如 “/t/t1/+”)。

2、模型介绍

队列存储模型

我们设计了一种多维度分发的Topic队列模型,如上图所示,消息可以来自各个接入场景(如服务端的MQ/AMQP、客户端的MQTT),但只会写一份存到commitlog里面,然后分发出多个需求场景的队列索引(ConsumerQueue),如服务端场景(MQ/AMQP)可以按照一级Topic队列进行传统的服务端消费,客户端MQTT场景可以按照MQTT多级Topic以及通配符订阅进行消费消息。

这样的一个队列模型就可以同时支持服务端和终端场景的接入和消息收发,达到一体化的目标。

推拉模型

上图展示的是一个推拉模型,图中的P节点是一个协议网关或broker插件,终端设备通过MQTT协议连到这个网关节点。消息可以来自多种场景(MQ/AMQP/MQTT)发送过来,存到Topic队列后会有一个notify逻辑模块来实时感知这个新消息到达,然后会生成消息事件(就是消息的Topic名称),将该事件推送至网关节点,网关节点根据其连上的终端设备订阅情况进行内部匹配,找到哪些终端设备能匹配上,然后会触发pull请求去存储层读取消息再推送至终端设备。

3、架构概览

我们的目标是期望基于RocketMQ实现一体化且自闭环,但不希望Broker被侵入更多场景逻辑,我们抽象了一个协议计算层,这个计算层可以是一个网关,也可以是一个broker插件。Broker专注解决Queue的事情以及为了满足上面的计算需求做一些Queue存储的适配或改造。协议计算层负责协议接入,并且要可插拔部署。

本实例分四部分:

  • MqttConsumer.java // MQTT客户端启动订阅消息

  • MqttProducer.java // MQTT客户端启动发布消息

  • RocketMQConsumer.java //RocketMQ客户端启动订阅消息

  • RocketMQProducer.java // RocketMQ客户端启动发布消息

二、引入依赖

<dependencies><dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.2</version></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>mqtt-common</artifactId></dependency></dependencies>

三、Mqtt消息生产者MqttProducer:

import org.apache.rocketmq.mqtt.common.util.HmacSHA1Util;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;import java.nio.charset.StandardCharsets;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.text.SimpleDateFormat;
import java.util.Date;public class MqttProducer {public static void main(String[] args) throws InterruptedException, MqttException, NoSuchAlgorithmException, InvalidKeyException {MemoryPersistence memoryPersistence = new MemoryPersistence();String brokerUrl = "tcp://" + System.getenv("host") + ":1883";String firstTopic = System.getenv("topic");String sendClientId = "send01";MqttConnectOptions mqttConnectOptions = buildMqttConnectOptions(sendClientId);MqttClient mqttClient = new MqttClient(brokerUrl, sendClientId, memoryPersistence);mqttClient.setTimeToWait(5000L);mqttClient.setCallback(new MqttCallbackExtended() {@Overridepublic void connectComplete(boolean reconnect, String serverURI) {System.out.println(sendClientId + " connect success to " + serverURI);}@Overridepublic void connectionLost(Throwable throwable) {throwable.printStackTrace();}@Overridepublic void messageArrived(String topic, MqttMessage mqttMessage) {}@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {}});try {mqttClient.connect(mqttConnectOptions);} catch (Exception e) {e.printStackTrace();}long interval = 1000;for (int i = 0; i < 1000; i++) {String msg = "r1_" + System.currentTimeMillis() + "_" + i;MqttMessage message = new MqttMessage(msg.getBytes(StandardCharsets.UTF_8));message.setQos(1);String mqttSendTopic = firstTopic + "/r1";mqttClient.publish(mqttSendTopic, message);System.out.println(now() + "send: " + mqttSendTopic + ", " + msg);Thread.sleep(interval);mqttSendTopic = firstTopic + "/r/wc";msg = "wc_" + System.currentTimeMillis() + "_" + i;MqttMessage messageWild = new MqttMessage(msg.getBytes(StandardCharsets.UTF_8));messageWild.setQos(1);mqttClient.publish(mqttSendTopic, messageWild);System.out.println(now() + "send: " + mqttSendTopic + ", " + msg);Thread.sleep(interval);mqttSendTopic = firstTopic + "/r2";msg = "msgQ2_" + System.currentTimeMillis() + "_" + i;message = new MqttMessage(msg.getBytes(StandardCharsets.UTF_8));message.setQos(2);mqttClient.publish(mqttSendTopic, message);System.out.println(now() + "send: " + mqttSendTopic + ", " + msg);Thread.sleep(interval);}}private static MqttConnectOptions buildMqttConnectOptions(String clientId) throws NoSuchAlgorithmException, InvalidKeyException {MqttConnectOptions connOpts = new MqttConnectOptions();connOpts.setCleanSession(true);connOpts.setKeepAliveInterval(60);connOpts.setAutomaticReconnect(true);connOpts.setMaxInflight(10000);connOpts.setUserName(System.getenv("username"));connOpts.setPassword(HmacSHA1Util.macSignature(clientId, System.getenv("password")).toCharArray());return connOpts;}private static String now() {SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");return sf.format(new Date()) + "\t";}
}

四、Mqtt消费者MqttConsumer

import org.apache.rocketmq.mqtt.common.util.HmacSHA1Util;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.text.SimpleDateFormat;
import java.util.Date;public class MqttConsumer {public static void main(String[] args) throws MqttException, NoSuchAlgorithmException, InvalidKeyException {String brokerUrl = "tcp://" + System.getenv("host") + ":1883";String firstTopic = System.getenv("topic");MemoryPersistence memoryPersistence = new MemoryPersistence();String recvClientId = "recv01";MqttConnectOptions mqttConnectOptions = buildMqttConnectOptions(recvClientId);MqttClient mqttClient = new MqttClient(brokerUrl, recvClientId, memoryPersistence);mqttClient.setTimeToWait(5000L);mqttClient.setCallback(new MqttCallbackExtended() {@Overridepublic void connectComplete(boolean reconnect, String serverURI) {System.out.println(recvClientId + " connect success to " + serverURI);try {final String topicFilter[] = {firstTopic + "/r1", firstTopic + "/r/+", firstTopic + "/r2"};final int[] qos = {1, 1, 2};mqttClient.subscribe(topicFilter, qos);} catch (Exception e) {e.printStackTrace();}}@Overridepublic void connectionLost(Throwable throwable) {throwable.printStackTrace();}@Overridepublic void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {try {String payload = new String(mqttMessage.getPayload());String[] ss = payload.split("_");System.out.println(now() + "receive:" + topic + "," + payload);} catch (Exception e) {e.printStackTrace();}}@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {}});try {mqttClient.connect(mqttConnectOptions);} catch (Exception e) {e.printStackTrace();System.out.println("connect fail");}}private static MqttConnectOptions buildMqttConnectOptions(String clientId) throws NoSuchAlgorithmException, InvalidKeyException {MqttConnectOptions connOpts = new MqttConnectOptions();connOpts.setCleanSession(true);connOpts.setKeepAliveInterval(60);connOpts.setAutomaticReconnect(true);connOpts.setMaxInflight(10000);connOpts.setUserName(System.getenv("username"));connOpts.setPassword(HmacSHA1Util.macSignature(clientId, System.getenv("password")).toCharArray());return connOpts;}private static String now() {SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");return sf.format(new Date()) + "\t";}
}

五、RocketMQ客户端启动发布消息RocketMQProducer

import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.mqtt.common.util.TopicUtils;
import org.apache.rocketmq.remoting.exception.RemotingException;import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;public class RocketMQProducer {private static DefaultMQProducer producer;private static String firstTopic = System.getenv("firstTopic");private static String recvClientId = "recv01";public static void main(String[] args) throws Exception {//Instantiate with a producer group name.producer = new DefaultMQProducer("PID_TEST");// Specify name server addresses.producer.setNamesrvAddr(System.getenv("namesrv"));//Launch the instance.producer.start();for (int i = 0; i < 1000; i++) {//Create a message instance, specifying topic, tag and message body.//Call send message to deliver message to one of brokers.try {sendMessage(i);Thread.sleep(1000);sendWithWildcardMessage(i);Thread.sleep(1000);} catch (Exception e) {e.printStackTrace();}}//Shut down once the producer instance is not longer in use.producer.shutdown();}private static void setLmq(Message msg, Set<String> queues) {msg.putUserProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH,StringUtils.join(queues.stream().map(s -> StringUtils.replace(s, "/", "%")).map(s -> MixAll.LMQ_PREFIX + s).collect(Collectors.toSet()),MixAll.MULTI_DISPATCH_QUEUE_SPLITTER));}private static void sendMessage(int i) throws MQBrokerException, RemotingException, InterruptedException, MQClientException {Message msg = new Message(firstTopic,"MQ2MQTT",("MQ_" + System.currentTimeMillis() + "_" + i).getBytes(StandardCharsets.UTF_8));String secondTopic = "/r1";setLmq(msg, new HashSet<>(Arrays.asList(TopicUtils.wrapLmq(firstTopic, secondTopic))));SendResult sendResult = producer.send(msg);System.out.println(now() + "sendMessage: " + new String(msg.getBody()));}private static void sendWithWildcardMessage(int i) throws MQBrokerException, RemotingException, InterruptedException, MQClientException {Message msg = new Message(firstTopic,"MQ2MQTT",("MQwc_" + System.currentTimeMillis() + "_" + i).getBytes(StandardCharsets.UTF_8));String secondTopic = "/r/wc";Set<String> lmqSet = new HashSet<>();lmqSet.add(TopicUtils.wrapLmq(firstTopic, secondTopic));lmqSet.addAll(mapWildCardLmq(firstTopic, secondTopic));setLmq(msg, lmqSet);SendResult sendResult = producer.send(msg);System.out.println(now() + "sendWcMessage: " + new String(msg.getBody()));}private static Set<String> mapWildCardLmq(String firstTopic, String secondTopic) {// todo by yourselfreturn new HashSet<>(Arrays.asList(TopicUtils.wrapLmq(firstTopic, "/r/+")));}private static String now() {SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");return sf.format(new Date()) + "\t";}}

六、RocketMQ客户端启动订阅消息RocketMQConsumer

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.mqtt.common.model.Constants;import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;public class RocketMQConsumer {public static void main(String[] args) throws MQClientException {// Instantiate with specified consumer group name.DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GID_test01");// Specify name server addresses.consumer.setNamesrvAddr(System.getenv("namesrv"));// Subscribe one more more topics to consume.String firstTopic = System.getenv("firstTopic");consumer.subscribe(firstTopic, Constants.MQTT_TAG);// Register callback to execute on arrival of messages fetched from brokers.consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {MessageExt messageExt = msgs.get(0);System.out.println(now() + "Receive: " + new String(messageExt.getBody()));return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//Launch the consumer instance.consumer.start();System.out.printf("Consumer Started.%n");}private static String now() {SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");return sf.format(new Date()) + "\t";}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_274472.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Tomcat源码:启动类Bootstrap与Catalina的加载

参考资料&#xff1a; 《Tomcat源码解析系列&#xff08;一&#xff09;Bootstrap》 《Tomcat源码解析系列&#xff08;二&#xff09;Catalina》 《Tomcat - 启动过程&#xff1a;初始化和启动流程》 《Tomcat - 启动过程:类加载机制详解》 《Tomcat - 启动过程:Catalina…

不用科学上网,免费的GPT-4 IDE工具Cursor保姆级使用教程

大家好&#xff0c;我是可乐。 过去的一周&#xff0c;真是疯狂的一周。 GPT-4 震撼发布&#xff0c;拥有了多模态能力&#xff0c;不仅能和GPT3一样进行文字对话&#xff0c;还能读懂图片&#xff1b; 然后斯坦福大学发布 Alpaca 7 B&#xff0c;性能匹敌 GPT-3.5&#xff…

易基因:PIWI/piRNA在人癌症中的表观遗传调控机制(DNA甲基化+m6A+组蛋白修饰)|综述

大家好&#xff0c;这里是专注表观组学十余年&#xff0c;领跑多组学科研服务的易基因。2023年03月07日&#xff0c;南华大学衡阳医学院李二毛团队在《Molecular Cancer》杂志发表了题为“The epigenetic regulatory mechanism of PIWI/piRNAs in human cancers”的综述文章&am…

数据处理时代,绕不开的数据分析

数据分析的出现是因为人类难以理解海量数据所呈现出来的信息&#xff0c;不能从中找到相应的规律来对现实中的事物进行对应&#xff0c;我们都知道数据有很高的价值&#xff0c;但不能利用的价值&#xff0c;没有任何意义。 为了解决这一问题&#xff0c;数据分析在长期的数据…

Golang每日一练(leetDay0012)

目录 34. 查找元素首末位置 Find-first-and-last-position-of-element-in-sorted-array &#x1f31f;&#x1f31f; 35. 搜索插入位置 Search Insert Position &#x1f31f; 36. 有效的数独 Valid Sudoku &#x1f31f;&#x1f31f; &#x1f31f; 每日一练刷题专栏 …

[vue问题]Uncaught SyntaxError: Not available in legacy mode

[vue问题]Uncaught SyntaxError: Not available in legacy mode问题描述问题分析解决方案直接回退vue-i18n的版本解决错误提示的问题问题描述 Uncaught SyntaxError: Not available in legacy modeat Object.createCompileError (message-compiler.cjs.js?af13:58:1)at creat…

GTC 2023 | 「皮衣刀客」黄仁勋畅谈 AI Top 5,科学计算、生成式 AI、Omniverse 榜上有名

内容一览&#xff1a;北京时间 3 月 21 日 23:00&#xff0c;英伟达创始人兼 CEO 黄仁勋在 GTC 2023 上发表主题演讲&#xff0c;介绍了生成式 AI、元宇宙、大语言模型、云计算等领域最新进展。 关键词&#xff1a;英伟达 黄仁勋 GTC 2023 「Don’t Miss This Defining Momen…

辉煌优配|沪指震荡涨0.25%,建筑、家居等板块拉升,数字经济概念活跃

22日早盘&#xff0c;两市股指盘中强势上扬&#xff0c;接近午盘涨幅收窄&#xff1b;两市半日成交近6000亿元&#xff0c;北向资金小幅净流出。 到午间收盘&#xff0c;沪指涨0.25%报3263.85点&#xff0c;深成指涨0.39%&#xff0c;创业板指微跌0.01%&#xff0c;两市合计成交…

html(1)

创建html项目 新建html项目&#xff0c;用记事本打开&#xff1a; 只需要浏览器就可以执行里面的代码&#xff0c;不需要安装额外的运行环境&#xff08;例如JDK&#xff09; html不需要编译&#xff0c;浏览器读取后就可以执行 上述hello world在文件是如下代码&#xff1a…

静态版通讯录的实现(详解)

前言&#xff1a;内容包括三个模块&#xff1a;测试通讯录模块&#xff0c;声明模块&#xff0c;通讯录实现模块 实现一个通讯录&#xff1a; 1 可以存放100个人的信息 每个人的信息&#xff1a; 名字 性别 年龄 电话 地址 2 增加联系人信息 删除联系人信息 查找联系人信息…

windows无盘启动技术开发之传统BIOS(Legacy BIOS)引导程序开发之二

by fanxiushu 2023-03-21 转载或引用请注明原始作者&#xff0c;接上文&#xff0c;这篇文章其实主要就是讲述上文中 Int13HookEntry 这个C函数的实现过程&#xff0c;看起来就一个函数&#xff0c;可实现起来一点也不轻松。首先得准备编译环境&#xff0c;因为是16位程序&…

LeetCode岛屿问题通用解决模板

文章目录前言第一题&#xff1a;求岛屿的周长模板整理遍历方向确定边界重复遍历问题处理模板解第一题第二题&#xff1a;求岛屿数量第三题&#xff1a;岛屿的最大面积第四题&#xff1a;统计子岛屿第五题&#xff1a;统计封闭岛屿的数目第六题&#xff1a;最大人工岛总结前言 …

04.hadoopHDFS

win java访问hadoop //复制文件夹,配置环境变量//配置HADOOP_HOME为我们的路径 ,hadoop-3.3.0 ,记得JAVA_HOME不要带有空格,!!!默认java安装环境有空格C:\Program Files//要在cmd hadoop -fs 查看是否配置成功//%HADOOP_HOME%\bin到path//maven添加依赖hadoop3.1.0//创建目录Be…

常见的CMS后台getshell姿势总结

目录 WordPress dedecms aspcms 南方数据企业系统 phpmyadmin日志 pageadmin 无忧企业系统 WordPress 默认后台登录地址 /wp-login.php /wp-admin 登录后在外观的编辑里面找一个模板&#xff0c;我们在404模板 (404.php)里面写入一句话后门 可以蚁剑连接 上传一个压缩…

自定义类型的超详细讲解ᵎᵎ了解结构体和位段这一篇文章就够了ᵎ

目录 1.结构体的声明 1.1基础知识 1.2结构体的声明 1.3结构体的特殊声明 1.4结构体的自引用 1.5结构体变量的定义和初始化 1.6结构体内存对齐 那对齐这么浪费空间&#xff0c;为什么要对齐 1.7修改默认对齐数 1.8结构体传参 2.位段 2.1什么是位段 2.2位段的内存分配…

web前端框架——Vue的特性

目录 前言&#xff1a; 一.vue 二.特性 1.轻量级 2.数据绑定 3.指令 4.插件 三.比较Angular 、React 、Vue 框架之间的比较 1. Angular Angular的优点&#xff1a; 2. React React 的优点&#xff1a; 3.vue 3.Vue的优点&#xff1a; 前言&#xff1a; 本篇文章…

QT开发笔记(多媒体)

多媒体 多媒体&#xff08;Multimedia&#xff09;是多种媒体的综合&#xff0c;一般包括文本&#xff0c;声音和图像等多种媒体形式。 在计算机系统中&#xff0c;多媒体指组合两种或两种以上媒体的一种人机交互式信息交流和传播媒体。 使用的媒体包括文字、图片、照片、声音…

头歌c语言实训项目-函数(2)

(创作不易&#xff0c;感谢有你&#xff0c;你的支持&#xff0c;就是我前行的最大动力&#xff0c;如果看完对你有帮助&#xff0c;请留下您的足迹&#xff09; 目录 第1关&#xff1a;模拟投掷骰子游戏: 题目&#xff1a; 代码思路&#xff1a; 代码表示&#xff1a; 第…

20230322英语学习

Why Are So Many Gen Z-Ers Drawn to Old Digital Cameras? 老式数码相机&#xff1a;Z世代的复古潮流 The latest digital cameras boast ever-higher resolutions, better performance in low light, smart focusing and shake reduction – and they’re built right into …

牛客C/C++刷题笔记(五)

122、对于"int *pa[5]&#xff1b;"的描述中&#xff0c;&#xff08; &#xff09;是正确的。 123、以下叙述中正确的是&#xff08;&#xff09; C语言的源程序中对缩进没有要求,所以A选项错误。C语言中区分大小写,main函数不能写成Main或_main,所以B选项错误。一…