SpringCloud Stream 快速入门实例教程

2025-11-26 19:50

本文主要是介绍SpringCloud Stream 快速入门实例教程,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

《SpringCloudStream快速入门实例教程》本文介绍了SpringCloudStream(SCS)组件在分布式系统中的作用,以及如何集成到SpringBoot项目中,通过SCS,可...

1.SCS 组件的出现的背景和作用

在分布式系统中,可能使用到的消息队列让人眼花缭乱,可能有使用(RabbitMq RroketMQ Kafka....),他们提供的客户端各不相同,使用的方式也让人眼花缭乱,此时就需要一个能够统一消息队列的客户端,通过更高级的抽象来实现更通用和更简单的集成不同的消息队列中间件,此时也就诞生了这个SCS 组件

2.SCS 集成srping Boot项目

我们在这个演示项目中所使用的Sprphping Boot版本为 2.7.18、SpringCloud Alibaba版本为 2021.0.6.0

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>com.alibaba.cloud</groupId>
                <artifactandroidId>spring-cloud-alibaba-dependencies</artifactId>
                <version>2021.0.6.0</version>
                <type>pom</type>
                <scope> import</scope>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>2.7.18</version>
                <type>pom</type>
                <scope> import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

使用的SCS 组件版本为 3.2.10

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
            <version>3.2.10</version>
        </dependency>

3.Yml 配置

scs 的使用难点主要就是在yml 的配置上,配置完成使用很方便

spring:
  cloud:
    function:
      definition: myTaskConsumer;ackConsumer #你注册的 Consumer 方法名 或者 Function 方法名 中间使用 ;分割 (生产者一般是动态发送消息 不需要注册)
    stream:
      binders:
        kafka-binder-1: # 绑定器名称
          type: kafka  # 消息队列的类型类型
          environment: #绑定器环境配置
            spring:
              kafka:
                bootstrap-servers: 172.22.134.135:9092 # kafka地址 可以设置多个
                properties:
                  security.protocol: PLAINTEXT # kafka协议
        #rabbit-binde-1r:
         # type: rabbit
          # ... rabbitmq配置
      # 全局生产者可靠性配置(推荐)
      binder:
        producer-properties:
          acks: all                #  生产者 ACK = all 所有副本同步完成才ack;ACK=1写入leader副本返回ack;ACK=0 生产者发送消息立马ack
          retries: 100          # 最大重试 当发送失败时(如网络抖动、Leader 切换),Producer 自动重试的最大次数。
          retry.backoff.ms: 100http://www.chinasem.cn0 #每次重试之间的等待时间(毫秒)。
          enable-idempotence: true      # 幂等生产者(防重复)
      bindings:
        myTaskConsumer-in-0: #命名规则 ${方法名}-${消费者:in/生产者:out}-${数字:不能与其他相同}
          destination: test-kraft # topic
          group: my-consumer-group #消费者组
          binder: kafka-binder-1 # 绑定器 <--上面配置的绑定名称
          consumer: # 消费者配置
            autoStartup: true # 是否自动启动
            concurrency: 1  #启动消费者实例数  (同属于一个消费者组)
        myTaskProducer-out-0:
          destination: test-kraft # topic
          binder: kafka-binder-1 # 绑定器 <--上面配置的绑定名称
          producer: # 生产者配置
              partitionCount: 1 # 应与目标 Topic 的实际分区数一致。
                                # - 若小于实际分区数:仅使用部分分区,浪费并行能力;
                                # - 若大于实际分区数:发送时会因访问不存在的分区而失败!
              #使用消息头中的 headers的 partitionKey 作为 key进行分区
              partition-key-expression: headers.partitionKey # 分区键(分区规则根据key进行hash落到分区 有助于落到指定分区顺序消费)

4.SpringCloud Stream 3.X新特性函数编程

4.1.编写 消费者

        Mesage 的包别导错

import org.springframework.messaging.Message;
@Configuration
public class kafkaConsumer
{
	@Bean
	public Consumer<Message<String>> myTaskConsumer ()
	{
		System.out.println ("[初始化] myTaskConsumer Bean 已创建");
		return message -> System.out.println ("[myTaskConsumer] 收到消息: " + message.getPayload ());
	}
}

4.2.编写动态生产者

@RestController
public class SendController
{
	@Autowired
	StreamBridge streamBridge;
	@GetMapping ("/sendMyTaskProducer/{msg}")
	public String send (@PathVariable ("msg") String msg)
	{
		//构建消息
		Message<String> message = MessageBuilder.withPayload (mspythong)
				.setHeader ("partitionKey", msg) // 添加分区键partitionKey 作为分区键
				.build ();
		//参数1为发送的通道名称(在yml中配置),参数2为消息
		boolean myTaskProducer = streamBridge.send ("myTaskProducer-out-0", message);
		System.out.println ("发送结果:" + myTaskProducer);
		return "发送结果:" + myTaskProducer;
	}
}

5.进行测试

访问发送消息的接口,发送成功,并且消费者进行了消费

SpringCloud Stream 快速入门实例教程

SpringCloud Stream 快速入门实例教程

6.进行消费者手动ACK

消费者手动ACK 比自动ACK 要安全得多,默认scs 是实行自动ack,自动ack只要消息被投递到消费者,不论是否消费成功或者失败,都会被视为消费成功

6.1yml 配置

        China编程#========================================消费者ACK Kafka 专属配置========================================
        #演示消费者ACK机制
        ackConsumer-in-0: #命名规则 ${方法名}-${消费者:in/生产者:out}-${数字:不能与其他相同}
          destination: topicOne # topic
          group: ack-consumer-group #消费者组 (修改为独立的消费者组,避免与myTaskConsumer冲突)
          binder: kafka-binder-1 # 绑定器 <--上面配置的绑定名称
          consumer: # 消费者配置
            autoStartup: true # 是否自动启动
            concurrency: 1  #启动消费者实例数  (同属于一个消费者组)
        ackProducer-out-0:
          destination: topicOne # topic
          binder: kafka-binder-1 # 绑定器 <--上面配置的绑定名称
          producer: # 生产者配置
            partitionCount: 1 # 应与目标 Topic 的实际分区数一致。
              # - 若小于实际分区数:仅使用部分分区,浪费并行能力;
            # - 若大于实际分区数:发送时会因访问不存在的分区而失败!
            #使用消息头中的 headers的 partitionKey 作为 key进行分区
            partition-key-expression: headers.partitionKey # 分区键(分区规则根据key进行hash落到分区 有助于落到指定分区顺序消费)
      # Kafka 专属配置
      kafka:
        bindings:
          ackConsumer-in-0: # 指定哪个消费者使用ACK
            consumer:
              ack-mode: MANUAL  #  关键!手动 ACK 模式
                                #RECORD	每条消息处理完自动提交 offset(默认)	简单场景
                                #BATCH	批量提交(一批 poll 的消息处理完后提交)	默认行为(等价于 auto-commit=true)
                                #TIME	定时提交	较少用
                                #COUNT	每 N 条提交一次	较少用
                                #MANUAL	手动调用 acknowledge() 才提交	✅ 需要精确控制(推荐)
                                #MANUAL_IMMEDIATE	手动调用立即提交(不等批次)	高可靠性要求

6.2编写消费者

@Bean
	public Consumer<Message<String>> ackConsumer(){
		System.out.println ("[初始化] ackConsumer Bean 已创建");
		return message  -> {
			System.out.println ("[ackConsumer] ========== 开始处理消息 ==========");
			System.out.println ("[ackConsumer] 消息内容: " + message.getPayload());
			System.out.println ("[ackConsumer] 消息Headers: " + message.getHeaders());
			//获取Acknowledgment
			Acknowledgment ack = message.getHeaders ()
					.get (KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
			if (ack != null) {
				//进行手动ack
				ack.acknowledge ();
				System.out.println ("[ackConsumer] ✅ 已手动确认消息");
			} else {
				System.out.println ("[ackConsumer] ⚠️ 警告: Acknowledgment为null,无法手动确认");
			}
			System.out.println ("[ackConsumer] ========== 消息处理完成 ==========\n");
		};
	}

6.3编写生产者

@GetMapping ("/sendAckProducer/{msg}")
	public String send2 (@PathVariable ("msg") String msg)
	{
		//构建消息
		Message<String> message = MessageBuilder.withPayload (msg)
				.setHeader ("partitionKey", msg) // 添加分区键partitionKey 作为分区键
				.build ();
		//参数1为发送的通道名称(在yml中配置),参数2为消息
		boolean ackProducer = streamBridge.send ("ackProducer-out-0", message);
		System.out.println ("发送结果:" + ackProducer);
		return "发送结果:" + ackProducer;
	}

6.4测试结果

SpringCloud Stream 快速入门实例教程

到此这篇关于SpringCloud Stream 快速入门的文章就介绍到这了,更多相关SpringCloud Stream 入门内容请搜索China编程(www.chinasem.cn)以前的文章或继续浏览下面的相关文章希望大家以后多多支持China编程(www.chinasem.cn)!

这篇关于SpringCloud Stream 快速入门实例教程的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1156280

相关文章

Java方法重载与重写之同名方法的双面魔法(最新整理)

《Java方法重载与重写之同名方法的双面魔法(最新整理)》文章介绍了Java中的方法重载Overloading和方法重写Overriding的区别联系,方法重载是指在同一个类中,允许存在多个方法名相同... 目录Java方法重载与重写:同名方法的双面魔法方法重载(Overloading):同门师兄弟的不同绝

Spring配置扩展之JavaConfig的使用小结

《Spring配置扩展之JavaConfig的使用小结》JavaConfig是Spring框架中基于纯Java代码的配置方式,用于替代传统的XML配置,通过注解(如@Bean)定义Spring容器的组... 目录JavaConfig 的概念什么是JavaConfig?为什么使用 JavaConfig?Jav

Java数组动态扩容的实现示例

《Java数组动态扩容的实现示例》本文主要介绍了Java数组动态扩容的实现示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧... 目录1 问题2 方法3 结语1 问题实现动态的给数组添加元素效果,实现对数组扩容,原始数组使用静态分配

Java中ArrayList与顺序表示例详解

《Java中ArrayList与顺序表示例详解》顺序表是在计算机内存中以数组的形式保存的线性表,是指用一组地址连续的存储单元依次存储数据元素的线性结构,:本文主要介绍Java中ArrayList与... 目录前言一、Java集合框架核心接口与分类ArrayList二、顺序表数据结构中的顺序表三、常用代码手动

JAVA项目swing转javafx语法规则以及示例代码

《JAVA项目swing转javafx语法规则以及示例代码》:本文主要介绍JAVA项目swing转javafx语法规则以及示例代码的相关资料,文中详细讲解了主类继承、窗口创建、布局管理、控件替换、... 目录最常用的“一行换一行”速查表(直接全局替换)实际转换示例(JFramejs → JavaFX)迁移建

Spring Boot Interceptor的原理、配置、顺序控制及与Filter的关键区别对比分析

《SpringBootInterceptor的原理、配置、顺序控制及与Filter的关键区别对比分析》本文主要介绍了SpringBoot中的拦截器(Interceptor)及其与过滤器(Filt... 目录前言一、核心功能二、拦截器的实现2.1 定义自定义拦截器2.2 注册拦截器三、多拦截器的执行顺序四、过

Python实现快速扫描目标主机的开放端口和服务

《Python实现快速扫描目标主机的开放端口和服务》这篇文章主要为大家详细介绍了如何使用Python编写一个功能强大的端口扫描器脚本,实现快速扫描目标主机的开放端口和服务,感兴趣的小伙伴可以了解下... 目录功能介绍场景应用1. 网络安全审计2. 系统管理维护3. 网络故障排查4. 合规性检查报错处理1.

MySQL快速复制一张表的四种核心方法(包括表结构和数据)

《MySQL快速复制一张表的四种核心方法(包括表结构和数据)》本文详细介绍了四种复制MySQL表(结构+数据)的方法,并对每种方法进行了对比分析,适用于不同场景和数据量的复制需求,特别是针对超大表(1... 目录一、mysql 复制表(结构+数据)的 4 种核心方法(面试结构化回答)方法 1:CREATE

JAVA线程的周期及调度机制详解

《JAVA线程的周期及调度机制详解》Java线程的生命周期包括NEW、RUNNABLE、BLOCKED、WAITING、TIMED_WAITING和TERMINATED,线程调度依赖操作系统,采用抢占... 目录Java线程的生命周期线程状态转换示例代码JAVA线程调度机制优先级设置示例注意事项JAVA线程

JavaWeb项目创建、部署、连接数据库保姆级教程(tomcat)

《JavaWeb项目创建、部署、连接数据库保姆级教程(tomcat)》:本文主要介绍如何在IntelliJIDEA2020.1中创建和部署一个JavaWeb项目,包括创建项目、配置Tomcat服务... 目录简介:一、创建项目二、tomcat部署1、将tomcat解压在一个自己找得到路径2、在idea中添加