springboot集成mqtt

news/2024/4/26 14:24:32/文章来源:https://blog.csdn.net/qq_35921773/article/details/127451916

springboot集成mqtt

1. 前言

​ 这里我们使用springboot搭建一个轻量级的mqtt客户端,连接mqtt的Broker服务。

​ 连接信息写在配置文件里application.properties

spring.mqtt.username=admin
spring.mqtt.mqpassword=admin
spring.mqtt.host-url= tcp://127.0.0.1:1883
spring.mqtt.client-id= server_client_${random.value}
spring.mqtt.default-topic= $SYS/brokers/+/clients/#
spring.mqtt.completionTimeout= 3000
spring.mqtt.keepAlive= 60

2. 引入依赖

       <!--mqtt --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-integration</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-stream</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId></dependency>

3. 配置文件

​ 新建MqttProperties.java文件,初始化application里的mqtt配置项

/*** @author Eric* @date 2020年5月14日*/
@ConfigurationProperties("spring.mqtt")
@Component
@Getter
@Setter
public class MqttProperties {private String username;private String mqpassword;private String hostUrl;private String clientId;private String defaultTopic;private String completionTimeout;private Integer keepAlive;
}

​ 新建MqttConfiguration.java文件,为mqtt做初始化配置

/*** @author Eric* @date 2020年5月14日*/
@Configuration
@Slf4j
public class MqttConfiguration {@Autowiredprivate MqttProperties mqttProperties;/*** 事件触发*/@Autowiredprivate ApplicationEventPublisher eventPublisher;@Beanpublic MqttConnectOptions getMqttConnectOptions(){MqttConnectOptions mqttConnectOptions=new MqttConnectOptions();mqttConnectOptions.setUserName(mqttProperties.getUsername());mqttConnectOptions.setPassword(mqttProperties.getMqpassword().toCharArray());mqttConnectOptions.setServerURIs(new String[]{mqttProperties.getHostUrl()});mqttConnectOptions.setKeepAliveInterval(2);mqttConnectOptions.setKeepAliveInterval(mqttProperties.getKeepAlive());return mqttConnectOptions;}@Beanpublic MqttPahoClientFactory mqttClientFactory() {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();factory.setConnectionOptions(getMqttConnectOptions());return factory;}@Beanpublic MessageChannel mqttInputChannel() {return new DirectChannel();}/*** 配置client,监听的topic*/@Beanpublic MessageProducer inbound() {MqttPahoMessageDrivenChannelAdapter adapter =new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getClientId()+"_inbound", mqttClientFactory(),mqttProperties.getDefaultTopic().split(","));adapter.setCompletionTimeout(Long.valueOf(mqttProperties.getCompletionTimeout()));adapter.setConverter(new DefaultPahoMessageConverter());//默认添加TopicName中所有tipicadapter.addTopic("+/+/test");adapter.setQos(2);adapter.setOutputChannel(mqttInputChannel());return adapter;}@Bean@ServiceActivator(inputChannel = "mqttInputChannel")public MessageHandler handler() {return new MessageHandler() {@Overridepublic void handleMessage(Message<?> message) throws MessagingException {String topic = message.getHeaders().get("mqtt_receivedTopic").toString();String qos = message.getHeaders().get("mqtt_receivedQos").toString();//触发事件 这里不再做业务处理,包 listener中做处理eventPublisher.publishEvent(new MqttEvent(this,topic,message.getPayload().toString()));}};}/*** 发送消息和消费消息Channel可以使用相同MqttPahoClientFactory** @return*/@Bean@ServiceActivator(inputChannel = "mqttOutboundChannel")public MessageHandler mqttOutbound() {// 在这里进行mqttOutboundChannel的相关设置MqttPahoMessageHandler messageHandler =  new MqttPahoMessageHandler(mqttProperties.getClientId(), mqttClientFactory());// 如果设置成true,发送消息时将不会阻塞。messageHandler.setAsync(true);messageHandler.setDefaultTopic(mqttProperties.getDefaultTopic());return messageHandler;}@Beanpublic MessageChannel mqttOutboundChannel() {return new DirectChannel();}
}

4. MQTT消息类

新建MqttEvent.java 消息类。用于发送mqtt的消息

/*** topic事件* @author Eric* @date 2020年5月23日*/
@Getter
public class MqttEvent extends ApplicationEvent {private String topic;/*** 发送的消息*/private String message;public MqttEvent(Object source,String topic,String message) {super(source);this.topic = topic;this.message = message;}
}

5. MQTT消息接收器

新建JobListener.java文件作为 mqtt的消息接收类

/***  触发event topic 事件* @author Eric* @date 2020年5月23日*/
@Slf4j
@Component
public class JobListener {@AutowiredDeviceDao deviceDao;/*** 监听topic* @param mqttEvent*/@EventListener(condition = "#mqttEvent.topic.startsWith('pay')")public void onEmqttCall1(MqttEvent mqttEvent) throws Exception {String topic = mqttEvent.getTopic();//写逻辑处理}/*** 监听topic* @param mqttEvent*/@EventListener(condition = "#mqttEvent.topic.equals('device')")public void onEmqttCallT(MqttEvent mqttEvent){log.info("接收到消11111111111:"+mqttEvent.getMessage());}
}

6. MQTT消息发送器

新建MqttGateway.java 提供发送mqttt消息的接口服务

/***  触发event topic 事件* @author Eric* @date 2020年5月23日*/
@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {void sendToMqtt(String data);void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}

7. 测试MQTT发送消息

/*** @version 1.0* @author: eric* @date: 2022/7/1 上午 11:03*/
@SpringBootTest
public class Test3 {@AutowiredMqttGateway mqttGateway;@Testpublic void mqttTest () {mqttGateway.sendToMqtt("111//222/33","消息内容");}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_404498.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

关于Vue项目npm操作中npm run serve或npm run dev报错以及二者区别

&#x1f3af;问题分析 我们在启动vue项目的时候&#xff0c;需要用到npm操作&#xff0c;比如使用npm run serve或npm run dev。比如说我们下载其他人的开源vue项目时&#xff0c;我们想在本地运行时&#xff0c;我们用的到npm run serve或npm run dev有时候会出现报错&#x…

Windows11+wsl2+cuda+conda+pytorch安装记录之处处都是坑

由于代码在windows上运行总是抛奇奇怪怪的错误&#xff0c;一怒之下换linux环境运行&#xff0c;win11自带的wsl很方便&#xff0c;在各种博客找了n多教程&#xff0c;历时两天&#xff0c;终于安装好了环境&#xff0c;配置这个环境的目的是为了运行深度学习的代码&#xff0c…

摄像机登录网页界面提示无法跳转登录页面,网页提示无法访问此页面

摄像机登录网页界面提示无法跳转登录页面&#xff0c;网页提示无法访问此页面 &#xff08;报错截图&#xff09; 首先通过ping设备看是否ping得通IPC&#xff0c;如不通&#xff0c;检查IPC的供电和网络。 供电排查方法&#xff1a; &#xff08;1&#xff09;、观察IPC红外…

泊松流(Poisson Flow)生成模型

又搬来了神器啊~~~ 扩散模型最早来源于物理中的热力学&#xff0c;最近却在人工智能领域大放异彩。还有什么物理理论可以推动生成模型研究的发展呢&#xff1f;最近&#xff0c;来自 MIT 的研究者受到高维电磁理论的启发&#xff0c;提出了一种称作泊松流&#xff08;Poisson…

FDTD script command(源/监视器)

adddipole : 添加偶极子光源 addplane : 添加平面波 addindex : 添加折射率监视器 addmovie :添加电影监视器 addpower : 添加功率监视器 通用设置 设置结构名字 set("name",name);设置位置/跨度 set("x",1e-6);set("x span",1e-6); set(&q…

【软考中级信安】第三章--密码学基本理论

目录 1、密码学概况 1.1 密码学发展简况 1.2 密码学基本概念 1.3 密码安全性分析 2、密码体制分类 2.1 私钥密码体制&#xff08;密钥不能公开&#xff09; 2.2 公钥密码体制&#xff08;密钥可以公开&#xff09; 2.3 混合密码体制 3、常用密码算法 3.1 DES 3.2 …

【JAVA程序设计】(C00084)基于SSM+uniapp的社区疫情防控小程序及管理系统-有文档

基于SSMuniapp的社区疫情防控小程序及管理系统项目简介项目获取开发环境项目技术运行截图项目简介 基于后台ssm框架前台vue以及elementui框架小程序使用uniapp的社区疫情管控小程序及管理系统&#xff1a;系统管理员、用户 管理员角色包含以下功能&#xff1a; 首页、个人中心…

jmeter压力测试工具,雪崩效应,容错组件Sentinel

一。jmeter测试工具&#xff1a; 概念&#xff1a; 项目要上线&#xff0c;肯定是需要进行压力测试的 可以测试服务器够不够 场景&#xff1a; 一个问题&#xff0c;如果有服务接口被阻&#xff0c;那另一个接口是否会有影响 进而引出了jmeter&#xff0c;用来模拟线程压力下载…

YOLO性能指标

术语 全称 解释 True 表示推理正确 False 表示推理错误&#xff0c; 跟ground truth(位置&#xff0c;类别&#xff09;比对之后得到的结论 positive 推理为正例&#xff0c;iou > 阈值&#xff0c;类别概率>阈值 negative 推理为反例&#xff0c;不符…

HarmonyOS系统中内核实现烟雾检测的方法

大家好&#xff0c;今天我们主要来聊一聊&#xff0c;如何使用鸿蒙系统来操作烟雾浓度传感器。 目录 第一&#xff1a;烟雾浓度传感器原理 第二&#xff1a; 鸿蒙相关API函数分析 Init_E53_SF1() MQ2_PPM_Calibration() Get_MQ2_PPM() 第三&#xff1a;硬件设计 第四&…

【单片机毕业设计】【mcuclub-hj-006-7】基于单片机的空气质量(天然气、CO、甲醛、甲苯、TVOC、CO2)检测的设计

最近设计了一个项目基于单片机的空气质量&#xff08;天然气、CO、甲醛、甲苯、TVOC、CO2&#xff09;检测系统&#xff0c;与大家分享一下&#xff1a; 一、基本介绍 项目名&#xff1a;空气质量&#xff08;天然气、CO、甲醛、甲苯、TVOC、CO2&#xff09; 项目编号&#x…

04 CSS01

目标&#xff1a; 1、基础认知 2、基础选择器 3、字体和文本相关样式 4、Chrome调试工具 5、综合案例 一、基础认知 1.1 css介绍 中文名&#xff1a;层叠样式表 作用&#xff1a;给页面中的html标签设置样式 1.2 css语法规则 css写在style标签中&#xff0c;style标签一般…

【DETR 论文解读】End-to-End Object Detection with Transformer

目录前言一、整体架构二、基于集合预测的损失函数2.1、二分图匹配确定有效预测框2.2、损失函数三、前向推理四、掉包版代码五、一些问题Reference前言 贡献/特点&#xff1a; 端到端&#xff1a;去除NMS和anchor&#xff0c;没有那么多的超参&#xff0c;计算量也大大减少&am…

二叉树的OJ练习题

1.单值二叉树 描述&#xff1a;如果二叉树每个节点都具有相同的值&#xff0c;那么该二叉树就是单值二叉树。只有给定的树是单值二叉树时&#xff0c;才返回 true&#xff1b;否则返回 false。 链接&#xff1a;965. 单值二叉树 - 力扣&#xff08;LeetCode&#xff09; 思路…

世界陶瓷卫浴100强榜单发布!

​  经过一年的严格数据审查&#xff0c;科学统计分析&#xff0c;备受全行业期待的 【世界陶瓷卫浴100强统计排行榜 】于2022年10月19日在中国佛山正式发布&#xff0c;除了陶瓷卫浴企业100强总榜以外&#xff0c;还发布了全球瓷砖企业30强、全球卫浴企业20强&#xff0c;全…

Python中的对象池是什么

在程序设计中&#xff0c;创建物体模块主要是通过生成对象来实现。当对象使用结束后&#xff0c;则会成为不再需要的模块进行销毁。 而在系统进行对象的生成与销毁过程中会大量的增加内存的消耗&#xff0c;同时对象的销毁往往会留下残留的信息&#xff0c;这样将会伴随内存泄露…

javaWeb SSM车辆调度系统myeclipse定制开发mysql数据库网页模式java编程SpringMVC

一、源码特点 JSP SSM车辆调度系统是一套完善的web设计系统&#xff0c;对理解JSP java编程开发语言有帮助&#xff0c;系统具有完整的源代码 系统采用SSM框架&#xff0c;系统主要采用B/S模式开发。开发环境为 TOMCAT7.0,Myeclipse8.5开发&#xff0c;数据库为Mysql5.0&a…

swagger动态开关实践

swagger动态开关实践1. 背景2. 配置文件监听2.1 基于注解2.2 基于jdk3. swagger改造3.1 bean刷新3.2 方法重写4. 总结5. 参考资料1. 背景 系统漏洞扫描&#xff0c;扫出了swagger的问题。这个问题其实比较基础&#xff0c;那就是生产环境不应该开启swagger&#xff01; 但是&…

FreeRTOS 软件定时器的使用

FreeRTOS中加入了软件定时器这个功能组件&#xff0c;是一个可选的、不属于freeRTOS内核的功能&#xff0c;由定时器服务任务&#xff08;其实就是一个定时器任务&#xff09;来提供。 软件定时器是当设定一个定时时间&#xff0c;当达到设定的时间之后就会执行指定的功能函数&…

el-switch接口实现

后台返回的数据&#xff1a; active-textswitch 打开时的文字描述string——inactive-textswitch 关闭时的文字描述string——active-valueswitch 打开时的值boolean / string / number—trueinactive-valueswitch 关闭时的值boolean / string / number—falseactive-colorswi…