Spring Integration 是什么?

2024-03-25 09:12
文章标签 java spring integration

本文主要是介绍Spring Integration 是什么?,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Spring Integration 是什么?

Spring Integration 在 Spring 家族不太有名气,如果不是有需求,一般也不会仔细去看。那么 Spring Integration 是什么呢?用官方的一句话来解释就是:它是一种轻量级消息传递模块,并支持通过声明式适配器与外部系统集成。简单来说,Spring Integration 抽象了用于消息传递的一套规范,并且基于这套规范提供了很多企业级的中间件的集成。比如他支持基于 AMQP 的消息队列、MQTT、RMI 等等中间件。

用过 Spring 家族组件的同学应该会比较容易理解了。例如,Spring Data 抽象了数据访问的一系列接口,后端可支持多种 ORM;Spring Cache 抽象了缓存使用的接口,后端支持 Caffeine、Redis、Memcached 等缓存中间件。其实这都是一样的。好处是,我们只需要熟悉这一种规范,就可以任意的去对接各种企业级框架,起到快速开发的作用;劣势是,这些企业级的框架只能再 Spring 抽象的这套规范下工作,对于一些细节的开发,可能仍然需要使用原生的框架来实现。

本文主要介绍的是 Spring Integration,以及它是如何集成 MQTT 协议的。

Spring Integration 消息抽象

刚刚我们讲了,Spring Integration 实际上就是抽象出了消息传递的规范,然后再适配各种消息中间件。那么下面我们先简单了解下 Spring Integration 消息通信的模式。

image.png

image.png

image.png

image.png

image.png

image.png

以上几张官方提供的图可以大致厘清 Spring Integration 的各类组件和工作模式:

  1. Message 包含 Header 和 Payload 两部分。
  2. MessageChannel 用于解耦生产者和消费者,实现消息发送。
  3. MessageRouter 用来控制消息转发的 Channel。
  4. Service Activitor 用来绑定 MessageHandler 和用于消费消息的 MessageChannel。
  5. ChannelAdapter 用来连接 MessageChannel 和具体的消息端口,例如通信的 topic。

在开发上就需要去了解这些抽象组件的具体实现了,在下面讲到 MQTT 的集成上可以再体会一下 SI 的设计思路。

MQTT 协议

MQTT is an OASIS standard messaging protocol for the Internet of Things (IoT). It is designed as an extremely lightweight publish/subscribe messaging transport that is ideal for connecting remote devices with a small code footprint and minimal network bandwidth. MQTT today is used in a wide variety of industries, such as automotive, manufacturing, telecommunications, oil and gas, etc.

这是 MQTT 协议的官方描述,它是一种应用于物联网的轻量级的发布订阅协议,类似于 AMQP。详细了解可以参考:

  • MQTT Specifications
  • [emqx mqtt 协议介绍](docs.emqx.cn/broker/v4.3… 协议)
  • MQTT 协议中文版
  • 消息推送标准协议:MQTT

下面提一些重要的或者开发中需要配置的点。

通信方式

默认是发布 / 订阅模式的。

  1. 通信系统中有发布者和订阅者。发布者发布消息而订阅者接收消息。我们把发布者和订阅者统称为客户端。客户端可以同时是发布者和订阅者。
  2. 在系统中有另外一个角色,它接收发布者的消息并且将消息派发给订阅者。我们一般称这个角色为消息 Broker。
  3. 在 MQTT 中默认是广播的,也就是说订阅了相同 topic 的订阅者都能收到发布者发送的消息。

基于主题 (Topic) 消息路由

MQTT 协议基于主题 (Topic) 进行消息路由,主题 (Topic) 类似 URL 路径,例如:

 

bash

复制代码

chat/room/1 sensor/10/temperature sensor/+/temperature

主题 (Topic) 通过'/'分割层级,支持'+', '#'通配符:

  • '+': 表示通配一个层级,例如 a/+,匹配 a/x, a/y
  • '#': 表示通配多个层级,例如 a/#,匹配 a/x, a/b/c/d
  • 订阅者可以订阅含通配符主题,但发布者不允许向含通配符主题发布消息。

QoS

为了满足不同的场景,MQTT 支持三种不同级别的服务质量(Quality of Service,QoS)为不同场景提供消息可靠性:

  • 0:At most once。消息发送者会想尽办法发送消息,但是遇到意外并不会重试。
  • 1:At least once。消息接收者如果没有知会或者知会本身丢失,消息发送者会再次发送以保证消息接收者至少会收到一次,当然可能造成重复消息。
  • 2:Exactly onces。保证这种语义肯待会减少并发或者增加延时,不过丢失或者重复消息是不可接受的时候,级别 2 是最合适的。

订阅者收到 MQTT 消息的 QoS 级别,最终取决于发布消息的 QoS 和主题订阅的 QoS

Broker 选型

本文使用的 MQTT Broker 是 EMQ X 的开源版。

EMQ X (Erlang/Enterprise/Elastic MQTT Broker) 是基于 Erlang/OTP 平台开发的开源物联网 MQTT 消息服务器。 Erlang/OTP 是出色的软实时 (Soft-Realtime)、低延时 (Low-Latency)、分布式 (Distributed) 的语言平台。

客户端代码集成

Java 客户端一般使用 Eclipse Paho Java Client,此客户端为 Java SE 版本的,为了在 SpringBoot 上有更好的集成,这里我们使用 Spring Integration,Spring Integration MQTT Support 默认集成的就是 Eclipse Paho Java Client V3 版本。

依赖和参数配置

 

xml

复制代码

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency>

 

yml

复制代码

mqtt: url: tcp://172.17.218.94:1883 username: admin password: public clientId: mqtt-sender

 

java

复制代码

@Data @Component @ConfigurationProperties(prefix = "mqtt") public class MqttProperties { private String url; private String username; private String password; private String clientId; }

发布者配置

 

java

复制代码

@Configuration @IntegrationComponentScan public class MqttConfig { @Autowired private MqttProperties prop; @Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); mqttConnectOptions.setServerURIs(new String[]{prop.getUrl()}); mqttConnectOptions.setUserName(prop.getUsername()); mqttConnectOptions.setPassword(prop.getPassword().toCharArray()); // 客户端断线时暂时不清除,直到超时注销 mqttConnectOptions.setCleanSession(false); mqttConnectOptions.setAutomaticReconnect(true); factory.setConnectionOptions(mqttConnectOptions); return factory; } @Bean @ServiceActivator(inputChannel = "mqttOutboundChannel") public MessageHandler mqttOutbound(MqttPahoClientFactory mqttClientFactory) { MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler( prop.getClientId() + "-pub-" + Instant.now().toEpochMilli(), mqttClientFactory); messageHandler.setAsync(true); messageHandler.setDefaultRetained(false); messageHandler.setAsyncEvents(false); // Exactly Once messageHandler.setDefaultQos(2); messageHandler.setDefaultTopic(ApiConst.MQTT_TOPIC_SUFFIX); return messageHandler; } @Bean public MessageChannel mqttOutboundChannel() { return new DirectChannel(); } } @Component @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") public interface MqttTemplate { void send(String payload); void sendToTopic(String payload, @Header(MqttHeaders.TOPIC) String topic); void sendToTopic(String payload, @Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos); }

  1. @IntegrationComponentScan,开启 Spring Integration 的注解扫描。
  2. 注入客户端工厂类 MqttPahoClientFactory,此处可以配置认证参数、超时时间等 broker 连接参数。
  3. 注入 MessageChannel 实例。
  4. 注入 MessageHandler 的实例,并通过 @ServiceActivator 绑定到对应的 MessageChannel。此处可配置消息处理的模式、QoS、默认的 Topic 等。
  5. 定义一个 @MessagingGateway 修饰的接口,用于消息的发送,@MessagingGatewaydefaultRequestChannel 参数用于绑定具体的 MessageChannel
  6. 在使用的地方自动注入 MqttTemplate 的实例,即可调用方法发送消息。

订阅者配置

 

java

复制代码

@Configuration @IntegrationComponentScan public class MqttConfig { private final MqttProperties prop; private final MqttInboundMessageHandler mqttInboundMessageHandler; public MqttConfig(MqttProperties prop, MqttInboundMessageHandler mqttInboundMessageHandler) { this.prop = prop; this.mqttInboundMessageHandler = mqttInboundMessageHandler; } @Bean public MessageProducerSupport mqttInbound(MqttPahoClientFactory mqttClientFactory) { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(prop.getClientId() + "-sub-" + Instant.now().toEpochMilli(), mqttClientFactory, "facego/reply"); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setQos(2); adapter.setOutputChannel(mqttInboundChannel()); return adapter; } @Bean @ServiceActivator(inputChannel = "mqttInboundChannel") public MessageHandler InboundMessageHandler() { return mqttInboundMessageHandler; } @Bean public MessageChannel mqttInboundChannel() { return new DirectChannel(); } } @Slf4j @Component public class MqttInboundMessageHandler implements MessageHandler { @Override public void handleMessage(Message<?> message) throws MessagingException { log.info("mqtt reply: {}", message.getPayload()); } }

  1. 注入消息处理的 MessageChannel
  2. 注入自己实现的 MqttInboundMessageHandler,并通过 @ServiceActivator 绑定到对应的 MessageChannel
  3. 注入 Channel Adapter 的实例,配置客户端订阅的 Topic 和相应的 MessageChannel

Spring Integration 大致交互逻辑

对于发布者:

  1. 消息通过消息网关发送出去,由 MessageChannel 的实例 DirectChannel 处理发送的细节。
  2. DirectChannel 收到消息后,内部通过 MessageHandler 的实例 MqttPahoMessageHandler 发送到指定的 Topic。

对于订阅者:

  1. 通过注入 MessageProducerSupport 的实例 MqttPahoMessageDrivenChannelAdapter,实现订阅 Topic 和绑定消息消费的 MessageChannel
  2. 同样由 MessageChannel 的实例 DirectChannel 处理消费细节。Channel 消息后会发送给我们自定义的 MqttInboundMessageHandler 实例进行消费。

可以看到整个处理的流程和前面将的基本一致。Spring Integration 就是抽象出了这么一套消息通信的机制,具体的通信细节由它集成的中间件来决定,这里是 MQTT Eclipse Paho Java Client。

总结

本文主要介绍了 Java 使用 MQTT 通信的方式,由于使用了 SpringBoot,因此使用 Spring Integration 来集成会比直接只用 Eclipse Paho Java Client 更符合 Spring 的哲学,所有的 Bean 均单例注入统一管理。

Spring Integration 的好处在于,我们只需要了解其消息通信的基本机制,屏蔽了 Eclipse Paho Java Client 的具体细节,便于编码。从上面的代码可以看出,我们仅仅注入了相关的 Bean,给出响相应的配置信息即可。

参考文献

  • Spring Integration Reference Guide
  • Spring Integration 中文手册(完整版)
  • SpringBoot 集成 MQTT 配置
  • Spring Boot 集成 MQTT
  • 消息推送标准协议:MQTT

这篇关于Spring Integration 是什么?的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/844535

相关文章

Java 实用工具类Spring 的 AnnotationUtils详解

《Java实用工具类Spring的AnnotationUtils详解》Spring框架提供了一个强大的注解工具类org.springframework.core.annotation.Annot... 目录前言一、AnnotationUtils 的常用方法二、常见应用场景三、与 JDK 原生注解 API 的

Java controller接口出入参时间序列化转换操作方法(两种)

《Javacontroller接口出入参时间序列化转换操作方法(两种)》:本文主要介绍Javacontroller接口出入参时间序列化转换操作方法,本文给大家列举两种简单方法,感兴趣的朋友一起看... 目录方式一、使用注解方式二、统一配置场景:在controller编写的接口,在前后端交互过程中一般都会涉及

Java中的StringBuilder之如何高效构建字符串

《Java中的StringBuilder之如何高效构建字符串》本文将深入浅出地介绍StringBuilder的使用方法、性能优势以及相关字符串处理技术,结合代码示例帮助读者更好地理解和应用,希望对大家... 目录关键点什么是 StringBuilder?为什么需要 StringBuilder?如何使用 St

使用Java将各种数据写入Excel表格的操作示例

《使用Java将各种数据写入Excel表格的操作示例》在数据处理与管理领域,Excel凭借其强大的功能和广泛的应用,成为了数据存储与展示的重要工具,在Java开发过程中,常常需要将不同类型的数据,本文... 目录前言安装免费Java库1. 写入文本、或数值到 Excel单元格2. 写入数组到 Excel表格

Java并发编程之如何优雅关闭钩子Shutdown Hook

《Java并发编程之如何优雅关闭钩子ShutdownHook》这篇文章主要为大家详细介绍了Java如何实现优雅关闭钩子ShutdownHook,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起... 目录关闭钩子简介关闭钩子应用场景数据库连接实战演示使用关闭钩子的注意事项开源框架中的关闭钩子机制1.

Maven中引入 springboot 相关依赖的方式(最新推荐)

《Maven中引入springboot相关依赖的方式(最新推荐)》:本文主要介绍Maven中引入springboot相关依赖的方式(最新推荐),本文给大家介绍的非常详细,对大家的学习或工作具有... 目录Maven中引入 springboot 相关依赖的方式1. 不使用版本管理(不推荐)2、使用版本管理(推

Java 中的 @SneakyThrows 注解使用方法(简化异常处理的利与弊)

《Java中的@SneakyThrows注解使用方法(简化异常处理的利与弊)》为了简化异常处理,Lombok提供了一个强大的注解@SneakyThrows,本文将详细介绍@SneakyThro... 目录1. @SneakyThrows 简介 1.1 什么是 Lombok?2. @SneakyThrows

在 Spring Boot 中实现异常处理最佳实践

《在SpringBoot中实现异常处理最佳实践》本文介绍如何在SpringBoot中实现异常处理,涵盖核心概念、实现方法、与先前查询的集成、性能分析、常见问题和最佳实践,感兴趣的朋友一起看看吧... 目录一、Spring Boot 异常处理的背景与核心概念1.1 为什么需要异常处理?1.2 Spring B

如何在 Spring Boot 中实现 FreeMarker 模板

《如何在SpringBoot中实现FreeMarker模板》FreeMarker是一种功能强大、轻量级的模板引擎,用于在Java应用中生成动态文本输出(如HTML、XML、邮件内容等),本文... 目录什么是 FreeMarker 模板?在 Spring Boot 中实现 FreeMarker 模板1. 环

SpringMVC 通过ajax 前后端数据交互的实现方法

《SpringMVC通过ajax前后端数据交互的实现方法》:本文主要介绍SpringMVC通过ajax前后端数据交互的实现方法,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价... 在前端的开发过程中,经常在html页面通过AJAX进行前后端数据的交互,SpringMVC的controll