延时任务通知服务的设计及实现(三)-- JDK的延迟队列DelayQueue

2024-05-07 09:36

本文主要是介绍延时任务通知服务的设计及实现(三)-- JDK的延迟队列DelayQueue,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、接着上文

上文我们讲述了使用redisson的RDelayedQueue实现分布式延迟队列,本文我们将自己JDK的延迟队列DelayQueue实现。

相比前者的实现,作为进程内的延迟队列,它会遇到许多技术难点:

  • 如何支持分布式的多个节点部署场景
  • 应用重启会恢复延时队列
  • 冷数据如何转换为热数据
  • 如何删除延迟队列中的任务

随后,我们也将提及:

  • 保存任务至延迟队列(生产者)
  • 读取延迟队列中的任务(消费者)

二、设计概要

在这里插入图片描述

  • 冷数据:mysql表中的任务数据

  • 热数据:jdk 延迟队列中的任务

  • 广播事件:删除延迟队列中的任务,发布的是广播事件,可以使用redis topic实现。

  • 本地事件:分布式多节点部署的时候,每个任务只保存在其中一个节点的延迟队列中,可以使用spring事件驱动实现。

  • 延迟队列 DelayQueueJob, 它实现了接口Delayed

包括任务的交易流水号和过期时间(即任务的回调时间)

import lombok.Builder;
import lombok.Data;import java.util.Date;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;/*** @author xxx*/
@Builder
@Data
public class DelayQueueJob implements Delayed {/*** 交易流水号*/private String transNo;/*** 到期时间*/private Date expireDate;public DelayQueueJob(String transNo, Date expireDate) {super();this.transNo = transNo;this.expireDate = expireDate;}/*** 用于队列中排序过期时间** @param o* @return*/@Overridepublic int compareTo(Delayed o) {return Long.valueOf(this.expireDate.getTime()).compareTo(Long.valueOf(((DelayQueueJob) o).expireDate.getTime()));}/*** 用于获取过期时间* 延迟关闭时间 = 过期时间 - 当前时间** @param unit* @return*/@Overridepublic long getDelay(TimeUnit unit) {return this.expireDate.getTime() - System.currentTimeMillis();}
}

三、应用启动流程

解决恢复延迟队列的问题。因为DelayQueue是进程内的,一旦重启,将被销毁。

在这里插入图片描述

import cn.hutool.core.date.DateTime;
import cn.hutool.core.date.DateUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;import java.util.List;
import java.util.concurrent.TimeUnit;@Slf4j
@Service
@RequiredArgsConstructor
public class ApplicationStartupListener implements ApplicationListener<ApplicationReadyEvent> {@Overridepublic void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {// 实现代码参考上面的流程图}
}

四、定时任务流程

解决冷数据如何转换为热数据的问题,防止延时任务过多导致消耗过多的jvm内存,所以只有回调时间将近的任务才放入延迟队列。

在这里插入图片描述

五、如何删除延迟队列中的任务

删除延迟队列的任务:发送广播消息通知所有的节点,当不是当前节点的时候,执行删除。

if (!NetUtil.getLocalhostStr().equals(ipAddress)) {DelayQueueSingleton.getDelayQueue().remove(transNo);
}

DelayQueueSingletons是一个单例类,详见下:

public class DelayQueueSingleton {private static volatile CustomDelayQueue<DelayQueueJob> delayQueue;private DelayQueueSingleton() {}public static CustomDelayQueue<DelayQueueJob> getDelayQueue() {if (delayQueue == null) {synchronized (DelayQueueSingleton.class) {if (delayQueue == null) {delayQueue = new CustomDelayQueue<>();}}}return delayQueue;}}

这里为了删除延迟队列的任务,我们对DelayQueue进行了重写。


import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;public class CustomDelayQueue<T extends Delayed> {private final DelayQueue<T> queue = new DelayQueue<>();private final Map<String, T> map = new ConcurrentHashMap<>();public boolean put(T task, String taskId) {// 如果任务已存在,则删除旧任务,防止重复添加this.remove(taskId);map.put(taskId, task);return queue.add(task);}public boolean remove(String taskId) {// 先删除map,再删除queueT task = map.remove(taskId);if (task != null) {return queue.remove(task);}return false;}public T take() throws InterruptedException {return queue.take();}
}

六、保存任务至延迟队列(生产者)


// 如果通知时间在一定时间范围内
if (DateUtil.offsetMinute(new DateTime(), commonConfig.getHotDataTimeLine()).after(event.getNotifyDate())) {DelayQueueSingleton.getDelayQueue().put(DelayQueueJob.builder().transNo(event.getTransNo()).expireDate(event.getNotifyDate()).build(), event.getTransNo());}

七、读取延迟队列中的任务(消费者)

作为延迟队列的消费者,它的实现和上一篇文章实现类似。不同的是take()获取任务不一样。

String transNo = null;
Date notifyDate = null;DelayQueueJob job = DelayQueueSingleton.getDelayQueue().take();
if (null != job) {transNo = job.getTransNo();notifyDate = job.getExpireDate();
}if (null == transNo) {return;
}if (log.isInfoEnabled()) {log.info("开始执行延迟队列中的任务,transNo={},notifyDate={}", transNo, notifyDate);
}// 异步执行你的操作
notifyTaskService.handleTask(transNo, notifyDate);

八、总结

作为进程内的延迟队列,在多点部署的分布式集群环境下, 代码明显比上一篇要复杂得多。

它们都需要的步骤是:

  • 任务的生产
  • 任务的消费
  • 移除任务

DelayQueue额外多出来的步骤是:

  • 应用启动的时候拉取回调时间将近的未完成任务(更新marked标记为true,防止重复拉取冷数据)
  • 定时拉取未标记且回调时间将近的未完成任务(和上面必须是互斥,等待上一步执行完成,否则会导致重复拉取)
  • 删除延迟队列DelayQueue的任务,必须发布广播消息给全部节点。(引入广播消息机制)

由此可见,任务表的字段marked仅供DelayQueue使用,防止重复拉取数据库的任务到热数据区。

    @Column(name = "marked", nullable = false, columnDefinition = "TINYINT(1) default 0 COMMENT '是否已标记为热数据'")private Boolean marked;

附:相关系列文章链接

延时任务通知服务的设计及实现(一)-- 设计方案

延时任务通知服务的设计及实现(二)-- redisson的延迟队列RDelayedQueue

延时任务通知服务的设计及实现(三)-- JDK的延迟队列DelayQueue

延时任务通知服务的设计及实现(四)-- webhook执行任务

延时任务通知服务的设计及实现(五)-- Netty时间轮HashedWheelTimer

这篇关于延时任务通知服务的设计及实现(三)-- JDK的延迟队列DelayQueue的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!


原文地址:
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.chinasem.cn/article/966976

相关文章

基于Python实现一个图片拆分工具

《基于Python实现一个图片拆分工具》这篇文章主要为大家详细介绍了如何基于Python实现一个图片拆分工具,可以根据需要的行数和列数进行拆分,感兴趣的小伙伴可以跟随小编一起学习一下... 简单介绍先自己选择输入的图片,默认是输出到项目文件夹中,可以自己选择其他的文件夹,选择需要拆分的行数和列数,可以通过

Python中将嵌套列表扁平化的多种实现方法

《Python中将嵌套列表扁平化的多种实现方法》在Python编程中,我们常常会遇到需要将嵌套列表(即列表中包含列表)转换为一个一维的扁平列表的需求,本文将给大家介绍了多种实现这一目标的方法,需要的朋... 目录python中将嵌套列表扁平化的方法技术背景实现步骤1. 使用嵌套列表推导式2. 使用itert

Python使用pip工具实现包自动更新的多种方法

《Python使用pip工具实现包自动更新的多种方法》本文深入探讨了使用Python的pip工具实现包自动更新的各种方法和技术,我们将从基础概念开始,逐步介绍手动更新方法、自动化脚本编写、结合CI/C... 目录1. 背景介绍1.1 目的和范围1.2 预期读者1.3 文档结构概述1.4 术语表1.4.1 核

在Linux中改变echo输出颜色的实现方法

《在Linux中改变echo输出颜色的实现方法》在Linux系统的命令行环境下,为了使输出信息更加清晰、突出,便于用户快速识别和区分不同类型的信息,常常需要改变echo命令的输出颜色,所以本文给大家介... 目python录在linux中改变echo输出颜色的方法技术背景实现步骤使用ANSI转义码使用tpu

关于DNS域名解析服务

《关于DNS域名解析服务》:本文主要介绍关于DNS域名解析服务,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录DNS系统的作用及类型DNS使用的协议及端口号DNS系统的分布式数据结构DNS的分布式互联网解析库域名体系结构两种查询方式DNS服务器类型统计构建DNS域

Python使用python-can实现合并BLF文件

《Python使用python-can实现合并BLF文件》python-can库是Python生态中专注于CAN总线通信与数据处理的强大工具,本文将使用python-can为BLF文件合并提供高效灵活... 目录一、python-can 库:CAN 数据处理的利器二、BLF 文件合并核心代码解析1. 基础合

Python使用OpenCV实现获取视频时长的小工具

《Python使用OpenCV实现获取视频时长的小工具》在处理视频数据时,获取视频的时长是一项常见且基础的需求,本文将详细介绍如何使用Python和OpenCV获取视频时长,并对每一行代码进行深入解析... 目录一、代码实现二、代码解析1. 导入 OpenCV 库2. 定义获取视频时长的函数3. 打开视频文

golang版本升级如何实现

《golang版本升级如何实现》:本文主要介绍golang版本升级如何实现问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录golanwww.chinasem.cng版本升级linux上golang版本升级删除golang旧版本安装golang最新版本总结gola

SpringBoot中SM2公钥加密、私钥解密的实现示例详解

《SpringBoot中SM2公钥加密、私钥解密的实现示例详解》本文介绍了如何在SpringBoot项目中实现SM2公钥加密和私钥解密的功能,通过使用Hutool库和BouncyCastle依赖,简化... 目录一、前言1、加密信息(示例)2、加密结果(示例)二、实现代码1、yml文件配置2、创建SM2工具

Mysql实现范围分区表(新增、删除、重组、查看)

《Mysql实现范围分区表(新增、删除、重组、查看)》MySQL分区表的四种类型(范围、哈希、列表、键值),主要介绍了范围分区的创建、查询、添加、删除及重组织操作,具有一定的参考价值,感兴趣的可以了解... 目录一、mysql分区表分类二、范围分区(Range Partitioning1、新建分区表:2、分