airflow调度时间详解

2024-08-24 03:04
文章标签 详解 时间 调度 airflow

本文主要是介绍airflow调度时间详解,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

⭐️ airflow调度概述

Apache Airflow 是一个开源的工作流调度和监控平台,广泛用于数据工程、ETL(提取、转换、加载)管道以及各种自动化任务。下面我将详细说明 Airflow 的调度算法。

1. DAG(有向无环图)

Airflow 的核心是 DAG(Directed Acyclic Graph),它定义了一组有序的任务,其中每个任务称为一个 “task”。DAG 是调度的基本单位,Airflow 通过 DAG 定义任务的依赖关系和执行顺序。

在这里插入图片描述

2. DAG调度器

Airflow 中的调度器(Scheduler)负责监控 DAG,决定何时触发 DAG 中的任务。

3. 调度策略(Scheduling Policy)

在 Airflow 中,调度策略定义了 DAG 应该何时被触发。调度策略通常基于以下两个因素:

  • 时间表(Schedule Interval): 这是一个时间间隔,定义了 DAG 被调度的频率。例如,每小时调度一次、每天调度一次等。时间表可以通过 Cron 表达式、timedelta 对象等来定义。

  • 开始时间(Start Date): DAG 从这个时间开始调度。start_dateschedule_interval 一起决定了 DAG 的首次执行时间和后续的执行时间点。

4. Catchup

默认情况下,Airflow 会自动执行所有未执行的 DAG 实例。如果你设置了一个 DAG 的 start_date 为过去的某个时间,且 schedule_interval 为每天执行一次,Airflow 会在下次调度器运行时尝试执行所有中间日期的任务,直到赶上当前日期。

如果 DAG 不是为了处理其追赶而编写的,那么可以关闭catchup。这可以通过在DAG中设置catchup=False或在配置文件中设置catchup_by_default=False来实现。关闭catchup时,调度程序仅为最新间隔创建DAG运行。

5. External Triggers

Airflow 还支持外部事件触发,例如通过 REST API 或 Sensor 任务来触发 DAG 或特定任务的执行。这个机制使得 Airflow 能够适应动态和复杂的调度需求,而不仅仅依赖于固定的时间表。

在这里插入图片描述

⭐️ airflow内部是怎样计算调度时间的

Airflow 的开始时间(start_date)和调度时间(schedule_interval)是密切相关的,理解它们之间的关系对正确配置 DAG 至关重要。

1. 开始时间(start_date)

start_date 是指 DAG 或任务首次被调度的参考时间点。它表示任务计划从何时开始执行。重要的是,Airflow 将在 start_date 后的第一个调度间隔完成时实际开始执行任务。换句话说,start_date 指的是任务应该开始考虑执行的时间,而不是实际执行的时间。

2. 调度时间(schedule_interval)

schedule_interval 是指 DAG 定期运行的时间间隔。例如,如果你设置了 schedule_interval=‘@daily’,Airflow 会每天触发一次 DAG。Airflow 调度任务的实际时间通常是 start_date 加上 schedule_interval 的长度。

3. 两者的关系

Airflow 的调度器总是会在 start_date 之后的调度间隔结束时调度任务,这意味着任务实际运行的时间通常是 start_date 加上一个 schedule_interval。

举例说明,假设你有一个 DAG,其 start_date 设置为 2024-08-14 00:00:00,schedule_interval 设置为 @daily(每天运行一次):start_date 是 2024-08-14 00:00:00。
第一次调度的实际执行时间(即第一个 DAG run)会是 2024-08-15 00:00:00。

为什么会这样? 因为 Airflow 认为每个 DAG run 是在上一个时间区间的末尾处理数据。例如,在这个例子中,2024-08-15 00:00:00 运行的是 2024-08-14 的数据。

4. 调度时间的具体计算

Airflow 采用了如下逻辑计算调度时间:

初始调度时间计算:
初始调度时间是基于 start_date 和 schedule_interval 计算得出的第一个调度时间。例如,如果 start_date 是 2024-08-14 00:00:00,schedule_interval 是 1 小时,那么第一个调度时间是 2024-08-14 01:00:00。

后续调度时间计算:
Airflow 会从 start_date 开始逐个计算调度时间,直到当前时间。例如,若 schedule_interval 是 1 小时,Airflow 会逐小时计算每个调度时间点。

Missed Schedules(错过的调度):
如果一个 DAG 处于 paused 状态或因为某些原因未被调度,Airflow 会在 DAG 恢复为 active 状态时补齐所有错过的调度时间点。

5. 实际运行中的情况

在实际应用中,start_date 和 schedule_interval 的组合可能会导致以下情况:
迟到执行:由于 schedule_interval 的存在,第一次 DAG 任务通常在 start_date 后一个完整的调度间隔结束时执行。
调度延迟:如果 DAG 配置或系统资源不够,可能会出现调度延迟。

6. 示例说明

假设你有一个 DAG:

start_date: 2024-08-14 00:00:00
schedule_interval: 每 1 小时一次 (timedelta(hours=1))

那么 Airflow 会按如下时间点调度任务:

2024-08-14 01:00:00 调度 2024-08-14 00:00:00 到 01:00:00 这一小时的数据
2024-08-14 02:00:00 调度 2024-08-14 01:00:00 到 02:00:00 这一小时的数据
以此类推

7. 具体代码实现

在 Airflow 的源码中,调度时间的计算主要依赖于以下几个组件:
DagRun: 每次 DAG 运行都会创建一个 DagRun 对象,其中记录了调度时间、开始时间、结束时间等信息。
next_dagrun_info: 这个函数会基于当前的 schedule_interval 计算下一个调度时间。
catchup: 如果 catchup=True,Airflow 会尝试补齐所有错过的调度时间。

在这里插入图片描述

⭐️ 一个场景

假如开始时间为当天0点,每隔1小时执行一次,而且开始时DAG是被paused的,当在凌晨3点手动从paused状态变成acitve状态,那么后面的运行是怎么调度的,分析如下:

在 Airflow 中,DAG 被 paused 状态解除并变为 active 状态时,调度的逻辑将根据 start_dateschedule_interval 来决定接下来的任务调度时间。让我们详细讨论一下这种情况下的调度行为。

1. 情景描述

  • start_date: 当天的凌晨 00:00(例如 2024-08-14 00:00)。
  • schedule_interval: 每 1 小时运行一次('0 * * * *'timedelta(hours=1))。
  • DAG 状态: 初始时 DAG 处于 paused 状态,凌晨 3 点手动将其设置为 active 状态。

2. 关键点

  1. DAG 的调度逻辑

    • Airflow 的调度是基于 start_dateschedule_interval 的,并且会计算所有可能的调度时间点。
    • 当 DAG 处于 paused 状态时,Airflow 不会调度新的任务实例,但会“记住”错过的调度窗口。
    • 当 DAG 被从 paused 变为 active 时,Airflow 会立即尝试补齐所有错过的调度实例,除非你手动跳过这些实例。
  2. 调度行为

    • 你在凌晨 3 点将 DAG 从 paused 变为 active 状态后,Airflow 将立即调度凌晨 1 点、2 点和 3 点的实例,因为它们都是基于 start_dateschedule_interval 计算出来的调度点。
    • 这意味着,当你在凌晨 3 点将 DAG 设为 active,Airflow 会依次调度并执行 00:00-01:00、01:00-02:00、02:00-03:00 这几个时间段的 DAG 任务。

3. 示例说明

假设 start_date 是 2024-08-14 00:00,DAG 设定为每 1 小时执行一次,调度时间点依次是:

  • 2024-08-14 01:00:00
  • 2024-08-14 02:00:00
  • 2024-08-14 03:00:00
  • 2024-08-14 04:00:00

如果 DAG 在 2024-08-14 03:00:00 从 paused 状态变为 active,Airflow 会立即调度前面错过的任务:

  • 2024-08-14 01:00:00(任务处理 00:00 到 01:00 的数据)
  • 2024-08-14 02:00:00(任务处理 01:00 到 02:00 的数据)
  • 2024-08-14 03:00:00(任务处理 02:00 到 03:00 的数据)

Airflow 会按照这些顺序来执行这些任务,确保所有错过的调度时间点都得到处理。

下图是一个在实际生产中的案例,开始时间是凌晨0点5分,每5个小时调度一次,但开始时DAG是paused的,上午9点多手动将DAG恢复,此图是从airflow ui 中截取的,

在这里插入图片描述

笔者水平有限,若有不对的地方欢迎评论指正!

这篇关于airflow调度时间详解的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1101263

相关文章

HTML5 搜索框Search Box详解

《HTML5搜索框SearchBox详解》HTML5的搜索框是一个强大的工具,能够有效提升用户体验,通过结合自动补全功能和适当的样式,可以创建出既美观又实用的搜索界面,这篇文章给大家介绍HTML5... html5 搜索框(Search Box)详解搜索框是一个用于输入查询内容的控件,通常用于网站或应用程

Python中使用uv创建环境及原理举例详解

《Python中使用uv创建环境及原理举例详解》uv是Astral团队开发的高性能Python工具,整合包管理、虚拟环境、Python版本控制等功能,:本文主要介绍Python中使用uv创建环境及... 目录一、uv工具简介核心特点:二、安装uv1. 通过pip安装2. 通过脚本安装验证安装:配置镜像源(可

C++ 函数 strftime 和时间格式示例详解

《C++函数strftime和时间格式示例详解》strftime是C/C++标准库中用于格式化日期和时间的函数,定义在ctime头文件中,它将tm结构体中的时间信息转换为指定格式的字符串,是处理... 目录C++ 函数 strftipythonme 详解一、函数原型二、功能描述三、格式字符串说明四、返回值五

LiteFlow轻量级工作流引擎使用示例详解

《LiteFlow轻量级工作流引擎使用示例详解》:本文主要介绍LiteFlow是一个灵活、简洁且轻量的工作流引擎,适合用于中小型项目和微服务架构中的流程编排,本文给大家介绍LiteFlow轻量级工... 目录1. LiteFlow 主要特点2. 工作流定义方式3. LiteFlow 流程示例4. LiteF

CSS3中的字体及相关属性详解

《CSS3中的字体及相关属性详解》:本文主要介绍了CSS3中的字体及相关属性,详细内容请阅读本文,希望能对你有所帮助... 字体网页字体的三个来源:用户机器上安装的字体,放心使用。保存在第三方网站上的字体,例如Typekit和Google,可以link标签链接到你的页面上。保存在你自己Web服务器上的字

MySQL存储过程之循环遍历查询的结果集详解

《MySQL存储过程之循环遍历查询的结果集详解》:本文主要介绍MySQL存储过程之循环遍历查询的结果集,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录前言1. 表结构2. 存储过程3. 关于存储过程的SQL补充总结前言近来碰到这样一个问题:在生产上导入的数据发现

MyBatis ResultMap 的基本用法示例详解

《MyBatisResultMap的基本用法示例详解》在MyBatis中,resultMap用于定义数据库查询结果到Java对象属性的映射关系,本文给大家介绍MyBatisResultMap的基本... 目录MyBATis 中的 resultMap1. resultMap 的基本语法2. 简单的 resul

从基础到进阶详解Pandas时间数据处理指南

《从基础到进阶详解Pandas时间数据处理指南》Pandas构建了完整的时间数据处理生态,核心由四个基础类构成,Timestamp,DatetimeIndex,Period和Timedelta,下面我... 目录1. 时间数据类型与基础操作1.1 核心时间对象体系1.2 时间数据生成技巧2. 时间索引与数据

Mybatis Plus Join使用方法示例详解

《MybatisPlusJoin使用方法示例详解》:本文主要介绍MybatisPlusJoin使用方法示例详解,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,... 目录1、pom文件2、yaml配置文件3、分页插件4、示例代码:5、测试代码6、和PageHelper结合6

一文全面详解Python变量作用域

《一文全面详解Python变量作用域》变量作用域是Python中非常重要的概念,它决定了在哪里可以访问变量,下面我将用通俗易懂的方式,结合代码示例和图表,带你全面了解Python变量作用域,需要的朋友... 目录一、什么是变量作用域?二、python的四种作用域作用域查找顺序图示三、各作用域详解1. 局部作