Spring Integration 是什么?

2024-03-25 09:12
文章标签 java spring integration

本文主要是介绍Spring Integration 是什么?,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Spring Integration 是什么?

Spring Integration 在 Spring 家族不太有名气,如果不是有需求,一般也不会仔细去看。那么 Spring Integration 是什么呢?用官方的一句话来解释就是:它是一种轻量级消息传递模块,并支持通过声明式适配器与外部系统集成。简单来说,Spring Integration 抽象了用于消息传递的一套规范,并且基于这套规范提供了很多企业级的中间件的集成。比如他支持基于 AMQP 的消息队列、MQTT、RMI 等等中间件。

用过 Spring 家族组件的同学应该会比较容易理解了。例如,Spring Data 抽象了数据访问的一系列接口,后端可支持多种 ORM;Spring Cache 抽象了缓存使用的接口,后端支持 Caffeine、Redis、Memcached 等缓存中间件。其实这都是一样的。好处是,我们只需要熟悉这一种规范,就可以任意的去对接各种企业级框架,起到快速开发的作用;劣势是,这些企业级的框架只能再 Spring 抽象的这套规范下工作,对于一些细节的开发,可能仍然需要使用原生的框架来实现。

本文主要介绍的是 Spring Integration,以及它是如何集成 MQTT 协议的。

Spring Integration 消息抽象

刚刚我们讲了,Spring Integration 实际上就是抽象出了消息传递的规范,然后再适配各种消息中间件。那么下面我们先简单了解下 Spring Integration 消息通信的模式。

image.png

image.png

image.png

image.png

image.png

image.png

以上几张官方提供的图可以大致厘清 Spring Integration 的各类组件和工作模式:

  1. Message 包含 Header 和 Payload 两部分。
  2. MessageChannel 用于解耦生产者和消费者,实现消息发送。
  3. MessageRouter 用来控制消息转发的 Channel。
  4. Service Activitor 用来绑定 MessageHandler 和用于消费消息的 MessageChannel。
  5. ChannelAdapter 用来连接 MessageChannel 和具体的消息端口,例如通信的 topic。

在开发上就需要去了解这些抽象组件的具体实现了,在下面讲到 MQTT 的集成上可以再体会一下 SI 的设计思路。

MQTT 协议

MQTT is an OASIS standard messaging protocol for the Internet of Things (IoT). It is designed as an extremely lightweight publish/subscribe messaging transport that is ideal for connecting remote devices with a small code footprint and minimal network bandwidth. MQTT today is used in a wide variety of industries, such as automotive, manufacturing, telecommunications, oil and gas, etc.

这是 MQTT 协议的官方描述,它是一种应用于物联网的轻量级的发布订阅协议,类似于 AMQP。详细了解可以参考:

  • MQTT Specifications
  • [emqx mqtt 协议介绍](docs.emqx.cn/broker/v4.3… 协议)
  • MQTT 协议中文版
  • 消息推送标准协议:MQTT

下面提一些重要的或者开发中需要配置的点。

通信方式

默认是发布 / 订阅模式的。

  1. 通信系统中有发布者和订阅者。发布者发布消息而订阅者接收消息。我们把发布者和订阅者统称为客户端。客户端可以同时是发布者和订阅者。
  2. 在系统中有另外一个角色,它接收发布者的消息并且将消息派发给订阅者。我们一般称这个角色为消息 Broker。
  3. 在 MQTT 中默认是广播的,也就是说订阅了相同 topic 的订阅者都能收到发布者发送的消息。

基于主题 (Topic) 消息路由

MQTT 协议基于主题 (Topic) 进行消息路由,主题 (Topic) 类似 URL 路径,例如:

 

bash

复制代码

chat/room/1 sensor/10/temperature sensor/+/temperature

主题 (Topic) 通过'/'分割层级,支持'+', '#'通配符:

  • '+': 表示通配一个层级,例如 a/+,匹配 a/x, a/y
  • '#': 表示通配多个层级,例如 a/#,匹配 a/x, a/b/c/d
  • 订阅者可以订阅含通配符主题,但发布者不允许向含通配符主题发布消息。

QoS

为了满足不同的场景,MQTT 支持三种不同级别的服务质量(Quality of Service,QoS)为不同场景提供消息可靠性:

  • 0:At most once。消息发送者会想尽办法发送消息,但是遇到意外并不会重试。
  • 1:At least once。消息接收者如果没有知会或者知会本身丢失,消息发送者会再次发送以保证消息接收者至少会收到一次,当然可能造成重复消息。
  • 2:Exactly onces。保证这种语义肯待会减少并发或者增加延时,不过丢失或者重复消息是不可接受的时候,级别 2 是最合适的。

订阅者收到 MQTT 消息的 QoS 级别,最终取决于发布消息的 QoS 和主题订阅的 QoS

Broker 选型

本文使用的 MQTT Broker 是 EMQ X 的开源版。

EMQ X (Erlang/Enterprise/Elastic MQTT Broker) 是基于 Erlang/OTP 平台开发的开源物联网 MQTT 消息服务器。 Erlang/OTP 是出色的软实时 (Soft-Realtime)、低延时 (Low-Latency)、分布式 (Distributed) 的语言平台。

客户端代码集成

Java 客户端一般使用 Eclipse Paho Java Client,此客户端为 Java SE 版本的,为了在 SpringBoot 上有更好的集成,这里我们使用 Spring Integration,Spring Integration MQTT Support 默认集成的就是 Eclipse Paho Java Client V3 版本。

依赖和参数配置

 

xml

复制代码

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency>

 

yml

复制代码

mqtt: url: tcp://172.17.218.94:1883 username: admin password: public clientId: mqtt-sender

 

java

复制代码

@Data @Component @ConfigurationProperties(prefix = "mqtt") public class MqttProperties { private String url; private String username; private String password; private String clientId; }

发布者配置

 

java

复制代码

@Configuration @IntegrationComponentScan public class MqttConfig { @Autowired private MqttProperties prop; @Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); mqttConnectOptions.setServerURIs(new String[]{prop.getUrl()}); mqttConnectOptions.setUserName(prop.getUsername()); mqttConnectOptions.setPassword(prop.getPassword().toCharArray()); // 客户端断线时暂时不清除,直到超时注销 mqttConnectOptions.setCleanSession(false); mqttConnectOptions.setAutomaticReconnect(true); factory.setConnectionOptions(mqttConnectOptions); return factory; } @Bean @ServiceActivator(inputChannel = "mqttOutboundChannel") public MessageHandler mqttOutbound(MqttPahoClientFactory mqttClientFactory) { MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler( prop.getClientId() + "-pub-" + Instant.now().toEpochMilli(), mqttClientFactory); messageHandler.setAsync(true); messageHandler.setDefaultRetained(false); messageHandler.setAsyncEvents(false); // Exactly Once messageHandler.setDefaultQos(2); messageHandler.setDefaultTopic(ApiConst.MQTT_TOPIC_SUFFIX); return messageHandler; } @Bean public MessageChannel mqttOutboundChannel() { return new DirectChannel(); } } @Component @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") public interface MqttTemplate { void send(String payload); void sendToTopic(String payload, @Header(MqttHeaders.TOPIC) String topic); void sendToTopic(String payload, @Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos); }

  1. @IntegrationComponentScan,开启 Spring Integration 的注解扫描。
  2. 注入客户端工厂类 MqttPahoClientFactory,此处可以配置认证参数、超时时间等 broker 连接参数。
  3. 注入 MessageChannel 实例。
  4. 注入 MessageHandler 的实例,并通过 @ServiceActivator 绑定到对应的 MessageChannel。此处可配置消息处理的模式、QoS、默认的 Topic 等。
  5. 定义一个 @MessagingGateway 修饰的接口,用于消息的发送,@MessagingGatewaydefaultRequestChannel 参数用于绑定具体的 MessageChannel
  6. 在使用的地方自动注入 MqttTemplate 的实例,即可调用方法发送消息。

订阅者配置

 

java

复制代码

@Configuration @IntegrationComponentScan public class MqttConfig { private final MqttProperties prop; private final MqttInboundMessageHandler mqttInboundMessageHandler; public MqttConfig(MqttProperties prop, MqttInboundMessageHandler mqttInboundMessageHandler) { this.prop = prop; this.mqttInboundMessageHandler = mqttInboundMessageHandler; } @Bean public MessageProducerSupport mqttInbound(MqttPahoClientFactory mqttClientFactory) { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(prop.getClientId() + "-sub-" + Instant.now().toEpochMilli(), mqttClientFactory, "facego/reply"); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setQos(2); adapter.setOutputChannel(mqttInboundChannel()); return adapter; } @Bean @ServiceActivator(inputChannel = "mqttInboundChannel") public MessageHandler InboundMessageHandler() { return mqttInboundMessageHandler; } @Bean public MessageChannel mqttInboundChannel() { return new DirectChannel(); } } @Slf4j @Component public class MqttInboundMessageHandler implements MessageHandler { @Override public void handleMessage(Message<?> message) throws MessagingException { log.info("mqtt reply: {}", message.getPayload()); } }

  1. 注入消息处理的 MessageChannel
  2. 注入自己实现的 MqttInboundMessageHandler,并通过 @ServiceActivator 绑定到对应的 MessageChannel
  3. 注入 Channel Adapter 的实例,配置客户端订阅的 Topic 和相应的 MessageChannel

Spring Integration 大致交互逻辑

对于发布者:

  1. 消息通过消息网关发送出去,由 MessageChannel 的实例 DirectChannel 处理发送的细节。
  2. DirectChannel 收到消息后,内部通过 MessageHandler 的实例 MqttPahoMessageHandler 发送到指定的 Topic。

对于订阅者:

  1. 通过注入 MessageProducerSupport 的实例 MqttPahoMessageDrivenChannelAdapter,实现订阅 Topic 和绑定消息消费的 MessageChannel
  2. 同样由 MessageChannel 的实例 DirectChannel 处理消费细节。Channel 消息后会发送给我们自定义的 MqttInboundMessageHandler 实例进行消费。

可以看到整个处理的流程和前面将的基本一致。Spring Integration 就是抽象出了这么一套消息通信的机制,具体的通信细节由它集成的中间件来决定,这里是 MQTT Eclipse Paho Java Client。

总结

本文主要介绍了 Java 使用 MQTT 通信的方式,由于使用了 SpringBoot,因此使用 Spring Integration 来集成会比直接只用 Eclipse Paho Java Client 更符合 Spring 的哲学,所有的 Bean 均单例注入统一管理。

Spring Integration 的好处在于,我们只需要了解其消息通信的基本机制,屏蔽了 Eclipse Paho Java Client 的具体细节,便于编码。从上面的代码可以看出,我们仅仅注入了相关的 Bean,给出响相应的配置信息即可。

参考文献

  • Spring Integration Reference Guide
  • Spring Integration 中文手册(完整版)
  • SpringBoot 集成 MQTT 配置
  • Spring Boot 集成 MQTT
  • 消息推送标准协议:MQTT

这篇关于Spring Integration 是什么?的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/844535

相关文章

Java+AI驱动实现PDF文件数据提取与解析

《Java+AI驱动实现PDF文件数据提取与解析》本文将和大家分享一套基于AI的体检报告智能评估方案,详细介绍从PDF上传、内容提取到AI分析、数据存储的全流程自动化实现方法,感兴趣的可以了解下... 目录一、核心流程:从上传到评估的完整链路二、第一步:解析 PDF,提取体检报告内容1. 引入依赖2. 封装

使用Spring Cache本地缓存示例代码

《使用SpringCache本地缓存示例代码》缓存是提高应用程序性能的重要手段,通过将频繁访问的数据存储在内存中,可以减少数据库访问次数,从而加速数据读取,:本文主要介绍使用SpringCac... 目录一、Spring Cache简介核心特点:二、基础配置1. 添加依赖2. 启用缓存3. 缓存配置方案方案

Java实现复杂查询优化的7个技巧小结

《Java实现复杂查询优化的7个技巧小结》在Java项目中,复杂查询是开发者面临的“硬骨头”,本文将通过7个实战技巧,结合代码示例和性能对比,手把手教你如何让复杂查询变得优雅,大家可以根据需求进行选择... 目录一、复杂查询的痛点:为何你的代码“又臭又长”1.1冗余变量与中间状态1.2重复查询与性能陷阱1.

深度剖析SpringBoot日志性能提升的原因与解决

《深度剖析SpringBoot日志性能提升的原因与解决》日志记录本该是辅助工具,却为何成了性能瓶颈,SpringBoot如何用代码彻底破解日志导致的高延迟问题,感兴趣的小伙伴可以跟随小编一起学习一下... 目录前言第一章:日志性能陷阱的底层原理1.1 日志级别的“双刃剑”效应1.2 同步日志的“吞吐量杀手”

Spring创建Bean的八种主要方式详解

《Spring创建Bean的八种主要方式详解》Spring(尤其是SpringBoot)提供了多种方式来让容器创建和管理Bean,@Component、@Configuration+@Bean、@En... 目录引言一、Spring 创建 Bean 的 8 种主要方式1. @Component 及其衍生注解

SpringBoot通过main方法启动web项目实践

《SpringBoot通过main方法启动web项目实践》SpringBoot通过SpringApplication.run()启动Web项目,自动推断应用类型,加载初始化器与监听器,配置Spring... 目录1. 启动入口:SpringApplication.run()2. SpringApplicat

Java利用@SneakyThrows注解提升异常处理效率详解

《Java利用@SneakyThrows注解提升异常处理效率详解》这篇文章将深度剖析@SneakyThrows的原理,用法,适用场景以及隐藏的陷阱,看看它如何让Java异常处理效率飙升50%,感兴趣的... 目录前言一、检查型异常的“诅咒”:为什么Java开发者讨厌它1.1 检查型异常的痛点1.2 为什么说

基于Java开发一个极简版敏感词检测工具

《基于Java开发一个极简版敏感词检测工具》这篇文章主要为大家详细介绍了如何基于Java开发一个极简版敏感词检测工具,文中的示例代码简洁易懂,感兴趣的小伙伴可以跟随小编一起学习一下... 目录你是否还在为敏感词检测头疼一、极简版Java敏感词检测工具的3大核心优势1.1 优势1:DFA算法驱动,效率提升10

Java使用正则提取字符串中的内容的详细步骤

《Java使用正则提取字符串中的内容的详细步骤》:本文主要介绍Java中使用正则表达式提取字符串内容的方法,通过Pattern和Matcher类实现,涵盖编译正则、查找匹配、分组捕获、数字与邮箱提... 目录1. 基础流程2. 关键方法说明3. 常见场景示例场景1:提取所有数字场景2:提取邮箱地址4. 高级

使用SpringBoot+InfluxDB实现高效数据存储与查询

《使用SpringBoot+InfluxDB实现高效数据存储与查询》InfluxDB是一个开源的时间序列数据库,特别适合处理带有时间戳的监控数据、指标数据等,下面详细介绍如何在SpringBoot项目... 目录1、项目介绍2、 InfluxDB 介绍3、Spring Boot 配置 InfluxDB4、I