一文带你搞懂Redis Stream的6种消息处理模式

2025-05-04 17:50

本文主要是介绍一文带你搞懂Redis Stream的6种消息处理模式,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

《一文带你搞懂RedisStream的6种消息处理模式》Redis5.0版本引入的Stream数据类型,为Redis生态带来了强大而灵活的消息队列功能,本文将为大家详细介绍RedisStream的6...

Redis 5.0版本引入的Stream数据类型,为Redis生态带来了强大而灵活的消息队列功能,弥补了之前发布/订阅模式的不足,如消息持久化、消费者组、消息确认等特性。

Redis Stream结合了传统消息队列和时序数据库的特点,适用于日志收集、事件驱动应用、实时分析等多种场景。

本文将介绍Redis Stream的6种消息处理模式。

1. 简单消费模式(Simple Consumption)

基本概念

简单消费模式是Redis Stream最基础的使用方式,不使用消费者组,直接读取流中的消息。生产者将消息追加到流中,消费者通过指定起始ID来读取消息。

核心命令

# 发布消息
XADD stream_name [ID] field value [field value ...]

# 读取消息
XREAD [COUNT count] [BLOCK milliseconds] STREAMS stream_name start_id

实现示例

Redis CLI

# 添加消息到stream
> XADD mystream * sensor_id 1234 temperature 19.8 humidity 56
"1647257548956-0"

# 从头开始读取所有消息
> XREAD STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) "1647257548956-0"
         2) 1) "sensor_id"
            2) "1234"
            3) "temperature"
            4) "19.8"
            5) "humidity"
            6) "56"

# 从指定ID开始读取
> XREAD STREAMS mystream 1647257548956-0
(empty list or set)

# 从最新的消息ID之后开始读取(阻塞等待新消息)
> XREAD BLOCK 5000 STREAMS mystream $
(nil)

Java Spring Boot示例

@Service
public class SimpleStreamService {
    
    @Autowired
    private StringRedisTemplate redisTemplate;
    
    /**
     * 发布消息到Stream
     */
    public String publishEvent(String streamKey, Map<String, Object> eventData) {
        StringRecord record = StreamRecords.string(eventData).withStreamKey(streamKey);
        return redisTemplate.opsForStream().add(record).getValue();
    }
    
    /**
     * 从指定位置开始读取消息
     */
    public List<MapRecord<String, Object, Object>> readEvents(String streamKey, String startId, int count) {
        StreamReadOptions readOptions = StreamReadOptions.empty().count(count);
        return redisTemplate.opsForStream().read(readOptions, StreamOffset.from(streamKey, ReadOffset.from(startId)));
    }
    
    /**
     * 阻塞式读取消息
     */
    public List<MapRecord<String, Object, Object>> readEventsBlocking(String streamKey, int timeoutMillis) {
        StreamReadOptions readOptions = StreamReadOptions.empty().count(10).block(Duration.ofMillis(timeoutMillis));
        return redisTemplate.opsForStream().read(readOptions, StreamOffset.latest(streamKey));
    }
}

使用场景

  • 简单的事件日志记录
  • 单一消费者场景
  • 时间序列数据收集
  • 开发和调试阶段

优缺点

优点

  • 实现简单,无需创建和管理消费者组
  • 直接控制从哪个位置开始消费消息
  • 适合单个消费者场景

缺点

  • 无法实现负载均衡
  • 无法追踪消息确认状态
  • 需要手动管理已读消息ID
  • 服务重启需自行记录上次读取位置

2. 消费者组模式(Consumer Groups)

基本概念

消费者组允许多个消费者共同处理一个流的消息,实现负载均衡,并提供消息确认机制,确保消息至少被处理一次。每个消费者组维护自己的消费位置,不同消费者组之间互不干扰。

核心命令

# 创建消费者组
XGROUP CREATE stream_name group_name [ID|$] [MKSTREAM]

# 从消费者组读取消息
XREADGROUP GROUP group_name consumer_name [COUNT count] [BLOCK milliseconds] STREAMS stream_name [>|ID]

# 确认消息处理完成
XACK stream_name group_name message_id [message_id ...]

实现示例

Redis CLI

# 创建消费者组
> XGROUP CREATE mystream processing-group $ MKSTREAM
OK

# 消费者1读取消息
> XREADGROUP GROUP processing-group consumer-1 COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) "1647257548956-0"
         2) 1) "sensor_id"
            2) "1234"
            3) "temperature"
            4) "19.8"
            5) "humidity"
            6) "56"

# 确认消息已处理
> XACK mystream processing-group 1647257548956-0
(integer) 1

# 消费者2读取消息(已无未处理消息)
> XREADGROUP GROUP processing-group consumer-2 COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) (empty list or set)

Java Spring Boot示例

@Service
public class ConsumerGroupService {
    
    @Autowired
    private StringRedisTemplate redisTemplate;
    
    /**
     * 创建消费者组
     */
    public void createGroup(String streamKey, String groupName) {
        try {
            redisTemplate.opsForStream().createGroup(streamKey, groupName);
        } catch (RedisSystemException e) {
            // 处理流不存在的情况
            if (e.getRootCause() instanceof RedisCommandExecutionException 
                    && e.getRootCause().getMessage().contains("NOGROUP")) {
                redisTemplate.opsForStream().createGroup(ReadOffset.from("0"), streamKey, groupName);
            } else {
                throw e;
            }
        }
    }
    
    /**
     * 从消费者组读取消息
     */
    public List<MapRecord<String, Object, Object>> readFromGroup(
            String streamKey, String groupName, String consumerName, int count) {
        
        StreamReadOptions options = StreamReadOptions.empty().count(count);
        return redisTemplate.opsForStream().read(
                Consumer.from(groupName, consumerName),
                options,
                StreamOffset.create(streamKey, ReadOffset.lastConsumed())
        );
    }
    
    /**
     * 阻塞式从消费者组读取消息
     */
    public List<MapRecord<String, Object, Object>> readFromGroupBlocking(
            String streamKey, String groupName, String consumerName, int count, Duration timeout) {
        
        StreamReadOptions options = StreamReadOptions.empty().count(count).block(timeout);
        return redisTemplate.opsForStream().read(
                Consumer.from(groupName, consumerName),
                options,
                StreamOffset.create(streamKey, ReadOffset.lastConsumed())
        );
    }
    
    /**
     * 确认消息已处理
     */
    public Long acknowledgeMessage(String streamKey, String groupName, String... messageIds) {
        return redisTemplate.opsForStream().acknowledge(streamKey, groupName, messageIds);
    }
}

使用场景

  • 需要横向扩展消息处理能力的系统
  • 要求消息可靠处理的业务场景
  • 实现消息工作队列
  • 微服务间的事件传递

优缺点

优点

  • 多个消费者可以并行处理消息
  • 提供消息确认机制,保证消息不丢失
  • 支持消费者崩溃后恢复处理
  • 每个消费者组维护独立的消费位置

缺点

  • 实现相对复杂
  • 需要妥善管理消费者组和消费者
  • 需要显式处理消息确认
  • 需要定期处理未确认的消息

3. 阻塞式消费模式(Blocking Consumption)

基本概念

阻塞式消费允许消费者在没有新消息时保持连接,等待新消息到达。这种模式减少了轮询开销,提高了实时性,适合对消息处理时效性要求高的场景。

核心命令

# 阻塞式简单消费
XREAD BLOCK milliseconds STREAMS stream_name ID

# 阻塞式消费者组消费
XREADGROUP GROUP group_name consumer_name BLOCK milliseconds STREAMS stream_name >

实现示例

Redis CLI

# 阻塞等待新消息(最多等待10秒)
> XREAD BLOCK 10000 STREAMS mystream $
(nil)  # 如果10秒内没有新消息

# 使用消费者组的阻塞式消费
> XREADGROUP GROUP processing-group consumer-1 BLOCK 10000 STREAMS mystream >
(nil)  # 如果10秒内没有新分配的消息

Java Spring Boot示例

@Service
public class BlockingStreamConsumerService {
    
    @Autowired
    private StringRedisTemplate redisTemplate;
    
    /**
     * 阻塞式消息消费者任务
     */
    @Async
    public void startBlockingConsumer(String streamKey, String lastId, Duration timeout) {
        StreamReadOptions options = StreamReadOptions.empty()
                .count(1)
                .block(timeout);
        
        while (!Thread.currentThread().isInterrupted()) {
            try {
                // 阻塞读取消息
                List<MapRecord<String, Object, Object>> records = redisTemplate.opsForStream()
                        .read(options, StreamOffset.from(streamKey, ReadOffset.from(lastId)));
                
                if (records != null && !records.isEmpty()) {
                    for (MapRecord<String, Object, Object> record : records) {
                        // 处理消息
                        processMessage(record);
                        
                        // 更新最后读取的ID
                        lastId = record.getId().getValue();
                    }
                } else {
                    // 超时未读取到消息,可以执行一些其他逻辑
        www.chinasem.cn        }
            } catch (Exception e) {
                // 异常处理
                log.error("Error reading from stream: {}", e.getMessage(), e);
                try {
                    Thread.sleep(1000); // 出错后等待一段时间再重试
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }
    }
    
    /**
     * 阻塞式消费者组消费
     */
    @Async
    public void startGroupBlockingConsumer(
            String streamKey, String groupName, String consumerName, Duration timeout) {
        
        StreamReadOptions options = StreamReadOptions.empty()
                .count(1)
                .block(timeout);
        
        while (!Thread.currentThread().isInterrupted()) {
            try {
                // 阻塞读取消息
                List<MapRecord<String, Object, Object>> records = redisTemplate.opsForStream()
                        .read(Consumer.from(groupName, consumerName),
                               options,
                               StreamOffset.create(streamKey, ReadOffset.lastConsumed()));
                
                if (records != null && !records.isEmpty()) {
                    for (MapRecord<String, Object, Object> record : records) {
                        try {
    android                        // 处理消息
                            processMessage(record);
                            
                            // 确认消息
                            redisTemplate.opsForStream()
                                    .acknowledge(streamKey, groupName, record.getId().getValue());
                        } catch (Exception e) {
                            // 处理失败,记录日志
                            log.error("Error processing message: {}", e.getMessage(), e);
                        }
                    }
                }
            } catch (Exception e) {
                log.error("Error reading from stream group: {}", e.getMessage(), e);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }
    }
    
    private void processMessage(MapRecord<String, Object, Object> record) {
        // 实际消息处理逻辑
        log.info("Processing message: {}", record);
        // ...处理消息的具体业务逻辑
    }
}

使用场景

  • 实时数据处理系统
  • 事件驱动的任务处理
  • 低延迟要求的应用
  • 即时通讯系统
  • 通知服务

优缺点

优点

  • 减少轮询带来的资源浪费
  • 实时性好,消息到达后立即处理
  • 降低Redis和客户端的负载
  • 节省CPU和网络资源

缺点

  • 长连接可能占用Redis连接资源
  • 需要合理设置超时时间
  • 可能需要处理网络中断后的重连
  • 消费者需要具备并发处理能力

4. 扇出模式(Fan-out Pattern)

基本概念

扇出模式允许多个独立的消费者组同时消费同一个流中的所有消息,类似于发布/订阅模式,但具有消息持久化和回溯能力。每个消费者组独立维护自己的消费位置。

核心命令

创建多个消费者组,它们都独立消费同一个流:

XGROUP CREATE stream_name group_name_1 $ MKSTREAM
XGROUP CREATE stream_name group_name_2 $ MKSTREAM
XGROUP CREATE stream_name group_name_3 $ MKSTREAM

实现示例

Redis CLI

# 创建多个消费者组
> XGROUP CREATE notifications analytics-group $ MKSTREAM
OK
> XGROUP CREATE notifications email-group $ MKSTREAM
OK
> XGROUP CREATE notifications mobile-group $ MKSTREAM
OK

# 添加一条消息
> XADD notifications * type user_signup user_id 1001 email "user@example.com"
"1647345678912-0"

# 从各个消费者组读取(每个组都能收到所有消息)
> XREADGROUP GROUP analytics-group analytics-1 COUNT 1 STREAMS notifications >
1) 1) "notifications"
   2) 1) 1) "1647345678912-0"
         2) 1) "type"
            2) "user_signup"
            3) "user_id"
            4) "1001"
            5) "email"
            6) "user@example.com"

> XREADGROUP GROUP email-group email-1 COUNT 1 STREAMS notifications >
1) 1) "notifications"
   2) 1) 1) "1647345678912-0"
         2) 1) "type"
            2) "user_signup"
            3) "user_id"
            4) "1001"
            5) "email"
            6) "user@example.com"

> XREADGROUP GROUP mobile-group mobile-1 COUNT 1 STREAMS notifications >
1) 1) "notifications"
   2) 1) 1) "1647345678912-0"
         2) 1) "type"
            2) "user_signup"
            3) "user_id"
            4) "1001"
            5) "email"
            6) "user@example.com"

Java Spring Boot示例

@Service
p编程ublic class FanOutService {
    
    @Autowired
    private StringRedisTemplate redisTemplate;
    
    /**
     * 初始化扇出消费者组
     */
    public void initializeFanOutGroups(String streamKey, List<String> groupNames) {
        // 确保流存在
        try {
            StreamInfo.XInfoStream info = redisTemplate.opsForStream().info(streamKey);
        } catch (Exception e) {
            // 流不存在,发送一个初始消息
            Map<String, Object> initialMessage = new HashMap<>();
            initialMessage.put("init", "true");
            redisTemplate.opsForStream().add(streamKey, initialMessage);
        }
        
        // 创建所有消费者组
        for (String groupName : groupNames) {
            try {
                redisTemplate.opsForStream().createGroup(streamKey, groupName);
            } catch (Exception e) {
                // 忽略组已存在的错误
                log.info("Group {} may already exist: {}", groupName, e.getMessage());
            }
        }
    }
    
    /**
     * 发布扇出消息
     */
    public String publishFanOutMessage(String streamKey, Map<String, Object> messageData) {
        StringRecord record = StreamRecords.string(messageData).withStreamKey(streamKey);
        return redisTemplate.opsForStream().add(record).getValue();
    }
    
    /**
     * 为特定组启动消费者
     */
    @Async
    public void startGroupConsumer(
            String streamKey, String groupName, String consumerName, 
            Consumer<MapRecord<String, Object, Object>> messageHandler) {
        
        StreamReadOptions options = StreamReadOptions.empty().count(10).block(Duration.ofSeconds(2));
        
        while (!Thread.currentThread().isInterrupted()) {
            try {
                List<MapRecord<String, Object, Object>> messages = redisTemplate.opsForStream().read(
                        Consumer.from(groupName, consumerName),
                        options,
                        StreamOffset.create(streamKey, ReadOffset.lastConsumed())
                );
                
                if (messages != null && !messages.isEmpty()) {
                    for (MapRecord<String, Object, Object> message : messages) {
                        try {
                            // 处理消息
                            messageHandler.accept(message);
                            
                            // 确认消息
                            redisTemplate.opsForStream().acknowledge(
                                    streamKey, groupName, message.getId().getValue());
                        } catch (Exception e) {
                            log.error("Error processing message in group {}: {}", 
                                    groupName, e.getMessage(), e);
                        }
                    }
                }
            } catch (Exception e) {
                log.error("Error reading from stream for group {}: {}", 
                        groupName, e.getMessage(), e);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }
    }
}

使用示例

@Service
public class NotificationService {
    
    @Autowired
    private FanOutService fanOutService;
    
    @PostConstruct
    public void init() {
        // 初始化扇出组
        List<String> groups = Arrays.asList("email-group", "sms-group", "analytics-group");
        fanOutService.initializeFanOutGroups("user-events", groups);
        
        // 启动各个消费者组的处理器
        fanOutService.startGroupConsumer(
                "user-events", "email-group", "email-consumer", this::processEmailNotification);
        
        fanOutService.startGroupConsumer(
                "user-events", "sms-group", "sms-consumer", this::processSmsNotification);
        
        fanOutService.startGroupConsumer(
                "user-events", "analytics-group", "analytics-consumer", this::processAnalyticsEvent);
    }
    
    private void processEmailNotification(MapRecord<String, Object, Object> message) {
        Map<Object, Object> messageData = message.getValue();
        log.info("Processing email notification: {}", messageData);
        // 邮件发送逻辑
    }
    
    private void processSmsNotification(MapRecord<String, Object, Object> message) {
        Map<Object, Object> messageData = message.getValue();
        log.info("Processing SMS notification: {}", messageData);
        // 短信发送逻辑
    }
    
    private void processAnalyticsEvent(MapRecord<String, Object, Object> message) {
        Map<Object, Object> messageData = message.getValue();
        log.info("Processing analytics event: {}", messageData);
        // 分析事件处理逻辑
    }
    
    public void publishUserEvent(String eventType, Map<String, Object> eventData) {
        Map<String, Object> message = new HashMap<>(eventData);
        message.put("event_type", eventType);
        message.put("timestamp", System.currentTimeMillis());
        
        fanOutService.publishFanOutMessage("user-events", message);
    }
}

使用场景

  • 多个系统需要独立处理同一事件流
  • 实现事件广播机制
  • 系统集成:一个事件触发多个业务流程
  • 日志统一处理并分发到不同服务
  • 通知系统:一个事件需要通过多种方式通知用户

优缺点

优点

  • 实现一次发布多次消费
  • 各消费者组独立工作,互不影响
  • 新增消费者组可以从头开始消费所有历史消息
  • 可靠性高,消息持久化存储

缺点

  • 随着流数据增长,可能占用较多存储空间
  • 需要合理设置流的最大长度或过期策略
  • 消费者组数量过多可能增加Redis负载
  • 需要单独管理每个消费者组的状态

5. 重试与恢复模式(Retry and Recovery)

基本概念

这种模式关注处理失败消息的恢复和重试机制。Redis Stream消费者组会跟踪每个消息的处理状态,允许查看和管理未确认(PEL - Pending Entry List)的消息,实现可靠的消息处理。

核心命令

# 查看消费者组中未确认的消息
XPENDING stream_name group_name [start_id end_id count] [consumer_name]

# 查看消费者组中长时间未确认的消息详情
XPENDING stream_name group_name start_id end_id count [consumer_name]

# 认领处理超时的消息
XCLAIM stream_name group_name consumer_name min_idle_time message_id [message_id ...] [JUSTID]

实现示例

Redis CLI

# 查看未确认的消息数量
> XPENDING mystream processing-group
1) (integer) 2         # 未确认消息数量
2) "1647257548956-0"   # 最小ID
3) "1647257549123-0"   # 最大ID
4) 1) 1) "consumer-1"  # 各个消费者的未确认消息数
      2) (integer) 1
   2) 1) "consumer-2"
      2) (integer) 1

# 查看特定消费者的未确认消息
> XPENDING mystream processing-group - + 10 consumer-1
1) 1) "1647257548956-0"   # 消息ID
   2) "consumer-1"         # 当前持有的消费者
   3) (integer) 120000     # 空闲时间(毫秒)
   4) (integer) 2          # 传递次数

# 认领超过2分钟未处理的消息
> XCLAIM myandroidstream processing-group consumer-2 120000 1647257548956-0
1) 1) "1647257548956-0"
   2) 1) "sensor_id"
      2) "1234"
      3) "temperature"
      4) "19.8"
      5) "humidity"
      6) "56"

Java Spring Boot示例

@Service
public class MessageRecoveryService {
    
    @Autowired
    private StringRedisTemplate redisTemplate;
    
    /**
     * 获取消费者组中的未确认消息
     */
    public PendingMessagesSummary getPendingMessagesSummary(String streamKey, String groupName) {
        return redisTemplate.opsForStream().pending(streamKey, groupName);
    }
    
    /**
     * 获取指定消费者的详细未确认消息
     */
    public PendingMessages getPendingMessages(
            String streamKey, String groupName, String consumerName, 
            Range<String> idRange, long count) {
        
        return redisTemplate.opsForStream().pending(
                streamKey, 
                Consumer.from(groupName, consumerName), 
                idRange, 
                count);
    }
    
    /**
     * 认领长时间未处理的消息
     */
    public List<MapRecord<String, Object, Object>> claimMessages(
            String streamKey, String groupName, String newConsumerName, 
            Duration minIdleTime, String... messageIds) {
        
        return redisTemplate.opsForStream().claim(
                streamKey, 
                Consumer.from(groupName, newConsumerName), 
                minIdleTime, 
                messageIds);
    }
    
    /**
     * 定时检查和恢复未处理的消息
     */
    @Scheduled(fixedRate = 60000) // 每分钟执行一次
    public void recoverStaleMessages() {
        // 配置参数
        String streamKey = "mystream";
        String groupName = "processing-group";
        String recoveryConsumer = "recovery-consumer";
        Duration minIdleTime = Duration.ofMinutes(5); // 超过5分钟未处理的消息
        
        try {
            // 1. 获取所有未确认消息的摘要
            PendingMessagesSummary summary = getPendingMessagesSummary(streamKey, groupName);
            
            if (summary != null && summary.getTotalPendingMessages() > 0) {
                // 2. 遍历每个消费者的未确认消息
                for (Consumer consumer : summary.getPendingMessagesPerConsumer().keySet()) {
                    // 获取该消费者的详细未确认消息列表
                    PendingMessages pendingMessages = getPendingMessages(
                            streamKey, groupName, consumer.getName(), 
                            Range.unbounded(), 50); // 每次最多处理50条
                    
                    if (pendingMessages != null) {
                        // 3. 筛选出空闲时间超过阈值的消息
                        List<String> staleMessageIds = new ArrayList<>();
                        
                        for (PendingMessage message : pendingMessages) {
                            if (message.getElapsedTimeSinceLastDelivery().compareTo(minIdleTime) > 0) {
                                staleMessageIds.add(message.getIdAsString());
                            }
                        }
                        
                        // 4. 认领这些消息
                        if (!staleMessageIds.isEmpty()) {
                            log.info("Claiming {} stale messages from consumer {}", 
                                    staleMessageIds.size(), consumer.getName());
                            
                            List<MapRecord<String, Object, Object>> claimedMessages = claimMessages(
                                    streamKey, groupName, recoveryConsumer, minIdleTime, 
                                    staleMessageIds.toArray(new String[0]));
                            
                            // 5. 处理这些被认领的消息
                            processClaimedMessages(streamKey, groupName, claimedMessages);
                        }
                    }
                }
            }
        } catch (Exception e) {
            log.error("Error recovering stale messages: {}", e.getMessage(), e);
        }
    }
    
    /**
     * 处理被认领的消息
     */
    private void processClaimedMessages(
            String streamKey, String groupName, 
            List<MapRecord<String, Object, Object>> messages) {
        
        if (messages == null || messages.isEmpty()) {
            return;
        }
        
        for (MapRecord<String, Object, Object> message : messages) {
            try {
                // 执行消息处理逻辑
                processMessage(message);
                
                // 确认消息
                redisTemplate.opsForStream().acknowledge(
                        streamKey, groupName, message.getId().getValue());
                
                log.info("Successfully processed recovered message: {}", message.getId());
            } catch (Exception e) {
                log.error("Failed to process recovered message {}: {}", 
                        message.getId(), e.getMessage(), e);
                // 根据业务需求决定是否将消息加入死信队列
                moveToDeadLetterQueue(streamKey, message);
            }
        }
    }
    
    /**
     * 将消息移至死信队列
     */
    private void moveToDeadLetterQueue(String sourceStream, MapRecord<String, Object, Object> message) {
        String deadLetterStream = sourceStream + ":dead-letter";
        Map<Object, Object> messageData = message.getValue();
        
        Map<String, Object> dlqMessage = new HashMap<>();
        messageData.forEach((k, v) -> dlqMessage.put(k.toString(), v));
        
        // 添加元数据
        dlqMessage.put("original_id", message.getId().getValue());
        dlqMessage.put("error_time", System.currentTimeMillis());
        
        redisTemplate.opsForStream().add(deadLetterStream, dlqMessage);
        
        // 可选:从原消费者组确认该消息
        // redisTemplate.opsForStream().acknowledge(sourceStream, groupName, message.getId().getValue());
    }
    
    private void processMessage(MapRecord<String, Object, Object> message) {
        // 实际的消息处理逻辑
        log.info("Processing recovered message: {}", message);
        // ...
    }
}

使用场景

  • 需要可靠消息处理的关键业务系统
  • 处理时间较长的任务
  • 需要错误重试机制的工作流
  • 监控和诊断消息处理过程
  • 实现死信队列处理特定失败场景

优缺点

优点

  • 提高系统容错性和可靠性
  • 自动恢复因消费者崩溃导致的未处理消息
  • 可以识别和处理长时间未确认的消息
  • 支持实现复杂的重试策略和死信处理

缺点

  • 需要额外开发和维护恢复机制
  • 可能导致消息重复处理,需要确保业务逻辑幂等
  • 系统复杂度增加
  • 需要监控和管理PEL(未确认消息列表)的大小

6. 流处理窗口模式(Streaming Window Processing)

基本概念

流处理窗口模式基于时间或消息计数划分数据流,在每个窗口内执行聚合或分析操作。这种模式适用于实时分析、趋势监测和时间序列处理。虽然Redis Stream本身不直接提供窗口操作,但可以结合Redis的其他特性实现。

实现方式

主要通过以下几种方式实现:

1. 基于消息ID的时间范围(Redis消息ID包含毫秒时间戳)

2. 结合Redis的排序集合(SortedSet)存储窗口数据

3. 使用Redis的过期键实现滑动窗口

实现示例

Redis CLI

窗口数据收集与查询:

# 添加带时间戳的数据
> XADD temperature * sensor_id 1 value 21.5 timestamp 1647257548000
"1647257550123-0"
> XADD temperature * sensor_id 1 value 21.8 timestamp 1647257558000
"1647257560234-0"
> XADD temperature * sensor_id 1 value 22.1 timestamp 1647257568000
"1647257570345-0"

# 查询特定时间范围的数据
> XRANGE temperature 1647257550000-0 1647257570000-0
1) 1) "1647257550123-0"
   2) 1) "sensor_id"
      2) "1"
      3) "value"
      4) "21.5"
      5) "timestamp"
      6) "1647257548000"
2) 1) "1647257560234-0"
   2) 1) "sensor_id"
      2) "1"
      3) "value"
      4) "21.8"
      5) "timestamp"
      6) "1647257558000"

Java Spring Boot示例

@Service
public class TimeWindowprocessingService {
    
    @Autowired
    private StringRedisTemplate redisTemplate;
    
    /**
     * 添加数据点到流,并存储到相应的时间窗口
     */
    public String addDataPoint(String streamKey, String sensorId, double value) {
        long timestamp = System.currentTimeMillis();
        
        // 1. 添加到原始数据流
        Map<String, Object> dataPoint = new HashMap<>();
        dataPoint.put("sensor_id", sensorId);
        dataPoint.put("value", String.valueOf(value));
        dataPoint.put("timestamp", String.valueOf(timestamp));
        
        StringRecord record = StreamRecords.string(dataPoint).withStreamKey(streamKey);
        RecordId recordId = redisTemplate.opsForStream().add(record);
        
        // 2. 计算所属的窗口(这里以5分钟为一个窗口)
        long Windowstart = timestamp - (timestamp % (5 * 60 * 1000));
        String windowKey = streamKey + ":window:" + windowStart;
        
        // 3. 将数据点添加到窗口的有序集合中,分数为时间戳
        String dataPointjson = new ObjectMapper().writeValueAsString(dataPoint);
        redisTemplate.opsForZSet().add(windowKey, dataPointJson, timestamp);
        
        // 4. 设置窗口键的过期时间(保留24小时)
        redisTemplate.expire(windowKey, Duration.ofHours(24));
        
        return recordId.getValue();
    }
    
    /**
     * 获取指定时间窗口内的数据点
     */
    public List<Map<String, Object>> getWindowData(
            String streamKey, long windowStartTime, long windowEndTime) {
        
        // 计算可能的窗口键(每5分钟一个窗口)
        List<String> windowKeys = new ArrayList<>();
        long current = windowStartTime - (windowStartTime % (5 * 60 * 1000));
        
        while (current <= windowEndTime) {
            windowKeys.add(streamKey + ":window:" + current);
            current += (5 * 60 * 1000);
        }
        
        // 从各个窗口获取数据点
        List<Map<String, Object>> results = new ArrayList<>();
        ObjectMapper mapper = new ObjectMapper();
        
        for (String windowKey : windowKeys) {
            Set<String> dataPoints = redisTemplate.opsForZSet().rangeByScore(
                    windowKey, windowStartTime, windowEndTime);
            
            if (dataPoints != null) {
                for (String dataPointJson : dataPoints) {
                    try {
                        Map<String, Object> dataPoint = mapper.readValue(
                                dataPointJson, new TypeReference<Map<String, Object>>() {});
                        results.add(dataPoint);
                    } catch (Exception e) {
                        log.error("Error parsing data point: {}", e.getMessage(), e);
                    }
                }
            }
        }
        
        // 按时间戳排序
        results.sort(Comparator.comparing(dp -> Long.parseLong(dp.get("timestamp").toString())));
        
        return results;
    }
    
    /**
     * 计算窗口内数据的聚合统计
     */
    public Map<String, Object> getWindowStats(
            String streamKey, String sensorId, long windowStartTime, long windowEndTime) {
        
        List<Map<String, Object>> windowData = getWindowData(streamKey, windowStartTime, windowEndTime);
        
        // 过滤特定传感器的数据
        List<Double> values = windowData.stream()
                .filter(dp -> sensorId.equals(dp.get("sensor_id").toString()))
                .map(dp -> Double.parseDouble(dp.get("value").toString()))
                .collect(Collectors.toList());
        
        Map<String, Object> stats = new HashMap<>();
        stats.put("count", values.size());
        
        if (!values.isEmpty()) {
            DoubleSummaryStatistics summaryStats = values.stream().collect(Collectors.summarizingDouble(v -> v));
            stats.put("min", summaryStats.getMin());
            stats.put("max", summaryStats.getMax());
            stats.put("avg", summaryStats.getAverage());
            stats.put("sum", summaryStats.getSum());
        }
        
        stats.put("start_time", windowStartTime);
        stats.put("end_time", windowEndTime);
        stats.put("sensor_id", sensorId);
        
        return stats;
    }
    
    /**
     * 实现滑动窗口处理
     */
    @Scheduled(fixedRate = 60000) // 每分钟执行一次
    public void processSlidingWindows() {
        String streamKey = "temperature";
        long now = System.currentTimeMillis();
        
        // 处理过去10分钟窗口的数据
        long windowEndTime = now;
        long windowStartTime = now - (10 * 60 * 1000);
        
        List<String> sensorIds = Arrays.asList("1", "2", "3"); // 示例传感器ID
        
        for (String sensorId : sensorIds) {
            try {
                // 获取窗口统计
                Map<String, Object> stats = getWindowStats(streamKey, sensorId, windowStartTime, windowEndTime);
                
                // 根据统计结果执行业务逻辑
                if (stats.containsKey("avg")) {
                    double avgTemp = (double) stats.get("avg");
                    if (avgTemp > 25.0) {
                        // 触发高温警报
                        log.warn("High temperature alert for sensor {}: {} C", sensorId, avgTemp);
                        triggerAlert(sensorId, "HIGH_TEMP", avgTemp);
                    }
                }
                
                // 存储聚合结果用于历史趋势分析
                saveAggregatedResults(streamKey, sensorId, stats);
                
            } catch (Exception e) {
                log.error("Error processing sliding window for sensor {}: {}", 
                        sensorId, e.getMessage(), e);
            }
        }
    }
    
    /**
     * 触发警报
     */
    private void triggerAlert(String sensorId, String alertType, double value) {
        Map<String, Object> alertData = new HashMap<>();
        alertData.put("sensor_id", sensorId);
        alertData.put("alert_type", alertType);
        alertData.put("value", value);
        alertData.put("timestamp", System.currentTimeMillis());
        
        redisTemplate.opsForStream().add("alerts", alertData);
    }
    
    /**
     * 保存聚合结果
     */
    private void saveAggregatedResults(String streamKey, String sensorId, Map<String, Object> stats) {
        long windowTime = www.chinasem.cn(long) stats.get("end_time");
        String aggregateKey = streamKey + ":aggregate:" + sensorId;
        
        // 使用时间作为分数存储聚合结果
        redisTemplate.opsForZSet().add(
                aggregateKey, 
                new ObjectMapper().writeValueAsString(stats),
                windowTime);
        
        // 保留30天的聚合数据
        redisTemplate.expire(aggregateKey, Duration.ofDays(30));
    }
}

使用场景

  • 实时数据分析与统计
  • 趋势检测和预测
  • 异常值和阈值监控
  • 时间序列数据处理
  • IoT数据流处理和聚合
  • 用户行为分析

优缺点

优点

  • 支持基于时间的数据分析
  • 可以实现实时聚合和计算
  • 灵活的窗口定义(滑动窗口、滚动窗口)
  • 可扩展以支持复杂的分析场景

缺点

  • 实现复杂度较高
  • 可能需要额外的数据结构和存储空间
  • 对于大数据量的窗口计算可能影响性能
  • 需要小心管理内存使用和数据过期策略

结论

Redis Stream提供了强大而灵活的消息处理功能,通过组合这些模式,可以构建出高性能、可靠且灵活的消息处理系统,满足从简单的任务队列到复杂的实时数据处理等各种应用需求。

在选择和实现这些模式时,应充分考虑业务特性、性能需求、可靠性要求以及系统规模,结合Redis Stream的特性,打造最适合自己应用场景的消息处理解决方案。

以上就是一文带你搞懂Redis Stream的6种消息处理模式的详细内容,更多关于Redis Stream消息处理模式的资料请关注China编程(www.chinasem.cn)其它相关文章!

这篇关于一文带你搞懂Redis Stream的6种消息处理模式的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1154482

相关文章

Redis中6种缓存更新策略详解

《Redis中6种缓存更新策略详解》Redis作为一款高性能的内存数据库,已经成为缓存层的首选解决方案,然而,使用缓存时最大的挑战在于保证缓存数据与底层数据源的一致性,本文将介绍Redis中6种缓存更... 目录引言策略一:Cache-Aside(旁路缓存)策略工作原理代码示例优缺点分析适用场景策略二:Re

redis中使用lua脚本的原理与基本使用详解

《redis中使用lua脚本的原理与基本使用详解》在Redis中使用Lua脚本可以实现原子性操作、减少网络开销以及提高执行效率,下面小编就来和大家详细介绍一下在redis中使用lua脚本的原理... 目录Redis 执行 Lua 脚本的原理基本使用方法使用EVAL命令执行 Lua 脚本使用EVALSHA命令

Java 中的 @SneakyThrows 注解使用方法(简化异常处理的利与弊)

《Java中的@SneakyThrows注解使用方法(简化异常处理的利与弊)》为了简化异常处理,Lombok提供了一个强大的注解@SneakyThrows,本文将详细介绍@SneakyThro... 目录1. @SneakyThrows 简介 1.1 什么是 Lombok?2. @SneakyThrows

在 Spring Boot 中实现异常处理最佳实践

《在SpringBoot中实现异常处理最佳实践》本文介绍如何在SpringBoot中实现异常处理,涵盖核心概念、实现方法、与先前查询的集成、性能分析、常见问题和最佳实践,感兴趣的朋友一起看看吧... 目录一、Spring Boot 异常处理的背景与核心概念1.1 为什么需要异常处理?1.2 Spring B

python处理带有时区的日期和时间数据

《python处理带有时区的日期和时间数据》这篇文章主要为大家详细介绍了如何在Python中使用pytz库处理时区信息,包括获取当前UTC时间,转换为特定时区等,有需要的小伙伴可以参考一下... 目录时区基本信息python datetime使用timezonepandas处理时区数据知识延展时区基本信息

Redis 热 key 和大 key 问题小结

《Redis热key和大key问题小结》:本文主要介绍Redis热key和大key问题小结,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录一、什么是 Redis 热 key?热 key(Hot Key)定义: 热 key 常见表现:热 key 的风险:二、

Java Stream流使用案例深入详解

《JavaStream流使用案例深入详解》:本文主要介绍JavaStream流使用案例详解,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录前言1. Lambda1.1 语法1.2 没参数只有一条语句或者多条语句1.3 一个参数只有一条语句或者多

C#使用StackExchange.Redis实现分布式锁的两种方式介绍

《C#使用StackExchange.Redis实现分布式锁的两种方式介绍》分布式锁在集群的架构中发挥着重要的作用,:本文主要介绍C#使用StackExchange.Redis实现分布式锁的... 目录自定义分布式锁获取锁释放锁自动续期StackExchange.Redis分布式锁获取锁释放锁自动续期分布式

Python Transformers库(NLP处理库)案例代码讲解

《PythonTransformers库(NLP处理库)案例代码讲解》本文介绍transformers库的全面讲解,包含基础知识、高级用法、案例代码及学习路径,内容经过组织,适合不同阶段的学习者,对... 目录一、基础知识1. Transformers 库简介2. 安装与环境配置3. 快速上手示例二、核心模

一文详解Java异常处理你都了解哪些知识

《一文详解Java异常处理你都了解哪些知识》:本文主要介绍Java异常处理的相关资料,包括异常的分类、捕获和处理异常的语法、常见的异常类型以及自定义异常的实现,文中通过代码介绍的非常详细,需要的朋... 目录前言一、什么是异常二、异常的分类2.1 受检异常2.2 非受检异常三、异常处理的语法3.1 try-