本文主要是介绍springboot整合mqtt的步骤示例详解,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
《springboot整合mqtt的步骤示例详解》MQTT(MessageQueuingTelemetryTransport)是一种轻量级的消息传输协议,适用于物联网设备之间的通信,本文介绍Sprin...
使用场景:
mqtt可用于消息发送接收,一方面完成系统解耦,一方面可用于物联网设备的数据采集和指令控制
话不多说,下面直接干货
1、引入依赖包
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency> <dependency> <groupId>com.fasterXML.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency>
2、yml配置
若需要搭建mqtt服务教程,留言我下期出哦!
spring: application: name: device-control profiles: active: local device: mqtt: enable: true username: admin password: 123456 host-url: tcp://192.168.1.12:1883 # mqtt服务连接tcp地址 in-client-id: ${random.value} # 随机值,使出入站 client ID 不同 out-client-id: ${random.value} client-id: ${random.int} # 客户端Id,不能相同,采用随机数 ${random.value} default-topic: pubDevice # 默认主题 timeout: 60 # 超时时间 keepalive: 60 # 保持连接 clearSession: true # 清除会话(设置为false,断开连接,重连后使用原来的会话 保留订阅的主题,能接收离线期间的消息)
3、创建配置
创建MqttAutoConfiguration
import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.springframework.boot.autoconfigure.AutoConfiguration; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; import org.springframework.integration.annotation.IntegrationComponentScan; import org.springframework.integration.channel.ExecutorChannel; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.dsl.IntegrationFlows; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; import org.springframework.integration.mqtt.support.DefaultPahoMessage编程Converter; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import Javax.annotation.Resource; import java.util.concurrent.ThreadPoolExecutor; @AutoConfiguration @ConditionalOnProperty(value = "device.mqtt.enable", havingValue = "true") @IntegrationComponentScan public class MqttAutoConfiguration { @Resource MqttProperties mqttProperties; @Resource MqttMessageHandle mqttMessageHandle; /** * Mqtt 客户端工厂 所有客户端从这里产生 * @return */ @Bean public MqttPahoClientFactory mqttPahoClientFactory(){ DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); MqttConnectOptions options = new MqttConnectOptions(); options.setServerURIs(mqttProperties.getHostUrl().split(",")); options.setUserName(mqttProperties.getUsername()); options.setPassword(mqttProperties.getPassword().toCharArray()); factory.setConnectionOptions(options); return factory; } /** * Mqtt 管道适配器 * @param factory * @return */ @Bean public MqttPahoMessageDrivenChannelAdapter adapter(MqttPahoClientFactory factory){ return new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getInClientId(),factory,mqttProperties.getDefaultTopic().split(",")); } /** * 消息生产者 (接收,处理来自mqtt的消息) * @param adapter * @return */ @Bean public IntegrationFlow mqttInbound(MqttPahoMessageDrivenChannelAdapter adapter) { adapter.setCompletionTimeout(5000); adapter.setQos(1); return IntegrationFlows.from( adapter) .channel(new ExecutorChannel(mqttThreadPoolTaskExecutor())) .handle(mqttMessageHandle) .get(); } @Bean public ThreadPoolTaskExecutor mqttThreadPoolTaskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // 最大可创建的线程数 int maxPoolSize = 200; executor.setMaxPoolSize(maxPoolSize); // 核心线程池大小 int corePoolSize = 50; executor.setCorePoolSize(corePoolSize); // 队列最大长度 int queueCapacity = 1000; executor.setQueueCapacity(queueCapacity); // 线程池维护线程所允许的空闲时间 int keepAliveSeconds = 300; executor.setKeepAliveSeconds(keepAliveSeconds); // 线程池对拒绝任务(无线程可用)的处理策略 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.initialize(); return executor; } /** * 出站处理器 (向 mqtt 发送消息) * @param fact编程ory * @return */ @Bean public IntegrationFlow mqttOutboundFlow(MqttPahoClientFactory factory) { MqttPahoMessageHandler handler = new MqttPahoMessageHandler(mqttProperties.getOutClientId(),factory); handler.setAsync(true); handler.setConverter(new DefaultPahoMessageConverter()); handler.setDefaultTopic(mqttProperties.getDefaultTopic().split(",")[0]); return IntegrationFlows.from( "mqttOutboundChannel").handle(handler).get(); } }
创建MqttGateway
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Lazy; import org.springframework.integration.annotation.MessagingGateway; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component; @Component @Lazy @ConditionalOnProperty(value = "device.mqtt.enable", havingValue = "true") @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") public interface MqttGateway { /** * @param topic String * @param data String * @return void * @throws * @description <description you method purpose> * @author lwt * @time 2024/1/24 09:29 */ void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String data); /** * @param topic String * @param Qos Integer * @param data String * @return void * @throws * @description <description you method purpose> * @author lwt * @time 2024/1/24 09:31 */ void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,python @Header(MqttHeaders.QOS) Integer Qos, String data); }
创建MqttMessageHandle
import cn.hutool.extra.spring.SpringUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.autoconfigure.AutoConfiguration; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.Map; @Slf4j @AutoConfiguration public class MqttMessageHandle implements MessageHandler { public static Map<String, Object> mqttServices; @Override public void handleMessage(Message<?> message) throws MessagingException { getMqttTopicService(message); } public Map<String, Object> getMqttServices() { if (mqttServices == null) { mqttServices = SpringUtil.getConfigurableBeanFactory().getBeansWithAnnotation(MqttService.class); } return mqttServices; } public void getMqttTopicService(Message<?> message) { // 在这里 我们根据不同的 主题 分发不同的消息 String receivedTopic = message.getHeaders().get("mqtt_receivedTopic", String.class); if (receivedTopic == null || "".equals(receivedTopic)) { return; } //updateTopicStatus(receivedTopic); for (Map.Entry<String, Object> entry : getMqttServices().entrySet()) { // 把所有带有 @MqttService 的类遍历 Class<?> clazz = entry.getValue().getClass(); // 获取他所有方法 Method[] methods = clazz.getSuperclass().getDeclaredMethods(); for (Method method : methods) { if (method.isAnnotationPresent(MqttTopic.class)) { // 如果这个方法有 这个注解 MqttTopic handleTopic = method.getAnnotation(MqttTopic.class); if (isMatch(receivedTopic, handleTopic.value())) { // 并且 这个 topic 匹配成功 try { method.invoke(SpringUtil.getBean(clazz),receivedTopic, message); return; } catch (IllegalAccessException e) { e.printStackTrace(); log.error("代理炸了"); } catch (InvocationTargetException e) { log.error("执行 {} 方法出现错误", handleTopic.value(), e); } } } } } } /** * mqtt 订阅的主题与我实际的主题是否匹配 * @param topic 是实际的主题 * @param pattern 是我订阅的主题 可以是通配符模式 * @return 是否匹配 */ public static boolean isMatch(String topic, String pattern) { if ((topic == null) || (pattern == null)) { return false; } if (topic.equals(pattern)) { // 完全相等是肯定匹配的 return true; } if ("#".equals(pattern)) { // # 号代表所有主题 肯定匹配的 return true; } String[] splitTopic = topic.split("_"); String[] splitPattern = pattern.split("_"); boolean match = true; // 如果包含 # 则只需要判断 # 前面的 for (int i = 0; i < splitPattern.length; i++) { if (!"#".equals(splitPattern[i])) { // 不是# 号 正常判断 if (i >= splitTopic.length) { // 此时长度不相等 不匹配 match = false; break; } if (!splitTopic[i].equals(splitPattern[i]) && !"+".equals(splitPattern[i])) { // 不相等 且不等于 + match = false; break; } } else { // 是# 号 肯定匹配的 break; } } return match; } }
创建MqttProperties
import lombok.Data; import org.springframework.boot.autoconfigure.AutoConfiguration; import org.springframework.boot.context.properties.ConfigurationProperties; @ConfigurationProperties(prefix = "device.mqtt") @Data @AutoConfiguration public class MqttProperties { /** * 用户名 */ private String username; /** * 密码 */ private String password; /** * 连接地址 */ private String hostUrl; /** * 进-客户Id */ private String inClientId; /** * 出-客户Id */ private String outClientId; /** * 客户Id */ private String clientId; /** * 默认连接话题 */ private String defaultTopic; /** * 超时时间 */ private int timeout; /** * 保持连接数 */ private int keepalive; /**是否清除session*/ private boolean clearSession; }
创建MqttConstants
public classChina编程 MqttConstants { public static final Shttp://www.chinasem.cntring MQTT_DEVICE_INFO = "mqtt:device:info"; public static final String TOPIC_PUB_DEVICE = "pubDevice"; public static final String TOPIC_SUB_DEVICE = "subDevice"; }
创建初始化
import lombok.extern.slf4j.Slf4j; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.Resource; @Slf4j @Component @ConditionalOnProperty(value = "device.mqtt.enable", havingValue = "true") public class InitMqttSubscriberTopic { @Resource MqttSubscriberService mqttSubscriberService; @PostConstruct public void initSubscriber() { try { mqttSubscriberService.addTopic(MqttConstants.TOPIC_PUB_DEVICE); } catch (Exception e) { e.printStackTrace(); throw new RuntimeException(e); } } }
4、自定义注解
创建MqttService
import org.springframework.core.annotation.AliasFor; import org.springframework.stereotype.Component; import java.lang.annotation.*; @Documented @Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Component public @interface MqttService { @AliasFor(annotation = Component.class) String value() default ""; }
创建MqttTopic
import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) public @interface MqttTopic { /** * 主题名字 */ String value() default ""; }
创建如图:
6、使用示例
import cn.hutool.extra.spring.SpringUtil; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.springframework.messaging.Message; import org.springframework.transaction.annotation.Transactional; @Slf4j @MqttService public class MqttTopicHandle { /** * 监听到指定主题的消息 * @param topic * @param message */ @SneakyThrows @MqttTopic("pubDevice") @Transactional(rollbackFor = Exception.class) public void receive(String topic, Message<?> message) { log.info("message:{}", message.getPayload()); String value = message.getPayload().toString(); // 进行逻辑处理 } /** * 发送消息到指定主题 * @param topic * @param message */ @Transactional(rollbackFor = Exception.class) public Boolean send(String topic, String message) { try { MqttGateway mqttGateway = SpringUtil.getBean(MqttGateway.class); mqttGateway.sendToMqtt(topic,message); } catch (Exception e){ return false; } return true; } }
到此这篇关于springboot整合mqtt的文章就介绍到这了,更多相关springboot整合mqtt内容请搜索China编程(www.chinasem.cn)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程China编程(www.chinasem.cn)!
这篇关于springboot整合mqtt的步骤示例详解的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!