airflow调度时间详解

2024-08-24 03:04
文章标签 详解 时间 调度 airflow

本文主要是介绍airflow调度时间详解,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

⭐️ airflow调度概述

Apache Airflow 是一个开源的工作流调度和监控平台,广泛用于数据工程、ETL(提取、转换、加载)管道以及各种自动化任务。下面我将详细说明 Airflow 的调度算法。

1. DAG(有向无环图)

Airflow 的核心是 DAG(Directed Acyclic Graph),它定义了一组有序的任务,其中每个任务称为一个 “task”。DAG 是调度的基本单位,Airflow 通过 DAG 定义任务的依赖关系和执行顺序。

在这里插入图片描述

2. DAG调度器

Airflow 中的调度器(Scheduler)负责监控 DAG,决定何时触发 DAG 中的任务。

3. 调度策略(Scheduling Policy)

在 Airflow 中,调度策略定义了 DAG 应该何时被触发。调度策略通常基于以下两个因素:

  • 时间表(Schedule Interval): 这是一个时间间隔,定义了 DAG 被调度的频率。例如,每小时调度一次、每天调度一次等。时间表可以通过 Cron 表达式、timedelta 对象等来定义。

  • 开始时间(Start Date): DAG 从这个时间开始调度。start_dateschedule_interval 一起决定了 DAG 的首次执行时间和后续的执行时间点。

4. Catchup

默认情况下,Airflow 会自动执行所有未执行的 DAG 实例。如果你设置了一个 DAG 的 start_date 为过去的某个时间,且 schedule_interval 为每天执行一次,Airflow 会在下次调度器运行时尝试执行所有中间日期的任务,直到赶上当前日期。

如果 DAG 不是为了处理其追赶而编写的,那么可以关闭catchup。这可以通过在DAG中设置catchup=False或在配置文件中设置catchup_by_default=False来实现。关闭catchup时,调度程序仅为最新间隔创建DAG运行。

5. External Triggers

Airflow 还支持外部事件触发,例如通过 REST API 或 Sensor 任务来触发 DAG 或特定任务的执行。这个机制使得 Airflow 能够适应动态和复杂的调度需求,而不仅仅依赖于固定的时间表。

在这里插入图片描述

⭐️ airflow内部是怎样计算调度时间的

Airflow 的开始时间(start_date)和调度时间(schedule_interval)是密切相关的,理解它们之间的关系对正确配置 DAG 至关重要。

1. 开始时间(start_date)

start_date 是指 DAG 或任务首次被调度的参考时间点。它表示任务计划从何时开始执行。重要的是,Airflow 将在 start_date 后的第一个调度间隔完成时实际开始执行任务。换句话说,start_date 指的是任务应该开始考虑执行的时间,而不是实际执行的时间。

2. 调度时间(schedule_interval)

schedule_interval 是指 DAG 定期运行的时间间隔。例如,如果你设置了 schedule_interval=‘@daily’,Airflow 会每天触发一次 DAG。Airflow 调度任务的实际时间通常是 start_date 加上 schedule_interval 的长度。

3. 两者的关系

Airflow 的调度器总是会在 start_date 之后的调度间隔结束时调度任务,这意味着任务实际运行的时间通常是 start_date 加上一个 schedule_interval。

举例说明,假设你有一个 DAG,其 start_date 设置为 2024-08-14 00:00:00,schedule_interval 设置为 @daily(每天运行一次):start_date 是 2024-08-14 00:00:00。
第一次调度的实际执行时间(即第一个 DAG run)会是 2024-08-15 00:00:00。

为什么会这样? 因为 Airflow 认为每个 DAG run 是在上一个时间区间的末尾处理数据。例如,在这个例子中,2024-08-15 00:00:00 运行的是 2024-08-14 的数据。

4. 调度时间的具体计算

Airflow 采用了如下逻辑计算调度时间:

初始调度时间计算:
初始调度时间是基于 start_date 和 schedule_interval 计算得出的第一个调度时间。例如,如果 start_date 是 2024-08-14 00:00:00,schedule_interval 是 1 小时,那么第一个调度时间是 2024-08-14 01:00:00。

后续调度时间计算:
Airflow 会从 start_date 开始逐个计算调度时间,直到当前时间。例如,若 schedule_interval 是 1 小时,Airflow 会逐小时计算每个调度时间点。

Missed Schedules(错过的调度):
如果一个 DAG 处于 paused 状态或因为某些原因未被调度,Airflow 会在 DAG 恢复为 active 状态时补齐所有错过的调度时间点。

5. 实际运行中的情况

在实际应用中,start_date 和 schedule_interval 的组合可能会导致以下情况:
迟到执行:由于 schedule_interval 的存在,第一次 DAG 任务通常在 start_date 后一个完整的调度间隔结束时执行。
调度延迟:如果 DAG 配置或系统资源不够,可能会出现调度延迟。

6. 示例说明

假设你有一个 DAG:

start_date: 2024-08-14 00:00:00
schedule_interval: 每 1 小时一次 (timedelta(hours=1))

那么 Airflow 会按如下时间点调度任务:

2024-08-14 01:00:00 调度 2024-08-14 00:00:00 到 01:00:00 这一小时的数据
2024-08-14 02:00:00 调度 2024-08-14 01:00:00 到 02:00:00 这一小时的数据
以此类推

7. 具体代码实现

在 Airflow 的源码中,调度时间的计算主要依赖于以下几个组件:
DagRun: 每次 DAG 运行都会创建一个 DagRun 对象,其中记录了调度时间、开始时间、结束时间等信息。
next_dagrun_info: 这个函数会基于当前的 schedule_interval 计算下一个调度时间。
catchup: 如果 catchup=True,Airflow 会尝试补齐所有错过的调度时间。

在这里插入图片描述

⭐️ 一个场景

假如开始时间为当天0点,每隔1小时执行一次,而且开始时DAG是被paused的,当在凌晨3点手动从paused状态变成acitve状态,那么后面的运行是怎么调度的,分析如下:

在 Airflow 中,DAG 被 paused 状态解除并变为 active 状态时,调度的逻辑将根据 start_dateschedule_interval 来决定接下来的任务调度时间。让我们详细讨论一下这种情况下的调度行为。

1. 情景描述

  • start_date: 当天的凌晨 00:00(例如 2024-08-14 00:00)。
  • schedule_interval: 每 1 小时运行一次('0 * * * *'timedelta(hours=1))。
  • DAG 状态: 初始时 DAG 处于 paused 状态,凌晨 3 点手动将其设置为 active 状态。

2. 关键点

  1. DAG 的调度逻辑

    • Airflow 的调度是基于 start_dateschedule_interval 的,并且会计算所有可能的调度时间点。
    • 当 DAG 处于 paused 状态时,Airflow 不会调度新的任务实例,但会“记住”错过的调度窗口。
    • 当 DAG 被从 paused 变为 active 时,Airflow 会立即尝试补齐所有错过的调度实例,除非你手动跳过这些实例。
  2. 调度行为

    • 你在凌晨 3 点将 DAG 从 paused 变为 active 状态后,Airflow 将立即调度凌晨 1 点、2 点和 3 点的实例,因为它们都是基于 start_dateschedule_interval 计算出来的调度点。
    • 这意味着,当你在凌晨 3 点将 DAG 设为 active,Airflow 会依次调度并执行 00:00-01:00、01:00-02:00、02:00-03:00 这几个时间段的 DAG 任务。

3. 示例说明

假设 start_date 是 2024-08-14 00:00,DAG 设定为每 1 小时执行一次,调度时间点依次是:

  • 2024-08-14 01:00:00
  • 2024-08-14 02:00:00
  • 2024-08-14 03:00:00
  • 2024-08-14 04:00:00

如果 DAG 在 2024-08-14 03:00:00 从 paused 状态变为 active,Airflow 会立即调度前面错过的任务:

  • 2024-08-14 01:00:00(任务处理 00:00 到 01:00 的数据)
  • 2024-08-14 02:00:00(任务处理 01:00 到 02:00 的数据)
  • 2024-08-14 03:00:00(任务处理 02:00 到 03:00 的数据)

Airflow 会按照这些顺序来执行这些任务,确保所有错过的调度时间点都得到处理。

下图是一个在实际生产中的案例,开始时间是凌晨0点5分,每5个小时调度一次,但开始时DAG是paused的,上午9点多手动将DAG恢复,此图是从airflow ui 中截取的,

在这里插入图片描述

笔者水平有限,若有不对的地方欢迎评论指正!

这篇关于airflow调度时间详解的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1101263

相关文章

MySQL中的分组和多表连接详解

《MySQL中的分组和多表连接详解》:本文主要介绍MySQL中的分组和多表连接的相关操作,本文通过实例代码给大家介绍的非常详细,感兴趣的朋友一起看看吧... 目录mysql中的分组和多表连接一、MySQL的分组(group javascriptby )二、多表连接(表连接会产生大量的数据垃圾)MySQL中的

Java 实用工具类Spring 的 AnnotationUtils详解

《Java实用工具类Spring的AnnotationUtils详解》Spring框架提供了一个强大的注解工具类org.springframework.core.annotation.Annot... 目录前言一、AnnotationUtils 的常用方法二、常见应用场景三、与 JDK 原生注解 API 的

Java controller接口出入参时间序列化转换操作方法(两种)

《Javacontroller接口出入参时间序列化转换操作方法(两种)》:本文主要介绍Javacontroller接口出入参时间序列化转换操作方法,本文给大家列举两种简单方法,感兴趣的朋友一起看... 目录方式一、使用注解方式二、统一配置场景:在controller编写的接口,在前后端交互过程中一般都会涉及

redis中使用lua脚本的原理与基本使用详解

《redis中使用lua脚本的原理与基本使用详解》在Redis中使用Lua脚本可以实现原子性操作、减少网络开销以及提高执行效率,下面小编就来和大家详细介绍一下在redis中使用lua脚本的原理... 目录Redis 执行 Lua 脚本的原理基本使用方法使用EVAL命令执行 Lua 脚本使用EVALSHA命令

SpringBoot3.4配置校验新特性的用法详解

《SpringBoot3.4配置校验新特性的用法详解》SpringBoot3.4对配置校验支持进行了全面升级,这篇文章为大家详细介绍了一下它们的具体使用,文中的示例代码讲解详细,感兴趣的小伙伴可以参考... 目录基本用法示例定义配置类配置 application.yml注入使用嵌套对象与集合元素深度校验开发

Python中的Walrus运算符分析示例详解

《Python中的Walrus运算符分析示例详解》Python中的Walrus运算符(:=)是Python3.8引入的一个新特性,允许在表达式中同时赋值和返回值,它的核心作用是减少重复计算,提升代码简... 目录1. 在循环中避免重复计算2. 在条件判断中同时赋值变量3. 在列表推导式或字典推导式中简化逻辑

python处理带有时区的日期和时间数据

《python处理带有时区的日期和时间数据》这篇文章主要为大家详细介绍了如何在Python中使用pytz库处理时区信息,包括获取当前UTC时间,转换为特定时区等,有需要的小伙伴可以参考一下... 目录时区基本信息python datetime使用timezonepandas处理时区数据知识延展时区基本信息

Java Stream流使用案例深入详解

《JavaStream流使用案例深入详解》:本文主要介绍JavaStream流使用案例详解,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录前言1. Lambda1.1 语法1.2 没参数只有一条语句或者多条语句1.3 一个参数只有一条语句或者多

SpringBoot整合mybatisPlus实现批量插入并获取ID详解

《SpringBoot整合mybatisPlus实现批量插入并获取ID详解》这篇文章主要为大家详细介绍了SpringBoot如何整合mybatisPlus实现批量插入并获取ID,文中的示例代码讲解详细... 目录【1】saveBATch(一万条数据总耗时:2478ms)【2】集合方式foreach(一万条数

Python装饰器之类装饰器详解

《Python装饰器之类装饰器详解》本文将详细介绍Python中类装饰器的概念、使用方法以及应用场景,并通过一个综合详细的例子展示如何使用类装饰器,希望对大家有所帮助,如有错误或未考虑完全的地方,望不... 目录1. 引言2. 装饰器的基本概念2.1. 函数装饰器复习2.2 类装饰器的定义和使用3. 类装饰