Spring Boot Messaging Chapter 6 Messaging with Redis

2024-03-02 23:08

本文主要是介绍Spring Boot Messaging Chapter 6 Messaging with Redis,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

摘要:本章向您展示了如何使用Redis集成Spring Boot作为消息中间件。Redis是一个内存数据库,它被用作数据库、缓存和消息中间件。它不仅可以存储键值对,还可以用来存储复杂的数据类型,如散列、列表、集合、排序集、位图、超日志和地理空间索引。

Spring Boot使用Spring Data模块,特别是Redis one。换句话说,为了在您的项目中使用Redis,您必须将spring-boot-starer-redis依赖于您的pom.xml文件或Gradle。然后,您将拥有连接到Redis服务器的所有必要依赖项。

一:Redis as a Message Broker(Redis作为一个消息中间件)

Redis不仅提供了存储数据结构的方法,而且还实现了发布/订阅消息传递范型。前面的章节用JMS解释了这种范例。

        这里的重要部分向您展示了如何与Redis交互并启用消息中间件。Redis拥有一个通道的标识,其中一个消息将由生成者发送。它将被那些对一个或多个频道感兴趣的订阅者使用。正如您所看到的,通道关键字在Redis中使用。Redis中的通道是JMS世界中的主题。

Redis有几个命令允许你与发布/订阅功能进行交互:

• SUBSCRIBE: 告诉Redis订阅一个特定的频道或频道。例如:

127.0.0.1:6379> SUBSCRIBE spring-boot-chat

您可以一次订阅多个通道,将它们按空格分隔开。

• UNSUBSCRIBE: 取消订阅的频道。这个命令不需要参数。

• PUBLISH: 通过指定通道(作为第一个参数)和实际消息来发布一条消息。例如:

127.0.0.1:6379> PUBLISH spring-boot-chat "Hi there"

• PSUBSCRIBE: 这个命令与SUBSCRIBE相同,但接受多个通道的模式。例如:

127.0.0.1:6379> PSUBSCRIBE currency.*

这个例子将订阅任何以货币开头的频道,比如currency.us,currency.asia.jp,currency.eu.gb,等等。

•PUNSUBSCRIBE: 取消订阅使用模式匹配。例如:

127.0.0.1:6379> PUNSUBSCRIBE currency.asia.*

•PING: 如果没有提供任何参数,则返回一个PONG。通常情况下,您可以使用这个来测试连接是否仍然存在。


为了试一试,确保你已经安装了Redis,它已经启动并运行了。(您可以从https://redis.io/download下载它。)打开一个新的终端窗口,并使用redis-cli命令与Redis进行交互。如图6 - 1所示。

Figure 6-1. Redis interaction with the publish/subscribe commands

图6-1向您展示了一种与Redis交互的简单方法,并确定订阅和发布一条消息是多么容易。

本章不讨论Redis的集群,也不讨论哨兵,分片,等等。只讨论消息传递。许多客户使用Redis作为消息传递代理,并作为实时数据分析的web会话管理工具。本章的开始是使用Spring Boot和Redis来进行消息传递。

二:Publish/Subscribe Messaging with Redis(使用Redis发布/订阅消息)

正如前面提到的,Spring Boot将使用Spring Data Redis的力量,它与您所熟悉的Spring JMS非常相似。Spring Boot将配置必要的组件,例如连接、使用的数据库(默认情况下是0索引DB)、集群节点、池、哨兵、超时等等。记住简单地添加 spring-boot-starter-redis可以启用Redis自动配置。

Spring Data Redis模块有两个主要的领域,用于消息的生产或发布,以及消息的消费或订阅。对于消息发布,它使用RedisTemplate<K,V>类(它使用模板设计模式),对于消息订阅,它有一个专用的异步消息侦听器容器(一个MDP消息驱动pojo)。对于同步消息,它使用一个RedisConnection接口约定。下面的部分只讨论异步订阅。

在这一章中,我们将使用两个项目:redis-demo和rest-api-redis。redis-demo项目拥有完成和补充货币项目的所有必要代码。

2.1.Subscriber(订阅者)

作为Redis订阅者,您可以通过使用固定名称或使用模式匹配来订阅一个或多个通道(或主题)。Spring Data Redis模块提供了一种进行低级操作的方法通过RedisConnection订阅。这包括subscribe和pSubscribe方法。

低级订阅需要一种方法来处理简单侦听器的连接和线程管理。现在想象有多个侦听器。您可能认为实现此功能将是一件麻烦的事情,但是Spring Data Redis包括RedisMessageListenerContainer类,它负责所有的繁重工作,并支持消息驱动pojo(MDPs)。换句话说,你可以创建自己的类和方法来接收消息并处理它。这意味着您需要使用MessageListenerAdapter类来使用这个特性。不要太担心,这是我接下来要给你们看的。

打开redis-demo项目,并检查com.micai.spring.messaging.config.RedisConfig类,如清单6-1所示。

Listing 6-1. com.micai.spring.messaging.config.RedisConfig.java

@Configuration
@EnableConfigurationProperties(SimpleRedisProperties.class)
public class RedisConfig {// Simple Message Listener@Beanpublic RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,MessageListenerAdapter listenerAdapter,@Value("${micai.redis.topic}") String topic) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.addMessageListener(listenerAdapter, new PatternTopic(topic));return container;}@BeanMessageListenerAdapter listenerAdapter(Subscriber subscriber) {return new MessageListenerAdapter(subscriber);}
}

清单6-1向您展示了我们将要使用的配置,以便订阅一个频道。让我们更详细地了解代码:

•RedisMessageListenerContainer: 这是一个完成所有繁重工作的类,并充当一个消息侦听器容器,它将接收来自Redis通道(topic)的消息。您需要设置连接工厂和将要处理接收到的消息的消息侦听器。请记住,这个侦听器容器负责所有线程和消息分派。

•RedisConnectionFactory: 这个接口对于RedisMessageListenerContainer是必需的,它包含关于Redis连接的所有信息。因为它是这个方法的一部分,Spring会自动将它连接起来,所以您不需要手工创建它。在幕后,Spring Boot会为您处理这种配置。

•MessageListenerAdapter: 该类是一个适配器,它将收到的消息委托给一个已声明的类,该类是通过MessageListener签名进行投诉的。您可以看到,这是通过调用容器来设置的。addMessageListener方法并作为参数传递订阅者(listenerAdapter-MessageListenerAdapter)和它将订阅的主题(PatternTopic)。

•PatternTopic: 这是一个类,也是消息侦听器需要的参数之一。它通常保存主题的名称或用于订阅正确频道的名称模式。

接下来,让我们看一下订阅者类。打开com.micai.spring.messaging.redis.Subscriber类,如清单6-2所示。

 

Listing 6-2. com.micai.spring.messaging.redis.Subscriber.java

@Component
public class Subscriber {private static final Logger LOGGER = LoggerFactory.getLogger(Subscriber.class);// If only one method defined, it must be named: handleMessagepublic void handleMessage(String message) {// Process message here ...LOGGER.info("消费Redis的消息内容为:{}", message);}
}

 清单6-2显示了只有一个必需方法(handleMessage)的订阅者类。该订阅者类是消息委托的适配器。换句话说,MessageListenerAdapter符合以下签名:

void handleMessage(String message);
void handleMessage(Map message);
void handleMessage(byte[] message);
void handleMessage(Serializable message);//You can get the channel or the pattern used
void handleMessage(Serializable message, String channel);
void handleMessage(byte[] bytes, String pattern);//You can have your own object
void handleMessage(MyOwnDomainObject obj);

有时,您的适配器类会有更多的方法进行额外的处理,或者从外部调用。在这些情况下,您可以让MessageListenerAdapter类通过在其构造函数中添加额外的参数来知道您想要使用什么方法。例如:

@Component
public class Subscriber {
public void shipping(Order order){
// Process order here ...
}
//This method is used as listener for Redis topics
public void processTicket(String message){
// Process message here ...
}
// ... more methods
}
// RedisConfig.java
@Bean
MessageListenerAdapter listenerAdapter(Subscriber subscriber) {
return new MessageListenerAdapter(subscriber,"processTicket");
}

 添加您将用来处理从Redis主题到构造器的传入消息的方法的名称;在这个例子中,这是processTicket方法。

正如您所看到的,使用MessageListenerAdapter的好处是您在POJO类中没有依赖关系,使您的应用程序更具可扩展性。

2.2.Publisher(发布者)

本节将介绍向通道(主题)发布消息。要在Redis中发布消息,您有两个选择。您可以使用 low-level RedisConnection类或高级RedisTemplate类(记住,它非常类似于JmsTemplate和RabbitTemplate)。两个接口都提供发布方法连接。发布(msg,通道)。您还必须确定通道(主题)。

使用RedisTemplate的好处是,您有一种方法来定义序列化/反序列化策略。这种方法隐藏了调用原始方法的复杂性,并且是线程安全的。

让我们打开代码com.micai.spring.messaging.RedisDemoApplication类如清单6-3所示。

 

Listing 6-3. com.micai.spring.messaging.RedisDemoApplication.java

@SpringBootApplication
public class RedisDemoApplication {private static final Logger LOGGER = LoggerFactory.getLogger(RedisDemoApplication.class);public static void main(String [] args) {SpringApplication.run(RedisDemoApplication.class, args);}@BeanCommandLineRunner sendMessage(StringRedisTemplate template, @Value("${micai.redis.topic}")String topic){return args -> {String message = "Hello Redis with Spring Boot!";template.convertAndSend(topic, message);LOGGER.info("发布Redis的消息内容为: {}", message);};}
}

清单6-3显示了主应用程序类。正如您所知道的,一旦Spring Boot最终确定了自动配置,它将执行sendMessage方法。该方法将自动在应用程序中自动连接StringRedisTemplate和主题。在application.properties文件中micai.redis.topic=spring-boot-chat的key。然后,它将使用模板通过模板来发送一条消息。接受主题和消息作为其参数的convertAndSend方法。

您通常需要使用RedisTemplate,但是在本例中,我们使用的是StringRedisTemplate。我们这样做是因为RedisTemplate被定义为RedisTemplate<K,V>,当key是Redis的key时(通常是一个字符串),而V是Redis值类型(它将是消息)。然后,StringRedisTemplate是一个使用字符串的子类。换句话说,它就像创建一个RedisTemplate<String,String>对象。

这里的有趣之处在于,StringRedisTemplate为不同的操作定义了多个字符串序列化器,它们适用于不同的数据结构,比如Set和Hash键/值。

现在,如果您运行这个项目(请记住让redis-server启动并运行),您将看到如图6-2、6-3和6-4所示的订阅者日志。

Figure 6-2. Project logs

图6-2显示了日志。具有Around AOP建议的RedisAudit类会生成这些日志。正如您所看到的,它使用订阅者类(侦听器适配器)和接收字符串消息的handleMessage方法。

图6-3显示了带有Redis客户端的终端。监视器在执行代码之前显示,只是为了确定Redis是否接受这些消息。正如您所看到的,Redis展示了在redis-server中执行的命令,PSUBSCRIBE和PUBLISH,当然还有一个PING命令来检查客户端和服务器之间的连接。

Figure 6-3. The redis-cli monitor command

图6-4显示了另一个终端窗口,在那里我们订阅了spring-boot-chat channel/topic。运行这个项目之后,它将打印消息。这是另一种确保您的redis-server正在运行的方法,并且您可以有多个订阅者到一个通道/主题。

Figure 6-4. redis-cli subscribe

2.3.JSON Serialization(JSON序列化)

 现在,回到序列化/反序列化,回想一下我们正在使用JSON格式。您需要做些什么才能使发布/订阅使用JSON和序列化/反序列化为自定义对象?

如果您遵循前面的模块(JMS和RabbitMQ)的相同想法,那么回答这个问题就更容易了,因为您可以在这里应用相同的概念。

让我们先修改RedisConfig类,如清单6-4所示。

 

Listing 6-4. com.micai.spring.messaging.config.RedisConfig.java

@Configuration
@EnableConfigurationProperties(SimpleRedisProperties.class)
public class RedisConfig {// Simple Message Listener@Beanpublic RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,MessageListenerAdapter listenerAdapter,@Value("${micai.redis.topic}") String topic) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.addMessageListener(listenerAdapter, new PatternTopic(topic));return container;}@BeanMessageListenerAdapter rateListenerAdapter(RateSubscriber subscriber) {MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(subscriber);messageListenerAdapter.setSerializer(new Jackson2JsonRedisSerializer<>(Rate.class));return messageListenerAdapter;}@BeanRedisTemplate<String, Rate> redisTemplate(RedisConnectionFactory connectionFactory){RedisTemplate<String,Rate> redisTemplate = new RedisTemplate<String,Rate>();redisTemplate.setConnectionFactory(connectionFactory);redisTemplate.setDefaultSerializer(new Jackson2JsonRedisSerializer<>(Rate.class));redisTemplate.afterPropertiesSet();return redisTemplate;}
}

清单6-4展示了修改后的RedisConfig类。与之前的版本相比,有什么区别呢?RedisMessageListenerContainer bean是一样的,只不过现在我们也在使用rateListenerAdapter bean。让我们来回顾一下这个清单:

• MessageListenerAdapter: 这和以前是一样的,但是这里我们设置了一个新的类适配器,在这个例子中是RateSubscriber。需要注意的是,我们通过调用setSerializer方法来设置序列化器,并且我们正在使用Rate类作为对象映射器来实例化一个Jackson2JsonRedisSerializer对象。

• RedisTemplate<String,Rate>:正如您所看到的,我们定义这个bean来返回一个RedisTemplate,其中键是一个字符串,值是利率类。还要注意的是,我们通过调用setDefaultSerializer方法来设置一个序列化器,我们使用的是与以前相同的类,Jackson2JsonRedisSerializer。

查看RateSubscriber类,如清单6-5所示。

 

Listing 6-5. com.micai.spring.messaging.redis.RateSubscriber.java

/*** @Auther: zhaoxinguo* @Date: 2018/8/9 19:27* @Description:*/
@Component
public class RateSubscriber {private static final Logger LOGGER = LoggerFactory.getLogger(RateSubscriber.class);// If only one method defined, it must be named: handleMessagepublic void handleMessage(Rate rate){// Process message here ...LOGGER.info("消费Redis的消息内容为Rate:{}", rate);}
}

清单6-5显示了RateSubscriber类。与前面的例子没有任何改变。handleMessage方法将接收一条利率消息。现在,让我们看一下发布者,如清单6-6所示。

 

Listing 6-6. com.micai.spring.messaging.RedisDemoApplication.java

@SpringBootApplication
public class RedisDemoApplication {private static final Logger LOGGER = LoggerFactory.getLogger(RedisDemoApplication.class);public static void main(String [] args) {SpringApplication.run(RedisDemoApplication.class, args);}@BeanCommandLineRunner sendMessage(RedisTemplate<String, Rate> template,@Value("${micai.redis.topic}")String topic){return args -> {Rate rate = new Rate("MX", 21.17F, new Date());template.convertAndSend(topic, rate);LOGGER.info("发布Redis的消息内容为Rate: {}", rate);};}
}

 清单6-6显示了主应用程序。将这个类与前一个版本进行比较;改变了什么?我们使用RedisTemplate类并使用利率作为值。我们还通过创建一个新利率来使用Rate类作为消息。

现在您可以运行您的项目并查看日志了。参见图6-5、6-6和6-7。

图6-5显示了RateSubscriber正在处理消息的日志。请记住,在幕后,为了得到对象,序列化/反序列化正在发生。

Figure 6-5. RateSubscriber logs

图6-6显示了带有Redis客户端监视器的终端。我们的想法是看到发布的方法是JSON格式的字符串,这意味着Jackson2JsonRedisSerializer执行了速率类的序列化。

Figure 6-6. redis-cli monitor

图6-7向您展示了一个终端,它的订阅者利率通道/主题。请注意,此订阅者以JSON字符串格式接收利率消息。

Figure 6-7. redis-cli subscriber

如果您将这些结果与之前的Spring模块(JMS和AMQP)进行比较,您将看到我们正在做同样的事情。尽管Spring Data Redis模块没有注解来简化发布/订阅模式,但是很容易就能很快地启动并运行它。

三:The Currency Project(货币项目)

您现在拥有了完成货币项目所需的所有信息。看一下RateRedisSubscriber、RateRedisConfig和RateRedisProperties类开始。您可以重用演示项目来将消息发布到channel/topic。

四:总结

本章讨论了发布/订阅消息模式,并指出Redis提供了开箱即用的功能。它很容易使用。本章向您展示了Spring Boot如何帮助您轻松地配置您的发布者和订阅者,只需添加spring-boot-starer-redis。

您看到了如何发布和监听传入的消息,您看到Spring Data Redis模块使用RedisTemplate(与Spring JMS和Spring AMQP模块相同的行为)使用与生产者和订阅者类似的方式。

尽管这一章很短,但它为您提供了一个使用Redis作为内存消息中间件的起点。

下一章将介绍WebSockets,这是另一种使用Spring Boot进行消息传递的方式。

五:源代码

https://gitee.com/micai/micai-spring-message/tree/master/redis-demo

这篇关于Spring Boot Messaging Chapter 6 Messaging with Redis的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/767713

相关文章

Spring Boot集成/输出/日志级别控制/持久化开发实践

《SpringBoot集成/输出/日志级别控制/持久化开发实践》SpringBoot默认集成Logback,支持灵活日志级别配置(INFO/DEBUG等),输出包含时间戳、级别、类名等信息,并可通过... 目录一、日志概述1.1、Spring Boot日志简介1.2、日志框架与默认配置1.3、日志的核心作用

破茧 JDBC:MyBatis 在 Spring Boot 中的轻量实践指南

《破茧JDBC:MyBatis在SpringBoot中的轻量实践指南》MyBatis是持久层框架,简化JDBC开发,通过接口+XML/注解实现数据访问,动态代理生成实现类,支持增删改查及参数... 目录一、什么是 MyBATis二、 MyBatis 入门2.1、创建项目2.2、配置数据库连接字符串2.3、入

Springboot项目启动失败提示找不到dao类的解决

《Springboot项目启动失败提示找不到dao类的解决》SpringBoot启动失败,因ProductServiceImpl未正确注入ProductDao,原因:Dao未注册为Bean,解决:在启... 目录错误描述原因解决方法总结***************************APPLICA编

深度解析Spring Security 中的 SecurityFilterChain核心功能

《深度解析SpringSecurity中的SecurityFilterChain核心功能》SecurityFilterChain通过组件化配置、类型安全路径匹配、多链协同三大特性,重构了Spri... 目录Spring Security 中的SecurityFilterChain深度解析一、Security

Redis客户端连接机制的实现方案

《Redis客户端连接机制的实现方案》本文主要介绍了Redis客户端连接机制的实现方案,包括事件驱动模型、非阻塞I/O处理、连接池应用及配置优化,具有一定的参考价值,感兴趣的可以了解一下... 目录1. Redis连接模型概述2. 连接建立过程详解2.1 连php接初始化流程2.2 关键配置参数3. 最大连

SpringBoot多环境配置数据读取方式

《SpringBoot多环境配置数据读取方式》SpringBoot通过环境隔离机制,支持properties/yaml/yml多格式配置,结合@Value、Environment和@Configura... 目录一、多环境配置的核心思路二、3种配置文件格式详解2.1 properties格式(传统格式)1.

Apache Ignite 与 Spring Boot 集成详细指南

《ApacheIgnite与SpringBoot集成详细指南》ApacheIgnite官方指南详解如何通过SpringBootStarter扩展实现自动配置,支持厚/轻客户端模式,简化Ign... 目录 一、背景:为什么需要这个集成? 二、两种集成方式(对应两种客户端模型) 三、方式一:自动配置 Thick

Spring WebClient从入门到精通

《SpringWebClient从入门到精通》本文详解SpringWebClient非阻塞响应式特性及优势,涵盖核心API、实战应用与性能优化,对比RestTemplate,为微服务通信提供高效解决... 目录一、WebClient 概述1.1 为什么选择 WebClient?1.2 WebClient 与

Java.lang.InterruptedException被中止异常的原因及解决方案

《Java.lang.InterruptedException被中止异常的原因及解决方案》Java.lang.InterruptedException是线程被中断时抛出的异常,用于协作停止执行,常见于... 目录报错问题报错原因解决方法Java.lang.InterruptedException 是 Jav

深入浅出SpringBoot WebSocket构建实时应用全面指南

《深入浅出SpringBootWebSocket构建实时应用全面指南》WebSocket是一种在单个TCP连接上进行全双工通信的协议,这篇文章主要为大家详细介绍了SpringBoot如何集成WebS... 目录前言为什么需要 WebSocketWebSocket 是什么Spring Boot 如何简化 We