环形定时任务 原理

2024-09-08 01:18
文章标签 原理 定时 任务 环形

本文主要是介绍环形定时任务 原理,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

业务背景

在稍微复杂点业务系统中,不可避免会碰到做定时任务的需求,比如淘宝的交易超时自动关闭订单、超时自动确认收货等等。对于一些定时作业比较多的系统,通常都会搭建专门的调度平台来管理,通过创建定时器来周期性执行任务。如刚才所说的场景,我们可以给订单创建一个专门的任务来处理交易状态,每秒轮询一次订单表,找出那些符合超时条件的订单然后标记状态。这是最简单粗暴的做法,但明显也很low,自己都下不去手写这样的代码,所有必须要找个更好的方案。

回到真实项目中的场景,系统中某个活动上线后要给目标用户发送短信通知,这些通知需要按时间点批量发送。虽然已经基于quartz.net给系统搭建了任务调度平台,但着实不想用上述方案来实现。在网上各种搜索和思考,找到一篇文章让我眼前一亮,稍加分析发现里面的思路完全符合现在的场景,于是决定在自己项目中实现出来。

 

原理分析

 这种方案的核心就是构造一种数据结构,称之为环形队列,但实际上还是一个数组,加上对它的循环遍历,达到一种环状的假象。然后再配合定时器,就可以实现按需延时的效果。上面提到的文章中也介绍了实现思路,这里我采用我的理解再更加详细的解释一下。

我们先为这个数组分配一个固定大小的空间,比如60,每个数组的元素用来存放任务的集合。然后开启一个定时器每隔一秒来扫描这个数组,扫完一圈刚好是一分钟。如果提前设置好任务被扫描的圈数(CycleNum)和在数组中的位置(Slot),在刚好扫到数组的Slot位置时,集合里那些CycleNum为0的任务就是达到触发条件的任务,拉出来做业务操作然后移除掉,其他的把圈数减掉一次,然后留到下次继续扫描,这样就实现了延时的效果。原理如下图所示:

可以看出中间的重点是计算出每个任务所在的位置以及需要循环的圈数。假设当前时间为15:20:08,当前扫描位置是2,我的任务要在15:22:35这个时刻触发,也就是147秒后。那么我需要循环的圈数就是147/60=2圈,需要被扫描的位置就是(147+2)%60=29的地方。计算好任务的坐标后塞到数组中属于它的位置,然后静静等待被消费就好啦。

 

撸码实现

光讲原理不上代码怎么能行呢,根据上面的思路,下面一步步在.net平台下实现出来。

先做一些基础封装。

首先构造任务参数的基类,用来记录任务的位置信息和定义业务回调方法:

复制代码

    public class DelayQueueParam{internal int Slot { get; set; }internal int CycleNum { get; set; }public Action<object> Callback { get; set; }}

复制代码

接下来是核心地方。再构造队列的泛型类,真实类型必须派生自上面的基类,用来扩展一些业务字段方便消费时使用。队列的主要属性有当前位置指针以及数组容器,主要的操作有插入、移除和消费。插入任务时需要传入执行时间,用来计算这个任务的坐标。

复制代码

    public class DelayQueue<T> where T : DelayQueueParam{private List<T>[] queue;private int currentIndex = 1;public DelayQueue(int length){queue = new List<T>[length];}public void Insert(T item, DateTime time){//根据消费时间计算消息应该放入的位置var second = (int)(time - DateTime.Now).TotalSeconds;item.CycleNum = second / queue.Length;item.Slot = (second + currentIndex) % queue.Length;//加入到延时队列中if (queue[item.Slot] == null){queue[item.Slot] = new List<T>();}queue[item.Slot].Add(item);}public void Remove(T item){if (queue[item.Slot] != null){queue[item.Slot].Remove(item);}}public void Read(){if (queue.Length >= currentIndex){var list = queue[currentIndex - 1];if (list != null){List<T> target = new List<T>();foreach (var item in list){if (item.CycleNum == 0){//在本轮命中,用单独线程去执行业务操作Task.Run(()=> { item.Callback(item); });target.Add(item);}else{//等下一轮item.CycleNum--;System.Diagnostics.Debug.WriteLine($"@@@@@索引:{item.Slot},剩余:{item.CycleNum}");}}//把已过期的移除掉foreach (var item in target){list.Remove(item);}}currentIndex++;//下一遍从头开始if (currentIndex > queue.Length){currentIndex = 1;}}}}

复制代码

接下来是使用方法。

创建一个管理队列实例的静态类,里面封装对队列的操作:

复制代码

    public static class NotifyPlanManager{private static DelayQueue<NotifyPlan> _queue = new DelayQueue<NotifyPlan>(60);public static void Insert(NotifyPlan plan, DateTime time){_queue.Insert(plan, time);}public static void Read(){_queue.Read();}}

复制代码

构建我们的实际业务参数类,派生自DelayQueueParam:

复制代码

    public class NotifyPlan : DelayQueueParam{public Guid CamId { get; set; }public int PreviousTotal { get; set; }public int Amount { get; set; }}

复制代码

生产端往队列中插入数据:

复制代码

    Action<object> callback = (result) =>{var np = result as NotifyPlan;//这里做自己的业务操作//举个例子:Debug.WriteLine($"活动ID:{np.CamId},已发送数量:{np.PreviousTotal},本次发送数量:{np.Amount}");};NotifyPlanManager.Insert(new NotifyPlan{Amount = set.MainAmount,CamId = camId,PreviousTotal = 0,Callback = callback}, smsTemplate.SendDate);

复制代码

再创建一个每秒执行一次的定时器用做消费端,我这里使用的是FluentScheduler,核心代码:

复制代码

    internal class NotifyPlanJob : IJob{/// <summary>/// 执行计划/// </summary>public void Execute(){NotifyPlanManager.Read();}}internal class JobFactory : Registry{public JobFactory(){//每秒运行一次Schedule<NotifyPlanJob >().ToRunEvery(1).Seconds();}}JobManager.Initialize(new JobFactory());

复制代码

然后开启调试运行,打开本机的系统时间面板,对着时间看输出结果。亲测有效。

 

总结

 这种方案的好处是避免了频繁地扫描数据库和不必要的业务操作,另外也很方便控制时间精度。带来的问题是如果web服务异常或重启可能会发生任务丢失的情况,我目前的处理方法是在数据库中标记任务状态,服务启动时把状态为“排队中”的任务重新加载到队列中等待消费。

以上方案在单机环境测试没问题,多节点情况下暂时没有深究。若有设计实现上的缺陷,欢迎讨论与指正,要是有更好的方案,那就当抛砖引玉,再好不过了~

这篇关于环形定时任务 原理的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1146702

相关文章

Linux下MySQL数据库定时备份脚本与Crontab配置教学

《Linux下MySQL数据库定时备份脚本与Crontab配置教学》在生产环境中,数据库是核心资产之一,定期备份数据库可以有效防止意外数据丢失,本文将分享一份MySQL定时备份脚本,并讲解如何通过cr... 目录备份脚本详解脚本功能说明授权与可执行权限使用 Crontab 定时执行编辑 Crontab添加定

ShardingProxy读写分离之原理、配置与实践过程

《ShardingProxy读写分离之原理、配置与实践过程》ShardingProxy是ApacheShardingSphere的数据库中间件,通过三层架构实现读写分离,解决高并发场景下数据库性能瓶... 目录一、ShardingProxy技术定位与读写分离核心价值1.1 技术定位1.2 读写分离核心价值二

深度解析Python中递归下降解析器的原理与实现

《深度解析Python中递归下降解析器的原理与实现》在编译器设计、配置文件处理和数据转换领域,递归下降解析器是最常用且最直观的解析技术,本文将详细介绍递归下降解析器的原理与实现,感兴趣的小伙伴可以跟随... 目录引言:解析器的核心价值一、递归下降解析器基础1.1 核心概念解析1.2 基本架构二、简单算术表达

深入浅出Spring中的@Autowired自动注入的工作原理及实践应用

《深入浅出Spring中的@Autowired自动注入的工作原理及实践应用》在Spring框架的学习旅程中,@Autowired无疑是一个高频出现却又让初学者头疼的注解,它看似简单,却蕴含着Sprin... 目录深入浅出Spring中的@Autowired:自动注入的奥秘什么是依赖注入?@Autowired

从原理到实战解析Java Stream 的并行流性能优化

《从原理到实战解析JavaStream的并行流性能优化》本文给大家介绍JavaStream的并行流性能优化:从原理到实战的全攻略,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的... 目录一、并行流的核心原理与适用场景二、性能优化的核心策略1. 合理设置并行度:打破默认阈值2. 避免装箱

SpringBoot集成XXL-JOB实现任务管理全流程

《SpringBoot集成XXL-JOB实现任务管理全流程》XXL-JOB是一款轻量级分布式任务调度平台,功能丰富、界面简洁、易于扩展,本文介绍如何通过SpringBoot项目,使用RestTempl... 目录一、前言二、项目结构简述三、Maven 依赖四、Controller 代码详解五、Service

Linux系统管理与进程任务管理方式

《Linux系统管理与进程任务管理方式》本文系统讲解Linux管理核心技能,涵盖引导流程、服务控制(Systemd与GRUB2)、进程管理(前台/后台运行、工具使用)、计划任务(at/cron)及常用... 目录引言一、linux系统引导过程与服务控制1.1 系统引导的五个关键阶段1.2 GRUB2的进化优

Python中的filter() 函数的工作原理及应用技巧

《Python中的filter()函数的工作原理及应用技巧》Python的filter()函数用于筛选序列元素,返回迭代器,适合函数式编程,相比列表推导式,内存更优,尤其适用于大数据集,结合lamb... 目录前言一、基本概念基本语法二、使用方式1. 使用 lambda 函数2. 使用普通函数3. 使用 N

MyBatis-Plus 与 Spring Boot 集成原理实战示例

《MyBatis-Plus与SpringBoot集成原理实战示例》MyBatis-Plus通过自动配置与核心组件集成SpringBoot实现零配置,提供分页、逻辑删除等插件化功能,增强MyBa... 目录 一、MyBATis-Plus 简介 二、集成方式(Spring Boot)1. 引入依赖 三、核心机制

Python Flask实现定时任务的不同方法详解

《PythonFlask实现定时任务的不同方法详解》在Flask中实现定时任务,最常用的方法是使用APScheduler库,本文将提供一个完整的解决方案,有需要的小伙伴可以跟随小编一起学习一下... 目录完js整实现方案代码解释1. 依赖安装2. 核心组件3. 任务类型4. 任务管理5. 持久化存储生产环境