【RocketMQ系列九】SpringCloudStream整合RocketMQ

2023-10-22 08:44

本文主要是介绍【RocketMQ系列九】SpringCloudStream整合RocketMQ,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

您好,我是码农飞哥(wei158556),感谢您阅读本文,欢迎一键三连哦
💪🏻 1. Python基础专栏,基础知识一网打尽,9.9元买不了吃亏,买不了上当。 Python从入门到精通
😁 2. 毕业设计专栏,毕业季咱们不慌忙,几百款毕业设计等你选。
❤️ 3. Python爬虫专栏,系统性的学习爬虫的知识点。9.9元买不了吃亏,买不了上当 。python爬虫入门进阶
❤️ 4. Ceph实战,从原理到实战应有尽有。 Ceph实战
❤️ 5. Java高并发编程入门,打卡学习Java高并发。 Java高并发编程入门

文章目录

    • 1. Spring Cloud Stream是什么?
    • 2. Spring Cloud Stream的执行流程
    • 3. 注解代码实现
      • 在 my-springcloud-rocketmq-producer 上的操作
      • 3.1. 引入依赖
      • 3.2 . 属性文件配置
      • 3.3. 定义生产者
      • 在 my-springcloud-rocketmq-consumer上的操作
      • 3.4. 引入依赖同生产者
      • 3.5. 配置文件修改
      • 3.6. 定义消费者

1. Spring Cloud Stream是什么?

Spring Cloud Stream是一个框架,用于构建与共享消息系统连接的高度可扩展的事件驱动微服务。

官网:https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/

官网概述:https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#spring-cloud-stream-preface-notable-deprecations

该框架提供了一个灵活的编程模型,该模型基于已经建立和熟悉的Spring习惯用法和最佳实践,包括对持久pub/sub 语义、消费者组和有状态分区的支持。

简单的理解就是Spring Cloud Stream 通过在上层定义统一消息的编程模型,屏蔽了底层消息中间件的差异,降低了使用成本。下图展示了Spring Cloud Stream的处理架构

带粘合剂的 SCSt

image-20231006092654679

Spring Cloud Stream的核心构建块(编程模型)是:

  1. Destination Binders: 负责提供与外部消息传递系统集成的组件。Binders 可以生成Bindings。
  2. **Bindings: ** 外部消息系统和应用程序之间的桥梁,提供消息的生产者和消费者(由目标绑定器创建)。即用来绑定消息生产者和消息消费者。它有两种类型,INPUT和OUTPUT,INPUT对应消费者,OUTPUT对应生产者。
  3. Message: 生产者和消费者用于与目标绑定器(以及通过外部消息系统与其他应用程序)通信的规范的数据结构。

2. Spring Cloud Stream的执行流程

SpringCloudStream

3. 注解代码实现

首先创建一个生产者项目 my-springcloud-rocketmq-producer 和一个消费者项目 my-springcloud-rocketmq-consumer。

本demo使用的 版本号是 cloud 2021.0.5.0 +springboot 2.6.13

在 my-springcloud-rocketmq-producer 上的操作

3.1. 引入依赖

  <dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-stream-rocketmq</artifactId></dependency>

3.2 . 属性文件配置

spring:cloud:stream:bindings:output:destination: my-springcloud-stream-topicrocketmq:binder:name-server: 172.31.184.89:9876

3.3. 定义生产者

在MySpringcloudRocketmqProducerApplication 添加 @EnableBinding(Source.class) 注解。然后创建生产者。

@Component
public class MyProducer {@Resourceprivate Source source;public void sendMessage(String msg) {// 封装消息头Map<String, Object> headers = new HashMap<>();headers.put(MessageConst.PROPERTY_TAGS, "tagA");// 创建消息对象Message<String> message = MessageBuilder.createMessage(msg, new MessageHeaders(headers));// 发送消息source.output().send(message);}
}

在 my-springcloud-rocketmq-consumer上的操作

3.4. 引入依赖同生产者

3.5. 配置文件修改

spring.cloud.stream.rocketmq.binder.name-server=172.31.184.89:9876
spring.cloud.stream.bindings.input.destination=my-springcloud-stream-topic
spring.cloud.stream.bindings.input.group=my-springcloud-stream-consume-group

3.6. 定义消费者

在MySpringcloudRocketmqConsumerApplication 类上添加 @EnableBinding(Sink.class)注解。

@Component
public class MyConsumer {@StreamListener(Sink.INPUT)public void processMessage(String message) {System.out.println("收到的消息=" + message);}
}

这篇关于【RocketMQ系列九】SpringCloudStream整合RocketMQ的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/260352

相关文章

Spring Boot 整合 SSE(Server-Sent Events)实战案例(全网最全)

《SpringBoot整合SSE(Server-SentEvents)实战案例(全网最全)》本文通过实战案例讲解SpringBoot整合SSE技术,涵盖实现原理、代码配置、异常处理及前端交互,... 目录Spring Boot 整合 SSE(Server-Sent Events)1、简述SSE与其他技术的对

Java整合Protocol Buffers实现高效数据序列化实践

《Java整合ProtocolBuffers实现高效数据序列化实践》ProtocolBuffers是Google开发的一种语言中立、平台中立、可扩展的结构化数据序列化机制,类似于XML但更小、更快... 目录一、Protocol Buffers简介1.1 什么是Protocol Buffers1.2 Pro

springboot整合mqtt的步骤示例详解

《springboot整合mqtt的步骤示例详解》MQTT(MessageQueuingTelemetryTransport)是一种轻量级的消息传输协议,适用于物联网设备之间的通信,本文介绍Sprin... 目录1、引入依赖包2、yml配置3、创建配置4、自定义注解6、使用示例使用场景:mqtt可用于消息发

解决RocketMQ的幂等性问题

《解决RocketMQ的幂等性问题》重复消费因调用链路长、消息发送超时或消费者故障导致,通过生产者消息查询、Redis缓存及消费者唯一主键可以确保幂等性,避免重复处理,本文主要介绍了解决RocketM... 目录造成重复消费的原因解决方法生产者端消费者端代码实现造成重复消费的原因当系统的调用链路比较长的时

SpringBoot整合Dubbo+ZK注册失败的坑及解决

《SpringBoot整合Dubbo+ZK注册失败的坑及解决》使用Dubbo框架时,需在公共pom添加依赖,启动类加@EnableDubbo,实现类用@DubboService替代@Service,配... 目录1.先看下公共的pom(maven创建的pom工程)2.启动类上加@EnableDubbo3.实

SpringBoot整合(ES)ElasticSearch7.8实践

《SpringBoot整合(ES)ElasticSearch7.8实践》本文详细介绍了SpringBoot整合ElasticSearch7.8的教程,涵盖依赖添加、客户端初始化、索引创建与获取、批量插... 目录SpringBoot整合ElasticSearch7.8添加依赖初始化创建SpringBoot项

SpringSecurity整合redission序列化问题小结(最新整理)

《SpringSecurity整合redission序列化问题小结(最新整理)》文章详解SpringSecurity整合Redisson时的序列化问题,指出需排除官方Jackson依赖,通过自定义反序... 目录1. 前言2. Redission配置2.1 RedissonProperties2.2 Red

Spring boot整合dubbo+zookeeper的详细过程

《Springboot整合dubbo+zookeeper的详细过程》本文讲解SpringBoot整合Dubbo与Zookeeper实现API、Provider、Consumer模式,包含依赖配置、... 目录Spring boot整合dubbo+zookeeper1.创建父工程2.父工程引入依赖3.创建ap

SpringBoot3.X 整合 MinIO 存储原生方案

《SpringBoot3.X整合MinIO存储原生方案》本文详细介绍了SpringBoot3.X整合MinIO的原生方案,从环境搭建到核心功能实现,涵盖了文件上传、下载、删除等常用操作,并补充了... 目录SpringBoot3.X整合MinIO存储原生方案:从环境搭建到实战开发一、前言:为什么选择MinI

SpringBoot整合liteflow的详细过程

《SpringBoot整合liteflow的详细过程》:本文主要介绍SpringBoot整合liteflow的详细过程,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋...  liteflow 是什么? 能做什么?总之一句话:能帮你规范写代码逻辑 ,编排并解耦业务逻辑,代码