Java实现Mqtt收发消息
文章目录 Java实现Mqtt收发消息 windows mqtt 平台服务搭建 mqtt 客户端工具:mqttbox 整体代码结构 mqtt基础参数配置类 mqtt客户端连接 mqtt接收的消息处理类 对应的MqttService注解和MqttTopic注解 MqttGateway 发送消息 指定topic接收处理方法
java实现mqtt对消息的交互,mqtt 的topic主题概念是相互的,这个要先理解好,
发布者和订阅者是对等的,它们之间可以相互发送消息,而不需要建立任何连接或状态
使用到windows mqtt 平台服务搭建( 不是必须安装,仅 windows 测试需要此步骤)
mqtt 客户端工具:mqttbox
废话不多说,直接上代码,上工具,准备工作先做好,以及我的实现过程
windows mqtt 平台服务搭建
下载apache-apollo-1.7.1-windows版本,这里提供一个链接地址
http://archive.apache.org/dist/activemq/activemq-apollo/1.7.1/提供一个现有教程:
https://blog.csdn.net/qq_42315062/article/details/125890181
搭建完成后:登录 http://127.0.0.1:61680 即可,默认账号 admin,密码 password,
注意 这里网页的端口是 61680 ,但是 mqtt 服务的端口是 61613
mqtt 客户端工具:mqttbox
这里提供一个下载地方,也可以自行下载
https://download.csdn.net/download/qq_39671088/85740566?utm_medium= distribute.pc_relevant_download.none-task-download-2~default~BlogCommendFromBaidu~Rate-1-85740566-download-12755743.257%5Ev10%5Etop_income_click_base3& depth_1-utm_source= distribute.pc_relevant_download.none-task-download-2~default~BlogCommendFromBaidu~Rate-1-85740566-download-12755743.257%5Ev10%5Etop_income_click_base3& spm = 1003.2020 .3001.6616.1
整体代码结构
mqtt基础参数配置类
@Data
@Component
@ConfigurationProperties ( "mqtt" )
public class MqttProperties { private String username; private String password; private String hostUrl; private String inClientId; private String outClientId; private String clientId; private String defaultTopic; private int timeout; private int keepalive; private boolean clearSession;
}
mqtt客户端连接
import com. bsj. boyun. core. tool. utils. ExceptionUtil ;
import org. eclipse. paho. client. mqttv3. MqttConnectOptions ;
import org. eclipse. paho. client. mqttv3. MqttException ;
import org. springframework. beans. factory. annotation. Autowired ;
import org. springframework. context. annotation. Bean ;
import org. springframework. context. annotation. Configuration ;
import org. springframework. integration. channel. ExecutorChannel ;
import org. springframework. integration. dsl. IntegrationFlow ;
import org. springframework. integration. dsl. IntegrationFlows ;
import org. springframework. integration. mqtt. core. DefaultMqttPahoClientFactory ;
import org. springframework. integration. mqtt. core. MqttPahoClientFactory ;
import org. springframework. integration. mqtt. inbound. MqttPahoMessageDrivenChannelAdapter ;
import org. springframework. integration. mqtt. outbound. MqttPahoMessageHandler ;
import org. springframework. integration. mqtt. support. DefaultPahoMessageConverter ;
import org. springframework. scheduling. concurrent. ThreadPoolTaskExecutor ;
import java. util. concurrent. ThreadPoolExecutor ; @Configuration
public class MqttConfig { @Autowired private MqttProperties mqttProperties; @Autowired private MqttMessageHandle mqttMessageHandle; private static String outboundChannel = "mqttOutboundChannel" ; @Bean public MqttPahoClientFactory mqttPahoClientFactory ( ) throws MqttException { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory ( ) ; try { MqttConnectOptions options = new MqttConnectOptions ( ) ; options. setServerURIs ( mqttProperties. getHostUrl ( ) . split ( "," ) ) ; options. setUserName ( mqttProperties. getUsername ( ) ) ; options. setPassword ( mqttProperties. getPassword ( ) . toCharArray ( ) ) ; factory. setConnectionOptions ( options) ; } catch ( Exception e) { System . out. println ( "mqtt初始化连接异常:" + ExceptionUtil . getStackStr ( e) ) ; } return factory; } @Bean public MqttPahoMessageDrivenChannelAdapter adapter ( MqttPahoClientFactory factory) { return new MqttPahoMessageDrivenChannelAdapter ( mqttProperties. getInClientId ( ) , factory, mqttProperties. getDefaultTopic ( ) . split ( "," ) ) ; } @Bean public IntegrationFlow mqttInbound ( MqttPahoMessageDrivenChannelAdapter adapter) { adapter. setCompletionTimeout ( 5000 ) ; adapter. setQos ( 1 ) ; return IntegrationFlows . from ( adapter) . channel ( new ExecutorChannel ( mqttThreadPoolTaskExecutor ( ) ) ) . handle ( mqttMessageHandle) . get ( ) ; } @Bean public ThreadPoolTaskExecutor mqttThreadPoolTaskExecutor ( ) { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor ( ) ; int maxPoolSize = 200 ; executor. setMaxPoolSize ( maxPoolSize) ; int corePoolSize = 50 ; executor. setCorePoolSize ( corePoolSize) ; int queueCapacity = 1000 ; executor. setQueueCapacity ( queueCapacity) ; int keepAliveSeconds = 300 ; executor. setKeepAliveSeconds ( keepAliveSeconds) ; executor. setRejectedExecutionHandler ( new ThreadPoolExecutor. CallerRunsPolicy ( ) ) ; return executor; } @Bean public IntegrationFlow mqttOutboundFlow ( MqttPahoClientFactory factory) { MqttPahoMessageHandler handler = new MqttPahoMessageHandler ( mqttProperties. getOutClientId ( ) , factory) ; handler. setAsync ( true ) ; handler. setConverter ( new DefaultPahoMessageConverter ( ) ) ; handler. setDefaultTopic ( mqttProperties. getDefaultTopic ( ) . split ( "," ) [ 0 ] ) ; return IntegrationFlows . from ( outboundChannel) . handle ( handler) . get ( ) ; } }
mqtt接收的消息处理类
import com. bsj. studentcard. gateway. attendance. mqtt. annotation. MqttService ;
import com. bsj. studentcard. gateway. attendance. mqtt. annotation. MqttTopic ;
import com. bsj. studentcard. gateway. attendance. util. SpringUtils ;
import lombok. extern. slf4j. Slf4j ;
import org. springframework. integration. mqtt. support. MqttHeaders ;
import org. springframework. messaging. Message ;
import org. springframework. messaging. MessageHandler ;
import org. springframework. messaging. MessagingException ;
import org. springframework. stereotype. Component ;
import java. lang. reflect. InvocationTargetException ;
import java. lang. reflect. Method ;
import java. util. Map ;
@Component
@Slf4j
public class MqttMessageHandle implements MessageHandler { public static Map < String , Object > mqttServices; @Override public void handleMessage ( Message < ? > message) throws MessagingException { getMqttTopicService ( message) ; } public Map < String , Object > getMqttServices ( ) { if ( mqttServices == null ) { mqttServices = SpringUtils . getBeansByAnnotation ( MqttService . class ) ; } return mqttServices; } public void getMqttTopicService ( Message < ? > message) { String receivedTopic = ( String ) message. getHeaders ( ) . get ( MqttHeaders . RECEIVED_TOPIC) ; if ( receivedTopic == null || "" . equals ( receivedTopic) ) { return ; } for ( Map. Entry < String , Object > entry : getMqttServices ( ) . entrySet ( ) ) { Class < ? > clazz = entry. getValue ( ) . getClass ( ) ; Method [ ] methods = clazz. getDeclaredMethods ( ) ; for ( Method method : methods) { if ( method. isAnnotationPresent ( MqttTopic . class ) ) { MqttTopic handleTopic = method. getAnnotation ( MqttTopic . class ) ; if ( isMatch ( receivedTopic, handleTopic. value ( ) ) ) { try { method. invoke ( SpringUtils . getBean ( clazz) , message) ; return ; } catch ( IllegalAccessException e) { e. printStackTrace ( ) ; } catch ( InvocationTargetException e) { log. error ( "执行 {} 方法出现错误" , handleTopic. value ( ) , e) ; } } } } } } public static boolean isMatch ( String topic, String pattern) { if ( ( topic == null ) || ( pattern == null ) ) { return false ; } if ( topic. equals ( pattern) ) { return true ; } if ( "#" . equals ( pattern) ) { return true ; } String [ ] splitTopic = topic. split ( "/" ) ; String [ ] splitPattern = pattern. split ( "/" ) ; boolean match = true ; for ( int i = 0 ; i < splitPattern. length; i++ ) { if ( ! "#" . equals ( splitPattern[ i] ) ) { if ( i >= splitTopic. length) { match = false ; break ; } if ( ! splitTopic[ i] . equals ( splitPattern[ i] ) && ! "+" . equals ( splitPattern[ i] ) ) { match = false ; break ; } } else { break ; } } return match; }
}
对应的MqttService注解和MqttTopic注解
import org. springframework. core. annotation. AliasFor ;
import org. springframework. stereotype. Component ; import java. lang. annotation. * ;
@Documented
@Target ( { ElementType . TYPE} )
@Retention ( RetentionPolicy . RUNTIME)
@Component
public @interface MqttService { @AliasFor ( annotation = Component . class ) String value ( ) default "" ;
}
import java. lang. annotation. ElementType ;
import java. lang. annotation. Retention ;
import java. lang. annotation. RetentionPolicy ;
import java. lang. annotation. Target ;
@Target ( ElementType . METHOD)
@Retention ( RetentionPolicy . RUNTIME)
public @interface MqttTopic { String value ( ) default "" ; }
MqttGateway 发送消息
import org. springframework. integration. annotation. MessagingGateway ;
import org. springframework. integration. mqtt. support. MqttHeaders ;
import org. springframework. messaging. handler. annotation. Header ;
import org. springframework. stereotype. Component ; @Component
@MessagingGateway ( defaultRequestChannel = "mqttOutboundChannel" )
public interface MqttGateway { void sendToMqtt ( @Header ( MqttHeaders . TOPIC) String topic, String data) ; void sendToMqtt ( @Header ( MqttHeaders . TOPIC) String topic, @Header ( MqttHeaders . QOS) Integer qos, String data) ;
}
指定topic接收处理方法
@MqttService
@Slf4j
@RequiredArgsConstructor
public class MqttTopicHandle { private final MqttGateway mqttGateway; @MqttTopic ( "mqtt/face/basic" ) public void basic ( Message < ? > message) throws MqttException { String receivedTopic = ( String ) message. getHeaders ( ) . get ( MqttHeaders . RECEIVED_TOPIC) ; String payload = ( String ) message. getPayload ( ) ; log. info ( "接收到的topic为:{},内容:{}" , receivedTopic, payload ) ; mqttGateway. sendToMqtt ( topic, 0 , "收到消息!" ) ; }
}