本文主要是介绍Java对接MQTT协议的完整实现示例代码,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
《Java对接MQTT协议的完整实现示例代码》MQTT是一个基于客户端-服务器的消息发布/订阅传输协议,MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛,:本文主要介绍Ja...
前言
本文将详细介绍如何使用Java和Spring Integration框架实现MQTT协议的对接。代码包括MQTT客户端的配置、消息的订阅与发布、以及消息的处理逻辑。
前置依赖
<!-- MQTT 依赖 -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>1. MQTT配置类
代码解析
MqttConfig类是MQTT的核心配置类,负责MQTT客户端的初始化、连接选项的设置以及消息通道的创建。
package com.ruoyi.framework.config;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import java.io.IOException;
@Configuration
public class MqttConfig {
@Value("${mqtt.broker-url}")
private String brokerUrl;
@Value("${mqtt.client-id}")
private String clientId;
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[]{brokerUrl}); // Broker 地址
options.setAutomaticReconnect(true); // 自动重连
factory.setConnectionOptions(options);
System.out.println("Connecting to broker: " + brokerUrl + " OK.");
return factory;
}
@Bean
public MqttPahoMessageDrivenChannelAdapter mqttInbound() {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
clientId + "-inbound", // 客户端ID(唯一)
mqttClientFactory(), // 使用工厂创建客户端
"testSub/#" // 订阅的主题
);
adapter.setOutputChannelName("mqttInputChannel"); // 关键:绑定到输入通道 消息输出通道
adapter.setQos(1); // 设置 QoS 级别
return adapter;
}
// 出站适配器(发送)
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler handler = new MqttPahoMessageHandler(
clientId + "-outbound",
mqttClientFactory()
);
handler.setAsync(true);
handler.setDefaultQos(1);
return handler;
}
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel(); // 使用直连通道
}
// 出站通道(发送消息)
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
}1.1 MQTT客户端工厂
mqttClientFactory方法创建了一个MqttPahoClientFactory实例,用于配置MQTT客户端的连接选项。
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[]{brokerUrl}); // 设置MQTT服务端地址
options.setAutomaticReconnect(true); // 开启自动重连
factory.setConnectionOptions(options);
System.out.println("Connecting to broker: " + brokerUrl + " OK.");
return factory;
}brokerUrl:MQTT服务端的地址,通常为tcp://<IP>:<端口>。automaticReconnect:开启自动重连功能,确保网络波动时客户端能够自动恢复连接。
1.2 MQTT消息订阅适配器
mqttInbound方法创建了一个MqttPahoMessageDrivenChannelAdapter实例,用于订阅MQTT主题并接收消息。
@Bean
public MqttPahoMessageDrivenChannelAdapter mqttInbound() {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
clientId + "-inbound", mqttClientFactory(), "testSub/#");
adapter.setOutputChannelName("mqttInputChannel"); // 绑定到输入通道
adapter.setQos(1); // 设置QoS级别
return adapter;
}clientId + "-inbound":客户端ID,需保证唯一性。"testSub/#":订阅的主题,#表示匹配所有子主题。
1.3 MQTT消息发布适配器
mqttOutbound方法创建了一个MqttPahoMessageHandler实例,用于发布消息到MQTT主题。
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler handler = new MqttPahoMessageHandler(
clientId + "-outbound", mqttClientFactory());
handler.setAsync(true); // 异步发送
handler.setDefaultQos(1); // 设置QoS级别
return handler;
}clientId + "-outbound":客户端ID,需保证唯一性。mqttOutboundChannel:消息发送通道。
1.4 消息通道
mqttInputChannel和mqttOutboundChannel方法分别创建了输入和输出通道,用于消息的传递。
@Bean public MessageChannel mqttInputChannel()js { return new DirectChannel(); // 直连通道 } @Bean public MessageChannel mqttOutboundChannel() { return new DirectChannel(); }
2. MQTT消息监听器
代码解析
MqttMessageListener类负责处理从MQTT主题接收到的消息
Ⅰ
package com.ruoyi.framework.mqtt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Component;
@Component
public class MqttMessageListener {
@Autowired
private IMqttService mqttService;
// 处理入站消息
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
String payload = message.getPayload().toString();
Logger log = LoggerFactory.getLogger(MqttMessageListener.class);
log.info("[MQTT] 收到消息: topic={}, payload={}", topic, payload);
try {
if (topic.startsWith("heartbeat/")) { //心跳上报
mqttService.handleHeartbeat(payload);
} else if (topic.startsWith("report/")) { //数据上报
mqttService.handleReport(payload);
}
} catch (Exception e) {
log.error("[MQTT] 消息处理失败: {}", e.getMessage());
}
}
};
}
}或Ⅱ
package com.ruoyi.framework.mqtt; import com.alibaba.fastjson2.JSON; import com.ruoyi.common.constant.MessageConstants; import com.ruoyi.power.domain.protocol.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; import org.springframework.stereotype.Component; @Component public class MqttMessageListener { @Autowired private IMqttService mqttService; // 处理入站消息 @Bean @ServiceActivator(inputChannel = "mqttInputChannel") public MessageHandler handler() { return new MessageHandler() { @Override public void handleMessage(Message<?> message) throws MessagingException { String topic = message.getHeaders().get("mqtt_receivedTopic").toString(); String payload = message.getPayload().toString(); Loghttp://www.chinasem.cnger log = LoggerFactory.getLogger(MqttMessageListener.class); log.info("[MQTT] 收到消息: topic={}, payload={}", topic, payload); try { if (topic.startsWith("testSub/")) { BaseMessage baseMsg = JSON.parseobject(payload, BaseMessage.class); switch (baseMsg.getType()) { case MessageConstants.HEART_BEAT: HeartbeatMessage heartbeat = JSON.parseObject(payload, HeartbeatMessage.class); mqttService.handleHeartbeat(heartbeat); break; case MessageConstants.REPORT, MessageConstants.GET_CURRENT_DATA: ReportMessage report = JSON.parseObject(payload, ReportMessage.class); mqttService.handleReport(report); break; case MessageConstants.ALARM: AlarmMessage alarm = JSON.parseObject(payload, AlarmMessage.class); js mqttService.handleAlarm(alarm); break; case MessageConstants.CALL_ACK: mqttService.handleCallReadAck(baseMsg); break; case MessageConstants.CONTROL_ACK: ControlMessage controlAck = JSON.parseObject(payload, ControlMessage.class); mqttService.handleControlAck(controlAck); break; default: System.err.println("Unknown message type: " + baseMsg.getType()); } } else if (topic.startsWith("report/allpoints")) { BaseMessage baseMsg = JSON.parseObject(payload, BaseMessage.class); switch (baseMsg.getType()) { // 如果没收到callAck 则代表采集器没收到callRead case MessageConstants.CALL_ACK: mqttService.handleCallReadAck(baseMsg); break; case MessageConstants.REPORT, MessageConstants.GET_CURRENT_DATA: ReportMessage report = JSON.parseObject(payload, ReportMessage.class); mqttService.handleReport(report); break; case MessageConstants.CONTROL_ACK: ControlMessage controlAck = JSON.parseObject(payload, ControlMessage.class); mqttService.handleControlAck(controlAck); break; case MessageConstants.MULTIVALUESET_ACK: MultiValueSetMessage multvaluesetAck = JSON.parseObject(payload, MultiValueSetMessage.class); mqttService.handleMultiValueSet(multvaluesetAck); break; } } } catch (Exception e) { log.error("[MQTT] 消息处理失败: {}", e.getMessage()); } } }; } }
2.1 消息处理逻辑
handler方法是一个MessageHandler,用于处理从mqttInputChannel接收到的消息。
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
String topic = message.getHeaders().get("mqtt_receivedTopic").toString(); // 获取主题
String payload = message.getPayload().toString(); // 获取消息内容
Logger log = LoggerFactory.getLogger(MqttMessageListener.class);
log.info("[MQTT] 收到消息: topic={}, payload={}", topic, payload);
try {
if (topic.startsWith("testSub/")) {
// 处理订阅主题为testSub的消息
BaseMessage baseMsg = JSON.parseObject(payload, BaseMessage.class);
switch (baseMsg.getType()) {
case MessageConstants.HEART_BEAT:
mqttService.handleHeartbeat(JSON.parseObject(payload, HeartbeatMessage.class));
break;
case MessageConstants.REPORT:
mqttService.handleReport(JSON.parseObject(payload, ReportMessage.class));
break;
// 其他消息类型的处理逻辑
}
} else if (topic.startsWith("report/allpoints")) {
// 处理订阅主题为report/allpoints的消息
}
} catch (Exception e) {
log.error("[MQTT] 消息处理失败: {}", e.getMessage());
}
}
};
}mqtt_receivedTopic:从消息头中获取主题。payload:消息内容,通常是JSON格式的字符串。- 使用
switch语句根据消息类型调用不同的处理方法。
3. MQTT消息网关
代码解析
MqttMessageGateway接口提供了一个简单的发送消息的方法。
package com.ruoyi.framework.mqtt;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttMessageGateway {
void sendMessage(@Header(MqttHeaders.TOPIC) String topic, String payload);
}@MessagingGateway:声明一个消息网关,defaultRequestChannel指定默认的发送通道。@Header(MqttHeaders.TOPIC):指定消息的主题。
使用示例:
@Autowired
private MqttMessageGateway mqttMessageGateway;
public void publishMessage() {
mqttMessageGateway.sendMessage("testSub/topic", "{\"key\":\"value\"}");
}4. MQTT服务接口
代码解析
IMqttService接口定义了处理不同类型消息的方法。
package com.ruoyi.framework.mqtt;
import com.ruoyi.power.domain.protocol.*;
public interface IMqttService {
/**
* 处理心跳数据
* @param heartbeat MQTT 消息内容
*/
void handleHeartbeat(HeartbeatMessage heartbeat);
/**
* 处理上报数据
* @param report MQTT 消息内容
*/
void handleReport(ReportMessage report);
/**
* 服务器发送遥控命令到采集器
* 服务器发送遥调命令到采集器
* @param controlMessage 遥控命令
*/
void sendControl(ControlMessage controlMessage);
/**
* 处理上报仪表报警
* @param alarm 报警内容
* @return String 配置内容
*/
void handleAlarm(AlarmMessage alarm);
/**
* 下发控制命令到指定网关
* @param saleid 配电站ID
* @param gateid 网关ID
*/
php void sendCallRead(String saleid, String gateid, String startTime, String endTime);
/**
* 采集器响应召读命令(响应召读命令回复包,不代表召读时间段的数据一定存在,采集器收到召读命令后首先回复
* 此数据包,下一不再查找相应历史数据, 存在即发送,不存在不发送 )
* @param baseMsg 采集器响应召读命令(
*/
void handleCallReadAck(BaseMessage baseMsg);
/**
* 采集器发送执行结果到服务器
* @param controlAck
*/
void handleControlAck(ControlMessage controlAck);
/**
* 由服务器发布获取数据命令到采集器
* @param baseMessage
*/
void getCurrentData(BaseMessage baseMessage);
/**
*
* @param multiValueSetMessage
*/
void sendMultiValueSet(MultiValueSetMessage multiValueSetMessage);
/**
* 处理相应采集器接收到服务器的命令
* @param multiValueSetMessage
*/
void handleMultiValueSet(MultiValueSetMessage multiValueSetMessage);
}- 每个方法对应一种消息类型的处理逻辑。
- 实现该接口的类需要提供具体的业务逻辑。
5. 使用说明
5.1 配置MQTT参数
在application.yml中配置MQTT的相关参数:
mqtt: broker-url: tcp://127.0.0.1:1883 client-id: mqtt-client-123
5.2 实现IMqttService接口
创建一个类实现IMqttService接口,并提供具体的业务逻辑。例如:
package com.ruoyi.framework.mqtt; import com.alibaba.fastjson2.JSON; import com.baomidou.myBATisplus.core.toolkit.ObjectUtils; import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.ruoyi.common.constant.MessageConstants; import com.ruoyi.common.constant.OperationConstants; import com.ruoyi.common.constant.ResultConstants; import com.ruoyi.common.utils.bean.BeanUtils; import com.ruoyi.power.config.CustomIdGenerator; import com.ruoyi.power.domain.*; import com.ruoyi.power.domain.protocol.*; import com.ruoyi.power.mapper.*; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.time.LocalDateTime; import java.util.*; @Slf4j @Service public class MqttServiceImpl implements IMqttService { @Autowired private MqttMessageGateway mqttGateway; @Autowired private HeartBeatMapper heartbeatMapper; @Autowired private GatewayInfoMapper gatewayInfoMapper; @Autowired private ReportMapper reportMapper; @Autowired private ReportMeterMapper reportMeterMapper; @Autowired private AlarmMapper alarmMapper; @Autowired private AlarmDetailMapper alarmDetailMapper; @Autowired private ControlMapper controlMapper; // 处理心跳数据 @Override public void handleHeartbeat(HeartbeatMessage heartbeat) { try { // 心跳存储到数据库 HeartBeat heartBeat = new HeartBeat(); heartBeat.setId(String.valueOf(CustomIdGenerator.SEQUENCE.nextId())); heartBeat.setGateId(heartbeat.getGateid()); heartBeat.setType(heartbeat.getType()); heartBeat.setSaleId(heartbeat.getSaleid()); heartBeat.setTime(heartbeat.getTime()); heartBeat.setOperation(heartbeat.getOperation()); heartbeatMapper.insertHeartBeat(heartBeat); log.info("[心跳数据] 存储成功: substationId={}, gatewayId={}", heartbeat.getSaleid(), heartbeat.getGateid()); // 查询或创建网关记录 GatewayInfo gatewayInfo = gatewayInfoMapper.selectOne(Wrappers.<GatewayInfo>lambdaQuery().eq(GatewayInfo::getGateid, heartbeat.getGateid())); if(ObjectUtils.isNull(gatewayInfo)) { createNewGateway(heartbeat.getSaleid(), heartbeat.getGateid()); } else { gatewayInfo.setLastHeartbeatTime(LocalDateTime.now()); gatewayInfo.setUpdateTime(LocalDateTime.now()); int updated = gatewayInfoMapper.updateGatewayInfo(gatewayInfo); if(updated == 0) { log.warn("心跳更新冲突 saleid:{}, gateid:{}", heartbeat.getSaleid(), heartbeat.getGateid()); } } // 如果网关请求心跳,响应心跳 sendHeartbeat(heartbeat.getSaleid(), heartbeat.getGateid(), heartbeat); } catch (Exception e) { log.error("[心跳数据] 处理失败: {}", e.getMessage()); } } // 创建新网关记录 private void createNewGateway(String saleid, String gateid) { GatewayInfo newGateway = new GatewayInfo(); newGateway.setId(String.valueOf(CustomIdGenerator.SEQUENCE.nextId())); newGateway.setSaleid(saleid); newGateway.setGateid(gateid); newGateway.setLastHeartbeatTime(LocalDateTime.now()); newGateway.setStatus("0"); newGateway.setCheckInterval(60L); // 默认间隔 newGateway.setCreateTime(LocalDateTime.now()); gatewayInfoMapper.insertGatewayInfo(newGateway); } // 下发心跳 private void sendHeartbeat(String saleid, String gateid, HeartbeatMessage heartbeat) { String topic = String.format("report/allpoints", saleid, gateid); heartbeat.setOperation(OperationConstants.TIME); mqttGateway.sendMessage(topic, JSON.toJSONString(heartbeat)); log.info("[配置下发] topic={}, config={}", topic, JSON.toJSONString(heartbeat)); } // 处理上报数据 @Override public void handleReport(ReportMessage report) { try { // 存储到仪表信息表 转换为仪表信息表(meterMapper) String reportId = createReportData(report); // 批量存储仪表数据 List<ReportMeter> meterEntities = report.getMeter().stream() .map(m -> { ReportMeter entity = new ReportMeter(); entity.setId(String.valueOf(CustomIdGenerator.SEQUENCE.nextId())); entity.setReportId(reportId); entity.setMeterId(m.getId()); entity.setStatus(m.getStatus()); entity.setName(m.getName()); entity.setValuesJson(JSON.toJSONString(m.getValues())); return entity; }).toList(); for (ReportMeter meter : meterEntities) { reportMeterMapper.insertReportMeter(meter); } log.info("[上报数据] 存储成功: substationId={}, gatewayId={}", report.getSaleid(), report.getGateid()); } catch (Exception e) { log.error("[上报数据] 处理失败: {}", e.getMessage()); } } // 创建新数据记录 private String createReportData(ReportMessage report) { Report rep = new Report(); rep.setId(String.valueOf(CustomIdGenerator.SEQUENCE.nextId())); rep.setSaleid(report.getSaleid()); rep.setGateid(report.getGateid()); rep.setTime(report.getTime()); rep.setType(report.getType()); rep.setSequence(report.getSequence()); rep.setSource(report.getSource()); rep.setCreateTime(LocalDateTime.now()); reportMapper.insert(rep); return rep.getId(); } // 下发控制命令 @Override public void sendControl(ControlMessage controlMessage) { ControlMessage message = new ControlMessage(); message.setSaleid(controlMessage.getSaleid()); message.setGateid(controlMessage.getGateid()); message.setType(controlMessage.getType()); message.setCuuid(LocalDateTime.now().toString()); message.setTime(LocalDateTime.now()); message.setMeterid(controlMessage.getMeterid()); message.setName(controlMessage.getName()); message.setFunctionid(controlMessage.getFunctionid()); message.setValue(controlMessage.getValue()); // 存储到控制记录表 createControl(controlMessjavascriptage); String topic = String.format("report/allpoints", message); mqttGateway.sendMessage(topic, JSON.toJSONString(message)); log.info("[控制命令] topic={}, command={}", topic, JSON.toJSONString(message)); } private void createControl(ControlMessage controlMessage) { Control control = new Control(); control.setId(String.valueOf(CustomIdGenerator.SEQUENCE.nextId())); control.setSaleid(controlMessage.getSaleid()); control.setGateid(controlMessage.getGateid()); control.setType(controlMessage.getType()); control.setCuuid(controlMessage.getCuuid()); control.setTime(controlMessage.getTime()); control.setMeterid(controlMessage.getMeterid()); control.setName(controlMessage.getName()); control.setFunctionid(controlMessage.getFunctionid()); control.setValue(controlMessage.getValue()); control.setResult(controlMessage.getResult()); control.setErrordesc(controlMessage.getErrordesc()); controlMapper.insertControl(control); } @Override public void handleAlarm(AlarmMessage alarmMessage) { try { // 存储报警信息表 转换为报警信息表(alarmMapper) String alarmId = createAlarmData(alarmMessage); // 批量存储仪表数据 List<AlarmDetail> alarmEntities = alarmMessage.getFunction().stream() .map(m -> { AlarmDetail entity = new AlarmDetail(); entity.setId(String.valueOf(CustomIdGenerator.SEQUENCE.nextId())); entity.setAlarmId(alarmId); entity.setPtId(m.getId()); entity.setAlarmType(m.getAlarmType()); entity.setLabel(m.getLabel()); entity.setCurrentValue(m.getCurrentValue()); entity.setSettingValue(m.getSettingValue()); entity.setLevel(m.getLevel()); return entity; }).toList(); for (AlarmDetail alarm : alarmEntities) { alarmDetailMapper.insertAlarmDetail(alarm); } log.info("[上报数据] 存储成功: substationId={}, gatewayId={}", alarmMessage.getSaleid(), alarmMessage.getGateid()); } catch (Exception e) { log.error("[上报数据] 处理失败: {}", e.getMessage()); } } @Override public void sendCallRead(String saleid, String gateid, String startTime, String endTime) { HashMap<String, String> protocol = new HashMap<>(); protocol.put("saleid", saleid); protocol.put("gateid", gateid); protocol.put("type", MessageConstants.CALL_READ); protocol.put("time", String.valueOf(LocalDateTime.now())); protocol.put("startTime", startTime); protocol.put("endTime", endTime); String topic = String.format("report/allpoints", saleid, gateid); mqttGateway.sendMessage(topic, JSON.toJSONString(protocol)); log.info("[控制命令] topic={}, command={}", topic, JSON.toJSONString(protocol)); } @Override public void handleCallReadAck(BaseMessage baseMsg) { Report report = new Report(); report.setId(String.valueOf(CustomIdGenerator.SEQUENCE.nextId())); report.setSaleid(baseMsg.getSaleid()); report.setGateid(baseMsg.getGateid()); report.setTime(baseMsg.getTime()); report.setType(baseMsg.getType()); reportMapper.insert(report); } @Override public void handleControlAck(ControlMessage controlAck) { if(ResultConstants.FAILURE.equals(controlAck.getResult())) { createControl(controlAck); // 配置或设备问题,记录错误并报警 log.error("控制失败(不可重试): {}", controlAck.getErrordesc()); } else if(ResultConstants.SUCCESS.equals(controlAck.getResult())) { createControl(controlAck); log.info("控制成功: {}", controlAck.getCuuid()); } } @Override public void getCurrentData(BaseMessage baseMessage) { String topic = String.format("report/allpoints", baseMessage.getSaleid(), baseMessage.getGateid()); mqttGateway.sendMessage(topic, JSON.toJSONString(baseMessage)); log.info("[控制命令] topic={}, command={}", topic, JSON.toJSONString(baseMessage)); } @Override public void sendMultiValueSet(MultiValueSetMessage multiValueSetMessage) { } @Override public void handleMultiValueSet(MultiValueSetMessage multiValueSetMessage) { String topic = String.format("report/allpoints", multiValueSetMessage.getSaleid(), multiValueSetMessage.getGateid()); ControlMessage controlMessage = new ControlMessage(); try { mqttGateway.sendMessage(topic, JSON.toJSONString(multiValueSetMessage)); log.info("[控制命令] topic={}, command={}", topic, JSON.toJSONString(multiValueSetMessage)); if(ResultConstants.SUCCESS.equals(multiValueSetMessage.getResult())) { BeanUtils.copyProperties(multiValueSetMessage, controlMessage); createControl(controlMessage); log.info("[控制命令] topic={}, command={}", topic, JSON.toJSONString(multiValueSetMessage)); } else if(ResultConstants.FAILURE.equals(multiValueSetMessage.getResult())){ BeanUtils.copyProperties(multiValueSetMessage, controlMessage); createControl(controlMessage); log.error("[控制失败] topic={}, command={}", topic, JSON.toJSONString(multiValueSetMessage)); } } catch (Exception e) { log.error("[控制命令] topic={}, command={}", topic, JSON.toJSONString(multiValueSetMessage)); } } private String createAlarmData(AlarmMessage alarmMessage) { Alarm alarm = new Alarm(); alarm.setId(String.valueOf(CustomIdGenerator.SEQUENCE.nextId())); alarm.setSaleid(alarmMessage.getSaleid()); alarm.setGateid(alarmMessage.getGateid()); alarm.setTime(alarmMessage.getTime()); alarm.setType(alarmMessage.getType()); alarm.setSequence(alarmMessage.getSequence()); alarm.setName(alarmMessage.getName()); alarm.setMeterid(alarmMessage.getMeterid()); alarm.setCreateTime(LocalDateTime.now()); alarmMapper.insert(alarm); return alarm.getId(); } }
5.3 发送MQTT消息
通过MqttMessageGateway发送消息:
@Autowired
private MqttMessageGateway mqttMessageGateway;
public void sendTestMessage() {
mqttMessageGateway.sendMessage("testSub/topic", "{\"key\":\"value\"}");
}6. 总结
本文介绍了如何使用Spring Integration框架实现MQTT协议的对接,包括客户端的配置、消息的订阅与发布、以及消息的处理逻辑。通过上述代码,您可以快速实现Java与MQTT的集成,并根据业务需求扩展消息的处理逻辑。
到此这篇关于Java对接MQTT协议的文章就介绍到这了,更多相关Java对接MQTT协议内容请搜索China编程(www.chinasem.cn)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程China编程(www.chinasem.cn)!
这篇关于Java对接MQTT协议的完整实现示例代码的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!