mqtt的使用与二次封装

news/2024/4/24 20:53:44/文章来源:https://blog.csdn.net/qq_43840143/article/details/128429011

前提:先安装Mosquitto并启动服务,可使用mqttx进行接收发送的测试。
Mosquitto以配置启动命令

mosquitto -c mosquitto.conf -v

原文链接:mqtt的使用
本文为测试使用固无账号密码,可在原文查看

封装后实现效果,加入一个新的topic时新建一个类进行自动订阅,该类写所订阅topic方法的处理。发送消息看原文使用方法。

所用依赖

<!--mqtt包--><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId><version>5.5.14</version></dependency><!--gson包--><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.9.0</version></dependency>

配置文件

spring:mqtt:url: tcp://127.0.0.1:1883clientId: mqttx_867916f3completionTimeout: 2000

封装后代码,链接mqtt工具类

package Ceshi.configure.mqtt;import javax.annotation.PostConstruct;import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;@Configuration
public class MqttConsumerConfig {@Value("${spring.mqtt.url}")private String hostUrl;@Value("${spring.mqtt.clientId}")private String clientId;/*** 客户端对象*/private MqttClient client;/*** 在bean初始化后连接到服务器*/@PostConstructpublic void init(){connect();}/*** 客户端连接服务端*/public void connect(){try {//创建MQTT客户端对象client = new MqttClient(hostUrl,clientId,new MemoryPersistence());//连接设置MqttConnectOptions options = new MqttConnectOptions();//是否清空session,设置为false表示服务器会保留客户端的连接记录,客户端重连之后能获取到服务器在客户端断开连接期间推送的消息//设置为true表示每次连接到服务端都是以新的身份options.setCleanSession(true);//设置超时时间,单位为秒options.setConnectionTimeout(100);//设置心跳时间 单位为秒,表示服务器每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线options.setKeepAliveInterval(20);//设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息options.setWill("willTopic",(clientId + "与服务器断开连接").getBytes(),0,false);//设置回调client.setCallback(new MqttConsumerCallBack());client.connect(options);} catch (MqttException e) {e.printStackTrace();}}/*** 断开连接*/public void disConnect(){try {client.disconnect();} catch (MqttException e) {e.printStackTrace();}}/*** 订阅主题*/public void subscribe(String topic,int qos){try {client.subscribe(topic,qos);} catch (MqttException e) {e.printStackTrace();}}/*** @Description: 消息发布*/public void publish(int qos,boolean retained,String topic,String message){MqttMessage mqttMessage = new MqttMessage();mqttMessage.setQos(qos);mqttMessage.setRetained(retained);mqttMessage.setPayload(message.getBytes());//主题的目的地,用于发布/订阅信息MqttTopic mqttTopic = client.getTopic(topic);//提供一种机制来跟踪消息的传递进度//用于在以非阻塞方式(在后台运行)执行发布是跟踪消息的传递进度MqttDeliveryToken token;try {//将指定消息发布到主题,但不等待消息传递完成,返回的token可用于跟踪消息的传递状态//一旦此方法干净地返回,消息就已被客户端接受发布,当连接可用,将在后台完成消息传递。token = mqttTopic.publish(mqttMessage);token.waitForCompletion();} catch (MqttException e) {e.printStackTrace();}}
}

监听消息工具类

package Ceshi.configure.mqtt;import org.eclipse.paho.client.mqttv3.IMqttAsyncClient;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Value;import java.util.HashMap;
import java.util.Map;public class MqttConsumerCallBack implements MqttCallback{@Value("${spring.mqtt.clientId}")private String clientId;private static Map<String,JIantingChuli> maps=new HashMap<>();public static void setJIantingChulis(String k,JIantingChuli y){maps.put(k,y);}/*** 客户端断开连接的回调*/@Overridepublic void connectionLost(Throwable throwable) {System.out.println(clientId+"与服务器断开连接,可重连");}/*** 消息到达的回调*/@Overridepublic void messageArrived(String topic, MqttMessage message) throws Exception {System.out.println(String.format("接收消息主题 : %s",topic));System.out.println(String.format("接收消息Qos : %d",message.getQos()));System.out.println(String.format("接收消息内容 : %s",new String(message.getPayload())));System.out.println(String.format("接收消息retained : %b",message.isRetained()));maps.get(topic).xiaoxifenhua(new String(message.getPayload()));//调用方法进行消费}/*** 消息发布成功的回调*/@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {IMqttAsyncClient client = iMqttDeliveryToken.getClient();System.out.println(client.getClientId()+"发布消息成功!");}}

统一处理消息接口

public  interface JIantingChuli {//根据fname找到要执行的方法,后者为传参void xiaoxifenhua(String message);
}

接收报文格式类

import lombok.AllArgsConstructor;
import lombok.Data;import java.io.Serializable;@Data
@AllArgsConstructor
public class MqResult implements Serializable{private String key;//方法名private Object data;//方法所需参数
}

一个测试订阅类(重点)(自己根据需求几个topic复制几个,内部topic与qos看情况修改,xiaoxifenhua内有几个方法就写几个分支,k为方法名)

import Ceshi.configure.mqtt.JIantingChuli;
import Ceshi.configure.mqtt.MqResult;
import Ceshi.configure.mqtt.MqttConsumerCallBack;
import Ceshi.configure.mqtt.MqttConsumerConfig;
import Ceshi.param.AppCodeApiParam;
import com.google.gson.Gson;
import lombok.Data;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import javax.annotation.Resource;@Component
public class Ceshi1 implements JIantingChuli {@Resourceprivate MqttConsumerConfig mqttConsumerConfig;private Gson gson=new Gson();private String topic="ceshi";private int qos=2;@PostConstructpublic void chushihua(){mqttConsumerConfig.subscribe(topic,qos);//添加到订阅MqttConsumerCallBack.setJIantingChulis(topic,this);//监听到消息时找到此类对象System.out.println(topic+","+qos+"已注册订阅");}@Overridepublic void xiaoxifenhua(String message) {MqResult mqres=gson.fromJson(message,MqResult.class);//json解析为对象switch (mqres.getKey()){case "ceshi":ceshi(gson.fromJson(mqres.getData().toString(), AppCodeApiParam.class));//AppCodeApiParam是随便一个类,内部有个getPassword方法显示参数值break;}}//随便写的测试方法tpublic void ceshi(AppCodeApiParam appCodeApiParam){System.out.println(appCodeApiParam.getPassword());}
}

QoS0,At most once,至多一次;
QoS1,At least once,至少一次;
QoS2,Exactly once,确保只有一次。
因上方测试类topic=“ceshi”,所以我们用MqttX进行测试一下(试了一下发送时qos等于啥都能接收到,只要topic相等)
在这里插入图片描述
参数(按照参数格式来key是要调用的方法名,data是所传参数)

{"key": "ceshi",//方法名"data": {//要传的参数"password":"132465"}
}

启动后可看见已注册订阅
在这里插入图片描述

发送消息测试结果,可见触发的方法与传递的参数均无误
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_239667.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

耗时二周,万字总结Maven简明教程,与君共勉!

什么是Mavne Maven 是一个项目管理工具&#xff0c;它包含了一个项目对象模型 (POM&#xff1a;Project Object Model)&#xff0c;一组标准集合。由于 Maven 使用标准目录布局和默认构建生命周期&#xff0c;开发团队几乎可以立即自动化项目的构建基础设施。在多个开发团队环…

消息队列RabbitMQ学习笔记(四)死信队列和延迟队列

1. 死信的概念 先从概念解释上搞清楚这个定义&#xff0c;死信&#xff0c;顾名思义就是无法被消费的消息&#xff0c;字面意思可以这样理 解&#xff0c;一般来说&#xff0c;producer 将消息投递到 broker 或者直接到queue 里了&#xff0c;consumer 从 queue 取出消息 进行…

Linux 下 使用点阵在LCD上显示汉字,字符

文章目录前言一、显示字符1.获取点阵&#xff1a;2.描点&#xff08;显示字符函数&#xff09;&#xff1a;3. 要打开LCD设备&#xff1a;4. 通过ioctl 获取Framebuffer参数:5. 通过mmap映射出Framebuffer的地址&#xff1a;6.清屏并显示字符&#xff1a;二、显示汉字1.区位码&…

多线程基础入门

文章目录前言一、认识线程&#xff08;一&#xff09;概念1.线程是什么2.为啥要有线程&#xff08;轻量级进程&#xff09;为什么线程比进程更轻量经典面试题&#xff1a;谈谈进程和线程的区别和联系3.线程的结构&#xff08;二&#xff09;第一个多线程程序&#xff08;三&…

我国用电信息采集系统行业应用需求及市场容量分析 现6省上线运行

用户用电信息采集系统是通过对配电变压器和终端用户的用电数据的采集和分析&#xff0c;实现用电监控、推行阶梯定价、负荷管理、线损分析&#xff0c;最终达到自动抄表、错峰用电、用电检查&#xff08;防窃电&#xff09;、负荷预测和节约用电成本等目的。建立全面的用户用电…

RabbitMQ 第一天 基础 4 RabbitMQ 的工作模式 4.4 Topic 通配符模式 4.5 工作模式总结

RabbitMQ 【黑马程序员RabbitMQ全套教程&#xff0c;rabbitmq消息中间件到实战】 文章目录RabbitMQ第一天 基础4 RabbitMQ 的工作模式4.4 Topic 通配符模式4.4.1 模式说明4.4.2 代码编写4.4.3 小结4.5 工作模式总结第一天 基础 4 RabbitMQ 的工作模式 4.4 Topic 通配符模式 …

32天高效突击:开源框架+性能优化+微服务架构+分布式,面阿里获P7(脑图、笔记、面试考点全都有)

今年的大环境不佳&#xff0c;所以大部分的人在今年的招聘旺季都没有收获到好的结果。 但不要着急&#xff0c;今天分享的内容则是由 一位阿里P7的面试心得&#xff0c;通过32天的高效突击训练&#xff0c;成功拿下offer的学习方法。 篇章分为三大章节&#xff0c;可以根据自…

day 10 模拟和高精度

P1328 [NOIP2014 提高组] 生活大爆炸版石头剪刀布 #include<bits/stdc.h> using namespace std; int n, na, nb, fa, fb;//f:得分 int a[205], b[205];void fun(int ta, int tb){if(ta 0 && tb 1) fb;if(ta 1 && tb 0) fa;if(ta 0 && tb …

【nowcoder】笔试强训Day2

目录 一、选择题 二、编程题 2.1排序子序列 2.2倒置字符串 一、选择题 1.A 派生出子类 B &#xff0c; B 派生出子类 C &#xff0c;并且在 java 源代码有如下声明&#xff1a; 1. A a0new A(); 2. A a1new B(); 3. A a2new C(); 问以下哪个说法是正确的&#xff08;&…

机器学习 | 线性回归

一.基本原理 利用回归方程&#xff08;函数&#xff09;对一个或多个自变量&#xff08;特征值&#xff09;和因变量&#xff08;目标值&#xff09;之间关系进行建模的一种分析方式 根据线性代数&#xff0c;我们可以定义方程 xwy&#xff0c;在线性回归问题中&#xff0c;x…

前端小知识:赋予变量默认值(逻辑与运算符、空值合并运算符、逻辑空运算符)

8. 逻辑与运算符、空值合并运算符、逻辑空运算符&#xff08;可用赋予默认值&#xff09; &#xff08;空值合并运算符&#xff09;官方文档&#xff1a; https://developer.mozilla.org/zh-CN/docs/Web/JavaScript/Reference/Operators/Nullish_coalescing   &#xff08;逻辑…

【推荐收藏】这份图解算法数据结构的材料太良心

5年前发生的一件事&#xff0c;成为了我职业生涯的重要转折点。当时的我在交大读研&#xff0c;对互联网求职一无所知&#xff0c;但仍然硬着头皮申请了 Microsoft 实习生。面试官让我在白板上写出“快速排序”代码&#xff0c;我畏畏缩缩地写了一个“冒泡排序”&#xff0c;并…

1754. 构造字典序最大的合并字符串

摘要 1754. 构造字典序最大的合并字符串 一 贪心算法分析 题目要求合并两个字符串 word1 与 word2&#xff0c;且要求合并后的字符串字典序最大。首先需要观察一下合并的选择规律&#xff0c;假设当前需要从 word1​ 的第 i 个字符和 word2​ 的第 j个字符选择一个字符加入到…

自制macOS安装镜像iso虚拟机用

在网上下载的用于在虚拟机中安装的镜像版本相对比较旧。安装完成后还要进行升级比较麻烦。于是我就想自己制作安装镜像了。 精华 #创建空白磁盘镜像 hdiutil create -o /tmp/ventura -size 13800m -volname ventura -layout SPUD -fs HFSJ #挂载上面创建的镜像 hdiutil attac…

内容资产管理11问

&#x1f447;点击一键关注主笔&#xff1a;邹小困、邝晴岚主持人&#xff1a;增长黑盒分析师Emma出品&#xff1a;增长黑盒研究组前言在这个信息爆炸的数据时代&#xff0c;各个行业正积极推进数字化转型&#xff0c;产业升级与技术赋能成为主题之一。在推进企业线上线下融合的…

最近面试遇到一个算法题,简单写一点。

第⼀题&#xff08;必答&#xff09; 请针对有重复数字的数组设计⼀个快排算法&#xff0c;⽐如&#xff1a;[34, 34, 89, 1, 1, 20, 12]&#xff0c;排序后结果为 [89,34,34,20,12,1,1] 第⼆题&#xff08;必答&#xff09; 请利⽤Redis 实现⼀个通⽤分布式锁&#xff0c;并…

B+树 [数据结构与算法][Java]

B树 B树是B树的一种变形 我们通过一颗四阶B树来理解认识一下B树:(如下:) 我们其实从图上就可以看出B树和B树是有很多不同之处的 比如我们的B树中将叶子结点层的所有结点使用一个链表串联了起来B树中对于非叶子结点都是只是存储的索引(指针), 并没有存储关键字, 所以我们最终查…

Linux系统基础——BIOS和Bootloader

BIOS和Bootloader 特此说明: 刘超的趣谈linux操作系统是比较重要的参考资料&#xff0c;本文大部分内容和所有图片来源于这个专栏。 1 了解背景 1.1 目的 操作系统不是在板子上电就直接运行的&#xff0c;上电到系统启动的中间过程要搞明白&#xff0c;比如了解linux系统启动…

火山引擎 DataTester 上线“流程画布”功能,支持组合型 A/B 实验分析

更多技术交流、求职机会&#xff0c;欢迎关注字节跳动数据平台微信公众号&#xff0c;回复【1】进入官方交流群 在精细化运营的时代&#xff0c;运营活动同样需要有精细化的策略&#xff0c;例如在年末大促活动中&#xff0c;设计 APP 弹窗提醒、满减、会员领券时&#xff0c;我…

TikTok 加速团结独立站,跨境电商的又一次红利期?

TikTok近年来在国际上非常流行。2021年8月&#xff0c;TikTok的全球下载量首次超过Facebook&#xff0c;成为全球最大的下载量。TikTok的诞生打破了海外社交媒体的垄断&#xff0c;TikTok营销成为许多跨境卖家的重点之一。 封号事件发生后&#xff0c;许多跨境卖家开始向独立站…