springboot集成mqtt
1. 前言
这里我们使用springboot搭建一个轻量级的mqtt客户端,连接mqtt的Broker服务。
连接信息写在配置文件里application.properties
spring.mqtt.username=admin
spring.mqtt.mqpassword=admin
spring.mqtt.host-url= tcp://127.0.0.1:1883
spring.mqtt.client-id= server_client_${random.value}
spring.mqtt.default-topic= $SYS/brokers/+/clients/#
spring.mqtt.completionTimeout= 3000
spring.mqtt.keepAlive= 60
2. 引入依赖
<!--mqtt --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-integration</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-stream</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId></dependency>
3. 配置文件
新建MqttProperties.java文件,初始化application里的mqtt配置项
/*** @author Eric* @date 2020年5月14日*/
@ConfigurationProperties("spring.mqtt")
@Component
@Getter
@Setter
public class MqttProperties {private String username;private String mqpassword;private String hostUrl;private String clientId;private String defaultTopic;private String completionTimeout;private Integer keepAlive;
}
新建MqttConfiguration.java文件,为mqtt做初始化配置
/*** @author Eric* @date 2020年5月14日*/
@Configuration
@Slf4j
public class MqttConfiguration {@Autowiredprivate MqttProperties mqttProperties;/*** 事件触发*/@Autowiredprivate ApplicationEventPublisher eventPublisher;@Beanpublic MqttConnectOptions getMqttConnectOptions(){MqttConnectOptions mqttConnectOptions=new MqttConnectOptions();mqttConnectOptions.setUserName(mqttProperties.getUsername());mqttConnectOptions.setPassword(mqttProperties.getMqpassword().toCharArray());mqttConnectOptions.setServerURIs(new String[]{mqttProperties.getHostUrl()});mqttConnectOptions.setKeepAliveInterval(2);mqttConnectOptions.setKeepAliveInterval(mqttProperties.getKeepAlive());return mqttConnectOptions;}@Beanpublic MqttPahoClientFactory mqttClientFactory() {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();factory.setConnectionOptions(getMqttConnectOptions());return factory;}@Beanpublic MessageChannel mqttInputChannel() {return new DirectChannel();}/*** 配置client,监听的topic*/@Beanpublic MessageProducer inbound() {MqttPahoMessageDrivenChannelAdapter adapter =new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getClientId()+"_inbound", mqttClientFactory(),mqttProperties.getDefaultTopic().split(","));adapter.setCompletionTimeout(Long.valueOf(mqttProperties.getCompletionTimeout()));adapter.setConverter(new DefaultPahoMessageConverter());//默认添加TopicName中所有tipicadapter.addTopic("+/+/test");adapter.setQos(2);adapter.setOutputChannel(mqttInputChannel());return adapter;}@Bean@ServiceActivator(inputChannel = "mqttInputChannel")public MessageHandler handler() {return new MessageHandler() {@Overridepublic void handleMessage(Message<?> message) throws MessagingException {String topic = message.getHeaders().get("mqtt_receivedTopic").toString();String qos = message.getHeaders().get("mqtt_receivedQos").toString();//触发事件 这里不再做业务处理,包 listener中做处理eventPublisher.publishEvent(new MqttEvent(this,topic,message.getPayload().toString()));}};}/*** 发送消息和消费消息Channel可以使用相同MqttPahoClientFactory** @return*/@Bean@ServiceActivator(inputChannel = "mqttOutboundChannel")public MessageHandler mqttOutbound() {// 在这里进行mqttOutboundChannel的相关设置MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttProperties.getClientId(), mqttClientFactory());// 如果设置成true,发送消息时将不会阻塞。messageHandler.setAsync(true);messageHandler.setDefaultTopic(mqttProperties.getDefaultTopic());return messageHandler;}@Beanpublic MessageChannel mqttOutboundChannel() {return new DirectChannel();}
}
4. MQTT消息类
新建MqttEvent.java 消息类。用于发送mqtt的消息
/*** topic事件* @author Eric* @date 2020年5月23日*/
@Getter
public class MqttEvent extends ApplicationEvent {private String topic;/*** 发送的消息*/private String message;public MqttEvent(Object source,String topic,String message) {super(source);this.topic = topic;this.message = message;}
}
5. MQTT消息接收器
新建JobListener.java文件作为 mqtt的消息接收类
/*** 触发event topic 事件* @author Eric* @date 2020年5月23日*/
@Slf4j
@Component
public class JobListener {@AutowiredDeviceDao deviceDao;/*** 监听topic* @param mqttEvent*/@EventListener(condition = "#mqttEvent.topic.startsWith('pay')")public void onEmqttCall1(MqttEvent mqttEvent) throws Exception {String topic = mqttEvent.getTopic();//写逻辑处理}/*** 监听topic* @param mqttEvent*/@EventListener(condition = "#mqttEvent.topic.equals('device')")public void onEmqttCallT(MqttEvent mqttEvent){log.info("接收到消11111111111:"+mqttEvent.getMessage());}
}
6. MQTT消息发送器
新建MqttGateway.java 提供发送mqttt消息的接口服务
/*** 触发event topic 事件* @author Eric* @date 2020年5月23日*/
@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {void sendToMqtt(String data);void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}
7. 测试MQTT发送消息
/*** @version 1.0* @author: eric* @date: 2022/7/1 上午 11:03*/
@SpringBootTest
public class Test3 {@AutowiredMqttGateway mqttGateway;@Testpublic void mqttTest () {mqttGateway.sendToMqtt("111//222/33","消息内容");}
}