本文主要是介绍Java对接MQTT协议的完整实现示例代码,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
《Java对接MQTT协议的完整实现示例代码》MQTT是一个基于客户端-服务器的消息发布/订阅传输协议,MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛,:本文主要介绍Ja...
前言
本文将详细介绍如何使用Java和Spring Integration框架实现MQTT协议的对接。代码包括MQTT客户端的配置、消息的订阅与发布、以及消息的处理逻辑。
前置依赖
<!-- MQTT 依赖 --> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency>
1. MQTT配置类
代码解析
MqttConfig
类是MQTT的核心配置类,负责MQTT客户端的初始化、连接选项的设置以及消息通道的创建。
package com.ruoyi.framework.config; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import java.io.IOException; @Configuration public class MqttConfig { @Value("${mqtt.broker-url}") private String brokerUrl; @Value("${mqtt.client-id}") private String clientId; @Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); MqttConnectOptions options = new MqttConnectOptions(); options.setServerURIs(new String[]{brokerUrl}); // Broker 地址 options.setAutomaticReconnect(true); // 自动重连 factory.setConnectionOptions(options); System.out.println("Connecting to broker: " + brokerUrl + " OK."); return factory; } @Bean public MqttPahoMessageDrivenChannelAdapter mqttInbound() { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter( clientId + "-inbound", // 客户端ID(唯一) mqttClientFactory(), // 使用工厂创建客户端 "testSub/#" // 订阅的主题 ); adapter.setOutputChannelName("mqttInputChannel"); // 关键:绑定到输入通道 消息输出通道 adapter.setQos(1); // 设置 QoS 级别 return adapter; } // 出站适配器(发送) @Bean @ServiceActivator(inputChannel = "mqttOutboundChannel") public MessageHandler mqttOutbound() { MqttPahoMessageHandler handler = new MqttPahoMessageHandler( clientId + "-outbound", mqttClientFactory() ); handler.setAsync(true); handler.setDefaultQos(1); return handler; } @Bean public MessageChannel mqttInputChannel() { return new DirectChannel(); // 使用直连通道 } // 出站通道(发送消息) @Bean public MessageChannel mqttOutboundChannel() { return new DirectChannel(); } }
1.1 MQTT客户端工厂
mqttClientFactory
方法创建了一个MqttPahoClientFactory
实例,用于配置MQTT客户端的连接选项。
@Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); MqttConnectOptions options = new MqttConnectOptions(); options.setServerURIs(new String[]{brokerUrl}); // 设置MQTT服务端地址 options.setAutomaticReconnect(true); // 开启自动重连 factory.setConnectionOptions(options); System.out.println("Connecting to broker: " + brokerUrl + " OK."); return factory; }
brokerUrl
:MQTT服务端的地址,通常为tcp://<IP>:<端口>
。automaticReconnect
:开启自动重连功能,确保网络波动时客户端能够自动恢复连接。
1.2 MQTT消息订阅适配器
mqttInbound
方法创建了一个MqttPahoMessageDrivenChannelAdapter
实例,用于订阅MQTT主题并接收消息。
@Bean public MqttPahoMessageDrivenChannelAdapter mqttInbound() { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter( clientId + "-inbound", mqttClientFactory(), "testSub/#"); adapter.setOutputChannelName("mqttInputChannel"); // 绑定到输入通道 adapter.setQos(1); // 设置QoS级别 return adapter; }
clientId + "-inbound"
:客户端ID,需保证唯一性。"testSub/#"
:订阅的主题,#
表示匹配所有子主题。
1.3 MQTT消息发布适配器
mqttOutbound
方法创建了一个MqttPahoMessageHandler
实例,用于发布消息到MQTT主题。
@Bean @ServiceActivator(inputChannel = "mqttOutboundChannel") public MessageHandler mqttOutbound() { MqttPahoMessageHandler handler = new MqttPahoMessageHandler( clientId + "-outbound", mqttClientFactory()); handler.setAsync(true); // 异步发送 handler.setDefaultQos(1); // 设置QoS级别 return handler; }
clientId + "-outbound"
:客户端ID,需保证唯一性。mqttOutboundChannel
:消息发送通道。
1.4 消息通道
mqttInputChannel
和mqttOutboundChannel
方法分别创建了输入和输出通道,用于消息的传递。
@Bean public MessageChannel mqttInputChannel()js { return new DirectChannel(); // 直连通道 } @Bean public MessageChannel mqttOutboundChannel() { return new DirectChannel(); }
2. MQTT消息监听器
代码解析
MqttMessageListener
类负责处理从MQTT主题接收到的消息
Ⅰ
package com.ruoyi.framework.mqtt; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; import org.springframework.stereotype.Component; @Component public class MqttMessageListener { @Autowired private IMqttService mqttService; // 处理入站消息 @Bean @ServiceActivator(inputChannel = "mqttInputChannel") public MessageHandler handler() { return new MessageHandler() { @Override public void handleMessage(Message<?> message) throws MessagingException { String topic = message.getHeaders().get("mqtt_receivedTopic").toString(); String payload = message.getPayload().toString(); Logger log = LoggerFactory.getLogger(MqttMessageListener.class); log.info("[MQTT] 收到消息: topic={}, payload={}", topic, payload); try { if (topic.startsWith("heartbeat/")) { //心跳上报 mqttService.handleHeartbeat(payload); } else if (topic.startsWith("report/")) { //数据上报 mqttService.handleReport(payload); } } catch (Exception e) { log.error("[MQTT] 消息处理失败: {}", e.getMessage()); } } }; } }
或Ⅱ
package com.ruoyi.framework.mqtt; import com.alibaba.fastjson2.JSON; import com.ruoyi.common.constant.MessageConstants; import com.ruoyi.power.domain.protocol.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; import org.springframework.stereotype.Component; @Component public class MqttMessageListener { @Autowired private IMqttService mqttService; // 处理入站消息 @Bean @ServiceActivator(inputChannel = "mqttInputChannel") public MessageHandler handler() { return new MessageHandler() { @Override public void handleMessage(Message<?> message) throws MessagingException { String topic = message.getHeaders().get("mqtt_receivedTopic").toString(); String payload = message.getPayload().toString(); Loghttp://www.chinasem.cnger log = LoggerFactory.getLogger(MqttMessageListener.class); log.info("[MQTT] 收到消息: topic={}, payload={}", topic, payload); try { if (topic.startsWith("testSub/")) { BaseMessage baseMsg = JSON.parseobject(payload, BaseMessage.class); switch (baseMsg.getType()) { case MessageConstants.HEART_BEAT: HeartbeatMessage heartbeat = JSON.parseObject(payload, HeartbeatMessage.class); mqttService.handleHeartbeat(heartbeat); break; case MessageConstants.REPORT, MessageConstants.GET_CURRENT_DATA: ReportMessage report = JSON.parseObject(payload, ReportMessage.class); mqttService.handleReport(report); break; case MessageConstants.ALARM: AlarmMessage alarm = JSON.parseObject(payload, AlarmMessage.class); js mqttService.handleAlarm(alarm); break; case MessageConstants.CALL_ACK: mqttService.handleCallReadAck(baseMsg); break; case MessageConstants.CONTROL_ACK: ControlMessage controlAck = JSON.parseObject(payload, ControlMessage.class); mqttService.handleControlAck(controlAck); break; default: System.err.println("Unknown message type: " + baseMsg.getType()); } } else if (topic.startsWith("report/allpoints")) { BaseMessage baseMsg = JSON.parseObject(payload, BaseMessage.class); switch (baseMsg.getType()) { // 如果没收到callAck 则代表采集器没收到callRead case MessageConstants.CALL_ACK: mqttService.handleCallReadAck(baseMsg); break; case MessageConstants.REPORT, MessageConstants.GET_CURRENT_DATA: ReportMessage report = JSON.parseObject(payload, ReportMessage.class); mqttService.handleReport(report); break; case MessageConstants.CONTROL_ACK: ControlMessage controlAck = JSON.parseObject(payload, ControlMessage.class); mqttService.handleControlAck(controlAck); break; case MessageConstants.MULTIVALUESET_ACK: MultiValueSetMessage multvaluesetAck = JSON.parseObject(payload, MultiValueSetMessage.class); mqttService.handleMultiValueSet(multvaluesetAck); break; } } } catch (Exception e) { log.error("[MQTT] 消息处理失败: {}", e.getMessage()); } } }; } }
2.1 消息处理逻辑
handler
方法是一个MessageHandler
,用于处理从mqttInputChannel
接收到的消息。
@Bean @ServiceActivator(inputChannel = "mqttInputChannel") public MessageHandler handler() { return new MessageHandler() { @Override public void handleMessage(Message<?> message) throws MessagingException { String topic = message.getHeaders().get("mqtt_receivedTopic").toString(); // 获取主题 String payload = message.getPayload().toString(); // 获取消息内容 Logger log = LoggerFactory.getLogger(MqttMessageListener.class); log.info("[MQTT] 收到消息: topic={}, payload={}", topic, payload); try { if (topic.startsWith("testSub/")) { // 处理订阅主题为testSub的消息 BaseMessage baseMsg = JSON.parseObject(payload, BaseMessage.class); switch (baseMsg.getType()) { case MessageConstants.HEART_BEAT: mqttService.handleHeartbeat(JSON.parseObject(payload, HeartbeatMessage.class)); break; case MessageConstants.REPORT: mqttService.handleReport(JSON.parseObject(payload, ReportMessage.class)); break; // 其他消息类型的处理逻辑 } } else if (topic.startsWith("report/allpoints")) { // 处理订阅主题为report/allpoints的消息 } } catch (Exception e) { log.error("[MQTT] 消息处理失败: {}", e.getMessage()); } } }; }
mqtt_receivedTopic
:从消息头中获取主题。payload
:消息内容,通常是JSON格式的字符串。- 使用
switch
语句根据消息类型调用不同的处理方法。
3. MQTT消息网关
代码解析
MqttMessageGateway
接口提供了一个简单的发送消息的方法。
package com.ruoyi.framework.mqtt; import org.springframework.integration.annotation.MessagingGateway; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.messaging.handler.annotation.Header; @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") public interface MqttMessageGateway { void sendMessage(@Header(MqttHeaders.TOPIC) String topic, String payload); }
@MessagingGateway
:声明一个消息网关,defaultRequestChannel
指定默认的发送通道。@Header(MqttHeaders.TOPIC)
:指定消息的主题。
使用示例:
@Autowired private MqttMessageGateway mqttMessageGateway; public void publishMessage() { mqttMessageGateway.sendMessage("testSub/topic", "{\"key\":\"value\"}"); }
4. MQTT服务接口
代码解析
IMqttService
接口定义了处理不同类型消息的方法。
package com.ruoyi.framework.mqtt; import com.ruoyi.power.domain.protocol.*; public interface IMqttService { /** * 处理心跳数据 * @param heartbeat MQTT 消息内容 */ void handleHeartbeat(HeartbeatMessage heartbeat); /** * 处理上报数据 * @param report MQTT 消息内容 */ void handleReport(ReportMessage report); /** * 服务器发送遥控命令到采集器 * 服务器发送遥调命令到采集器 * @param controlMessage 遥控命令 */ void sendControl(ControlMessage controlMessage); /** * 处理上报仪表报警 * @param alarm 报警内容 * @return String 配置内容 */ void handleAlarm(AlarmMessage alarm); /** * 下发控制命令到指定网关 * @param saleid 配电站ID * @param gateid 网关ID */ php void sendCallRead(String saleid, String gateid, String startTime, String endTime); /** * 采集器响应召读命令(响应召读命令回复包,不代表召读时间段的数据一定存在,采集器收到召读命令后首先回复 * 此数据包,下一不再查找相应历史数据, 存在即发送,不存在不发送 ) * @param baseMsg 采集器响应召读命令( */ void handleCallReadAck(BaseMessage baseMsg); /** * 采集器发送执行结果到服务器 * @param controlAck */ void handleControlAck(ControlMessage controlAck); /** * 由服务器发布获取数据命令到采集器 * @param baseMessage */ void getCurrentData(BaseMessage baseMessage); /** * * @param multiValueSetMessage */ void sendMultiValueSet(MultiValueSetMessage multiValueSetMessage); /** * 处理相应采集器接收到服务器的命令 * @param multiValueSetMessage */ void handleMultiValueSet(MultiValueSetMessage multiValueSetMessage); }
- 每个方法对应一种消息类型的处理逻辑。
- 实现该接口的类需要提供具体的业务逻辑。
5. 使用说明
5.1 配置MQTT参数
在application.yml
中配置MQTT的相关参数:
mqtt: broker-url: tcp://127.0.0.1:1883 client-id: mqtt-client-123
5.2 实现IMqttService接口
创建一个类实现IMqttService
接口,并提供具体的业务逻辑。例如:
package com.ruoyi.framework.mqtt; import com.alibaba.fastjson2.JSON; import com.baomidou.myBATisplus.core.toolkit.ObjectUtils; import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.ruoyi.common.constant.MessageConstants; import com.ruoyi.common.constant.OperationConstants; import com.ruoyi.common.constant.ResultConstants; import com.ruoyi.common.utils.bean.BeanUtils; import com.ruoyi.power.config.CustomIdGenerator; import com.ruoyi.power.domain.*; import com.ruoyi.power.domain.protocol.*; import com.ruoyi.power.mapper.*; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.time.LocalDateTime; import java.util.*; @Slf4j @Service public class MqttServiceImpl implements IMqttService { @Autowired private MqttMessageGateway mqttGateway; @Autowired private HeartBeatMapper heartbeatMapper; @Autowired private GatewayInfoMapper gatewayInfoMapper; @Autowired private ReportMapper reportMapper; @Autowired private ReportMeterMapper reportMeterMapper; @Autowired private AlarmMapper alarmMapper; @Autowired private AlarmDetailMapper alarmDetailMapper; @Autowired private ControlMapper controlMapper; // 处理心跳数据 @Override public void handleHeartbeat(HeartbeatMessage heartbeat) { try { // 心跳存储到数据库 HeartBeat heartBeat = new HeartBeat(); heartBeat.setId(String.valueOf(CustomIdGenerator.SEQUENCE.nextId())); heartBeat.setGateId(heartbeat.getGateid()); heartBeat.setType(heartbeat.getType()); heartBeat.setSaleId(heartbeat.getSaleid()); heartBeat.setTime(heartbeat.getTime()); heartBeat.setOperation(heartbeat.getOperation()); heartbeatMapper.insertHeartBeat(heartBeat); log.info("[心跳数据] 存储成功: substationId={}, gatewayId={}", heartbeat.getSaleid(), heartbeat.getGateid()); // 查询或创建网关记录 GatewayInfo gatewayInfo = gatewayInfoMapper.selectOne(Wrappers.<GatewayInfo>lambdaQuery().eq(GatewayInfo::getGateid, heartbeat.getGateid())); if(ObjectUtils.isNull(gatewayInfo)) { createNewGateway(heartbeat.getSaleid(), heartbeat.getGateid()); } else { gatewayInfo.setLastHeartbeatTime(LocalDateTime.now()); gatewayInfo.setUpdateTime(LocalDateTime.now()); int updated = gatewayInfoMapper.updateGatewayInfo(gatewayInfo); if(updated == 0) { log.warn("心跳更新冲突 saleid:{}, gateid:{}", heartbeat.getSaleid(), heartbeat.getGateid()); } } // 如果网关请求心跳,响应心跳 sendHeartbeat(heartbeat.getSaleid(), heartbeat.getGateid(), heartbeat); } catch (Exception e) { log.error("[心跳数据] 处理失败: {}", e.getMessage()); } } // 创建新网关记录 private void createNewGateway(String saleid, String gateid) { GatewayInfo newGateway = new GatewayInfo(); newGateway.setId(String.valueOf(CustomIdGenerator.SEQUENCE.nextId())); newGateway.setSaleid(saleid); newGateway.setGateid(gateid); newGateway.setLastHeartbeatTime(LocalDateTime.now()); newGateway.setStatus("0"); newGateway.setCheckInterval(60L); // 默认间隔 newGateway.setCreateTime(LocalDateTime.now()); gatewayInfoMapper.insertGatewayInfo(newGateway); } // 下发心跳 private void sendHeartbeat(String saleid, String gateid, HeartbeatMessage heartbeat) { String topic = String.format("report/allpoints", saleid, gateid); heartbeat.setOperation(OperationConstants.TIME); mqttGateway.sendMessage(topic, JSON.toJSONString(heartbeat)); log.info("[配置下发] topic={}, config={}", topic, JSON.toJSONString(heartbeat)); } // 处理上报数据 @Override public void handleReport(ReportMessage report) { try { // 存储到仪表信息表 转换为仪表信息表(meterMapper) String reportId = createReportData(report); // 批量存储仪表数据 List<ReportMeter> meterEntities = report.getMeter().stream() .map(m -> { ReportMeter entity = new ReportMeter(); entity.setId(String.valueOf(CustomIdGenerator.SEQUENCE.nextId())); entity.setReportId(reportId); entity.setMeterId(m.getId()); entity.setStatus(m.getStatus()); entity.setName(m.getName()); entity.setValuesJson(JSON.toJSONString(m.getValues())); return entity; }).toList(); for (ReportMeter meter : meterEntities) { reportMeterMapper.insertReportMeter(meter); } log.info("[上报数据] 存储成功: substationId={}, gatewayId={}", report.getSaleid(), report.getGateid()); } catch (Exception e) { log.error("[上报数据] 处理失败: {}", e.getMessage()); } } // 创建新数据记录 private String createReportData(ReportMessage report) { Report rep = new Report(); rep.setId(String.valueOf(CustomIdGenerator.SEQUENCE.nextId())); rep.setSaleid(report.getSaleid()); rep.setGateid(report.getGateid()); rep.setTime(report.getTime()); rep.setType(report.getType()); rep.setSequence(report.getSequence()); rep.setSource(report.getSource()); rep.setCreateTime(LocalDateTime.now()); reportMapper.insert(rep); return rep.getId(); } // 下发控制命令 @Override public void sendControl(ControlMessage controlMessage) { ControlMessage message = new ControlMessage(); message.setSaleid(controlMessage.getSaleid()); message.setGateid(controlMessage.getGateid()); message.setType(controlMessage.getType()); message.setCuuid(LocalDateTime.now().toString()); message.setTime(LocalDateTime.now()); message.setMeterid(controlMessage.getMeterid()); message.setName(controlMessage.getName()); message.setFunctionid(controlMessage.getFunctionid()); message.setValue(controlMessage.getValue()); // 存储到控制记录表 createControl(controlMessjavascriptage); String topic = String.format("report/allpoints", message); mqttGateway.sendMessage(topic, JSON.toJSONString(message)); log.info("[控制命令] topic={}, command={}", topic, JSON.toJSONString(message)); } private void createControl(ControlMessage controlMessage) { Control control = new Control(); control.setId(String.valueOf(CustomIdGenerator.SEQUENCE.nextId())); control.setSaleid(controlMessage.getSaleid()); control.setGateid(controlMessage.getGateid()); control.setType(controlMessage.getType()); control.setCuuid(controlMessage.getCuuid()); control.setTime(controlMessage.getTime()); control.setMeterid(controlMessage.getMeterid()); control.setName(controlMessage.getName()); control.setFunctionid(controlMessage.getFunctionid()); control.setValue(controlMessage.getValue()); control.setResult(controlMessage.getResult()); control.setErrordesc(controlMessage.getErrordesc()); controlMapper.insertControl(control); } @Override public void handleAlarm(AlarmMessage alarmMessage) { try { // 存储报警信息表 转换为报警信息表(alarmMapper) String alarmId = createAlarmData(alarmMessage); // 批量存储仪表数据 List<AlarmDetail> alarmEntities = alarmMessage.getFunction().stream() .map(m -> { AlarmDetail entity = new AlarmDetail(); entity.setId(String.valueOf(CustomIdGenerator.SEQUENCE.nextId())); entity.setAlarmId(alarmId); entity.setPtId(m.getId()); entity.setAlarmType(m.getAlarmType()); entity.setLabel(m.getLabel()); entity.setCurrentValue(m.getCurrentValue()); entity.setSettingValue(m.getSettingValue()); entity.setLevel(m.getLevel()); return entity; }).toList(); for (AlarmDetail alarm : alarmEntities) { alarmDetailMapper.insertAlarmDetail(alarm); } log.info("[上报数据] 存储成功: substationId={}, gatewayId={}", alarmMessage.getSaleid(), alarmMessage.getGateid()); } catch (Exception e) { log.error("[上报数据] 处理失败: {}", e.getMessage()); } } @Override public void sendCallRead(String saleid, String gateid, String startTime, String endTime) { HashMap<String, String> protocol = new HashMap<>(); protocol.put("saleid", saleid); protocol.put("gateid", gateid); protocol.put("type", MessageConstants.CALL_READ); protocol.put("time", String.valueOf(LocalDateTime.now())); protocol.put("startTime", startTime); protocol.put("endTime", endTime); String topic = String.format("report/allpoints", saleid, gateid); mqttGateway.sendMessage(topic, JSON.toJSONString(protocol)); log.info("[控制命令] topic={}, command={}", topic, JSON.toJSONString(protocol)); } @Override public void handleCallReadAck(BaseMessage baseMsg) { Report report = new Report(); report.setId(String.valueOf(CustomIdGenerator.SEQUENCE.nextId())); report.setSaleid(baseMsg.getSaleid()); report.setGateid(baseMsg.getGateid()); report.setTime(baseMsg.getTime()); report.setType(baseMsg.getType()); reportMapper.insert(report); } @Override public void handleControlAck(ControlMessage controlAck) { if(ResultConstants.FAILURE.equals(controlAck.getResult())) { createControl(controlAck); // 配置或设备问题,记录错误并报警 log.error("控制失败(不可重试): {}", controlAck.getErrordesc()); } else if(ResultConstants.SUCCESS.equals(controlAck.getResult())) { createControl(controlAck); log.info("控制成功: {}", controlAck.getCuuid()); } } @Override public void getCurrentData(BaseMessage baseMessage) { String topic = String.format("report/allpoints", baseMessage.getSaleid(), baseMessage.getGateid()); mqttGateway.sendMessage(topic, JSON.toJSONString(baseMessage)); log.info("[控制命令] topic={}, command={}", topic, JSON.toJSONString(baseMessage)); } @Override public void sendMultiValueSet(MultiValueSetMessage multiValueSetMessage) { } @Override public void handleMultiValueSet(MultiValueSetMessage multiValueSetMessage) { String topic = String.format("report/allpoints", multiValueSetMessage.getSaleid(), multiValueSetMessage.getGateid()); ControlMessage controlMessage = new ControlMessage(); try { mqttGateway.sendMessage(topic, JSON.toJSONString(multiValueSetMessage)); log.info("[控制命令] topic={}, command={}", topic, JSON.toJSONString(multiValueSetMessage)); if(ResultConstants.SUCCESS.equals(multiValueSetMessage.getResult())) { BeanUtils.copyProperties(multiValueSetMessage, controlMessage); createControl(controlMessage); log.info("[控制命令] topic={}, command={}", topic, JSON.toJSONString(multiValueSetMessage)); } else if(ResultConstants.FAILURE.equals(multiValueSetMessage.getResult())){ BeanUtils.copyProperties(multiValueSetMessage, controlMessage); createControl(controlMessage); log.error("[控制失败] topic={}, command={}", topic, JSON.toJSONString(multiValueSetMessage)); } } catch (Exception e) { log.error("[控制命令] topic={}, command={}", topic, JSON.toJSONString(multiValueSetMessage)); } } private String createAlarmData(AlarmMessage alarmMessage) { Alarm alarm = new Alarm(); alarm.setId(String.valueOf(CustomIdGenerator.SEQUENCE.nextId())); alarm.setSaleid(alarmMessage.getSaleid()); alarm.setGateid(alarmMessage.getGateid()); alarm.setTime(alarmMessage.getTime()); alarm.setType(alarmMessage.getType()); alarm.setSequence(alarmMessage.getSequence()); alarm.setName(alarmMessage.getName()); alarm.setMeterid(alarmMessage.getMeterid()); alarm.setCreateTime(LocalDateTime.now()); alarmMapper.insert(alarm); return alarm.getId(); } }
5.3 发送MQTT消息
通过MqttMessageGateway
发送消息:
@Autowired private MqttMessageGateway mqttMessageGateway; public void sendTestMessage() { mqttMessageGateway.sendMessage("testSub/topic", "{\"key\":\"value\"}"); }
6. 总结
本文介绍了如何使用Spring Integration框架实现MQTT协议的对接,包括客户端的配置、消息的订阅与发布、以及消息的处理逻辑。通过上述代码,您可以快速实现Java与MQTT的集成,并根据业务需求扩展消息的处理逻辑。
到此这篇关于Java对接MQTT协议的文章就介绍到这了,更多相关Java对接MQTT协议内容请搜索China编程(www.chinasem.cn)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程China编程(www.chinasem.cn)!
这篇关于Java对接MQTT协议的完整实现示例代码的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!