本文主要是介绍一文带你搞懂Redis Stream的6种消息处理模式,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
《一文带你搞懂RedisStream的6种消息处理模式》Redis5.0版本引入的Stream数据类型,为Redis生态带来了强大而灵活的消息队列功能,本文将为大家详细介绍RedisStream的6...
Redis 5.0版本引入的Stream数据类型,为Redis生态带来了强大而灵活的消息队列功能,弥补了之前发布/订阅模式的不足,如消息持久化、消费者组、消息确认等特性。
Redis Stream结合了传统消息队列和时序数据库的特点,适用于日志收集、事件驱动应用、实时分析等多种场景。
本文将介绍Redis Stream的6种消息处理模式。
1. 简单消费模式(Simple Consumption)
基本概念
简单消费模式是Redis Stream最基础的使用方式,不使用消费者组,直接读取流中的消息。生产者将消息追加到流中,消费者通过指定起始ID来读取消息。
核心命令
# 发布消息 XADD stream_name [ID] field value [field value ...] # 读取消息 XREAD [COUNT count] [BLOCK milliseconds] STREAMS stream_name start_id
实现示例
Redis CLI
# 添加消息到stream > XADD mystream * sensor_id 1234 temperature 19.8 humidity 56 "1647257548956-0" # 从头开始读取所有消息 > XREAD STREAMS mystream 0 1) 1) "mystream" 2) 1) 1) "1647257548956-0" 2) 1) "sensor_id" 2) "1234" 3) "temperature" 4) "19.8" 5) "humidity" 6) "56" # 从指定ID开始读取 > XREAD STREAMS mystream 1647257548956-0 (empty list or set) # 从最新的消息ID之后开始读取(阻塞等待新消息) > XREAD BLOCK 5000 STREAMS mystream $ (nil)
Java Spring Boot示例
@Service public class SimpleStreamService { @Autowired private StringRedisTemplate redisTemplate; /** * 发布消息到Stream */ public String publishEvent(String streamKey, Map<String, Object> eventData) { StringRecord record = StreamRecords.string(eventData).withStreamKey(streamKey); return redisTemplate.opsForStream().add(record).getValue(); } /** * 从指定位置开始读取消息 */ public List<MapRecord<String, Object, Object>> readEvents(String streamKey, String startId, int count) { StreamReadOptions readOptions = StreamReadOptions.empty().count(count); return redisTemplate.opsForStream().read(readOptions, StreamOffset.from(streamKey, ReadOffset.from(startId))); } /** * 阻塞式读取消息 */ public List<MapRecord<String, Object, Object>> readEventsBlocking(String streamKey, int timeoutMillis) { StreamReadOptions readOptions = StreamReadOptions.empty().count(10).block(Duration.ofMillis(timeoutMillis)); return redisTemplate.opsForStream().read(readOptions, StreamOffset.latest(streamKey)); } }
使用场景
- 简单的事件日志记录
- 单一消费者场景
- 时间序列数据收集
- 开发和调试阶段
优缺点
优点
- 实现简单,无需创建和管理消费者组
- 直接控制从哪个位置开始消费消息
- 适合单个消费者场景
缺点
- 无法实现负载均衡
- 无法追踪消息确认状态
- 需要手动管理已读消息ID
- 服务重启需自行记录上次读取位置
2. 消费者组模式(Consumer Groups)
基本概念
消费者组允许多个消费者共同处理一个流的消息,实现负载均衡,并提供消息确认机制,确保消息至少被处理一次。每个消费者组维护自己的消费位置,不同消费者组之间互不干扰。
核心命令
# 创建消费者组 XGROUP CREATE stream_name group_name [ID|$] [MKSTREAM] # 从消费者组读取消息 XREADGROUP GROUP group_name consumer_name [COUNT count] [BLOCK milliseconds] STREAMS stream_name [>|ID] # 确认消息处理完成 XACK stream_name group_name message_id [message_id ...]
实现示例
Redis CLI
# 创建消费者组 > XGROUP CREATE mystream processing-group $ MKSTREAM OK # 消费者1读取消息 > XREADGROUP GROUP processing-group consumer-1 COUNT 1 STREAMS mystream > 1) 1) "mystream" 2) 1) 1) "1647257548956-0" 2) 1) "sensor_id" 2) "1234" 3) "temperature" 4) "19.8" 5) "humidity" 6) "56" # 确认消息已处理 > XACK mystream processing-group 1647257548956-0 (integer) 1 # 消费者2读取消息(已无未处理消息) > XREADGROUP GROUP processing-group consumer-2 COUNT 1 STREAMS mystream > 1) 1) "mystream" 2) (empty list or set)
Java Spring Boot示例
@Service public class ConsumerGroupService { @Autowired private StringRedisTemplate redisTemplate; /** * 创建消费者组 */ public void createGroup(String streamKey, String groupName) { try { redisTemplate.opsForStream().createGroup(streamKey, groupName); } catch (RedisSystemException e) { // 处理流不存在的情况 if (e.getRootCause() instanceof RedisCommandExecutionException && e.getRootCause().getMessage().contains("NOGROUP")) { redisTemplate.opsForStream().createGroup(ReadOffset.from("0"), streamKey, groupName); } else { throw e; } } } /** * 从消费者组读取消息 */ public List<MapRecord<String, Object, Object>> readFromGroup( String streamKey, String groupName, String consumerName, int count) { StreamReadOptions options = StreamReadOptions.empty().count(count); return redisTemplate.opsForStream().read( Consumer.from(groupName, consumerName), options, StreamOffset.create(streamKey, ReadOffset.lastConsumed()) ); } /** * 阻塞式从消费者组读取消息 */ public List<MapRecord<String, Object, Object>> readFromGroupBlocking( String streamKey, String groupName, String consumerName, int count, Duration timeout) { StreamReadOptions options = StreamReadOptions.empty().count(count).block(timeout); return redisTemplate.opsForStream().read( Consumer.from(groupName, consumerName), options, StreamOffset.create(streamKey, ReadOffset.lastConsumed()) ); } /** * 确认消息已处理 */ public Long acknowledgeMessage(String streamKey, String groupName, String... messageIds) { return redisTemplate.opsForStream().acknowledge(streamKey, groupName, messageIds); } }
使用场景
- 需要横向扩展消息处理能力的系统
- 要求消息可靠处理的业务场景
- 实现消息工作队列
- 微服务间的事件传递
优缺点
优点
- 多个消费者可以并行处理消息
- 提供消息确认机制,保证消息不丢失
- 支持消费者崩溃后恢复处理
- 每个消费者组维护独立的消费位置
缺点
- 实现相对复杂
- 需要妥善管理消费者组和消费者
- 需要显式处理消息确认
- 需要定期处理未确认的消息
3. 阻塞式消费模式(Blocking Consumption)
基本概念
阻塞式消费允许消费者在没有新消息时保持连接,等待新消息到达。这种模式减少了轮询开销,提高了实时性,适合对消息处理时效性要求高的场景。
核心命令
# 阻塞式简单消费 XREAD BLOCK milliseconds STREAMS stream_name ID # 阻塞式消费者组消费 XREADGROUP GROUP group_name consumer_name BLOCK milliseconds STREAMS stream_name >
实现示例
Redis CLI
# 阻塞等待新消息(最多等待10秒) > XREAD BLOCK 10000 STREAMS mystream $ (nil) # 如果10秒内没有新消息 # 使用消费者组的阻塞式消费 > XREADGROUP GROUP processing-group consumer-1 BLOCK 10000 STREAMS mystream > (nil) # 如果10秒内没有新分配的消息
Java Spring Boot示例
@Service public class BlockingStreamConsumerService { @Autowired private StringRedisTemplate redisTemplate; /** * 阻塞式消息消费者任务 */ @Async public void startBlockingConsumer(String streamKey, String lastId, Duration timeout) { StreamReadOptions options = StreamReadOptions.empty() .count(1) .block(timeout); while (!Thread.currentThread().isInterrupted()) { try { // 阻塞读取消息 List<MapRecord<String, Object, Object>> records = redisTemplate.opsForStream() .read(options, StreamOffset.from(streamKey, ReadOffset.from(lastId))); if (records != null && !records.isEmpty()) { for (MapRecord<String, Object, Object> record : records) { // 处理消息 processMessage(record); // 更新最后读取的ID lastId = record.getId().getValue(); } } else { // 超时未读取到消息,可以执行一些其他逻辑 www.chinasem.cn } } catch (Exception e) { // 异常处理 log.error("Error reading from stream: {}", e.getMessage(), e); try { Thread.sleep(1000); // 出错后等待一段时间再重试 } catch (InterruptedException ie) { Thread.currentThread().interrupt(); break; } } } } /** * 阻塞式消费者组消费 */ @Async public void startGroupBlockingConsumer( String streamKey, String groupName, String consumerName, Duration timeout) { StreamReadOptions options = StreamReadOptions.empty() .count(1) .block(timeout); while (!Thread.currentThread().isInterrupted()) { try { // 阻塞读取消息 List<MapRecord<String, Object, Object>> records = redisTemplate.opsForStream() .read(Consumer.from(groupName, consumerName), options, StreamOffset.create(streamKey, ReadOffset.lastConsumed())); if (records != null && !records.isEmpty()) { for (MapRecord<String, Object, Object> record : records) { try { android // 处理消息 processMessage(record); // 确认消息 redisTemplate.opsForStream() .acknowledge(streamKey, groupName, record.getId().getValue()); } catch (Exception e) { // 处理失败,记录日志 log.error("Error processing message: {}", e.getMessage(), e); } } } } catch (Exception e) { log.error("Error reading from stream group: {}", e.getMessage(), e); try { Thread.sleep(1000); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); break; } } } } private void processMessage(MapRecord<String, Object, Object> record) { // 实际消息处理逻辑 log.info("Processing message: {}", record); // ...处理消息的具体业务逻辑 } }
使用场景
- 实时数据处理系统
- 事件驱动的任务处理
- 低延迟要求的应用
- 即时通讯系统
- 通知服务
优缺点
优点
缺点
- 长连接可能占用Redis连接资源
- 需要合理设置超时时间
- 可能需要处理网络中断后的重连
- 消费者需要具备并发处理能力
4. 扇出模式(Fan-out Pattern)
基本概念
扇出模式允许多个独立的消费者组同时消费同一个流中的所有消息,类似于发布/订阅模式,但具有消息持久化和回溯能力。每个消费者组独立维护自己的消费位置。
核心命令
创建多个消费者组,它们都独立消费同一个流:
XGROUP CREATE stream_name group_name_1 $ MKSTREAM XGROUP CREATE stream_name group_name_2 $ MKSTREAM XGROUP CREATE stream_name group_name_3 $ MKSTREAM
实现示例
Redis CLI
# 创建多个消费者组 > XGROUP CREATE notifications analytics-group $ MKSTREAM OK > XGROUP CREATE notifications email-group $ MKSTREAM OK > XGROUP CREATE notifications mobile-group $ MKSTREAM OK # 添加一条消息 > XADD notifications * type user_signup user_id 1001 email "user@example.com" "1647345678912-0" # 从各个消费者组读取(每个组都能收到所有消息) > XREADGROUP GROUP analytics-group analytics-1 COUNT 1 STREAMS notifications > 1) 1) "notifications" 2) 1) 1) "1647345678912-0" 2) 1) "type" 2) "user_signup" 3) "user_id" 4) "1001" 5) "email" 6) "user@example.com" > XREADGROUP GROUP email-group email-1 COUNT 1 STREAMS notifications > 1) 1) "notifications" 2) 1) 1) "1647345678912-0" 2) 1) "type" 2) "user_signup" 3) "user_id" 4) "1001" 5) "email" 6) "user@example.com" > XREADGROUP GROUP mobile-group mobile-1 COUNT 1 STREAMS notifications > 1) 1) "notifications" 2) 1) 1) "1647345678912-0" 2) 1) "type" 2) "user_signup" 3) "user_id" 4) "1001" 5) "email" 6) "user@example.com"
Java Spring Boot示例
@Service p编程ublic class FanOutService { @Autowired private StringRedisTemplate redisTemplate; /** * 初始化扇出消费者组 */ public void initializeFanOutGroups(String streamKey, List<String> groupNames) { // 确保流存在 try { StreamInfo.XInfoStream info = redisTemplate.opsForStream().info(streamKey); } catch (Exception e) { // 流不存在,发送一个初始消息 Map<String, Object> initialMessage = new HashMap<>(); initialMessage.put("init", "true"); redisTemplate.opsForStream().add(streamKey, initialMessage); } // 创建所有消费者组 for (String groupName : groupNames) { try { redisTemplate.opsForStream().createGroup(streamKey, groupName); } catch (Exception e) { // 忽略组已存在的错误 log.info("Group {} may already exist: {}", groupName, e.getMessage()); } } } /** * 发布扇出消息 */ public String publishFanOutMessage(String streamKey, Map<String, Object> messageData) { StringRecord record = StreamRecords.string(messageData).withStreamKey(streamKey); return redisTemplate.opsForStream().add(record).getValue(); } /** * 为特定组启动消费者 */ @Async public void startGroupConsumer( String streamKey, String groupName, String consumerName, Consumer<MapRecord<String, Object, Object>> messageHandler) { StreamReadOptions options = StreamReadOptions.empty().count(10).block(Duration.ofSeconds(2)); while (!Thread.currentThread().isInterrupted()) { try { List<MapRecord<String, Object, Object>> messages = redisTemplate.opsForStream().read( Consumer.from(groupName, consumerName), options, StreamOffset.create(streamKey, ReadOffset.lastConsumed()) ); if (messages != null && !messages.isEmpty()) { for (MapRecord<String, Object, Object> message : messages) { try { // 处理消息 messageHandler.accept(message); // 确认消息 redisTemplate.opsForStream().acknowledge( streamKey, groupName, message.getId().getValue()); } catch (Exception e) { log.error("Error processing message in group {}: {}", groupName, e.getMessage(), e); } } } } catch (Exception e) { log.error("Error reading from stream for group {}: {}", groupName, e.getMessage(), e); try { Thread.sleep(1000); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); break; } } } } }
使用示例
@Service public class NotificationService { @Autowired private FanOutService fanOutService; @PostConstruct public void init() { // 初始化扇出组 List<String> groups = Arrays.asList("email-group", "sms-group", "analytics-group"); fanOutService.initializeFanOutGroups("user-events", groups); // 启动各个消费者组的处理器 fanOutService.startGroupConsumer( "user-events", "email-group", "email-consumer", this::processEmailNotification); fanOutService.startGroupConsumer( "user-events", "sms-group", "sms-consumer", this::processSmsNotification); fanOutService.startGroupConsumer( "user-events", "analytics-group", "analytics-consumer", this::processAnalyticsEvent); } private void processEmailNotification(MapRecord<String, Object, Object> message) { Map<Object, Object> messageData = message.getValue(); log.info("Processing email notification: {}", messageData); // 邮件发送逻辑 } private void processSmsNotification(MapRecord<String, Object, Object> message) { Map<Object, Object> messageData = message.getValue(); log.info("Processing SMS notification: {}", messageData); // 短信发送逻辑 } private void processAnalyticsEvent(MapRecord<String, Object, Object> message) { Map<Object, Object> messageData = message.getValue(); log.info("Processing analytics event: {}", messageData); // 分析事件处理逻辑 } public void publishUserEvent(String eventType, Map<String, Object> eventData) { Map<String, Object> message = new HashMap<>(eventData); message.put("event_type", eventType); message.put("timestamp", System.currentTimeMillis()); fanOutService.publishFanOutMessage("user-events", message); } }
使用场景
- 多个系统需要独立处理同一事件流
- 实现事件广播机制
- 系统集成:一个事件触发多个业务流程
- 日志统一处理并分发到不同服务
- 通知系统:一个事件需要通过多种方式通知用户
优缺点
优点
- 实现一次发布多次消费
- 各消费者组独立工作,互不影响
- 新增消费者组可以从头开始消费所有历史消息
- 可靠性高,消息持久化存储
缺点
- 随着流数据增长,可能占用较多存储空间
- 需要合理设置流的最大长度或过期策略
- 消费者组数量过多可能增加Redis负载
- 需要单独管理每个消费者组的状态
5. 重试与恢复模式(Retry and Recovery)
基本概念
这种模式关注处理失败消息的恢复和重试机制。Redis Stream消费者组会跟踪每个消息的处理状态,允许查看和管理未确认(PEL - Pending Entry List)的消息,实现可靠的消息处理。
核心命令
# 查看消费者组中未确认的消息 XPENDING stream_name group_name [start_id end_id count] [consumer_name] # 查看消费者组中长时间未确认的消息详情 XPENDING stream_name group_name start_id end_id count [consumer_name] # 认领处理超时的消息 XCLAIM stream_name group_name consumer_name min_idle_time message_id [message_id ...] [JUSTID]
实现示例
Redis CLI
# 查看未确认的消息数量 > XPENDING mystream processing-group 1) (integer) 2 # 未确认消息数量 2) "1647257548956-0" # 最小ID 3) "1647257549123-0" # 最大ID 4) 1) 1) "consumer-1" # 各个消费者的未确认消息数 2) (integer) 1 2) 1) "consumer-2" 2) (integer) 1 # 查看特定消费者的未确认消息 > XPENDING mystream processing-group - + 10 consumer-1 1) 1) "1647257548956-0" # 消息ID 2) "consumer-1" # 当前持有的消费者 3) (integer) 120000 # 空闲时间(毫秒) 4) (integer) 2 # 传递次数 # 认领超过2分钟未处理的消息 > XCLAIM myandroidstream processing-group consumer-2 120000 1647257548956-0 1) 1) "1647257548956-0" 2) 1) "sensor_id" 2) "1234" 3) "temperature" 4) "19.8" 5) "humidity" 6) "56"
Java Spring Boot示例
@Service public class MessageRecoveryService { @Autowired private StringRedisTemplate redisTemplate; /** * 获取消费者组中的未确认消息 */ public PendingMessagesSummary getPendingMessagesSummary(String streamKey, String groupName) { return redisTemplate.opsForStream().pending(streamKey, groupName); } /** * 获取指定消费者的详细未确认消息 */ public PendingMessages getPendingMessages( String streamKey, String groupName, String consumerName, Range<String> idRange, long count) { return redisTemplate.opsForStream().pending( streamKey, Consumer.from(groupName, consumerName), idRange, count); } /** * 认领长时间未处理的消息 */ public List<MapRecord<String, Object, Object>> claimMessages( String streamKey, String groupName, String newConsumerName, Duration minIdleTime, String... messageIds) { return redisTemplate.opsForStream().claim( streamKey, Consumer.from(groupName, newConsumerName), minIdleTime, messageIds); } /** * 定时检查和恢复未处理的消息 */ @Scheduled(fixedRate = 60000) // 每分钟执行一次 public void recoverStaleMessages() { // 配置参数 String streamKey = "mystream"; String groupName = "processing-group"; String recoveryConsumer = "recovery-consumer"; Duration minIdleTime = Duration.ofMinutes(5); // 超过5分钟未处理的消息 try { // 1. 获取所有未确认消息的摘要 PendingMessagesSummary summary = getPendingMessagesSummary(streamKey, groupName); if (summary != null && summary.getTotalPendingMessages() > 0) { // 2. 遍历每个消费者的未确认消息 for (Consumer consumer : summary.getPendingMessagesPerConsumer().keySet()) { // 获取该消费者的详细未确认消息列表 PendingMessages pendingMessages = getPendingMessages( streamKey, groupName, consumer.getName(), Range.unbounded(), 50); // 每次最多处理50条 if (pendingMessages != null) { // 3. 筛选出空闲时间超过阈值的消息 List<String> staleMessageIds = new ArrayList<>(); for (PendingMessage message : pendingMessages) { if (message.getElapsedTimeSinceLastDelivery().compareTo(minIdleTime) > 0) { staleMessageIds.add(message.getIdAsString()); } } // 4. 认领这些消息 if (!staleMessageIds.isEmpty()) { log.info("Claiming {} stale messages from consumer {}", staleMessageIds.size(), consumer.getName()); List<MapRecord<String, Object, Object>> claimedMessages = claimMessages( streamKey, groupName, recoveryConsumer, minIdleTime, staleMessageIds.toArray(new String[0])); // 5. 处理这些被认领的消息 processClaimedMessages(streamKey, groupName, claimedMessages); } } } } } catch (Exception e) { log.error("Error recovering stale messages: {}", e.getMessage(), e); } } /** * 处理被认领的消息 */ private void processClaimedMessages( String streamKey, String groupName, List<MapRecord<String, Object, Object>> messages) { if (messages == null || messages.isEmpty()) { return; } for (MapRecord<String, Object, Object> message : messages) { try { // 执行消息处理逻辑 processMessage(message); // 确认消息 redisTemplate.opsForStream().acknowledge( streamKey, groupName, message.getId().getValue()); log.info("Successfully processed recovered message: {}", message.getId()); } catch (Exception e) { log.error("Failed to process recovered message {}: {}", message.getId(), e.getMessage(), e); // 根据业务需求决定是否将消息加入死信队列 moveToDeadLetterQueue(streamKey, message); } } } /** * 将消息移至死信队列 */ private void moveToDeadLetterQueue(String sourceStream, MapRecord<String, Object, Object> message) { String deadLetterStream = sourceStream + ":dead-letter"; Map<Object, Object> messageData = message.getValue(); Map<String, Object> dlqMessage = new HashMap<>(); messageData.forEach((k, v) -> dlqMessage.put(k.toString(), v)); // 添加元数据 dlqMessage.put("original_id", message.getId().getValue()); dlqMessage.put("error_time", System.currentTimeMillis()); redisTemplate.opsForStream().add(deadLetterStream, dlqMessage); // 可选:从原消费者组确认该消息 // redisTemplate.opsForStream().acknowledge(sourceStream, groupName, message.getId().getValue()); } private void processMessage(MapRecord<String, Object, Object> message) { // 实际的消息处理逻辑 log.info("Processing recovered message: {}", message); // ... } }
使用场景
- 需要可靠消息处理的关键业务系统
- 处理时间较长的任务
- 需要错误重试机制的工作流
- 监控和诊断消息处理过程
- 实现死信队列处理特定失败场景
优缺点
优点
- 提高系统容错性和可靠性
- 自动恢复因消费者崩溃导致的未处理消息
- 可以识别和处理长时间未确认的消息
- 支持实现复杂的重试策略和死信处理
缺点
- 需要额外开发和维护恢复机制
- 可能导致消息重复处理,需要确保业务逻辑幂等
- 系统复杂度增加
- 需要监控和管理PEL(未确认消息列表)的大小
6. 流处理窗口模式(Streaming Window Processing)
基本概念
流处理窗口模式基于时间或消息计数划分数据流,在每个窗口内执行聚合或分析操作。这种模式适用于实时分析、趋势监测和时间序列处理。虽然Redis Stream本身不直接提供窗口操作,但可以结合Redis的其他特性实现。
实现方式
主要通过以下几种方式实现:
1. 基于消息ID的时间范围(Redis消息ID包含毫秒时间戳)
2. 结合Redis的排序集合(SortedSet)存储窗口数据
3. 使用Redis的过期键实现滑动窗口
实现示例
Redis CLI
窗口数据收集与查询:
# 添加带时间戳的数据 > XADD temperature * sensor_id 1 value 21.5 timestamp 1647257548000 "1647257550123-0" > XADD temperature * sensor_id 1 value 21.8 timestamp 1647257558000 "1647257560234-0" > XADD temperature * sensor_id 1 value 22.1 timestamp 1647257568000 "1647257570345-0" # 查询特定时间范围的数据 > XRANGE temperature 1647257550000-0 1647257570000-0 1) 1) "1647257550123-0" 2) 1) "sensor_id" 2) "1" 3) "value" 4) "21.5" 5) "timestamp" 6) "1647257548000" 2) 1) "1647257560234-0" 2) 1) "sensor_id" 2) "1" 3) "value" 4) "21.8" 5) "timestamp" 6) "1647257558000"
Java Spring Boot示例
@Service public class TimeWindowprocessingService { @Autowired private StringRedisTemplate redisTemplate; /** * 添加数据点到流,并存储到相应的时间窗口 */ public String addDataPoint(String streamKey, String sensorId, double value) { long timestamp = System.currentTimeMillis(); // 1. 添加到原始数据流 Map<String, Object> dataPoint = new HashMap<>(); dataPoint.put("sensor_id", sensorId); dataPoint.put("value", String.valueOf(value)); dataPoint.put("timestamp", String.valueOf(timestamp)); StringRecord record = StreamRecords.string(dataPoint).withStreamKey(streamKey); RecordId recordId = redisTemplate.opsForStream().add(record); // 2. 计算所属的窗口(这里以5分钟为一个窗口) long Windowstart = timestamp - (timestamp % (5 * 60 * 1000)); String windowKey = streamKey + ":window:" + windowStart; // 3. 将数据点添加到窗口的有序集合中,分数为时间戳 String dataPointjson = new ObjectMapper().writeValueAsString(dataPoint); redisTemplate.opsForZSet().add(windowKey, dataPointJson, timestamp); // 4. 设置窗口键的过期时间(保留24小时) redisTemplate.expire(windowKey, Duration.ofHours(24)); return recordId.getValue(); } /** * 获取指定时间窗口内的数据点 */ public List<Map<String, Object>> getWindowData( String streamKey, long windowStartTime, long windowEndTime) { // 计算可能的窗口键(每5分钟一个窗口) List<String> windowKeys = new ArrayList<>(); long current = windowStartTime - (windowStartTime % (5 * 60 * 1000)); while (current <= windowEndTime) { windowKeys.add(streamKey + ":window:" + current); current += (5 * 60 * 1000); } // 从各个窗口获取数据点 List<Map<String, Object>> results = new ArrayList<>(); ObjectMapper mapper = new ObjectMapper(); for (String windowKey : windowKeys) { Set<String> dataPoints = redisTemplate.opsForZSet().rangeByScore( windowKey, windowStartTime, windowEndTime); if (dataPoints != null) { for (String dataPointJson : dataPoints) { try { Map<String, Object> dataPoint = mapper.readValue( dataPointJson, new TypeReference<Map<String, Object>>() {}); results.add(dataPoint); } catch (Exception e) { log.error("Error parsing data point: {}", e.getMessage(), e); } } } } // 按时间戳排序 results.sort(Comparator.comparing(dp -> Long.parseLong(dp.get("timestamp").toString()))); return results; } /** * 计算窗口内数据的聚合统计 */ public Map<String, Object> getWindowStats( String streamKey, String sensorId, long windowStartTime, long windowEndTime) { List<Map<String, Object>> windowData = getWindowData(streamKey, windowStartTime, windowEndTime); // 过滤特定传感器的数据 List<Double> values = windowData.stream() .filter(dp -> sensorId.equals(dp.get("sensor_id").toString())) .map(dp -> Double.parseDouble(dp.get("value").toString())) .collect(Collectors.toList()); Map<String, Object> stats = new HashMap<>(); stats.put("count", values.size()); if (!values.isEmpty()) { DoubleSummaryStatistics summaryStats = values.stream().collect(Collectors.summarizingDouble(v -> v)); stats.put("min", summaryStats.getMin()); stats.put("max", summaryStats.getMax()); stats.put("avg", summaryStats.getAverage()); stats.put("sum", summaryStats.getSum()); } stats.put("start_time", windowStartTime); stats.put("end_time", windowEndTime); stats.put("sensor_id", sensorId); return stats; } /** * 实现滑动窗口处理 */ @Scheduled(fixedRate = 60000) // 每分钟执行一次 public void processSlidingWindows() { String streamKey = "temperature"; long now = System.currentTimeMillis(); // 处理过去10分钟窗口的数据 long windowEndTime = now; long windowStartTime = now - (10 * 60 * 1000); List<String> sensorIds = Arrays.asList("1", "2", "3"); // 示例传感器ID for (String sensorId : sensorIds) { try { // 获取窗口统计 Map<String, Object> stats = getWindowStats(streamKey, sensorId, windowStartTime, windowEndTime); // 根据统计结果执行业务逻辑 if (stats.containsKey("avg")) { double avgTemp = (double) stats.get("avg"); if (avgTemp > 25.0) { // 触发高温警报 log.warn("High temperature alert for sensor {}: {} C", sensorId, avgTemp); triggerAlert(sensorId, "HIGH_TEMP", avgTemp); } } // 存储聚合结果用于历史趋势分析 saveAggregatedResults(streamKey, sensorId, stats); } catch (Exception e) { log.error("Error processing sliding window for sensor {}: {}", sensorId, e.getMessage(), e); } } } /** * 触发警报 */ private void triggerAlert(String sensorId, String alertType, double value) { Map<String, Object> alertData = new HashMap<>(); alertData.put("sensor_id", sensorId); alertData.put("alert_type", alertType); alertData.put("value", value); alertData.put("timestamp", System.currentTimeMillis()); redisTemplate.opsForStream().add("alerts", alertData); } /** * 保存聚合结果 */ private void saveAggregatedResults(String streamKey, String sensorId, Map<String, Object> stats) { long windowTime = www.chinasem.cn(long) stats.get("end_time"); String aggregateKey = streamKey + ":aggregate:" + sensorId; // 使用时间作为分数存储聚合结果 redisTemplate.opsForZSet().add( aggregateKey, new ObjectMapper().writeValueAsString(stats), windowTime); // 保留30天的聚合数据 redisTemplate.expire(aggregateKey, Duration.ofDays(30)); } }
使用场景
- 实时数据分析与统计
- 趋势检测和预测
- 异常值和阈值监控
- 时间序列数据处理
- IoT数据流处理和聚合
- 用户行为分析
优缺点
优点
- 支持基于时间的数据分析
- 可以实现实时聚合和计算
- 灵活的窗口定义(滑动窗口、滚动窗口)
- 可扩展以支持复杂的分析场景
缺点
- 实现复杂度较高
- 可能需要额外的数据结构和存储空间
- 对于大数据量的窗口计算可能影响性能
- 需要小心管理内存使用和数据过期策略
结论
Redis Stream提供了强大而灵活的消息处理功能,通过组合这些模式,可以构建出高性能、可靠且灵活的消息处理系统,满足从简单的任务队列到复杂的实时数据处理等各种应用需求。
在选择和实现这些模式时,应充分考虑业务特性、性能需求、可靠性要求以及系统规模,结合Redis Stream的特性,打造最适合自己应用场景的消息处理解决方案。
以上就是一文带你搞懂Redis Stream的6种消息处理模式的详细内容,更多关于Redis Stream消息处理模式的资料请关注China编程(www.chinasem.cn)其它相关文章!
这篇关于一文带你搞懂Redis Stream的6种消息处理模式的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!