本文主要是介绍MQTT SpringBoot整合实战教程,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
《MQTTSpringBoot整合实战教程》:本文主要介绍MQTTSpringBoot整合实战教程,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考...
MQTT-SpringBoot
创建简单 SpringBoot 项目
导入必须依赖
pom.XML
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.study</groupId> <artifactId>MqttDemo</artifactId> <version>0.0.1-SNAPSHOT</version> <name>SpringBootMqttDemo</name> <description>SpringBootMqttDemo</description> <properties> <java.version>1.8</java.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <spring-boot.version>2.6.13</spring-boot.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!-- spring boot项目web开发的起步依赖 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- spring boot项目集成消息中间件基础依赖 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <!-- spring boot项目和mqtt客户端集成起步依赖 --> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> <version>5.4.3</version> </dependency> <!-- lombok依赖 --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <!-- fastjson依赖 --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.83</version> </dependency> python </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-dependencies</artifactId> <version>${spring-boot.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.1</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <version>${spring-boot.version}</version> <configuration> <mainClass&gjst;com.study.mqtt.demo.MqttDemoApplication</mainClass> <skip>true</skip> </configuration> <executions> <execution> <id>repackage</id> <goals> <goal>repackage</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
增加MQTT相关配置
application.yml
spring: mqtt: # mqtt 服务器地址 url: tcp://192.168.40.128:1883 # 订阅客户端ID subClientId: sub_client_id_1 # 订阅主题 subTopic: lq/iot/demo/ # 发布客户端ID pubClientId: pub_client_id_1 # 用户名 username: admin # 密码 password: admin123456
编写对应Java类
配置类
MqttConfig.java
package com.study.mqtt.demo.domain; import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; @Data @ConfigurationProperties(prefix = "spring.mqtt") public class MqttConfig { private String username; private String password; private String url; private String subClientId ; private String subTopic ; private String pubClientId ; }
启动类增加开启配置
MqttDemoApplication.java
package com.study.mqtt.demo; import com.study.mqtt.demo.domain.MqttConfig; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.properties.EnableConfigurationProperties; @SpringBootApplication @EnableConfigurationProperties(value = MqttConfig.class) public class MqttDemoApplication { public static void main(String[] args) { SpringApplication.run(MqttDemoApplication.class, args); } }
创建MQTT连接工厂类
MqttFactory.java
package com.study.mqtt.demo.factory; import com.study.mqtt.demo.domain.MqttConfig; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; @Configuration public class MqttFactory { @Autowired private MqttConfig mqttConfig; @Bean public MqttPahoClientFactory mqttClientFactory() { // 创建客户端工厂 DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); MqttConnectOptions options = new MqttConnectOptions(); options.setUserName(mqttConfig.getUsername()); options.setPassword(mqttConfig.getPassword().toCharArray()); options.setServerURIs(new String[]{mqttConfig.getUrl()}); options.setCleanSession(true); factory.setConnectionOptions(options); return factory; } }
接收消息处理类
ReceiveMsgHandler.java
package com.study.mqtt.demo.handler; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.MessagingException; import org.springframework.stereotype.Component; @Component public class ReceiveMsgHandler implements MessageHandler { @Override public void handleMessage(Message<?> message) throws MessagingException { System.out.println("接收到消息对象:" + message); // 消息内容 Object payload = message.getPayload(); MessageHeaders headers = message.getHeaders(); Object mqttReceivedTopic = headers.get("mqtt_receivedTopic"); System.out.println("接收的消息主题:" + mqttReceivedTopic); System.out.println("接收的消息内容:" + payload); } }
接收消息配置类
MqttInboundConfig.java
package com.study.mqtt.demo.inbound; import com.study.mqtt.demo.domain.MqttConfig; import com.study.mqtt.demo.factory.MqttFactory; import com.study.mqtt.demo.handler.ReceiveMsgHandler; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.core.MessageProducer; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; @Configuration public class MqttInboundConfig { @Autowired privateChina编程 MqttConfig mqttConfig ; @Autowired private ReceiveMsgHandler receiveMsgHandler; /** * 配置消息接收通道 * @return */ @Bean public MessageChannel mqttInputChannel() { return new DirectChannel(); } /** * 配置接收适配器 */ @Bean public MessageProducer messageProducer(MqttPahoClientFactory mqttPahoClientFactory) { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getUrl() , mqttConfig.getSubClientId() , mqttPahoClientFactory , mqttConfig.getSubTopic().split(",")) ; adapter.setConverter(new DefaultPahoMessageConverter()); // 质量服务等级 adapter.setQos(1); adapter.setOutputChannel(mqttInputChannel()); return adapter ; } /** * 配置接收消息处理器 * @return */ @Bean @ServiceActivator(inputChannel = "mqttInputChannel") // 指定处理消息使用得通道 public MessageHandler messageHandler() { return this.receiveMsgHandler ; } }
发送消息配置类
MqttOutbouwww.chinasem.cnndConfig.java
package com.study.mqtt.demo.outbound; import com.study.mqtt.demo.domain.MqttConfig; import org.springframework.beans.factory.annowww.chinasem.cntation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; @Configuration public class MqttOutboundConfig { @Autowired private MqttConfig mqttConfig; @Autowired private MqttPahoClientFactory pahoClientFactory ; @Bean public MessageChannel mqttOutputChannel() { return new DirectChannel(); } @Bean @ServiceActivator(inputChannel = "mqttOutputChannel") public MessageHandler mqttOutboundMassageHandler() { MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttConfig.getUrl() , mqttConfig.getPubClientId() , pahoClientFactory ) ; messageHandler.setAsync(true); messageHandler.setDefaultQos(0); messageHandler.setDefaultTopic("default"); return messageHandler ; } }
发送消息网关接口类
MqttGateway.java
package com.study.mqtt.demo.gateway; import org.springframework.integration.annotation.MessagingGateway; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component; @MessagingGateway(defaultRequestChannel = "mqttOutputChannel") public interface MqttGateway { /** * 发送mqtt消息 * @param topic 主题 * @param payload 内容 */ void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload); /** * 发送包含qos的消息 * @param topic 主题 * @param qos 对消息处理的几种机制。 * * 0 发送成功就算完成,会出现消息丢失 * * 1 增加消息重试机制,消息发送失败会重新发送,会出现重复消息 * * 2 多了一次去重的动作,确保只有一次消息推给订阅者。 * @param payload 消息体 */ void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload); }
发送消息服务类
MqttMsgSenderService.java
package com.study.mqtt.demo.service; import com.study.mqtt.demo.gateway.MqttGateway; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service public class MqttMsgSenderService { @Autowired private MqttGateway mqttGateway; public void send(String topic, String payload) { mqttGateway.sendToMqtt(topic, payload); } public void send(String topic, int qos, String payload) { mqttGateway.sendToMqtt(topic, qos, payload); } }
测试验证
订阅消息验证
启动项目
发送消息
- 主题为配置文件中配置的订阅主题
lq/iot/demo/
- 发送时间:
2025-05-25 21:29:26:439
订阅收到消息
- 接收到消息的时间:
Sun May 25 21:29:26 GMT+08:00 2025
- 接收到的主题:
lq/iot/demo/
- 接收到的内容:
{ "msg":"spring boot mqtt demo" }
发送消息验证
- 编写测试类
- 发送主题:
sb/mqtt/test
- 发送内容:
hello world !=> 当前时间
- 发送主题:
package com.study.mqtt.demo; import com.study.mqtt.demo.service.MqttMsgSenderService; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import java.util.Date; @SpringBootTest(classes = MqttDemoApplication.class) class MqttDemoApplicationTests { @Autowired private MqttMsgSenderService mqttMsgSenderService; @Test void contextLoads() { } @Test void sendMsg(){ mqttMsgSenderService.send("sb/mqtt/test", "hello world ! => " + new Date()); } }
创建订阅者
订阅主题: sb/mqtt/test
运行测试类
订阅者接收消息
主题:sb/mqtt/test
到此这篇关于MQTT SpringBoot整合的文章就介绍到这了,更多相关MQTT SpringBoot整合内容请搜索编程China编程(www.chinasem.cn)以前的文章或继续浏览下面的相关文章希望大家以后多多支持China编程(www.chinasem.cn)!
这篇关于MQTT SpringBoot整合实战教程的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!