Spring Cloud Stream如何屏蔽不同MQ带来的差异性?

2023-11-30 08:30

本文主要是介绍Spring Cloud Stream如何屏蔽不同MQ带来的差异性?,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

引言

在当前的微服务架构下,使用消息队列(MQ)技术是实现服务解耦和削峰填谷的重要策略。为了保证系统的灵活性和可替换性,我们需要避免对单一开源技术的依赖

市面上有多种消息队列技术,如 Kafka、RocketMQ、RabbitMQ 等。关键在于如何在微服务体系中实现这些MQ组件的无缝切换,以减少代码修改需求。

Spring Cloud Stream 通过其与主流消息中间件的灵活集成,实现了通过仅修改配置文件的方式来切换不同的MQ实现,从而提高了系统的适应性和可维护性。

什么是 Spring Cloud Stream

Spring Cloud Stream 是一个用于构建消息驱动的微服务应用程序的框架。

基于 Spring Boot 构建,用于创建独立的生产级 Spring 应用程序,并使用 Spring Integration 提供与消息代理的连接。它提供了来自多个供应商的中间件的固定配置,引入了持久发布-订阅语义、消费者组和分区的概念。

简单来说 Spring Cloud Stream 是对 Spring Integration 和 Spring Boot 的合并。

图一

图一

主要概念:

1. application model(应用模型)

图二.Spring Cloud Stream 应用程序

图二.Spring Cloud Stream 应用程序

由中间件提供的 Binder 来处理绑定。 应用程序通过绑定这个 Binder 与其建立联系,发送消息时应用程序通过 outputs 通道将消息传递给 BinderBinder 再把消息给消息中间件。接收消息时消息中间件将消息传递给 BinderBinder 再把消息通过 inputs 通道传递给应用程序。

比如 Kafka Binder 依赖如下图:

图三 spring cloud stream kafka依赖

图三 spring cloud stream kafka依赖

2. The Binder Abstraction(Binder抽象)

Binder 抽象使 Spring Cloud Stream 应用程序能够灵活地连接中间件。

Spring Cloud Stream 为 Kafka 和 RabbitMQ 提供了 Binder 实现。 RocketMQ Binder 已由 Spring Cloud Alibaba 实现。

Binder 抽象也是该框架的扩展点之一,我们可以在 Spring Cloud Stream 之上实现自定义 Binder。

3. Programming Model(编程模型)

核心概念

  • Destination Binders(目标绑定器):负责提供与外部消息传递系统集成的组件。

  • Bindings(绑定):外部消息系统和应用程序之间的桥梁,提供消息的生产者和消费者(由目标绑定器创建)。

  • Message(消息):生产者和消费者用于与目标绑定器(以及通过外部消息系统与其他应用程序)通信的规范数据结构。

图四

图四

环境搭建

本文环境:

  • Java:17

  • Spring Boot:3.0.2

  • Spring Cloud:2022.0.2

  • Spring Cloud Alibaba:2022.0.0.0

maven依赖配置

pom.xml依赖如下:

消息驱动jar,用哪个mq引入哪个即可。<dependencies><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-stream-rocketmq</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-kafka</artifactId></dependency>
</dependencies><dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><version>${spring-boot.version}</version><type>pom</type><scope>import</scope></dependency><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-alibaba-dependencies</artifactId><version>${spring-cloud-alibaba.version}</version><type>pom</type><scope>import</scope></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>${spring-cloud.version}</version><type>pom</type><scope>import</scope></dependency></dependencies>
</dependencyManagement>

配置文件

application.yml RocketMq 配置信息:

spring:cloud:stream:stream:rocketmq:binder:name-server: 127.0.0.1:9876;127.0.0.1:9877function:# 组装和绑定definition: myTopicCbinders:default:type: rocketmqbindings:## 生产者 新版本固定格式  函数名-{out/in}-{index}demoChannel-out-0:destination: boot-mq-topic## 消费者 新版本固定格式  函数名字-{out/in}-{index}demoChannel-in-0:destination: boot-mq-topic

application.yml Kafka 配置信息:

spring:cloud:stream:stream:kafka:binder:brokers: 127.0.0.1:9092function:# 组装和绑定definition: myTopicCbinders:default:type: kafkabindings:## 生产者 新版本固定格式  函数名-{out/in}-{index}demoChannel-out-0:destination: boot-mq-topic## 消费者 新版本固定格式  函数名字-{out/in}-{index}demoChannel-in-0:destination: boot-mq-topic

消息生产者

创建一个简单的消息生产者:

@RestController
@Slf4j
public class ProducerStream {@Autowiredprivate StreamBridge streamBridge;@GetMapping("/test-stream")public String testStream() {streamBridge.send("demoChannel-out-0",MessageBuilder.withPayload("消息体").build());return "success";}
}

消息消费者

创建一个消息消费者来接收消息:

@Slf4j
@Configuration
public class TestStreamConsumer {@Beanpublic Consumer<String> demoChannel() {return message -> {log.info("demoChannel接到消息:{}", message);};}
}

假如需要从 Kafka 替换成 RocketMq ,只需要修改pom文件和配置文件即可。

在之前的 Spring Cloud Stream 版本中是采用注解的方式来实现绑定,在新版本中是通过函数式编程模型来绑定名称。采用约定大于配置的思想,简化了应用程序配置。

具体可见官方文档:https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_functional_binding_names

Spring Cloud Stream 发送消息流程

图五 spring cloud stream消息流程图

图五 spring cloud stream消息流程图

消息模型

通过图三可以看到 Sping Cloud Stream 的依赖关系。

Sping Cloud Stream -> Spring Integration -> Spring Messaging

可以看出来 Sping Cloud Stream 是基于 Spring Integration 做了一层封装,是依赖于 Spring Integration 这个组件的,而 Spring Integration 则依赖于 Spring Messaging 组件来实现消息处理机制的基础设施。

Spring Integration 是对 Spring Messaging 的扩展,设计目标是系统集成,因此内部提供了大量的集成化端点方便应用程序直接使用。

各个异构系统相互集成时,Spring Integration 通过通道之间的消息传递,让我们可以在消息的入口和出口使用通道适配器和消息网关这两种典型的端点对消息进行同构化处理。

Spring MessagingSpring 框架中的一个底层模块,用于提供统一的消息编程模型。

消息 Message 接口定义:

public interface Message<T> {//消息体T getPayload();//消息头MessageHeaders getHeaders();
}

消息通道 MessageChannel 接口定义:

@FunctionalInterface
public interface MessageChannel {long INDEFINITE_TIMEOUT = -1;//发送消息,无限期阻塞default boolean send(Message<?> message) {return send(message, INDEFINITE_TIMEOUT);}//发送消息,阻塞直到到达指定超时时间boolean send(Message<?> message, long timeout);
}

消息通道 MessageChannel 接收消息,调用send()方法将消息发送至该消息通道。

消息通道可简单理解为对队列的一种抽象。通道的名称对应队列的名称。

Spring message 把通道抽象成两种基本表现形式

  • 支持轮询的 PollableChannel

  • 实现发布-订阅模式的 SubscribableChannel

这两个通道都继承自具有消息发送功能的 MessageChannel

public interface SubscribableChannel extends MessageChannel {//通过注册回调函数MessageHandler来实现事件响应//注册消息处理器boolean subscribe(MessageHandler handler);//取消注册消息处理器boolean unsubscribe(MessageHandler handler);
}
public interface PollableChannel extends MessageChannel {//通过轮询操作主动获取消息//从通道中接收消息@NullableMessage<?> receive();//指定超时时间,从通道中接收消息@NullableMessage<?> receive(long timeout);
}

MessageHandler接口定义:

@FunctionalInterface
public interface MessageHandler {//处理消息方法void handleMessage(Message<?> message) throws MessagingException;
}

再回到图五流程图中,我们最终可以看到 KafkaRocketMQ 通过继承 AbstractMessageHandler 抽象类( AbstractMessageHandler 抽象类是实现了 MessageHandler 接口)来实现不同中间件的消息发送操作。而这些都是封装在各自中间件对应的 Binder 代码中来实现。

结论

回到我们的主题,Spring Cloud Stream 如何屏蔽不同 MQ 带来的差异性?

  • 统一的编程模型:发送和接收代码一致,开发者专注于业务逻辑即可。不用管底层消息中间件的实现。

  • Binder 抽象:封装与消息队列的交互逻辑,每种队列有自己的 Binder 实现。

  • 自动配置和约定优于配置:采用约定大于配置的思想,极少的改动配置文件实现消息队列的切换,而代码不用变动。

  • 高级特性的抽象:如分区、消息分组、持久性订阅等高级特性,Spring Cloud Stream 提供了抽象层,由不同的消息队列去实现。

参考资料

  • 官方文档:Spring Cloud Stream Reference Guide

  • 《Spring核心技术和案例实战》

这篇关于Spring Cloud Stream如何屏蔽不同MQ带来的差异性?的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/436336

相关文章

分布式锁在Spring Boot应用中的实现过程

《分布式锁在SpringBoot应用中的实现过程》文章介绍在SpringBoot中通过自定义Lock注解、LockAspect切面和RedisLockUtils工具类实现分布式锁,确保多实例并发操作... 目录Lock注解LockASPect切面RedisLockUtils工具类总结在现代微服务架构中,分布

Java使用Thumbnailator库实现图片处理与压缩功能

《Java使用Thumbnailator库实现图片处理与压缩功能》Thumbnailator是高性能Java图像处理库,支持缩放、旋转、水印添加、裁剪及格式转换,提供易用API和性能优化,适合Web应... 目录1. 图片处理库Thumbnailator介绍2. 基本和指定大小图片缩放功能2.1 图片缩放的

Spring Boot集成/输出/日志级别控制/持久化开发实践

《SpringBoot集成/输出/日志级别控制/持久化开发实践》SpringBoot默认集成Logback,支持灵活日志级别配置(INFO/DEBUG等),输出包含时间戳、级别、类名等信息,并可通过... 目录一、日志概述1.1、Spring Boot日志简介1.2、日志框架与默认配置1.3、日志的核心作用

破茧 JDBC:MyBatis 在 Spring Boot 中的轻量实践指南

《破茧JDBC:MyBatis在SpringBoot中的轻量实践指南》MyBatis是持久层框架,简化JDBC开发,通过接口+XML/注解实现数据访问,动态代理生成实现类,支持增删改查及参数... 目录一、什么是 MyBATis二、 MyBatis 入门2.1、创建项目2.2、配置数据库连接字符串2.3、入

Springboot项目启动失败提示找不到dao类的解决

《Springboot项目启动失败提示找不到dao类的解决》SpringBoot启动失败,因ProductServiceImpl未正确注入ProductDao,原因:Dao未注册为Bean,解决:在启... 目录错误描述原因解决方法总结***************************APPLICA编

深度解析Spring Security 中的 SecurityFilterChain核心功能

《深度解析SpringSecurity中的SecurityFilterChain核心功能》SecurityFilterChain通过组件化配置、类型安全路径匹配、多链协同三大特性,重构了Spri... 目录Spring Security 中的SecurityFilterChain深度解析一、Security

SpringBoot多环境配置数据读取方式

《SpringBoot多环境配置数据读取方式》SpringBoot通过环境隔离机制,支持properties/yaml/yml多格式配置,结合@Value、Environment和@Configura... 目录一、多环境配置的核心思路二、3种配置文件格式详解2.1 properties格式(传统格式)1.

Apache Ignite 与 Spring Boot 集成详细指南

《ApacheIgnite与SpringBoot集成详细指南》ApacheIgnite官方指南详解如何通过SpringBootStarter扩展实现自动配置,支持厚/轻客户端模式,简化Ign... 目录 一、背景:为什么需要这个集成? 二、两种集成方式(对应两种客户端模型) 三、方式一:自动配置 Thick

Spring WebClient从入门到精通

《SpringWebClient从入门到精通》本文详解SpringWebClient非阻塞响应式特性及优势,涵盖核心API、实战应用与性能优化,对比RestTemplate,为微服务通信提供高效解决... 目录一、WebClient 概述1.1 为什么选择 WebClient?1.2 WebClient 与

Java.lang.InterruptedException被中止异常的原因及解决方案

《Java.lang.InterruptedException被中止异常的原因及解决方案》Java.lang.InterruptedException是线程被中断时抛出的异常,用于协作停止执行,常见于... 目录报错问题报错原因解决方法Java.lang.InterruptedException 是 Jav