Java对接MQTT协议的完整实现示例代码

2025-08-10 22:50

本文主要是介绍Java对接MQTT协议的完整实现示例代码,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

《Java对接MQTT协议的完整实现示例代码》MQTT是一个基于客户端-服务器的消息发布/订阅传输协议,MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛,:本文主要介绍Ja...

前言

本文将详细介绍如何使用Java和Spring Integration框架实现MQTT协议的对接。代码包括MQTT客户端的配置、消息的订阅与发布、以及消息的处理逻辑。

前置依赖

<!-- MQTT 依赖 -->
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
</dependency>

1. MQTT配置类

代码解析

MqttConfig类是MQTT的核心配置类,负责MQTT客户端的初始化、连接选项的设置以及消息通道的创建。

package com.ruoyi.framework.config;

import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

import java.io.IOException;

@Configuration
public class MqttConfig {

    @Value("${mqtt.broker-url}")
    private String brokerUrl;

    @Value("${mqtt.client-id}")
    private String clientId;

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[]{brokerUrl}); // Broker 地址
        options.setAutomaticReconnect(true); // 自动重连
        factory.setConnectionOptions(options);
        System.out.println("Connecting to broker: " + brokerUrl + " OK.");
        return factory;
    }

    @Bean
    public MqttPahoMessageDrivenChannelAdapter mqttInbound() {
        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
                clientId + "-inbound",  // 客户端ID(唯一)
                mqttClientFactory(),   // 使用工厂创建客户端
                "testSub/#" // 订阅的主题
        );
        adapter.setOutputChannelName("mqttInputChannel"); // 关键:绑定到输入通道 消息输出通道
        adapter.setQos(1); // 设置 QoS 级别
        return adapter;
    }

    // 出站适配器(发送)
    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler handler = new MqttPahoMessageHandler(
                clientId + "-outbound",
                mqttClientFactory()
        );
        handler.setAsync(true);
        handler.setDefaultQos(1);
        return handler;
    }

    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel(); // 使用直连通道
    }

    // 出站通道(发送消息)
    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }

}

1.1 MQTT客户端工厂

mqttClientFactory方法创建了一个MqttPahoClientFactory实例,用于配置MQTT客户端的连接选项。

@Bean
public MqttPahoClientFactory mqttClientFactory() {
    DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
    MqttConnectOptions options = new MqttConnectOptions();
    options.setServerURIs(new String[]{brokerUrl}); // 设置MQTT服务端地址
    options.setAutomaticReconnect(true); // 开启自动重连
    factory.setConnectionOptions(options);
    System.out.println("Connecting to broker: " + brokerUrl + " OK.");
    return factory;
}
  • brokerUrl:MQTT服务端的地址,通常为tcp://<IP>:<端口>
  • automaticReconnect:开启自动重连功能,确保网络波动时客户端能够自动恢复连接。

1.2 MQTT消息订阅适配器

mqttInbound方法创建了一个MqttPahoMessageDrivenChannelAdapter实例,用于订阅MQTT主题并接收消息。

@Bean
public MqttPahoMessageDrivenChannelAdapter mqttInbound() {
    MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
            clientId + "-inbound", mqttClientFactory(), "testSub/#");
    adapter.setOutputChannelName("mqttInputChannel"); // 绑定到输入通道
    adapter.setQos(1); // 设置QoS级别
    return adapter;
}
  • clientId + "-inbound":客户端ID,需保证唯一性。
  • "testSub/#":订阅的主题,#表示匹配所有子主题。

1.3 MQTT消息发布适配器

mqttOutbound方法创建了一个MqttPahoMessageHandler实例,用于发布消息到MQTT主题。

@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
    MqttPahoMessageHandler handler = new MqttPahoMessageHandler(
            clientId + "-outbound", mqttClientFactory());
    handler.setAsync(true); // 异步发送
    handler.setDefaultQos(1); // 设置QoS级别
    return handler;
}
  • clientId + "-outbound":客户端ID,需保证唯一性。
  • mqttOutboundChannel:消息发送通道。

1.4 消息通道

mqttInputChannelmqttOutboundChannel方法分别创建了输入和输出通道,用于消息的传递。

@Bean
public MessageChannel mqttInputChannel()js {
    return new DirectChannel(); // 直连通道
}

@Bean
public MessageChannel mqttOutboundChannel() {
    return new DirectChannel();
}

2. MQTT消息监听器

代码解析

MqttMessageListener类负责处理从MQTT主题接收到的消息

package com.ruoyi.framework.mqtt;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Component;

@Component
public class MqttMessageListener {

    @Autowired
    private IMqttService mqttService;

    // 处理入站消息
    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
                String payload = message.getPayload().toString();
                Logger log = LoggerFactory.getLogger(MqttMessageListener.class);
                log.info("[MQTT] 收到消息: topic={}, payload={}", topic, payload);

                try {
                    if (topic.startsWith("heartbeat/")) {  //心跳上报
                        mqttService.handleHeartbeat(payload);
                    } else if (topic.startsWith("report/")) {  //数据上报
                        mqttService.handleReport(payload);
                    }
                } catch (Exception e) {
                    log.error("[MQTT] 消息处理失败: {}", e.getMessage());
                }
            }
        };
    }
}

 或Ⅱ

package com.ruoyi.framework.mqtt;

import com.alibaba.fastjson2.JSON;
import com.ruoyi.common.constant.MessageConstants;
import com.ruoyi.power.domain.protocol.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Component;

@Component
public class MqttMessageListener {

    @Autowired
    private IMqttService mqttService;

    // 处理入站消息
    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
                String payload = message.getPayload().toString();
                Loghttp://www.chinasem.cnger log = LoggerFactory.getLogger(MqttMessageListener.class);
                log.info("[MQTT] 收到消息: topic={}, payload={}", topic, payload);

                try {
                    if (topic.startsWith("testSub/")) {
                        BaseMessage baseMsg = JSON.parseobject(payload, BaseMessage.class);
                        switch (baseMsg.getType()) {
                            case MessageConstants.HEART_BEAT:
                                HeartbeatMessage heartbeat = JSON.parseObject(payload, HeartbeatMessage.class);
                                mqttService.handleHeartbeat(heartbeat);
                                break;
                            case MessageConstants.REPORT, MessageConstants.GET_CURRENT_DATA:
                                ReportMessage report = JSON.parseObject(payload, ReportMessage.class);
                                mqttService.handleReport(report);
                                break;
                            case MessageConstants.ALARM:
                                AlarmMessage alarm = JSON.parseObject(payload, AlarmMessage.class);
                         js       mqttService.handleAlarm(alarm);
                                break;
                            case MessageConstants.CALL_ACK:
                                mqttService.handleCallReadAck(baseMsg);
                                break;
                            case MessageConstants.CONTROL_ACK:
                                ControlMessage controlAck = JSON.parseObject(payload, ControlMessage.class);
                                mqttService.handleControlAck(controlAck);
                                break;
                            default:
                                System.err.println("Unknown message type: " + baseMsg.getType());
                        }
                    } else if (topic.startsWith("report/allpoints")) {
                        BaseMessage baseMsg = JSON.parseObject(payload, BaseMessage.class);
                        switch (baseMsg.getType()) {
                            // 如果没收到callAck 则代表采集器没收到callRead
                            case MessageConstants.CALL_ACK:
                                mqttService.handleCallReadAck(baseMsg);
                                break;
                            case MessageConstants.REPORT, MessageConstants.GET_CURRENT_DATA:
                                ReportMessage report = JSON.parseObject(payload, ReportMessage.class);
                                mqttService.handleReport(report);
                                break;
                            case MessageConstants.CONTROL_ACK:
                                ControlMessage controlAck = JSON.parseObject(payload, ControlMessage.class);
                                mqttService.handleControlAck(controlAck);
                                break;
                            case MessageConstants.MULTIVALUESET_ACK:
                                MultiValueSetMessage multvaluesetAck = JSON.parseObject(payload, MultiValueSetMessage.class);
                                mqttService.handleMultiValueSet(multvaluesetAck);
                                break;
                        }
                    }
                } catch (Exception e) {
                    log.error("[MQTT] 消息处理失败: {}", e.getMessage());
                }
            }
        };
    }
}

2.1 消息处理逻辑

handler方法是一个MessageHandler,用于处理从mqttInputChannel接收到的消息。

@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
    return new MessageHandler() {
        @Override
        public void handleMessage(Message<?> message) throws MessagingException {
            String topic = message.getHeaders().get("mqtt_receivedTopic").toString(); // 获取主题
            String payload = message.getPayload().toString(); // 获取消息内容
            Logger log = LoggerFactory.getLogger(MqttMessageListener.class);
            log.info("[MQTT] 收到消息: topic={}, payload={}", topic, payload);

            try {
                if (topic.startsWith("testSub/")) {
                    // 处理订阅主题为testSub的消息
                    BaseMessage baseMsg = JSON.parseObject(payload, BaseMessage.class);
                    switch (baseMsg.getType()) {
                        case MessageConstants.HEART_BEAT:
                            mqttService.handleHeartbeat(JSON.parseObject(payload, HeartbeatMessage.class));
                            break;
                        case MessageConstants.REPORT:
                            mqttService.handleReport(JSON.parseObject(payload, ReportMessage.class));
                            break;
                        // 其他消息类型的处理逻辑
                    }
                } else if (topic.startsWith("report/allpoints")) {
                    // 处理订阅主题为report/allpoints的消息
                }
            } catch (Exception e) {
                log.error("[MQTT] 消息处理失败: {}", e.getMessage());
            }
        }
    };
}
  • mqtt_receivedTopic:从消息头中获取主题。
  • payload:消息内容,通常是JSON格式的字符串。
  • 使用switch语句根据消息类型调用不同的处理方法。

3. MQTT消息网关

代码解析

MqttMessageGateway接口提供了一个简单的发送消息的方法。

package com.ruoyi.framework.mqtt;

import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;

@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttMessageGateway {

    void sendMessage(@Header(MqttHeaders.TOPIC) String topic, String payload);

}
  • @MessagingGateway:声明一个消息网关,defaultRequestChannel指定默认的发送通道。
  • @Header(MqttHeaders.TOPIC):指定消息的主题。

使用示例:

@Autowired
private MqttMessageGateway mqttMessageGateway;

public void publishMessage() {
    mqttMessageGateway.sendMessage("testSub/topic", "{\"key\":\"value\"}");
}

4. MQTT服务接口

代码解析

IMqttService接口定义了处理不同类型消息的方法。

package com.ruoyi.framework.mqtt;

import com.ruoyi.power.domain.protocol.*;

public interface IMqttService {

    /**
     * 处理心跳数据
     * @param heartbeat MQTT 消息内容
     */
    void handleHeartbeat(HeartbeatMessage heartbeat);

    /**
     * 处理上报数据
     * @param report MQTT 消息内容
     */
    void handleReport(ReportMessage report);

    /**
     * 服务器发送遥控命令到采集器
     * 服务器发送遥调命令到采集器
     * @param controlMessage 遥控命令
     */
    void sendControl(ControlMessage controlMessage);

    /**
     * 处理上报仪表报警
     * @param alarm 报警内容
     * @return String 配置内容
     */
    void handleAlarm(AlarmMessage alarm);

    /**
     * 下发控制命令到指定网关
     * @param saleid 配电站ID
     * @param gateid 网关ID
     */
php    void sendCallRead(String saleid, String gateid, String startTime, String endTime);

    /**
     * 采集器响应召读命令(响应召读命令回复包,不代表召读时间段的数据一定存在,采集器收到召读命令后首先回复
     * 此数据包,下一不再查找相应历史数据, 存在即发送,不存在不发送 )
     * @param baseMsg 采集器响应召读命令(
     */
    void handleCallReadAck(BaseMessage baseMsg);

    /**
     * 采集器发送执行结果到服务器
     * @param controlAck
     */
    void handleControlAck(ControlMessage controlAck);

    /**
     * 由服务器发布获取数据命令到采集器
     * @param baseMessage
     */
    void getCurrentData(BaseMessage baseMessage);

    /**
     *
     * @param multiValueSetMessage
     */
    void sendMultiValueSet(MultiValueSetMessage multiValueSetMessage);

    /**
     * 处理相应采集器接收到服务器的命令
     * @param multiValueSetMessage
     */
    void handleMultiValueSet(MultiValueSetMessage multiValueSetMessage);

}
  • 每个方法对应一种消息类型的处理逻辑。
  • 实现该接口的类需要提供具体的业务逻辑。

5. 使用说明

5.1 配置MQTT参数

application.yml中配置MQTT的相关参数:

mqtt:
  broker-url: tcp://127.0.0.1:1883
  client-id: mqtt-client-123

5.2 实现IMqttService接口

创建一个类实现IMqttService接口,并提供具体的业务逻辑。例如:

package com.ruoyi.framework.mqtt;

import com.alibaba.fastjson2.JSON;
import com.baomidou.myBATisplus.core.toolkit.ObjectUtils;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.ruoyi.common.constant.MessageConstants;
import com.ruoyi.common.constant.OperationConstants;
import com.ruoyi.common.constant.ResultConstants;
import com.ruoyi.common.utils.bean.BeanUtils;
import com.ruoyi.power.config.CustomIdGenerator;
import com.ruoyi.power.domain.*;
import com.ruoyi.power.domain.protocol.*;
import com.ruoyi.power.mapper.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.time.LocalDateTime;
import java.util.*;

@Slf4j
@Service
public class MqttServiceImpl implements IMqttService {

    @Autowired
    private MqttMessageGateway mqttGateway;

    @Autowired
    private HeartBeatMapper heartbeatMapper;

    @Autowired
    private GatewayInfoMapper gatewayInfoMapper;

    @Autowired
    private ReportMapper reportMapper;

    @Autowired
    private ReportMeterMapper reportMeterMapper;

    @Autowired
    private AlarmMapper alarmMapper;

    @Autowired
    private AlarmDetailMapper alarmDetailMapper;

    @Autowired
    private ControlMapper controlMapper;

    // 处理心跳数据
    @Override
    public void handleHeartbeat(HeartbeatMessage heartbeat) {
        try {
            // 心跳存储到数据库
            HeartBeat heartBeat = new HeartBeat();
            heartBeat.setId(String.valueOf(CustomIdGenerator.SEQUENCE.nextId()));
            heartBeat.setGateId(heartbeat.getGateid());
            heartBeat.setType(heartbeat.getType());
            heartBeat.setSaleId(heartbeat.getSaleid());
            heartBeat.setTime(heartbeat.getTime());
            heartBeat.setOperation(heartbeat.getOperation());
            heartbeatMapper.insertHeartBeat(heartBeat);
            log.info("[心跳数据] 存储成功: substationId={}, gatewayId={}",
                    heartbeat.getSaleid(), heartbeat.getGateid());

            // 查询或创建网关记录
            GatewayInfo gatewayInfo = gatewayInfoMapper.selectOne(Wrappers.<GatewayInfo>lambdaQuery().eq(GatewayInfo::getGateid, heartbeat.getGateid()));
            if(ObjectUtils.isNull(gatewayInfo)) {
                createNewGateway(heartbeat.getSaleid(), heartbeat.getGateid());
            } else {
                gatewayInfo.setLastHeartbeatTime(LocalDateTime.now());
                gatewayInfo.setUpdateTime(LocalDateTime.now());
                int updated = gatewayInfoMapper.updateGatewayInfo(gatewayInfo);
                if(updated == 0) {
                    log.warn("心跳更新冲突 saleid:{}, gateid:{}", heartbeat.getSaleid(), heartbeat.getGateid());
                }
            }

            // 如果网关请求心跳,响应心跳
            sendHeartbeat(heartbeat.getSaleid(), heartbeat.getGateid(), heartbeat);
        } catch (Exception e) {
            log.error("[心跳数据] 处理失败: {}", e.getMessage());
        }
    }

    // 创建新网关记录
    private void createNewGateway(String saleid, String gateid) {
        GatewayInfo newGateway = new GatewayInfo();
        newGateway.setId(String.valueOf(CustomIdGenerator.SEQUENCE.nextId()));
        newGateway.setSaleid(saleid);
        newGateway.setGateid(gateid);
        newGateway.setLastHeartbeatTime(LocalDateTime.now());
        newGateway.setStatus("0");
        newGateway.setCheckInterval(60L); // 默认间隔
        newGateway.setCreateTime(LocalDateTime.now());
        gatewayInfoMapper.insertGatewayInfo(newGateway);
    }

    // 下发心跳
    private void sendHeartbeat(String saleid, String gateid, HeartbeatMessage heartbeat) {
        String topic = String.format("report/allpoints", saleid, gateid);
        heartbeat.setOperation(OperationConstants.TIME);
        mqttGateway.sendMessage(topic, JSON.toJSONString(heartbeat));
        log.info("[配置下发] topic={}, config={}", topic, JSON.toJSONString(heartbeat));
    }

    // 处理上报数据
    @Override
    public void handleReport(ReportMessage report) {
        try {
            // 存储到仪表信息表 转换为仪表信息表(meterMapper)
            String reportId = createReportData(report);
            // 批量存储仪表数据
            List<ReportMeter> meterEntities = report.getMeter().stream()
                    .map(m -> {
                        ReportMeter entity = new ReportMeter();
                        entity.setId(String.valueOf(CustomIdGenerator.SEQUENCE.nextId()));
                        entity.setReportId(reportId);
                        entity.setMeterId(m.getId());
                        entity.setStatus(m.getStatus());
                        entity.setName(m.getName());
                        entity.setValuesJson(JSON.toJSONString(m.getValues()));
                        return entity;
                    }).toList();
            for (ReportMeter meter : meterEntities) {
                reportMeterMapper.insertReportMeter(meter);
            }
            log.info("[上报数据] 存储成功: substationId={}, gatewayId={}",
                    report.getSaleid(), report.getGateid());
        } catch (Exception e) {
            log.error("[上报数据] 处理失败: {}", e.getMessage());
        }
    }

    // 创建新数据记录
    private String createReportData(ReportMessage report) {
        Report rep = new Report();
        rep.setId(String.valueOf(CustomIdGenerator.SEQUENCE.nextId()));
        rep.setSaleid(report.getSaleid());
        rep.setGateid(report.getGateid());
        rep.setTime(report.getTime());
        rep.setType(report.getType());
        rep.setSequence(report.getSequence());
        rep.setSource(report.getSource());
        rep.setCreateTime(LocalDateTime.now());
        reportMapper.insert(rep);
        return rep.getId();
    }

    // 下发控制命令
    @Override
    public void sendControl(ControlMessage controlMessage) {
        ControlMessage message = new ControlMessage();
        message.setSaleid(controlMessage.getSaleid());
        message.setGateid(controlMessage.getGateid());
        message.setType(controlMessage.getType());
        message.setCuuid(LocalDateTime.now().toString());
        message.setTime(LocalDateTime.now());
        message.setMeterid(controlMessage.getMeterid());
        message.setName(controlMessage.getName());
        message.setFunctionid(controlMessage.getFunctionid());
        message.setValue(controlMessage.getValue());
        // 存储到控制记录表
        createControl(controlMessjavascriptage);
        String topic = String.format("report/allpoints", message);
        mqttGateway.sendMessage(topic, JSON.toJSONString(message));
        log.info("[控制命令] topic={}, command={}", topic, JSON.toJSONString(message));
    }

    private void createControl(ControlMessage controlMessage) {
        Control control = new Control();
        control.setId(String.valueOf(CustomIdGenerator.SEQUENCE.nextId()));
        control.setSaleid(controlMessage.getSaleid());
        control.setGateid(controlMessage.getGateid());
        control.setType(controlMessage.getType());
        control.setCuuid(controlMessage.getCuuid());
        control.setTime(controlMessage.getTime());
        control.setMeterid(controlMessage.getMeterid());
        control.setName(controlMessage.getName());
        control.setFunctionid(controlMessage.getFunctionid());
        control.setValue(controlMessage.getValue());
        control.setResult(controlMessage.getResult());
        control.setErrordesc(controlMessage.getErrordesc());
        controlMapper.insertControl(control);
    }

    @Override
    public void handleAlarm(AlarmMessage alarmMessage) {
        try {
            // 存储报警信息表 转换为报警信息表(alarmMapper)
            String alarmId = createAlarmData(alarmMessage);
            // 批量存储仪表数据
            List<AlarmDetail> alarmEntities = alarmMessage.getFunction().stream()
                    .map(m -> {
                        AlarmDetail entity = new AlarmDetail();
                        entity.setId(String.valueOf(CustomIdGenerator.SEQUENCE.nextId()));
                        entity.setAlarmId(alarmId);
                        entity.setPtId(m.getId());
                        entity.setAlarmType(m.getAlarmType());
                        entity.setLabel(m.getLabel());
                        entity.setCurrentValue(m.getCurrentValue());
                        entity.setSettingValue(m.getSettingValue());
                        entity.setLevel(m.getLevel());
                        return entity;
                    }).toList();
            for (AlarmDetail alarm : alarmEntities) {
                alarmDetailMapper.insertAlarmDetail(alarm);
            }
            log.info("[上报数据] 存储成功: substationId={}, gatewayId={}",
                    alarmMessage.getSaleid(), alarmMessage.getGateid());

        } catch (Exception e) {
            log.error("[上报数据] 处理失败: {}", e.getMessage());
        }

    }

    @Override
    public void sendCallRead(String saleid, String gateid, String startTime, String endTime) {
        HashMap<String, String> protocol = new HashMap<>();
        protocol.put("saleid", saleid);
        protocol.put("gateid", gateid);
        protocol.put("type", MessageConstants.CALL_READ);
        protocol.put("time", String.valueOf(LocalDateTime.now()));
        protocol.put("startTime", startTime);
        protocol.put("endTime", endTime);
        String topic = String.format("report/allpoints", saleid, gateid);
        mqttGateway.sendMessage(topic, JSON.toJSONString(protocol));
        log.info("[控制命令] topic={}, command={}", topic, JSON.toJSONString(protocol));
    }

    @Override
    public void handleCallReadAck(BaseMessage baseMsg) {
        Report report = new Report();
        report.setId(String.valueOf(CustomIdGenerator.SEQUENCE.nextId()));
        report.setSaleid(baseMsg.getSaleid());
        report.setGateid(baseMsg.getGateid());
        report.setTime(baseMsg.getTime());
        report.setType(baseMsg.getType());
        reportMapper.insert(report);
    }

    @Override
    public void handleControlAck(ControlMessage controlAck) {
        if(ResultConstants.FAILURE.equals(controlAck.getResult())) {
            createControl(controlAck);
            // 配置或设备问题,记录错误并报警
            log.error("控制失败(不可重试): {}", controlAck.getErrordesc());
        } else if(ResultConstants.SUCCESS.equals(controlAck.getResult())) {
            createControl(controlAck);
            log.info("控制成功: {}", controlAck.getCuuid());
        }
    }

    @Override
    public void getCurrentData(BaseMessage baseMessage) {
        String topic = String.format("report/allpoints", baseMessage.getSaleid(), baseMessage.getGateid());
        mqttGateway.sendMessage(topic, JSON.toJSONString(baseMessage));
        log.info("[控制命令] topic={}, command={}", topic, JSON.toJSONString(baseMessage));
    }

    @Override
    public void sendMultiValueSet(MultiValueSetMessage multiValueSetMessage) {

    }

    @Override
    public void handleMultiValueSet(MultiValueSetMessage multiValueSetMessage) {
        String topic = String.format("report/allpoints", multiValueSetMessage.getSaleid(), multiValueSetMessage.getGateid());
        ControlMessage controlMessage = new ControlMessage();
        try {
            mqttGateway.sendMessage(topic, JSON.toJSONString(multiValueSetMessage));
            log.info("[控制命令] topic={}, command={}", topic, JSON.toJSONString(multiValueSetMessage));
            if(ResultConstants.SUCCESS.equals(multiValueSetMessage.getResult())) {
                BeanUtils.copyProperties(multiValueSetMessage, controlMessage);
                createControl(controlMessage);
                log.info("[控制命令] topic={}, command={}", topic, JSON.toJSONString(multiValueSetMessage));
            } else if(ResultConstants.FAILURE.equals(multiValueSetMessage.getResult())){
                BeanUtils.copyProperties(multiValueSetMessage, controlMessage);
                createControl(controlMessage);
                log.error("[控制失败] topic={}, command={}", topic, JSON.toJSONString(multiValueSetMessage));
            }
        } catch (Exception e) {
            log.error("[控制命令] topic={}, command={}", topic, JSON.toJSONString(multiValueSetMessage));
        }

    }

    private String createAlarmData(AlarmMessage alarmMessage) {
        Alarm alarm = new Alarm();
        alarm.setId(String.valueOf(CustomIdGenerator.SEQUENCE.nextId()));
        alarm.setSaleid(alarmMessage.getSaleid());
        alarm.setGateid(alarmMessage.getGateid());
        alarm.setTime(alarmMessage.getTime());
        alarm.setType(alarmMessage.getType());
        alarm.setSequence(alarmMessage.getSequence());
        alarm.setName(alarmMessage.getName());
        alarm.setMeterid(alarmMessage.getMeterid());
        alarm.setCreateTime(LocalDateTime.now());
        alarmMapper.insert(alarm);
        return alarm.getId();
    }

}

5.3 发送MQTT消息

通过MqttMessageGateway发送消息:

@Autowired
private MqttMessageGateway mqttMessageGateway;

public void sendTestMessage() {
    mqttMessageGateway.sendMessage("testSub/topic", "{\"key\":\"value\"}");
}

6. 总结

本文介绍了如何使用Spring Integration框架实现MQTT协议的对接,包括客户端的配置、消息的订阅与发布、以及消息的处理逻辑。通过上述代码,您可以快速实现Java与MQTT的集成,并根据业务需求扩展消息的处理逻辑。

到此这篇关于Java对接MQTT协议的文章就介绍到这了,更多相关Java对接MQTT协议内容请搜索China编程(www.chinasem.cn)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程China编程(www.chinasem.cn)!

这篇关于Java对接MQTT协议的完整实现示例代码的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1155663

相关文章

MySQL容灾备份的实现方案

《MySQL容灾备份的实现方案》进行MySQL的容灾备份是确保数据安全和业务连续性的关键步骤,容灾备份可以分为本地备份和远程备份,主要包括逻辑备份和物理备份两种方式,下面就来具体介绍一下... 目录一、逻辑备份1. 使用mysqldump进行逻辑备份1.1 全库备份1.2 单库备份1.3 单表备份2. 恢复

MySQL中处理数据的并发一致性的实现示例

《MySQL中处理数据的并发一致性的实现示例》在MySQL中处理数据的并发一致性是确保多个用户或应用程序同时访问和修改数据库时,不会导致数据冲突、数据丢失或数据不一致,MySQL通过事务和锁机制来管理... 目录一、事务(Transactions)1. 事务控制语句二、锁(Locks)1. 锁类型2. 锁粒

Spring Boot项目如何使用外部application.yml配置文件启动JAR包

《SpringBoot项目如何使用外部application.yml配置文件启动JAR包》文章介绍了SpringBoot项目通过指定外部application.yml配置文件启动JAR包的方法,包括... 目录Spring Boot项目中使用外部application.yml配置文件启动JAR包一、基本原理

SpringBoot加载profile全面解析

《SpringBoot加载profile全面解析》SpringBoot的Profile机制通过多配置文件和注解实现环境隔离,支持开发、测试、生产等不同环境的灵活配置切换,无需修改代码,关键点包括配置文... 目录题目详细答案什么是 Profile配置 Profile使用application-{profil

MyBatis流式查询两种实现方式

《MyBatis流式查询两种实现方式》本文详解MyBatis流式查询,通过ResultHandler和Cursor实现边读边处理,避免内存溢出,ResultHandler逐条回调,Cursor支持迭代... 目录MyBATis 流式查询详解:ResultHandler 与 Cursor1. 什么是流式查询?

Java中InputStream重复使用问题的几种解决方案

《Java中InputStream重复使用问题的几种解决方案》在Java开发中,InputStream是用于读取字节流的类,在许多场景下,我们可能需要重复读取InputStream中的数据,这篇文章主... 目录前言1. 使用mark()和reset()方法(适用于支持标记的流)2. 将流内容缓存到字节数组

Java慢查询排查与性能调优完整实战指南

《Java慢查询排查与性能调优完整实战指南》Java调优是一个广泛的话题,它涵盖了代码优化、内存管理、并发处理等多个方面,:本文主要介绍Java慢查询排查与性能调优的相关资料,文中通过代码介绍的非... 目录1. 事故全景:从告警到定位1.1 事故时间线1.2 关键指标异常1.3 排查工具链2. 深度剖析:

Springboot项目登录校验功能实现

《Springboot项目登录校验功能实现》本文介绍了Web登录校验的重要性,对比了Cookie、Session和JWT三种会话技术,分析其优缺点,并讲解了过滤器与拦截器的统一拦截方案,推荐使用JWT... 目录引言一、登录校验的基本概念二、HTTP协议的无状态性三、会话跟android踪技术1. Cook

C++归并排序代码实现示例代码

《C++归并排序代码实现示例代码》归并排序将待排序数组分成两个子数组,分别对这两个子数组进行排序,然后将排序好的子数组合并,得到排序后的数组,:本文主要介绍C++归并排序代码实现的相关资料,需要的... 目录1 算法核心思想2 代码实现3 算法时间复杂度1 算法核心思想归并排序是一种高效的排序方式,需要用

mybatis用拦截器实现字段加解密全过程

《mybatis用拦截器实现字段加解密全过程》本文通过自定义注解和MyBatis拦截器实现敏感信息加密,处理Parameter和ResultSet,确保数据库存储安全且查询结果解密可用... 目录前言拦截器的使用总结前言根据公司业务需要,灵活对客户敏感信息进行加解密,这里采用myBATis拦截器进行简单实