Airflow

Airflow是apache基金会下的一个任务编排框架,底层基于python实现,支持多种DB,支持Celery,Mq.内置Web服务器及前端

指引

1. Operator

这里重点讲一下BashOperator和PythonOperator.

1.1 BashOperator(Bash指令算子)

使用BashOperator可以在Bash shell执行命令,例如

run_this = BashOperator(
    task_id='run_after_loop',
    bash_command='echo 1',
    dag=dag,
)

BashOperator支持Jinja模板,注意变量应该在env中声明

bash_task = BashOperator(
    task_id="bash_task",
    bash_command='echo "here is the message: \'$message\'"',
    env={'message': '{{ dag_run.conf["message"] if dag_run else "" }}'},
)

1.2 PythonOperator(Python算子)

Python算子是用于直接执行Python的函数。如

def print_context(ds, **kwargs):
    pprint(kwargs)
    print(ds)
    return 'Whatever you return gets printed in the logs'


run_this = PythonOperator(
    task_id='print_the_context',
    provide_context=True,
    python_callable=print_context,
    dag=dag,
)

传递参数

使用op_args和op_kwargs参数将附加参数传递给Python可调用。

def my_sleeping_function(random_base):
    """This is a function that will run within the DAG execution"""
    time.sleep(random_base)


# Generate 5 sleeping tasks, sleeping from 0.0 to 0.4 seconds respectively
for i in range(5):
    task = PythonOperator(
        task_id='sleep_for_' + str(i),
        python_callable=my_sleeping_function,
        op_kwargs={'random_base': float(i) / 10},
        dag=dag,
    )

    run_this >> task

模板化

Python算子同样支持Jinjia。当您将ProvidContext参数设置为True时,
airflow会传递一组额外的关键字参数:一个用于Jinja模板变量,另一个用于templates_dict参数。
template_dict参数是模板化的,因此字典中的每个值都被计算为Jinja模板。

1.3 自定义算子

airflow支持自定义算子的设计,如

from airflow.models.baseoperator import BaseOperator
from airflow.utils.decorators import apply_defaults

class HelloOperator(BaseOperator):

    @apply_defaults
    def __init__(
            self,
            name: str,
            *args, **kwargs) -> None:
        super().__init__(*args, **kwargs)
        self.name = name

    def execute(self, context):
        message = "Hello {}".format(self.name)
        print(message)
        return message

然后可以这样用

from custom_operator.hello_operator import HelloOperator

with dag:
    hello_task = HelloOperator(task_id='sample-task', name='foo_bar')

继承自BaseOperator

Hooks

由于Hooks的存在,可以实现DAG内的共享外部资源,如DB连接

class HelloDBOperator(BaseOperator):

        @apply_defaults
        def __init__(
                self,
                name: str,
                mysql_conn_id: str,
                database: str,
                *args, **kwargs) -> None:
            super().__init__(*args, **kwargs)
            self.name = name
            self.mysql_conn_id = mysql_conn_id
            self.database = database

        def execute(self, context):
            hook = MySqlHook(mysql_conn_id=self.mysql_conn_id,
                     schema=self.database)
            sql = "select name from user"
            result = hook.get_first(sql)
            message = "Hello {}".format(result['name'])
            print(message)
            return message

用户界面

airflow支持定制Operator在UI界面的颜色定制,如

class HelloOperator(BaseOperator):
    ui_color = '#ff0000'
    ui_fgcolor = '#000000'
    ....

模板

详情看https://airflow.apache.org/docs/stable/howto/custom-operator.html

2.1 管理内部连接

这里的内部连接是指DAG内部所共享使用的数据连接,这里可以在WebUI上的Admin/Connections里面编辑

详情https://airflow.apache.org/docs/stable/howto/connection/index.html

概念

1. DAGs有向无环图

Airflow中每个大的执行流都是一个有向无环图组成,其中包括了组成的算子,
不同算子间可以存在并行或依赖的关系,DAGs构成了整个调度模型的最外层任务。
1. DAG是使用Pythond定义,声明结构,表明依赖和执行顺序。
2. DAG可以声明任务的执行顺序,重试次数,依赖关系,
3. DAG是以Python文件形式,放置在项目目录/dags中,由框架动态生成
执行时间或方法等。
4. DAG只会从文件的全局代码中录入,而不会从local scope,如下列的dag2就不会被录入

dag_1 = DAG('this_dag_will_be_discovered')

def my_function():
    dag_2 = DAG('but_this_dag_will_not')

my_function()

默认配置

可以通过设置一个dict为默认配置初始化DAG,从而避免每次都需要一个一个字段设置,如

default_args = {
    'start_date': datetime(2016, 1, 1),
    'owner': 'airflow'
}

dag = DAG('my_dag', default_args=default_args)
op = DummyOperator(task_id='dummy', dag=dag)
print(op.owner) # Airflow

上下文管理器(类似与文件IO)

with DAG('my_dag', start_date=datetime(2016, 1, 1)) as dag:
    op = DummyOperator('op')

op.dag is dag # True

默认airflow只会搜索名字带airflow或者DAG的,
需要修改查找所有py文件,需要修改DAG_DISCOVERY_SAFE_MDOE配置项

2. DAG Runs(DAG 实例)

DAG实例是指一个物理上的DAG实例对象,包含了一系列的任务实例并运行在一个特定的执行时间。
DAG实例必然会有一个执行日期,无论是从调度器创建的,还是由额外的触发器创建。

实例化自同个DAG的多个DAG实例可以同时并发的执行,例如同时执行2016-10-01和2016-10-02。

execution_date(执行日期)

DAG实例及其任务实例都需要一个执行日期,但是这个执行日期是逻辑上的执行日期,而不是物理上的,
如我需要执行一个三个月前的任务,那么这是execution_date就是三个月前,而不是物理上的任务start_date。

3. Task(任务)

任务组成了DAG,一个任务代表了DAG中的一个节点,每个任务都是由一个Operator算子派生生成,如
PythonOperator执行Python代码,BashOperator执行bash指令等。

任务间的关系

下列代码创建了2个任务,并执行顺序为task_1 -> task_2

with DAG('my_dag', start_date=datetime(2016, 1, 1)) as dag:
    task_1 = DummyOperator('task_1')
    task_2 = DummyOperator('task_2')
    task_1 >> task_2 # Define dependencies
    >> 是用于建立任务关系的语法糖
    task_1是task_2的upstream上游
    task_2是task_1的downstream下游

任务实例

任务实例是任务的实例化体现,被赋予了一系列变量如execution_date,并且是拥有状态的物理实例。
状态包括如运行中,成功,失败,跳过,等待重试等等。

这其中的包含关系为,Tasks 组成DAG, Task instances组成DAG Run

4. 任务生命周期

详情https://airflow.apache.org/docs/stable/concepts.html#task-lifecycle

5. Operator算子

算子包括了BashOperator(Bash指令算子),DingdingOperator(阿里钉钉算子),GoogleCloudOperator(谷歌云算子),Papermill(执行JupterNotebook),PythonOperator(Python算子),
以及Cross-DAG Dependencies(用于跨DAG进行任务依赖使用),EmailOperator,SimpleHttpOperator,
MysqlOperator,Sesor…。

算子决定了一个任务该如何执行,算子通常是原子性的,只需要管好内部的执行,而无需关心与外部其他算子的关系,
而是由DAG去协调,并且通常一个DAG上的不同Task并不是运行在同一台机器上的。

通常如果两个Operator之间是不允许共享数据(这里的Operator更应该说是Task),
如果需要,建议合并为一个Operator,假如确实不能合并。可以参考XCom

DAG分配

Operator从1.8版本后支持延迟分配及推断分配。如

dag = DAG('my_dag', start_date=datetime(2016, 1, 1))

# 实例化算子时进行DAG分配,分配后则不能更换和解除分配
explicit_op = DummyOperator(task_id='op1', dag=dag)

# 延迟DAG分配
deferred_op = DummyOperator(task_id='op2')
deferred_op.dag = dag

# DAG推断分配
inferred_op = DummyOperator(task_id='op3')
inferred_op.set_upstream(deferred_op)

位移符配置依赖关系

下属写法是等效的

op1 >> op2
op1.set_downstream(op2)

op2 << op1
op2.set_upstream(op1)

多个算子可以通过这样创联关系

op1 >> op2 >> op3 << op4

# 等效于
op1.set_downstream(op2)
op2.set_downstream(op3)
op3.set_upstream(op4)

# 同时支持
op1 >> [op2, op3] >> op4
# 等同于
op1 >> op2 >> op4
op1 >> op3 >> op4
# 和
op1.set_downstream([op2, op3])

关系构建器

2.0版本将会从airflow.utils.helpers移动到airflow.models.baseoperator
用于复杂的关系构建,详情可看https://airflow.apache.org/docs/stable/concepts.html#relationship-builders

额外功能

6. Hook钩子

Hook是一系列外部平台或者数据的如Hive,S3,Mysql,HDFS,Pig等的接口,
可以作为算子构建的一部分,并独立于管道之外,独立存储在元数据库中。

7. Pool池子

pool是用于限制一系列任务同时并发执行的数量,是在Task声明时候设置,需要先于Task声明前存在,
可以在WebUI的admin/Pools中创建,同时pool的参数可以定义权重,使得任务在优先级列表中阻塞时,
可以根据权重安排执行顺序。

8. Connections连接

通过设置连接admin/connections,给与的con_id可以用于管道中的连接创建,同一个conn_id的不同连接
配置将会被随机的使用,从而实现负载均衡的效果。

9. Queues队列

任何Task都可以被声明赋予某个队列,而Worker也可以指定只关注某个队列。
详情https://airflow.apache.org/docs/stable/concepts.html#queues

10. XCom

用于Tasks间传数据,Python算子返回数据时默认会进行xcom_push,此时其他任务可以

# inside a PythonOperator called 'pushing_task'
def push_function():
    return value

# inside another PythonOperator where provide_context=True
def pull_function(**context):
    value = context['task_instance'].xcom_pull(task_ids='pushing_task')

详情https://airflow.apache.org/docs/stable/concepts.html#xcoms

11. Trigger Rules

用于设置每个operator的上游任务是如何状态,本身才会执行。如全部成功,全部失败,全部完成,
一个失败等等。默认的触发规则就是all_successed。

在all_successed和all_failed状态下,Skip任务会将任务状态级联到下游任务中,
详情https://airflow.apache.org/docs/stable/concepts.html#trigger-rules

KAI Airflow, 开源组件