Java实现Mqtt收发消息

news/2024/5/20 5:11:29/文章来源:https://blog.csdn.net/weixin_41451078/article/details/131067270

Java实现Mqtt收发消息

文章目录

  • Java实现Mqtt收发消息
    • windows mqtt 平台服务搭建
    • mqtt 客户端工具:mqttbox
    • 整体代码结构
    • mqtt基础参数配置类
    • mqtt客户端连接
    • mqtt接收的消息处理类
      • 对应的MqttService注解和MqttTopic注解
    • MqttGateway 发送消息
    • 指定topic接收处理方法

java实现mqtt对消息的交互,mqtt 的topic主题概念是相互的,这个要先理解好,
发布者和订阅者是对等的,它们之间可以相互发送消息,而不需要建立任何连接或状态
使用到windows mqtt 平台服务搭建(不是必须安装,仅 windows 测试需要此步骤)
mqtt 客户端工具:mqttbox
废话不多说,直接上代码,上工具,准备工作先做好,以及我的实现过程

windows mqtt 平台服务搭建

下载apache-apollo-1.7.1-windows版本,这里提供一个链接地址
http://archive.apache.org/dist/activemq/activemq-apollo/1.7.1/提供一个现有教程:
https://blog.csdn.net/qq_42315062/article/details/125890181
搭建完成后:登录 http://127.0.0.1:61680 即可,默认账号 admin,密码 password,
注意 这里网页的端口是 61680,但是 mqtt 服务的端口是 61613

mqtt 客户端工具:mqttbox

这里提供一个下载地方,也可以自行下载
https://download.csdn.net/download/qq_39671088/85740566?utm_medium=distribute.pc_relevant_download.none-task-download-2~default~BlogCommendFromBaidu~Rate-1-85740566-download-12755743.257%5Ev10%5Etop_income_click_base3&depth_1-utm_source=distribute.pc_relevant_download.none-task-download-2~default~BlogCommendFromBaidu~Rate-1-85740566-download-12755743.257%5Ev10%5Etop_income_click_base3&spm=1003.2020.3001.6616.1

在这里插入图片描述

整体代码结构

在这里插入图片描述

mqtt基础参数配置类

在这里插入图片描述

@Data
@Component
@ConfigurationProperties("mqtt")
public class MqttProperties {/*** 用户名*/private String username;/*** 密码*/private String password;/*** 连接地址*/private String hostUrl;/*** 进-客户Id*/private String inClientId;/*** 出-客户Id*/private String outClientId;/*** 客户Id*/private String clientId;/*** 默认连接话题*/private String defaultTopic;/*** 超时时间*/private int timeout;/*** 保持连接数*/private int keepalive;/*** 是否清除session*/private boolean clearSession;
}

mqtt客户端连接

import com.bsj.boyun.core.tool.utils.ExceptionUtil;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.ExecutorChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;@Configuration
public class MqttConfig {@Autowiredprivate MqttProperties mqttProperties;@Autowiredprivate MqttMessageHandle mqttMessageHandle;/*** 出站通道,MqttGateway类*/private static String outboundChannel = "mqttOutboundChannel";/*** Mqtt 客户端工厂 所有客户端从这里产生** @return*/@Beanpublic MqttPahoClientFactory mqttPahoClientFactory() throws MqttException {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();try {MqttConnectOptions options = new MqttConnectOptions();options.setServerURIs(mqttProperties.getHostUrl().split(","));options.setUserName(mqttProperties.getUsername());options.setPassword(mqttProperties.getPassword().toCharArray());factory.setConnectionOptions(options);} catch (Exception e) {System.out.println("mqtt初始化连接异常:" + ExceptionUtil.getStackStr(e));}return factory;}/*** Mqtt 管道适配器** @param factory* @return*/@Beanpublic MqttPahoMessageDrivenChannelAdapter adapter(MqttPahoClientFactory factory) {return new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getInClientId(), factory, mqttProperties.getDefaultTopic().split(","));}/*** 消息消费者 (接收,处理来自mqtt的消息)** @param adapter* @return*/@Beanpublic IntegrationFlow mqttInbound(MqttPahoMessageDrivenChannelAdapter adapter) {adapter.setCompletionTimeout(5000);adapter.setQos(1);//默认的消息通道,它将消息发送给为一个订阅者,然后阻碍发送直到消息被接收,传输方式都是同步的方式,都是由一个线程来运行的//这里我们可以将入站channel改成 ExecutorChannel一个可以使用多线程的channelreturn IntegrationFlows.from(adapter).channel(new ExecutorChannel(mqttThreadPoolTaskExecutor())).handle(mqttMessageHandle).get();}@Beanpublic ThreadPoolTaskExecutor mqttThreadPoolTaskExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();// 最大可创建的线程数int maxPoolSize = 200;executor.setMaxPoolSize(maxPoolSize);// 核心线程池大小int corePoolSize = 50;executor.setCorePoolSize(corePoolSize);// 队列最大长度int queueCapacity = 1000;executor.setQueueCapacity(queueCapacity);// 线程池维护线程所允许的空闲时间int keepAliveSeconds = 300;executor.setKeepAliveSeconds(keepAliveSeconds);// 线程池对拒绝任务(无线程可用)的处理策略executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());return executor;}/*** 出站处理器 (向 mqtt 发送消息 生产者)** @param factory* @return*/@Beanpublic IntegrationFlow mqttOutboundFlow(MqttPahoClientFactory factory) {MqttPahoMessageHandler handler = new MqttPahoMessageHandler(mqttProperties.getOutClientId(), factory);handler.setAsync(true);handler.setConverter(new DefaultPahoMessageConverter());handler.setDefaultTopic(mqttProperties.getDefaultTopic().split(",")[0]);return IntegrationFlows.from(outboundChannel).handle(handler).get();}}

mqtt接收的消息处理类

import com.bsj.studentcard.gateway.attendance.mqtt.annotation.MqttService;
import com.bsj.studentcard.gateway.attendance.mqtt.annotation.MqttTopic;
import com.bsj.studentcard.gateway.attendance.util.SpringUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Component;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Map;/*** 所有mqtt到达的消息都会在这里处理* 参考MVC @RequestMapping的方式* 使用注解映射到专门的Topic去处理(MqttTopicHandle类),不写 if else**/
@Component
@Slf4j
public class MqttMessageHandle implements MessageHandler {/*** 包含 @MqttService注解 的类(Component)*/public static Map<String, Object> mqttServices;/*** 所有mqtt到达的消息都会在这里处理* 要注意这个方法是在线程池里面运行的** @param message message*/@Overridepublic void handleMessage(Message<?> message) throws MessagingException {getMqttTopicService(message);}/*** 获取有@MqttService 的类,专门处理topic消息的类** @return*/public Map<String, Object> getMqttServices() {if (mqttServices == null) {mqttServices = SpringUtils.getBeansByAnnotation(MqttService.class);}return mqttServices;}/*** topic 匹配** @param message*/public void getMqttTopicService(Message<?> message) {// 在这里 我们根据不同的 主题 分发不同的消息String receivedTopic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC);if (receivedTopic == null || "".equals(receivedTopic)) {return;}for (Map.Entry<String, Object> entry : getMqttServices().entrySet()) {// 把所有带有 @MqttService 的类遍历Class<?> clazz = entry.getValue().getClass();// 获取他所有方法Method[] methods = clazz.getDeclaredMethods();for (Method method : methods) {if (method.isAnnotationPresent(MqttTopic.class)) {// 如果这个方法有 这个注解MqttTopic handleTopic = method.getAnnotation(MqttTopic.class);if (isMatch(receivedTopic, handleTopic.value())) {// 并且 这个 topic 匹配成功try {method.invoke(SpringUtils.getBean(clazz), message);return;} catch (IllegalAccessException e) {e.printStackTrace();} catch (InvocationTargetException e) {log.error("执行 {} 方法出现错误", handleTopic.value(), e);}}}}}}/*** mqtt 订阅的主题与我实际的主题是否匹配** @param topic   是实际的主题* @param pattern 是我订阅的主题 可以是通配符模式* @return 是否匹配*/public static boolean isMatch(String topic, String pattern) {if ((topic == null) || (pattern == null)) {return false;}if (topic.equals(pattern)) {// 完全相等是肯定匹配的return true;}if ("#".equals(pattern)) {// # 号代表所有主题  肯定匹配的return true;}String[] splitTopic = topic.split("/");String[] splitPattern = pattern.split("/");boolean match = true;// 如果包含 # 则只需要判断 # 前面的for (int i = 0; i < splitPattern.length; i++) {if (!"#".equals(splitPattern[i])) {// 不是# 号 正常判断if (i >= splitTopic.length) {// 此时长度不相等 不匹配match = false;break;}if (!splitTopic[i].equals(splitPattern[i]) && !"+".equals(splitPattern[i])) {// 不相等 且不等于 +match = false;break;}} else {// 是# 号  肯定匹配的break;}}return match;}
}

对应的MqttService注解和MqttTopic注解

import org.springframework.core.annotation.AliasFor;
import org.springframework.stereotype.Component;import java.lang.annotation.*;/*** 自定义注解:消息处理类*/
@Documented
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface MqttService {@AliasFor(annotation = Component.class)String value() default "";
}
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;/*** 自定义注解:topic处理方法*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface MqttTopic {/*** 主题名字*/String value() default "";}

MqttGateway 发送消息

import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;/*** MqttGateway*/@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String data);void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) Integer qos, String data);
}

指定topic接收处理方法

/*** MqttTopicHandle  指定topic消息处理*/
@MqttService
@Slf4j
@RequiredArgsConstructor
public class MqttTopicHandle {private final MqttGateway mqttGateway;/*** 上线通知** @param message*/@MqttTopic("mqtt/face/basic")public void basic(Message<?> message) throws MqttException {String receivedTopic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC);String payload = (String) message.getPayload();log.info("接收到的topic为:{},内容:{}", receivedTopic, payload );// 要回复当前主题,不回复不需要处理mqttGateway.sendToMqtt(topic, 0, "收到消息!");}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_312464.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Servlet Cookie基本概念和使用方法

目录 Cookie 介绍 Cookie 主要有两种类型&#xff1a;会话 Cookie 和持久 Cookie。 Cookie使用步骤 使用Servlet和Cookie实现客户端存储的登录功能示例&#xff1a; LoginServlet类 index.jsp 删除Cookie 浏览器中查看Cookie的方法 Cookie 介绍 Cookie 是一种在网站和…

测试老鸟总结,自动化测试难点挑战应对方法,我的进阶之路...

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 Python自动化测试&…

不定积分练习

不定积分练习 在看视频的时候遇到了一道比较有趣的题&#xff0c;在这里给大家分享一下。 题目 计算 ∫ ( 1 x − 1 x ) e x 1 x d x \int(1x-\dfrac 1x)e^{x\frac 1x}dx ∫(1x−x1​)exx1​dx 解&#xff1a; \qquad 原式 ∫ e x 1 x d x ∫ x ( 1 − 1 x 2 ) e x 1…

Promise-用法

目录 1.处理异步的几种方案 2.理解 3.promise状态&#xff1a;初始化 4.执行异步任务 5.执行异步任务成功 6.执行异步任务失败 7.执行异步任务成功-返回 8.执行异步任务失败-返回 1.处理异步的几种方案 纯粹callback&#xff0c;会剥夺函数return的能力promise&#xf…

【Java基础学习打卡01】计算机概述

目录 引言一、计算机是什么&#xff1f;1.计算机vs计算器2.计算机定义 二、计算机发展简史三、计算机分类四、计算机基本工作原理1.冯诺依曼2.冯诺依曼原理 总结 引言 其实我们在学习Java编程之前应该要对计算机有所了解&#xff0c;这里的了解不是说我们日常接触电脑就算是了…

postgres篇---docker安装postgres,python连接postgres数据库

postgres篇---docker安装postgres&#xff0c;python连接postgres数据库 一、docker安装postgres1.1 安装Docker&#xff1a;1.2 从Docker Hub获取PostgreSQL镜像1.3 创建PostgreSQL容器1.4 访问PostgreSQL 二. python连接postgres数据库2.1 connect连接2.2 cursor2.3 excute执…

docker-compose 搭建 zipkin 服务端

目录 基于docker-compose搭建服务端 数据库 服务器 docker-compose.yaml 问题 测试 基于docker-compose搭建服务端 数据库 我这边存储选择了Mysql存储&#xff0c;新建了 zipkin库&#xff0c;数据库脚本如下 -- -- Copyright 2015-2019 The OpenZipkin Authors -- -- Li…

Springboot3 + SpringSecurity + JWT + OpenApi3 实现认证授权

Springboot3 SpringSecurity JWT OpenApi3 实现双token 目前全网最新的 Spring Security JWT 实现双 Token 的案例&#xff01;收藏就对了&#xff0c;欢迎各位看友学习参考。此项目由作者个人创作&#xff0c;可以供大家学习和项目实战使用&#xff0c;创作不易&#xff…

linux 部署mysql

本文介绍下Centos7中mysql的安装(Centos7以下版本中有些命令和centos7中有些不同&#xff0c;安时需注意下自己的linux版本) 事先准备 1、查看系统中是否自带安装mysql yum list installed | grep mysql ![在这里插入图片描述](https://img-blog.csdnimg.cn/e322b2f4036c4d9…

电力能耗监测系统是如何运作的

电力能耗监测系统数据的采集主要通过多功能仪表、通讯管理机、通讯协议实现能耗数据的采集&#xff0c;能耗数据上传后经大数据计算实现能耗数据的展示&#xff0c;满足用户对能耗监测的需求。下面对电力能耗监测系统的是怎么采集数据的展开介绍&#xff1a; 1.多功能仪表 对高…

中国的互联网技术有多厉害?

1 很多人没有意识到&#xff0c;中国的互联网技术是相当厉害的。 给大家举几个例子。 我和朋友聊天的时候&#xff0c;手机上的app都在“侧耳倾听”&#xff0c;聊天的一些关键字很快就会出现在手机浏览器的搜索栏中。 携程会给我自动推荐景点&#xff0c;美团会给我推荐美食&…

【FLoyd算法(Floyd到底做什么用 进来看看)】牛的旅行【进来学习做一道题的步骤-1.读题根据样例 概括出题意-2.分析算法-3.优化算法】

专注 效率 记忆 预习 笔记 复习 做题 欢迎观看我的博客&#xff0c;如有问题交流&#xff0c;欢迎评论区留言&#xff0c;一定尽快回复&#xff01;&#xff08;大家可以去看我的专栏&#xff0c;是所有文章的目录&#xff09;   文章字体风格&#xff1a; 红色文字表示&#…

混合动力汽车耐久测试

一 背景 整车厂可通过发动机和电机驱动的结合为多款车型提供混合动力驱动技术。汽车集成电机驱动可大大减少二氧化碳的排放&#xff0c;不仅如此&#xff0c;全电动驱动或混合动力驱动的汽车还将使用户体验到更好的驾驶感受&#xff0c;且这种汽车可通过电动机来实现更快的加速…

操作系统原理 —— 内存覆盖与交换(十九)

什么情况下需要覆盖与交换 要弄清楚什么是覆盖与交换的概念&#xff0c;首先我们要知道在什么情况下才会使用到覆盖与交换。 在早期的计算机内存很小的时候&#xff0c;比如 IBM 推出的第一台 PC 机最大只支持 1 MB 大小的内存&#xff0c;因此会经常出现内存大小不够的情况&…

Dell服务器安装Ubuntu系统

1、下载镜像&#xff0c;做启动盘 镜像链接 http://old-releases.ubuntu.com/releases/20.04.2/ubuntu-20.04.2-live-server-amd64.iso 版本可以根据自己要求选择。 做启动盘 我用的是ultraiso 记得先格式化&#xff0c;再写入。 2、 设置BIOS启动 按F11&#xff0c;进入BIOS…

708教室使用方法

一、教室平面图 708教室的布局如下&#xff0c;重要的设备已经在图中标出。总开关、一体机和机柜。   二、使用方法 2.1 房间机器上电 进门后首先走到“总开关位置”&#xff0c;将电匝闭合。 原来的开关如图所示&#xff0c;有3组开关&#xff0c;1号组开关用于控制插座、…

Linux工具之htop(含移植到arm-linux系统)

文章目录 介绍安装使用一些参数讲解功能键说明一些快捷键一些指令参数 拓展&#xff1a;Linux进程PRI与NI值拓展&#xff1a;VIRT(虚拟内存)RES(常驻内存)和SHR(共享内存)拓展&#xff1a;编译成应用放到开发板上使用源码下载解压编译 介绍 Htop是一个免费的&#xff08;GPL&a…

spring实例化bean之实例化

1.关键方法createBean doGetBean中调用getSingleton方法中调用singletonFactory.getObject()触发lambda表达式中的createBean方法 AbstractAutowireCapableBeanFactory#createBean protected Object createBean(String beanName, RootBeanDefinition mbd, Nullable Object[] …

不愧是华为出来的,太强了。。。

前言 实习去了博彦科技&#xff08;外包&#xff09;&#xff0c;做的就是螺丝钉的活&#xff0c;后面还因为人效不佳&#xff0c;被开了。 正式毕业后去了另外一个做电子发票的公司&#xff0c;但是都是功能测试和一点点APP测试&#xff0c;然后经常被开发怼&#xff0c;测试…

Redis进阶:缓存穿透|缓存击穿|缓存雪崩问题

Redis应用问题 1. 缓存穿透问题1.1 问题描述1.2 解决方案方法一&#xff1a;空值缓存方法二&#xff1a;设置可访问的名单&#xff08;白名单&#xff09;方法三&#xff1a;采用布隆过滤器方法四&#xff1a;进行实时监控 2. 缓存击穿问题2.1 问题描述2.2 解决方案方法一&…