springboot整合mqtt的步骤示例详解

2025-08-14 22:50

本文主要是介绍springboot整合mqtt的步骤示例详解,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

《springboot整合mqtt的步骤示例详解》MQTT(MessageQueuingTelemetryTransport)是一种轻量级的消息传输协议,适用于物联网设备之间的通信,本文介绍Sprin...

使用场景:
mqtt可用于消息发送接收,一方面完成系统解耦,一方面可用于物联网设备的数据采集和指令控制
话不多说,下面直接干货

1、引入依赖包

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>
        <dependency>
            <groupId>com.fasterXML.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>

2、yml配置

若需要搭建mqtt服务教程,留言我下期出哦!

spring:
  application:
    name: device-control
  profiles:
    active: local
device:
  mqtt:
    enable: true
    username: admin
    password: 123456
    host-url: tcp://192.168.1.12:1883         # mqtt服务连接tcp地址
    in-client-id: ${random.value}              # 随机值,使出入站 client ID 不同
    out-client-id: ${random.value}
    client-id: ${random.int}                   # 客户端Id,不能相同,采用随机数 ${random.value}
    default-topic: pubDevice                      # 默认主题
    timeout: 60                                # 超时时间
    keepalive: 60                              # 保持连接
    clearSession: true                         # 清除会话(设置为false,断开连接,重连后使用原来的会话 保留订阅的主题,能接收离线期间的消息)

3、创建配置

创建MqttAutoConfiguration

import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.channel.ExecutorChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessage编程Converter;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import Javax.annotation.Resource;
import java.util.concurrent.ThreadPoolExecutor;
@AutoConfiguration
@ConditionalOnProperty(value = "device.mqtt.enable", havingValue = "true")
@IntegrationComponentScan
public class MqttAutoConfiguration {
    @Resource
    MqttProperties mqttProperties;
    @Resource
    MqttMessageHandle mqttMessageHandle;
    /**
     * Mqtt 客户端工厂 所有客户端从这里产生
     * @return
     */
    @Bean
    public MqttPahoClientFactory mqttPahoClientFactory(){
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(mqttProperties.getHostUrl().split(","));
        options.setUserName(mqttProperties.getUsername());
        options.setPassword(mqttProperties.getPassword().toCharArray());
        factory.setConnectionOptions(options);
        return factory;
    }
    /**
     * Mqtt 管道适配器
     * @param factory
     * @return
     */
    @Bean
    public MqttPahoMessageDrivenChannelAdapter adapter(MqttPahoClientFactory factory){
        return new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getInClientId(),factory,mqttProperties.getDefaultTopic().split(","));
    }
    /**
     * 消息生产者 (接收,处理来自mqtt的消息)
     * @param adapter
     * @return
     */
    @Bean
    public IntegrationFlow mqttInbound(MqttPahoMessageDrivenChannelAdapter adapter) {
        adapter.setCompletionTimeout(5000);
        adapter.setQos(1);
        return IntegrationFlows.from( adapter)
                .channel(new ExecutorChannel(mqttThreadPoolTaskExecutor()))
                .handle(mqttMessageHandle)
                .get();
    }
    @Bean
    public ThreadPoolTaskExecutor mqttThreadPoolTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 最大可创建的线程数
        int maxPoolSize = 200;
        executor.setMaxPoolSize(maxPoolSize);
        // 核心线程池大小
        int corePoolSize = 50;
        executor.setCorePoolSize(corePoolSize);
        // 队列最大长度
        int queueCapacity = 1000;
        executor.setQueueCapacity(queueCapacity);
        // 线程池维护线程所允许的空闲时间
        int keepAliveSeconds = 300;
        executor.setKeepAliveSeconds(keepAliveSeconds);
        // 线程池对拒绝任务(无线程可用)的处理策略
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }
    /**
     * 出站处理器 (向 mqtt 发送消息)
     * @param fact编程ory
     * @return
     */
    @Bean
    public IntegrationFlow mqttOutboundFlow(MqttPahoClientFactory factory) {
        MqttPahoMessageHandler handler = new MqttPahoMessageHandler(mqttProperties.getOutClientId(),factory);
        handler.setAsync(true);
        handler.setConverter(new DefaultPahoMessageConverter());
        handler.setDefaultTopic(mqttProperties.getDefaultTopic().split(",")[0]);
        return IntegrationFlows.from( "mqttOutboundChannel").handle(handler).get();
    }
}

创建MqttGateway

import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Lazy;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
@Component
@Lazy
@ConditionalOnProperty(value = "device.mqtt.enable", havingValue = "true")
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
    /**
     * @param topic String
     * @param data  String
     * @return void
     * @throws
     * @description <description you method purpose>
     * @author lwt
     * @time 2024/1/24 09:29
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String data);
    /**
     * @param topic String
     * @param Qos   Integer
     * @param data  String
     * @return void
     * @throws
     * @description <description you method purpose>
     * @author lwt
     * @time 2024/1/24 09:31
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,python @Header(MqttHeaders.QOS) Integer Qos, String data);
}

创建MqttMessageHandle

import cn.hutool.extra.spring.SpringUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Map;
@Slf4j
@AutoConfiguration
public class MqttMessageHandle implements MessageHandler {
    public static Map<String, Object> mqttServices;
    @Override
    public void handleMessage(Message<?> message) throws MessagingException {
        getMqttTopicService(message);
    }
    public Map<String, Object> getMqttServices() {
        if (mqttServices == null) {
            mqttServices = SpringUtil.getConfigurableBeanFactory().getBeansWithAnnotation(MqttService.class);
        }
        return mqttServices;
    }
    public void getMqttTopicService(Message<?> message) {
        // 在这里 我们根据不同的 主题 分发不同的消息
        String receivedTopic = message.getHeaders().get("mqtt_receivedTopic", String.class);
        if (receivedTopic == null || "".equals(receivedTopic)) {
            return;
        }
        //updateTopicStatus(receivedTopic);
        for (Map.Entry<String, Object> entry : getMqttServices().entrySet()) {
            // 把所有带有 @MqttService 的类遍历
            Class<?> clazz = entry.getValue().getClass();
            // 获取他所有方法
            Method[] methods = clazz.getSuperclass().getDeclaredMethods();
            for (Method method : methods) {
                if (method.isAnnotationPresent(MqttTopic.class)) {
                    // 如果这个方法有 这个注解
                    MqttTopic handleTopic = method.getAnnotation(MqttTopic.class);
                    if (isMatch(receivedTopic, handleTopic.value())) {
                        // 并且 这个 topic 匹配成功
                        try {
                            method.invoke(SpringUtil.getBean(clazz),receivedTopic, message);
                            return;
                        } catch (IllegalAccessException e) {
                            e.printStackTrace();
                            log.error("代理炸了");
                        } catch (InvocationTargetException e) {
                            log.error("执行 {} 方法出现错误", handleTopic.value(), e);
                        }
                    }
                }
            }
        }
    }
    /**
     * mqtt 订阅的主题与我实际的主题是否匹配
     * @param topic   是实际的主题
     * @param pattern 是我订阅的主题 可以是通配符模式
     * @return 是否匹配
     */
    public static boolean isMatch(String topic, String pattern) {
        if ((topic == null) || (pattern == null)) {
            return false;
        }
        if (topic.equals(pattern)) {
            // 完全相等是肯定匹配的
            return true;
        }
        if ("#".equals(pattern)) {
            // # 号代表所有主题  肯定匹配的
            return true;
        }
        String[] splitTopic = topic.split("_");
        String[] splitPattern = pattern.split("_");
        boolean match = true;
        // 如果包含 # 则只需要判断 # 前面的
        for (int i = 0; i < splitPattern.length; i++) {
            if (!"#".equals(splitPattern[i])) {
                // 不是# 号 正常判断
                if (i >= splitTopic.length) {
                    // 此时长度不相等 不匹配
                    match = false;
                    break;
                }
                if (!splitTopic[i].equals(splitPattern[i]) && !"+".equals(splitPattern[i])) {
                    // 不相等 且不等于 +
                    match = false;
                    break;
                }
            } else {
                // 是# 号  肯定匹配的
                break;
            }
        }
        return match;
    }
}

创建MqttProperties

import lombok.Data;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.context.properties.ConfigurationProperties;
@ConfigurationProperties(prefix = "device.mqtt")
@Data
@AutoConfiguration
public class MqttProperties {
    /**
     * 用户名
     */
    private String username;
    /**
     * 密码
     */
    private String password;
    /**
     * 连接地址
     */
    private String hostUrl;
    /**
     * 进-客户Id
     */
    private String inClientId;
    /**
     * 出-客户Id
     */
    private String outClientId;
    /**
     * 客户Id
     */
    private String clientId;
    /**
     * 默认连接话题
     */
    private String defaultTopic;
    /**
     * 超时时间
     */
    private int timeout;
    /**
     * 保持连接数
     */
    private int keepalive;
    /**是否清除session*/
    private boolean clearSession;
}

创建MqttConstants

public classChina编程 MqttConstants {
    public static final Shttp://www.chinasem.cntring MQTT_DEVICE_INFO = "mqtt:device:info";
    public static final String TOPIC_PUB_DEVICE = "pubDevice";
    public static final String TOPIC_SUB_DEVICE = "subDevice";
}

创建初始化

import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
@Slf4j
@Component
@ConditionalOnProperty(value = "device.mqtt.enable", havingValue = "true")
public class InitMqttSubscriberTopic {
    @Resource
    MqttSubscriberService mqttSubscriberService;
    @PostConstruct
    public void initSubscriber() {
        try {
            mqttSubscriberService.addTopic(MqttConstants.TOPIC_PUB_DEVICE);
        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }
    }
}

4、自定义注解

创建MqttService

import org.springframework.core.annotation.AliasFor;
import org.springframework.stereotype.Component;
import java.lang.annotation.*;
@Documented
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface MqttService {
    @AliasFor(annotation = Component.class)
    String value() default "";
}

创建MqttTopic

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface MqttTopic {
    /**
     * 主题名字
     */
    String value() default "";
}

创建如图:

springboot整合mqtt的步骤示例详解

6、使用示例

import cn.hutool.extra.spring.SpringUtil;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.messaging.Message;
import org.springframework.transaction.annotation.Transactional;
@Slf4j
@MqttService
public class MqttTopicHandle {
    /**
     * 监听到指定主题的消息
     * @param topic
     * @param message
     */
    @SneakyThrows
    @MqttTopic("pubDevice")
    @Transactional(rollbackFor = Exception.class)
    public void receive(String topic, Message<?> message) {
        log.info("message:{}", message.getPayload());
        String value = message.getPayload().toString();
        // 进行逻辑处理
    }
    /**
     * 发送消息到指定主题
     * @param topic
     * @param message
     */
    @Transactional(rollbackFor = Exception.class)
    public Boolean send(String topic, String message) {
        try {
            MqttGateway mqttGateway = SpringUtil.getBean(MqttGateway.class);
            mqttGateway.sendToMqtt(topic,message);
        } catch (Exception e){
            return false;
        }
        return true;
    }
}

到此这篇关于springboot整合mqtt的文章就介绍到这了,更多相关springboot整合mqtt内容请搜索China编程(www.chinasem.cn)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程China编程(www.chinasem.cn)!

这篇关于springboot整合mqtt的步骤示例详解的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1155737

相关文章

MyBatis-Plus 与 Spring Boot 集成原理实战示例

《MyBatis-Plus与SpringBoot集成原理实战示例》MyBatis-Plus通过自动配置与核心组件集成SpringBoot实现零配置,提供分页、逻辑删除等插件化功能,增强MyBa... 目录 一、MyBATis-Plus 简介 二、集成方式(Spring Boot)1. 引入依赖 三、核心机制

MySQL设置密码复杂度策略的完整步骤(附代码示例)

《MySQL设置密码复杂度策略的完整步骤(附代码示例)》MySQL密码策略还可能包括密码复杂度的检查,如是否要求密码包含大写字母、小写字母、数字和特殊字符等,:本文主要介绍MySQL设置密码复杂度... 目录前言1. 使用 validate_password 插件1.1 启用 validate_passwo

Java高效实现Word转PDF的完整指南

《Java高效实现Word转PDF的完整指南》这篇文章主要为大家详细介绍了如何用Spire.DocforJava库实现Word到PDF文档的快速转换,并解析其转换选项的灵活配置技巧,希望对大家有所帮助... 目录方法一:三步实现核心功能方法二:高级选项配置性能优化建议方法补充ASPose 实现方案Libre

Java List 使用举例(从入门到精通)

《JavaList使用举例(从入门到精通)》本文系统讲解JavaList,涵盖基础概念、核心特性、常用实现(如ArrayList、LinkedList)及性能对比,介绍创建、操作、遍历方法,结合实... 目录一、List 基础概念1.1 什么是 List?1.2 List 的核心特性1.3 List 家族成

Go中select多路复用的实现示例

《Go中select多路复用的实现示例》Go的select用于多通道通信,实现多路复用,支持随机选择、超时控制及非阻塞操作,建议合理使用以避免协程泄漏和死循环,感兴趣的可以了解一下... 目录一、什么是select基本语法:二、select 使用示例示例1:监听多个通道输入三、select的特性四、使用se

Java 中编码与解码的具体实现方法

《Java中编码与解码的具体实现方法》在Java中,字符编码与解码是处理数据的重要组成部分,正确的编码和解码可以确保字符数据在存储、传输、读取时不会出现乱码,本文将详细介绍Java中字符编码与解码的... 目录Java 中编码与解码的实现详解1. 什么是字符编码与解码?1.1 字符编码(Encoding)1

Python Flask实现定时任务的不同方法详解

《PythonFlask实现定时任务的不同方法详解》在Flask中实现定时任务,最常用的方法是使用APScheduler库,本文将提供一个完整的解决方案,有需要的小伙伴可以跟随小编一起学习一下... 目录完js整实现方案代码解释1. 依赖安装2. 核心组件3. 任务类型4. 任务管理5. 持久化存储生产环境

详解Java中三种状态机实现方式来优雅消灭 if-else 嵌套

《详解Java中三种状态机实现方式来优雅消灭if-else嵌套》这篇文章主要为大家详细介绍了Java中三种状态机实现方式从而优雅消灭if-else嵌套,文中的示例代码讲解详细,感兴趣的小伙伴可以跟... 目录1. 前言2. 复现传统if-else实现的业务场景问题3. 用状态机模式改造3.1 定义状态接口3

Java集合中的链表与结构详解

《Java集合中的链表与结构详解》链表是一种物理存储结构上非连续的存储结构,数据元素的逻辑顺序的通过链表中的引用链接次序实现,文章对比ArrayList与LinkedList的结构差异,详细讲解了链表... 目录一、链表概念与结构二、当向单链表的实现2.1 准备工作2.2 初始化链表2.3 打印数据、链表长

Linux查询服务器 IP 地址的命令详解

《Linux查询服务器IP地址的命令详解》在服务器管理和网络运维中,快速准确地获取服务器的IP地址是一项基本但至关重要的技能,下面我们来看看Linux中查询服务器IP的相关命令使用吧... 目录一、hostname 命令:简单高效的 IP 查询工具命令详解实际应用技巧注意事项二、ip 命令:新一代网络配置全