springboot整合mqtt向EMQX发送信息

2024-05-29 08:08

本文主要是介绍springboot整合mqtt向EMQX发送信息,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一 spingboot整合mqtt

原理:

 二 操作案例

2.1 工程结构

 2.2 配置pom文件

<dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.13</version><scope>test</scope></dependency><!-- mqtt --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-integration</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-stream</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId></dependency><!-- lombok --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.16.16</version></dependency><!-- springBoot的启动器 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><version>2.0.1.RELEASE</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-web</artifactId><version>5.1.5.RELEASE</version></dependency>

 2.3 配置application配置文件

server:port: 8081
spring:mqtt:username: admin							# 账号password: public						# 密码host-url: tcp://172.16.71.150:1883					# mqtt连接tcp地址client-id: mq-dky-0813						# 客户端Id,每个启动的id要不同default-topic: mq-dky-guolu					# 默认主题timeout: 100							# 超时时间keepalive: 100

2.4 读取配置文件,初始客户端

package com.ljf.mqtt.demo.config;import com.ljf.mqtt.demo.client.MqttPushClient;
import lombok.Getter;
import lombok.Setter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;/*** @ClassName: MqttConfig* @Description: TODO* @Author: liujianfu* @Date: 2021/08/16 14:43:39 * @Version: V1.0**/
@Component
@ConfigurationProperties("spring.mqtt")
@Setter
@Getter
public class MqttConfig {@Autowiredprivate MqttPushClient mqttPushClient;/*** 用户名*/private String username;/*** 密码*/private String password;/*** 连接地址*/private String hostUrl;/*** 客户Id*/private String clientId;/*** 默认连接话题*/private String defaultTopic;/*** 超时时间*/private int timeout;/*** 保持连接数*/private int keepalive;@Beanpublic MqttPushClient getMqttPushClient() {mqttPushClient.connect(hostUrl, clientId, username, password, timeout, keepalive);// 以/#结尾表示订阅所有以test开头的主题mqttPushClient.subscribe(defaultTopic, 0);return mqttPushClient;}
}

2.4 订阅推送客户端

package com.ljf.mqtt.demo.client;import com.ljf.mqtt.demo.listener.PushCallback;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;/*** @ClassName: MqttPushClient* @Description: TODO* @Author: liujianfu* @Date: 2021/08/16 14:48:38 * @Version: V1.0**/
@Component
public class MqttPushClient {private static final Logger logger = LoggerFactory.getLogger(MqttPushClient.class);@Autowiredprivate PushCallback pushCallback;private static MqttClient client;private static MqttClient getClient() {return client;}private static void setClient(MqttClient client) {MqttPushClient.client = client;}/*** 客户端连接** @param host      ip+端口* @param clientID  客户端Id* @param username  用户名* @param password  密码* @param timeout   超时时间* @param keepalive 保留数*/public void connect(String host, String clientID, String username, String password, int timeout, int keepalive) {MqttClient client;try {client = new MqttClient(host, clientID, new MemoryPersistence());MqttConnectOptions options = new MqttConnectOptions();options.setCleanSession(true);options.setUserName(username);options.setPassword(password.toCharArray());options.setConnectionTimeout(timeout);options.setKeepAliveInterval(keepalive);MqttPushClient.setClient(client);try {client.setCallback(pushCallback);client.connect(options);} catch (Exception e) {e.printStackTrace();}} catch (Exception e) {e.printStackTrace();}}/*** 发布** @param qos         连接方式* @param retained    是否保留* @param topic       主题* @param pushMessage 消息体*/public void publish(int qos, boolean retained, String topic, String pushMessage) {MqttMessage message = new MqttMessage();message.setQos(qos);message.setRetained(retained);message.setPayload(pushMessage.getBytes());MqttTopic mTopic = MqttPushClient.getClient().getTopic(topic);if (null == mTopic) {logger.error("topic not exist");}MqttDeliveryToken token;try {token = mTopic.publish(message);token.waitForCompletion();} catch (MqttPersistenceException e) {e.printStackTrace();} catch (MqttException e) {e.printStackTrace();}}/*** 订阅某个主题** @param topic 主题* @param qos   连接方式*/public void subscribe(String topic, int qos) {logger.info("==============开始订阅主题=========" + topic);try {MqttPushClient.getClient().subscribe(topic, qos);} catch (MqttException e) {e.printStackTrace();}}
}

2.5 定制监听订阅者

package com.ljf.mqtt.demo.listener;import com.ljf.mqtt.demo.config.MqttConfig;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;/*** @ClassName: PushCallback* @Description: TODO* @Author: liujianfu* @Date: 2021/08/16 14:52:20 * @Version: V1.0**/
@Component
public class PushCallback implements MqttCallback {private static final Logger logger = LoggerFactory.getLogger(PushCallback.class);@Autowiredprivate MqttConfig mqttConfig;private static MqttClient client;@Overridepublic void connectionLost(Throwable throwable) {// 连接丢失后,一般在这里面进行重连logger.info("连接断开,可以做重连");if (client == null || !client.isConnected()) {mqttConfig.getMqttPushClient();}}@Overridepublic void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {// subscribe后得到的消息会执行到这里面logger.info("接收消息主题 : " + topic);logger.info("接收消息Qos : " + mqttMessage.getQos());logger.info("接收消息内容 : " + new String(mqttMessage.getPayload()));}@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {logger.info("deliveryComplete---------" + iMqttDeliveryToken.isComplete());}
}

2.6 发布数据

package com.ljf.mqtt.demo.controller;import com.ljf.mqtt.demo.client.MqttPushClient;
import com.ljf.mqtt.demo.utils.R;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;/*** @ClassName: PullController* @Description: TODO* @Author: liujianfu* @Date: 2021/08/16 14:56:18 * @Version: V1.0**/
@RestController
@RequestMapping("/")
public class PullController {@Autowiredprivate MqttPushClient mqttPushClient;/*** @author liujianfu* @description    测试发布主题* @date 2021/8/16 15:04* @param []* @return RUtils*/@GetMapping(value = "/publishTopic")public R publishTopic(String sendMessage) {System.out.println("message:"+sendMessage);sendMessage=sendMessage+" : {\"name\":\"ljf\",\"age\":345}";mqttPushClient.publish(0,false,"mq-dky-guolu",sendMessage);return R.ok("OK");}
}

2.7 发布数据

1.发布数据:

 2.订阅消费数据

 3.emqx页面

4.在页面进行模拟

连接

订阅

 推送:

java代码客户端:

这篇关于springboot整合mqtt向EMQX发送信息的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1013103

相关文章

SpringBoot 获取请求参数的常用注解及用法

《SpringBoot获取请求参数的常用注解及用法》SpringBoot通过@RequestParam、@PathVariable等注解支持从HTTP请求中获取参数,涵盖查询、路径、请求体、头、C... 目录SpringBoot 提供了多种注解来方便地从 HTTP 请求中获取参数以下是主要的注解及其用法:1

HTTP 与 SpringBoot 参数提交与接收协议方式

《HTTP与SpringBoot参数提交与接收协议方式》HTTP参数提交方式包括URL查询、表单、JSON/XML、路径变量、头部、Cookie、GraphQL、WebSocket和SSE,依据... 目录HTTP 协议支持多种参数提交方式,主要取决于请求方法(Method)和内容类型(Content-Ty

深度解析Java @Serial 注解及常见错误案例

《深度解析Java@Serial注解及常见错误案例》Java14引入@Serial注解,用于编译时校验序列化成员,替代传统方式解决运行时错误,适用于Serializable类的方法/字段,需注意签... 目录Java @Serial 注解深度解析1. 注解本质2. 核心作用(1) 主要用途(2) 适用位置3

深入浅出Spring中的@Autowired自动注入的工作原理及实践应用

《深入浅出Spring中的@Autowired自动注入的工作原理及实践应用》在Spring框架的学习旅程中,@Autowired无疑是一个高频出现却又让初学者头疼的注解,它看似简单,却蕴含着Sprin... 目录深入浅出Spring中的@Autowired:自动注入的奥秘什么是依赖注入?@Autowired

Spring 依赖注入与循环依赖总结

《Spring依赖注入与循环依赖总结》这篇文章给大家介绍Spring依赖注入与循环依赖总结篇,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录1. Spring 三级缓存解决循环依赖1. 创建UserService原始对象2. 将原始对象包装成工

Java中如何正确的停掉线程

《Java中如何正确的停掉线程》Java通过interrupt()通知线程停止而非强制,确保线程自主处理中断,避免数据损坏,线程池的shutdown()等待任务完成,shutdownNow()强制中断... 目录为什么不强制停止为什么 Java 不提供强制停止线程的能力呢?如何用interrupt停止线程s

SpringBoot请求参数传递与接收示例详解

《SpringBoot请求参数传递与接收示例详解》本文给大家介绍SpringBoot请求参数传递与接收示例详解,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋... 目录I. 基础参数传递i.查询参数(Query Parameters)ii.路径参数(Path Va

SpringBoot路径映射配置的实现步骤

《SpringBoot路径映射配置的实现步骤》本文介绍了如何在SpringBoot项目中配置路径映射,使得除static目录外的资源可被访问,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一... 目录SpringBoot路径映射补:springboot 配置虚拟路径映射 @RequestMapp

Java MCP 的鉴权深度解析

《JavaMCP的鉴权深度解析》文章介绍JavaMCP鉴权的实现方式,指出客户端可通过queryString、header或env传递鉴权信息,服务器端支持工具单独鉴权、过滤器集中鉴权及启动时鉴权... 目录一、MCP Client 侧(负责传递,比较简单)(1)常见的 mcpServers json 配置

GSON框架下将百度天气JSON数据转JavaBean

《GSON框架下将百度天气JSON数据转JavaBean》这篇文章主要为大家详细介绍了如何在GSON框架下实现将百度天气JSON数据转JavaBean,文中的示例代码讲解详细,感兴趣的小伙伴可以了解下... 目录前言一、百度天气jsON1、请求参数2、返回参数3、属性映射二、GSON属性映射实战1、类对象映