工作流管理框架airflow-安装部署教程

2024-01-19 18:44

本文主要是介绍工作流管理框架airflow-安装部署教程,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1 概述

Airflow是一个以编程方式编写,用于管理和调度工作流的平台。可以帮助你定义复杂的工作流程,然后在集群上执行和监控这些工作流。

Airflow计划程序在遵循指定的依赖项,同时在一组工作线程上执行任务。丰富的命令实用程序使在DAG上执行复杂的调度变的轻而易举。Airflow的可扩展Python框架可以让你构建连接几乎任何技术的工作流程。丰富的用户界面可以随时查看生产中正在运行的管道,帮助你管理工作流程的状态,监视进度以及需要时对问题进行故障排除。

Airflow的主要组件有:

DAG(有向无环图):使用Airflow将工作流编写任务的有向无环图(DAG)。一个DAG定义了一个工作流,它包含所有任务、任务的依赖关系和时间表。

任务(Task):一个任务定义了一个单独的单元工作,有一个确定的开始和结束。一个任务可以依赖于其他任务。

运算符(Operator):一个运算符封装了一个任务,并定义了它的执行逻辑。Airflow内置了许多运算符,如BashOperator、PythonOperator、EmailOperator等。你也可以自定义运算符。

时间轴(Timeline):时间轴让你以图形方式查看 DAG 的运行情况和状态。

调度器(Scheduler):调度器监视时间轴并触发需要运行的任务。

执行器(Executor):executor负责实际运行任务。Airflow支持多种executor,如LocalExecutor, CeleryExecutor, KubernetesExecutor 等。

2 名词

(1)Dynamic:Airflow管道是用Python代码配置的,允许动态生成管道。Airflow配置需要使用Python,这允许编写可动态实例化管道的代码。

(2)Extensible:Airflow框架包含许多运算符来连接各种技术。Airflow的所有组件都是可扩展的。轻松定义自己的运算符,执行程序并扩展库,使其适合于您的环境。

(3)Elegant:Airlfow是精简灵活的,使用功能强大的Jinja模板引擎,将脚本参数化内置于Airflow的核心中。

(4)Scalable:Airflow具有模板块架构,并使用消息队列来安排任意数量的工作任务。

3 airflow优缺点

优点:

Python脚本实现DAG,非常容易扩展;

可实现复杂的依赖规则;

外部依赖较少,搭建容易,仅依赖DB和rabbitmq;

工作流依赖可视化。有一套完整的UI,可视化展现所有任务的状态及历史信息;(本人刚开始主要看重这点)

完全支持crontab定时任务格式,可以通过crontab格式指定任务何时进行;

业务代码和调度系统解耦,每个业务的流程代码以独立的Python脚本描述,里面定义了流程化的节点来执行业务逻辑,支持任务的热加载.

缺点:

Airflow是为有限的批处理工作流构建的。虽然CLI和REST API确实允许触发工作流,但Airflow不是为无限运行的基于事件的工作流构建的。Airflow不是流解决方案。然而,像Apache Kafka这样的流系统通常与Apache Airflow一起使用。Kafka可以用于实时接收和处理事件数据,事件数据被写入存储位置,Airflow定期启动处理一批数据的工作流。

如果你更喜欢点击而不是编码,Airflow可能不是正确的解决方案。Web界面旨在最大限度地简化工作流的管理,Airflow框架不断改进以最大限度地简化开发人员体验。然而,Airflow的理念是将工作流定义为代码,所以代码始终是必需的。

4 Airflow安装

airflow官网地址:https://airflow.apache.org。

1)先安装并配置好python环境(可以参考Anaconda安装即可,如果项目不需要依赖太多工具包,可选择更简洁的MiniConda)并激活。

2)安装airflow

pip install apache-airflow

3)初始化airflow

airflow db init

4)查看版本

airflow version

5)启动airflow web服务,启动后浏览器访问http://ip_address:12025(如果不知道ip地址的就用ifconfig命令去linux下获取)

airflow webserver -p 12025 -D

6)启动airflow调度

airflow scheduler -D

7)创建账号(斜杠别忘记了)

airflow users create \

  --username admin \

  --firstname trisyp \

  --lastname trisyp \

  --role Admin \

  --email trisyp@email.com

回车之后会让你输入两次password,我们就用123456

8)启动停止脚本

vim af.sh

#!/bin/bash

case $1 in

"start"){

    echo " --------启动 airflow-------"

    ssh ip_address "conda activate airflow;airflow webserver -p 12025 -D;airflow scheduler -D; conda deactivate"

};;

"stop"){

    echo " --------关闭 airflow-------"

    ps -ef|egrep 'scheduler|airflow-webserver'|grep -v grep|awk '{print $2}'|xargs kill -15

};;

esac

添加权限即可使用。

trisyp@ip_address bin]$ chmod +x af.sh

5 修改数据库为MySQL

1)先在MySQL中建库

mysql> CREATE DATABASE airflow_db CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;

2)如果报错Linux error:1425F102:SSL routines:ssl_choose_client_version:unsupported protocol,可以关闭MySQLSSL证书

查看SSL是否开启  YES为开启

mysql> SHOW VARIABLES LIKE '%ssl%';

+---------------+-----------------+

| Variable_name | Value           |

+---------------+-----------------+

| have_openssl  | YES             |

| have_ssl      | YES             |

| ssl_ca        | ca.pem          |

| ssl_capath    |                 |

| ssl_cert      | server-cert.pem |

| ssl_cipher    |                 |

| ssl_crl       |                 |

| ssl_crlpath   |                 |

| ssl_key       | server-key.pem  |

+---------------+-----------------+

3)修改配置文件my.cnf(注意:直接数据库修改值不起作用),加入以下内容:

# disable_ssl

skip_ssl

4)添加python连接的依赖,官网介绍的方法有两种:

这里我们选择mysql+mysqlconnector。

pip install mysql-connector-python

5)修改airflow的配置文件(vim ~/airflow/airflow.cfg):

[database]

# The SqlAlchemy connection string to the metadata database.

# SqlAlchemy supports many different database engines.

# More information here:

# http://airflow.apache.org/docs/apache-airflow/stable/howto/set-up-database.html#database-uri

#sql_alchemy_conn = sqlite:home/trisyp/airflow/airflow.db

sql_alchemy_conn = mysql+mysqlconnector://root:123456@ip_address:3306/airflow_db

6)关闭airflow,初始化后重启:

af.sh stop

airflow db init

af.sh start

7)若初始化报错1067 - Invalid default value for ‘update_at’:

原因:字段 'update_at' timestamp类型,取值范围是:1970-01-01 00:00:00 2037-12-31 23:59:59UTC +8 北京时间从1970-01-01 08:00:00 开始),而这里默认给了空值,所以导致失败。

推荐修改mysql存储时间戳格式:

mysql> set GLOBAL sql_mode ='STRICT_TRANS_TABLES,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION'

重启MySQL会造成参数失效(注意:这样就需要重新创建账号),推荐将参数写入到配置文件my.cnf中。

sql_mode = STRICT_TRANS_TABLES,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION

6 修改执行器

官网不推荐在开发中使用顺序执行器,会造成任务调度阻塞。

1)修改airflow的配置文件(vim ~/airflow/airflow.cfg)

[core]

# The executor class that airflow should use. Choices include

# ``SequentialExecutor``, ``LocalExecutor``, ``CeleryExecutor``, ``DaskExecutor``,

# ``KubernetesExecutor``, ``CeleryKubernetesExecutor`` or the

# full import path to the class when using a custom executor.

executor = LocalExecutor

可以使用官方推荐的几种执行器,也可以自定义。这里我们选择本地执行器即可。

7 部署使用

1)测试环境启动

本次测试使用的是spark的官方案例,所有需要启动hadoop和spark的历史服务器。

myhadoop.sh start

cd /opt/module/spark-yarn/sbin/start-history-server.sh

2)查看Airflow配置文件

vim ~/airflow/airflow.cfg

3)编写.py脚本,创建work-py目录用于存放python调度脚本

mkdir ~/airflow/dags

cd dags/

然后把脚本文件放到dags文件夹,代码如下:

from airflow import DAG

from airflow.operators.bash import BashOperator

from datetime import datetime, timedelta

default_args = {  # 设置默认参数。

    # 用户

    'owner': 'test_owner',

    # 是否开启任务依赖

    'depends_on_past': True,

    # 邮箱

    'email': ['trisyp@email.com'],

    # 启动时间

    'start_date':datetime(2022,11,28),

    # 出错是否发邮件报警

    'email_on_failure': False,

    # 重试是否发邮件报警

    'email_on_retry': False,

    # 重试次数

    'retries': 3,

    # 重试时间间隔

    'retry_delay': timedelta(minutes=5),

}

# 声明任务图,schedule_interval:调度频率。

dag = DAG('test', default_args=default_args, schedule_interval=timedelta(days=1))

 

# 创建单个任务

t1 = BashOperator(  # BashOperator:具体执行任务,如果为true前置任务必须成功完成才会走下一个依赖任务,如果为false则忽略是否成功完成。

    # 任务id:任务唯一标识(必填)。

    task_id='dwd',

    # 具体任务执行命令。

    bash_command='ssh ip_address "/opt/module/spark-yarn/bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn /opt/module/spark-yarn/examples/jars/spark-examples_2.12-3.1.3.jar 10 "',

    # 重试次数

    retries=3,

    # 把任务添加进图中

    dag=dag)

t2 = BashOperator(

    task_id='dws',

    bash_command='ssh ip_address "/opt/module/spark-yarn/bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn /opt/module/spark-yarn/examples/jars/spark-examples_2.12-3.1.3.jar 10 "',

    retries=3,

    dag=dag)

t3 = BashOperator(

    task_id='ads',

    bash_command='ssh ip_address "/opt/module/spark-yarn/bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn /opt/module/spark-yarn/examples/jars/spark-examples_2.12-3.1.3.jar 10 "',

    retries=3,

    dag=dag)

# 设置任务依赖:ads任务依赖dws任务依赖dwd任务。

t2.set_upstream(t1)

t3.set_upstream(t2)

4)等待一段时间,刷新任务列表

airflow dags list

5)已出现myairflow_execute_bash任务(刷新页面)

6)点击运行

7)查看dag图、甘特图,点击成功任务,查看日志

8)查看脚本代码

9)Dag任务操作

9.1 删除Dag任务

主要删除DAG任务不会删除底层文件,过一会还会自动加载回来。

9.2 查看当前所有dag任务

# 查看所有任务

airflow dags list

# 查看单个任务

airflow tasks list test --tree

8 配置邮件服务器

1)保证邮箱已开SMTP服务

2)修改airflow配置文件,用stmps服务对应587端口

vim ~/airflow/airflow.cfg 

smtp_host = smtp.qq.com

smtp_starttls = True

smtp_ssl = False

smtp_user = trisyp@email.com

# smtp_user =

smtp_password = qluxdbuhgrhgbigi

# smtp_password =

smtp_port = 587

smtp_mail_from = trisyp@email.com

3)重启airflow

af.sh stop

af.sh start

4)编辑test.py脚本,加入emailOperator

from airflow.operators.email_operator import EmailOperator

email=EmailOperator(

    task_id="email",

    to="yaohm163@163.com ",

    subject="test-subject",

    html_content="<h1>test-content</h1>",

    cc="trisyp@email.com ",

    dag=dag)

t2.set_upstream(t1)

t3.set_upstream(t2)

email.set_upstream(t3)

5)查看页面是否生效

6)运行测试

9 避坑指南

1)Exception rendering Jinja template for task

2)Intel MKL FATAL ERROR: Cannot load ../numexpr/../../../libmkl_rt.so.1.

强制更新airflow到最新版

3)error: subprocess-exited-with-error

解决方案:

错误有明确的提示,缺少pkg-config,所以就先安装这个包,然后在安装mysqlclient。

sudo apt-get install pkg-config

4)Can't connect to local MySQL server through socket '/tmp/mysql.sock' (2)

解决方案:

先用命令“find / -name ‘mysql.sock”来查看下这个文件所在目录,如果有就建立软连接(不要想着拷贝复制,无效的),命令是“ln -s /tmp/mysql.sock”。如果没有就找my.cnf文件,一般文件地址为/etc/mysql/my.cnf,然后通过vim加上socket路径信息,一定要加mysqld这个分组,不然会报找不到分组这个错;Found option without preceding group

5)Segmentation fault (core dumped)

解决方案:

在配置mysql存储的时候要加上mysqlconnector就解决了。这个坑非常恶心,你参照某些教程直接只配mysql,忽视了connector,碰到了还找不到解决方案,因为核心存储转移你不知道怎么搞。

cd /etc

vim profile

加入:

export AIRFLOW_HOME=/root/airflow

sudo mysql

create database airflow_db;

create user 'airflow'@'%' identified by '123456';

grant all on airflow_db .* to 'airflow'@'%';

sql_alchemy_conn = mysql://airflow:123456@10.0.0.22:3306/airflow_db

10 参考链接

https://yuchaoshui.com/1bd10cc/

这篇关于工作流管理框架airflow-安装部署教程的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/623329

相关文章

Python pandas库自学超详细教程

《Pythonpandas库自学超详细教程》文章介绍了Pandas库的基本功能、安装方法及核心操作,涵盖数据导入(CSV/Excel等)、数据结构(Series、DataFrame)、数据清洗、转换... 目录一、什么是Pandas库(1)、Pandas 应用(2)、Pandas 功能(3)、数据结构二、安

Python安装Pandas库的两种方法

《Python安装Pandas库的两种方法》本文介绍了三种安装PythonPandas库的方法,通过cmd命令行安装并解决版本冲突,手动下载whl文件安装,更换国内镜像源加速下载,最后建议用pipli... 目录方法一:cmd命令行执行pip install pandas方法二:找到pandas下载库,然后

使用IDEA部署Docker应用指南分享

《使用IDEA部署Docker应用指南分享》本文介绍了使用IDEA部署Docker应用的四步流程:创建Dockerfile、配置IDEADocker连接、设置运行调试环境、构建运行镜像,并强调需准备本... 目录一、创建 dockerfile 配置文件二、配置 IDEA 的 Docker 连接三、配置 Do

Linux系统中查询JDK安装目录的几种常用方法

《Linux系统中查询JDK安装目录的几种常用方法》:本文主要介绍Linux系统中查询JDK安装目录的几种常用方法,方法分别是通过update-alternatives、Java命令、环境变量及目... 目录方法 1:通过update-alternatives查询(推荐)方法 2:检查所有已安装的 JDK方

SQL Server安装时候没有中文选项的解决方法

《SQLServer安装时候没有中文选项的解决方法》用户安装SQLServer时界面全英文,无中文选项,通过修改安装设置中的国家或地区为中文中国,重启安装程序后界面恢复中文,解决了问题,对SQLSe... 你是不是在安装SQL Server时候发现安装界面和别人不同,并且无论如何都没有中文选项?这个问题也

2025版mysql8.0.41 winx64 手动安装详细教程

《2025版mysql8.0.41winx64手动安装详细教程》本文指导Windows系统下MySQL安装配置,包含解压、设置环境变量、my.ini配置、初始化密码获取、服务安装与手动启动等步骤,... 目录一、下载安装包二、配置环境变量三、安装配置四、启动 mysql 服务,修改密码一、下载安装包安装地

Redis MCP 安装与配置指南

《RedisMCP安装与配置指南》本文将详细介绍如何安装和配置RedisMCP,包括快速启动、源码安装、Docker安装、以及相关的配置参数和环境变量设置,感兴趣的朋友一起看看吧... 目录一、Redis MCP 简介二、安www.chinasem.cn装 Redis MCP 服务2.1 快速启动(推荐)2.

在macOS上安装jenv管理JDK版本的详细步骤

《在macOS上安装jenv管理JDK版本的详细步骤》jEnv是一个命令行工具,正如它的官网所宣称的那样,它是来让你忘记怎么配置JAVA_HOME环境变量的神队友,:本文主要介绍在macOS上安装... 目录前言安装 jenv添加 JDK 版本到 jenv切换 JDK 版本总结前言China编程在开发 Java

Spring Boot Actuator应用监控与管理的详细步骤

《SpringBootActuator应用监控与管理的详细步骤》SpringBootActuator是SpringBoot的监控工具,提供健康检查、性能指标、日志管理等核心功能,支持自定义和扩展端... 目录一、 Spring Boot Actuator 概述二、 集成 Spring Boot Actuat

电脑提示d3dx11_43.dll缺失怎么办? DLL文件丢失的多种修复教程

《电脑提示d3dx11_43.dll缺失怎么办?DLL文件丢失的多种修复教程》在使用电脑玩游戏或运行某些图形处理软件时,有时会遇到系统提示“d3dx11_43.dll缺失”的错误,下面我们就来分享超... 在计算机使用过程中,我们可能会遇到一些错误提示,其中之一就是缺失某个dll文件。其中,d3dx11_4