【Redis笔记】一起学习Redis | 从消息队列到PubSub模型

2024-01-14 12:32

本文主要是介绍【Redis笔记】一起学习Redis | 从消息队列到PubSub模型,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一起学习Redis | 从消息队列到发布订阅模型


如果觉得对你有帮助,能否点个赞或关个注,以示鼓励笔者呢?!博客目录 | 先点这里

  • Redis的消息队列

    • Redis中的消息队列怎么实现?
    • 怎么使用Redis实现延时队列?
  • Redis的发布订阅模型

    • 为什么还要发布订阅模型?
    • 发布订阅模型的缺点
    • 发布/订阅模型的命令
    • 消息结构
  • 代码实践

    • Redis消息队列 | Java
    • Redis PubSub | Python
    • Redis PubSub | Java

Redis的消息队列


Redis中的消息队列怎么实现?
  • 普通FIFO队列
    • lpush + rpop
    • rpush + lpop
  • 双端队列
    • lpush + rpush + lpop + rpop
  • 阻塞FIFO队列
    • lpush + blpop
    • rpush + brpop
  • 阻塞双端队列
    • lpush + rpush + blpop + brpop

怎么使用Redis实现延时队列?

为什么需要延时队列?
如果你了解分布式锁就会知道,当我们多个客户端的请求对同一把分布式锁产生了冲突时,我们就要讨论以什么样的策略去解决这个冲突。一般情况有三种

  • 抛异常,不管
  • while + sleep,阻塞线程,休眠一段时间,重试
  • 将请求转移到消息队列中,过一段时间再重试

而我们的Redis就可以实现第三种策略所需的延时队列。

Redis怎么实现延时队列?
Redis可以通过sorted set实现,既有序集合。我们将消息序列化成一个字符串,作为zsetvalue, 而这个value对应的score就是该消息的到期处理时间。然后可以使用定时任务每隔一小段时间就轮询一次这个zset, 判断zset的排名中最前(默认从小到大)的score是否小于当前时间,如果小于就说明该消息要重新拿出来处理了。

# python锁冲突后,将消息转移到延迟队列的伪代码
def delay(message):"""将消息转移到延迟队列中,延迟队列由zset实现"""message.id = str(uuid.uuid4()) # UUID, 消息的唯一标识data = json.dumps(message) 	   # 将消息序列化成json字符串retry_time = time.time() + 60  # 该消息在60s后被拿出来重新处理redis.zadd('delay-queue',retry_time, data)def loop():"""无限轮询,可单/多线程轮询到期要被处理的消息交给handler方法处理"""while True:# 从zset中获取一条score位于0到当前时间戳的消息# 如果有消息的到期处理时间超过了当前时间,就有数据data = redis.zrangebyscore('delay-queue',0,time.time(),start = 0, num = 1)if not data: # 没有消息到期,就等多1stime.sleep(1)continuedata = data[0]success = redis.zrem('delay-queue', data)if not success: # 因为可能是并发轮询,所以只有zrem返回的是成功删除,才说明消息是本线程有权利处理message = json.loads(success)handler(message) #重新处理消息

当然,如果你觉得查询zset的频率不够,有很多种改进手段,比如一次拿zset的前10的到期处理时间与当前时间比较,或是多线程的方式轮询,甚至是多节点一起轮询。当然只要涉及多线程或多节点,就一定要注意线程安全问题~

代码优化空间
当然以上的python伪代码还是可以有进一步优化空间的,既同一个任务可能会被多个线程争抢,而没有争抢到的线程都白干活了。我们可以考虑通过lua脚本,将zrangebyscore的流程和zrem的流程合并成一个原子操作。这样就可以避免多线程或多进程在争抢同一个任务时造成的资源浪费。


Redis的发布/订阅模型


为什么还要发布订阅模型?

前面我们讲了Redis消息队列的使用方法,但是没有提到Redis消息队列的不足。Redis消息队列的一个很大的不足就是无法支持消息的多播机制,正因为如此,Redis才提出了发布订阅模型!

消息多播
消息多播允许生产者值生成一次消息,由中间件负责将消息复制到多个消息队列中,每个消息队列由相应的消费组进行消费。

PubSub
为了支持消息多播,Redis不能再依赖基本的数据类型实现了,它单独的使用了一个模块来支持消息多播的功能,既PubSub模块, 也就是通常我们所说的发布/订阅模型


发布订阅模型的缺点

离线节点彻底丢失消息
Redis的PubSub模块的生产者发送了一个消息给订阅者,Redis会直接找到订阅的消费者发送。如果一个消费者都没有,那么生产者的消息就相当石沉大海,直接被丢弃。

  • 所以这就会造成一个很大的弊端。当有三个订阅者订阅了一个生产者,突然某个消费者节点离线了,过了一会又上线了。那么在该消费者离线期间,生成者所推送的内容,相对该掉线消费者就是彻底丢失了。

没有消息确认机制

  • 只要消息从Redis的队列中pop出去后,就再也不跟Redis有任何关系了,业务逻辑执行错了,消息也不会回到Redis中。对比RabbitMQ就可以做到,业务执行失败,no-ack,消息会回到Rabbit队列中
  • 无法保证消息真实到达中间件,既消息从发布者客户端发送出现后,是否有到达Redis服务器,是无法知道的。相比RabbitMQ就可以有Comfirm机制

消息不能持久化

  • 因为PubSub的消息并非是Redis的一种数据类型,所以PubSub中数据是没有得到aof等持久化机制的支持的,既一旦节点崩溃,重新上线后,消息是会丢失的。这一点在Redis 5.0的Stream新数据结构中得到了改善

也正因为PubSub模块的这些缺点,在严谨的消息队列领域,Redis的PubSub模型上不了主舞台,只能做一下简单的消息推送功能。总之Redis的发布订阅模块并不能作为严格意义的消息中间件,只能算是一个轻量级消息队列,可以在一些无伤大雅的低要求场景上使用。

所以Redis的作者单独开启了一个叫Disque的项目,专门做多播消息队列,但是目前该项目还未成熟。但是更让人劲爆的消息是,Redis 5.0新增了一个Stream数据结构,它是一个强大的支持多播的可持久化的消息队列。所以在可预见的未来,PubSub会逐渐被淘汰,Disque项目也可能不会有发行版本了。


发布/订阅模型的命令
  • publish
    将信息 message 发送到指定的频道 channel,返回频道订阅者数量
    publish channel message

  • subscribe
    订阅一个或多个频道
    subscribe channel [channel...]

  • psubscribe
    订阅符合一个或多个匹配模式的所有频道,psubscribe new.* 则是订阅所有new.开头的频道(new.log,new.studnet,etc…)
    psubscribe pattern [pattern …]

  • unsubscribe
    退订一个或多个的指定频道
    unsubscribe channel [channel...]

  • punsubscribe
    退订符合一个或多个匹配模式的所有频道
    punsubcribe pattern [pattern]

  • pubsub
    pubsub是一个查看订阅与发布系统状态的内省命令, 它由数个不同格式的子命令组成pubsub

    • pubsub channels [pattern]查询系统中符合模式的频道信息,pattern为空,则查询系统中所有存在的频道
    • pubsub numsub [channel] 查询一个或多个频道的订阅数
    • pubsub numpat 查询当前客户端订阅了多少频道

我们可以看到本质就是由四种命令组成,发布命令,订阅命令,退订命令和查询命令


消息结构
{'type': 'message', 	'pattern': None, 'channel': 'python.new','data': 'hello python?'}

我们可以看到Redis的发布订阅模型中,消息传递的数据结构是有四个固定字段的

  • type
    表示消息的类型
    如果是一个普通的消息,那么类型就是message
    如果是控制消息,比如订阅命令的反馈,那么类型就是subscribe
    如果是模式订阅的返回,它的类型就是psubscribe
    如果是取消订阅的指令,那么就可以是unsubscribe或punsubscribe
  • pattern
    表示当前消息是使用那种模式订阅的,如果是通过subscribe命令订阅的,那么该字段就为空
  • channel
    表示当前订阅的频道名称,其实就相当于kafka的主题,rabbitmq的routing-key
  • data
    这个就很明显的,就是数据体

代码实践


Redis消息队列 | Java

基于Spring Data Redis客户端实现

RedisConfig.java

package com.snailmann.redis.queue.config;import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.stereotype.Component;/*** Redis配置*/
@Component
public class RedisConfig {/*** RedisTemplate配置** @param connectionFactory* @return*/@Beanpublic RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();redisTemplate.setKeySerializer(new StringRedisSerializer());redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());redisTemplate.setHashKeySerializer(new StringRedisSerializer());redisTemplate.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());redisTemplate.setConnectionFactory(connectionFactory);return redisTemplate;}}

RedisService.java

package com.snailmann.redis.queue.service;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;@Component
public class RedisNativeMsgQueueService {@Autowiredprivate RedisTemplate<String, Object> redisTemplate;private static final String KEY = "test:list";/*** 模拟消息队列入队操作,5批入队,每批入队5个消息*/public void lpush() {List<Integer> nums = Stream.of(1, 2, 3, 4, 5).collect(Collectors.toList());for (int i = 0; i < 5; i++) {redisTemplate.opsForList().leftPushAll(KEY, nums.toArray());System.out.println("rpush :" + nums);//每隔5秒执行一次try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}}}/*** 模拟阻塞队列的弹出操作,当没有消息时,将线程阻塞* 1. 但是Spring Data Redis, 并没有完全的阻塞api ,可是给了一个超时时间。如果超时,会返回null*/public void rpop() {while (true) {Integer result = (Integer) redisTemplate.opsForList().rightPop(KEY, 5, TimeUnit.SECONDS);if (null == result) {continue;}System.out.println(result);}}}

Test类

package com.snailmann.redis.queue.service;import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;/*** 1. 先模拟push* 2. 再模拟pop的阻塞队列操作*/
@RunWith(SpringRunner.class)
@SpringBootTest
public class RedisNativeMsgQueueServiceTest {@AutowiredRedisNativeMsgQueueService redisNativeMsgQueueService;/*** 模拟消息队列的生产者操作*/@Testpublic void lpush() {redisNativeMsgQueueService.lpush();}/*** 模拟消息队列的消费者操作*/@Testpublic void rpop() {redisNativeMsgQueueService.rpop();}
}

分别在测试代码确定测试类的lpush和lpop方法测试即可


Redis PubSub | Python

发布者

import redisclient = redis.StrictRedis(host='127.0.0.1', port=6379)# 朝python.new频道发送三条消息
client.publish('python.new', 'hello python?')
client.publish('python.new', "let's learn pythoneveryday!!")
client.publish('python.new', "easy python3.7")

订阅者

# 普通版本,利用sleep来阻塞
import redis
import timeclient = redis.StrictRedis(host='127.0.0.1', port=6379)p = client.pubsub()
p.subscribe('python.new')
while True:message = p.get_message()if not message:time.sleep(1)continueprint(message)
# 改良版本,利用listen()来阻塞
import redis
import timeclient = redis.StrictRedis(host='127.0.0.1', port=6379)p = client.pubsub()
p.subscribe('python.new')
for message in p.listen():print(message)

先启动发布者,再启动订阅者

{'type': 'subscribe', 'pattern': None, 'channel': b'python.new', 'data': 1}
{'type': 'message', 'pattern': None, 'channel': b'python.new', 'data': b'hello python?'}
{'type': 'message', 'pattern': None, 'channel': b'python.new', 'data': b"let's learn pythoneveryday!!"}
{'type': 'message', 'pattern': None, 'channel': b'python.new', 'data': b'easy python3.7'}

Redis PubSub | Java

基于Spring Data Redis客户端实现

RedisConfig

package com.snailmann.redis.queue.config;import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.stereotype.Component;/*** Redis配置*/
@Component
public class RedisConfig {/*** RedisTemplate配置** @param connectionFactory* @return*/@Beanpublic RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();redisTemplate.setKeySerializer(new StringRedisSerializer());redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());redisTemplate.setHashKeySerializer(new StringRedisSerializer());redisTemplate.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());redisTemplate.setConnectionFactory(connectionFactory);return redisTemplate;}/*** Redis PubSub 配置** @param connectionFactory* @param listenerAdapter* @return*/@BeanRedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,MessageListenerAdapter listenerAdapter) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);//配置要订阅的订阅项container.addMessageListener(listenerAdapter, new PatternTopic("java.news"));return container;}}

RedisPubliscer.java

package com.snailmann.redis.queue.pubsub;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class RedisPubliscer {@Autowiredprivate RedisTemplate<String, String> redisTemplate;/*** Redis PubSub 生产者** @return*/public void publish() {redisTemplate.convertAndSend("java.news", "hello java?");redisTemplate.convertAndSend("java.news", "let's learn java everyday!!");redisTemplate.convertAndSend("java.news", "easy java 3.7");}}

RedisSubscriber.java

package com.snailmann.redis.queue.pubsub;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.stereotype.Component;/**
* Redis PubSub 消费者
* 
*/
@Component
public class RedisSubscriber extends MessageListenerAdapter {@Overridepublic void onMessage(Message message, byte[] bytes) {System.out.println("监听到生产者发送的消息: " + message);}}

RedisPubliscerTest.java 测试类

package com.snailmann.redis.queue.pubsub;import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;import static org.junit.Assert.*;@RunWith(SpringRunner.class)
@SpringBootTest
public class RedisPubliscerTest {@AutowiredRedisPubliscer redisPubliscer;@Testpublic void publish() {redisPubliscer.publish();}
}

直接运行测试类即可,因为测试类启动时,肯定会运行Spring容器,所以消费者listener肯定会在监听的

监听到生产者发送的消息: hello java?
监听到生产者发送的消息: let's learn java everyday!!
监听到生产者发送的消息: easy java 3.7

参考资料


  • 《Redis深度历险-核心原理与应用实践》
  • Redis命名参考
  • 如果觉得对你有帮助,能否点个赞或关个注,以示鼓励笔者呢?!

这篇关于【Redis笔记】一起学习Redis | 从消息队列到PubSub模型的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/605171

相关文章

Knife4j+Axios+Redis前后端分离架构下的 API 管理与会话方案(最新推荐)

《Knife4j+Axios+Redis前后端分离架构下的API管理与会话方案(最新推荐)》本文主要介绍了Swagger与Knife4j的配置要点、前后端对接方法以及分布式Session实现原理,... 目录一、Swagger 与 Knife4j 的深度理解及配置要点Knife4j 配置关键要点1.Spri

Redis出现中文乱码的问题及解决

《Redis出现中文乱码的问题及解决》:本文主要介绍Redis出现中文乱码的问题及解决,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1. 问题的产生2China编程. 问题的解决redihttp://www.chinasem.cns数据进制问题的解决中文乱码问题解决总结

java向微信服务号发送消息的完整步骤实例

《java向微信服务号发送消息的完整步骤实例》:本文主要介绍java向微信服务号发送消息的相关资料,包括申请测试号获取appID/appsecret、关注公众号获取openID、配置消息模板及代码... 目录步骤1. 申请测试系统2. 公众号账号信息3. 关注测试号二维码4. 消息模板接口5. Java测试

Redis的持久化之RDB和AOF机制详解

《Redis的持久化之RDB和AOF机制详解》:本文主要介绍Redis的持久化之RDB和AOF机制,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录概述RDB(Redis Database)核心原理触发方式手动触发自动触发AOF(Append-Only File)核

Redis分片集群、数据读写规则问题小结

《Redis分片集群、数据读写规则问题小结》本文介绍了Redis分片集群的原理,通过数据分片和哈希槽机制解决单机内存限制与写瓶颈问题,实现分布式存储和高并发处理,但存在通信开销大、维护复杂及对事务支持... 目录一、分片集群解android决的问题二、分片集群图解 分片集群特征如何解决的上述问题?(与哨兵模

SpringBoot连接Redis集群教程

《SpringBoot连接Redis集群教程》:本文主要介绍SpringBoot连接Redis集群教程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1. 依赖2. 修改配置文件3. 创建RedisClusterConfig4. 测试总结1. 依赖 <de

SpringBoot+Redis防止接口重复提交问题

《SpringBoot+Redis防止接口重复提交问题》:本文主要介绍SpringBoot+Redis防止接口重复提交问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不... 目录前言实现思路代码示例测试总结前言在项目的使用使用过程中,经常会出现某些操作在短时间内频繁提交。例

Redis 配置文件使用建议redis.conf 从入门到实战

《Redis配置文件使用建议redis.conf从入门到实战》Redis配置方式包括配置文件、命令行参数、运行时CONFIG命令,支持动态修改参数及持久化,常用项涉及端口、绑定、内存策略等,版本8... 目录一、Redis.conf 是什么?二、命令行方式传参(适用于测试)三、运行时动态修改配置(不重启服务

浅析如何保证MySQL与Redis数据一致性

《浅析如何保证MySQL与Redis数据一致性》在互联网应用中,MySQL作为持久化存储引擎,Redis作为高性能缓存层,两者的组合能有效提升系统性能,下面我们来看看如何保证两者的数据一致性吧... 目录一、数据不一致性的根源1.1 典型不一致场景1.2 关键矛盾点二、一致性保障策略2.1 基础策略:更新数

Redis Cluster模式配置

《RedisCluster模式配置》:本文主要介绍RedisCluster模式配置,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录分片 一、分片的本质与核心价值二、分片实现方案对比 ‌三、分片算法详解1. ‌范围分片(顺序分片)‌2. ‌哈希分片3. ‌虚