Airflow
Airflow是apache基金会下的一个任务编排框架,底层基于python实现,支持多种DB,支持Celery,Mq.内置Web服务器及前端
指引
1. Operator
这里重点讲一下BashOperator和PythonOperator.
1.1 BashOperator(Bash指令算子)
使用BashOperator可以在Bash shell执行命令,例如
run_this = BashOperator(
task_id='run_after_loop',
bash_command='echo 1',
dag=dag,
)
BashOperator支持Jinja模板,注意变量应该在env中声明
bash_task = BashOperator(
task_id="bash_task",
bash_command='echo "here is the message: \'$message\'"',
env={'message': '{{ dag_run.conf["message"] if dag_run else "" }}'},
)
1.2 PythonOperator(Python算子)
Python算子是用于直接执行Python的函数。如
def print_context(ds, **kwargs):
pprint(kwargs)
print(ds)
return 'Whatever you return gets printed in the logs'
run_this = PythonOperator(
task_id='print_the_context',
provide_context=True,
python_callable=print_context,
dag=dag,
)
传递参数
使用op_args和op_kwargs参数将附加参数传递给Python可调用。
def my_sleeping_function(random_base):
"""This is a function that will run within the DAG execution"""
time.sleep(random_base)
# Generate 5 sleeping tasks, sleeping from 0.0 to 0.4 seconds respectively
for i in range(5):
task = PythonOperator(
task_id='sleep_for_' + str(i),
python_callable=my_sleeping_function,
op_kwargs={'random_base': float(i) / 10},
dag=dag,
)
run_this >> task
模板化
Python算子同样支持Jinjia。当您将ProvidContext参数设置为True时,
airflow会传递一组额外的关键字参数:一个用于Jinja模板变量,另一个用于templates_dict参数。
template_dict参数是模板化的,因此字典中的每个值都被计算为Jinja模板。
1.3 自定义算子
airflow支持自定义算子的设计,如
from airflow.models.baseoperator import BaseOperator
from airflow.utils.decorators import apply_defaults
class HelloOperator(BaseOperator):
@apply_defaults
def __init__(
self,
name: str,
*args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.name = name
def execute(self, context):
message = "Hello {}".format(self.name)
print(message)
return message
然后可以这样用
from custom_operator.hello_operator import HelloOperator
with dag:
hello_task = HelloOperator(task_id='sample-task', name='foo_bar')
继承自BaseOperator
Hooks
由于Hooks的存在,可以实现DAG内的共享外部资源,如DB连接
class HelloDBOperator(BaseOperator):
@apply_defaults
def __init__(
self,
name: str,
mysql_conn_id: str,
database: str,
*args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.name = name
self.mysql_conn_id = mysql_conn_id
self.database = database
def execute(self, context):
hook = MySqlHook(mysql_conn_id=self.mysql_conn_id,
schema=self.database)
sql = "select name from user"
result = hook.get_first(sql)
message = "Hello {}".format(result['name'])
print(message)
return message
用户界面
airflow支持定制Operator在UI界面的颜色定制,如
class HelloOperator(BaseOperator):
ui_color = '#ff0000'
ui_fgcolor = '#000000'
....
模板
详情看https://airflow.apache.org/docs/stable/howto/custom-operator.html
2.1 管理内部连接
这里的内部连接是指DAG内部所共享使用的数据连接,这里可以在WebUI上的Admin/Connections里面编辑
详情https://airflow.apache.org/docs/stable/howto/connection/index.html
概念
1. DAGs有向无环图
Airflow中每个大的执行流都是一个有向无环图组成,其中包括了组成的算子,
不同算子间可以存在并行或依赖的关系,DAGs构成了整个调度模型的最外层任务。
1. DAG是使用Pythond定义,声明结构,表明依赖和执行顺序。
2. DAG可以声明任务的执行顺序,重试次数,依赖关系,
3. DAG是以Python文件形式,放置在项目目录/dags中,由框架动态生成
执行时间或方法等。
4. DAG只会从文件的全局代码中录入,而不会从local scope,如下列的dag2就不会被录入
dag_1 = DAG('this_dag_will_be_discovered')
def my_function():
dag_2 = DAG('but_this_dag_will_not')
my_function()
默认配置
可以通过设置一个dict为默认配置初始化DAG,从而避免每次都需要一个一个字段设置,如
default_args = {
'start_date': datetime(2016, 1, 1),
'owner': 'airflow'
}
dag = DAG('my_dag', default_args=default_args)
op = DummyOperator(task_id='dummy', dag=dag)
print(op.owner) # Airflow
上下文管理器(类似与文件IO)
with DAG('my_dag', start_date=datetime(2016, 1, 1)) as dag:
op = DummyOperator('op')
op.dag is dag # True
默认airflow只会搜索名字带airflow或者DAG的,
需要修改查找所有py文件,需要修改DAG_DISCOVERY_SAFE_MDOE配置项
2. DAG Runs(DAG 实例)
DAG实例是指一个物理上的DAG实例对象,包含了一系列的任务实例并运行在一个特定的执行时间。
DAG实例必然会有一个执行日期,无论是从调度器创建的,还是由额外的触发器创建。
实例化自同个DAG的多个DAG实例可以同时并发的执行,例如同时执行2016-10-01和2016-10-02。
execution_date(执行日期)
DAG实例及其任务实例都需要一个执行日期,但是这个执行日期是逻辑上的执行日期,而不是物理上的,
如我需要执行一个三个月前的任务,那么这是execution_date就是三个月前,而不是物理上的任务start_date。
3. Task(任务)
任务组成了DAG,一个任务代表了DAG中的一个节点,每个任务都是由一个Operator算子派生生成,如
PythonOperator执行Python代码,BashOperator执行bash指令等。
任务间的关系
下列代码创建了2个任务,并执行顺序为task_1 -> task_2
with DAG('my_dag', start_date=datetime(2016, 1, 1)) as dag:
task_1 = DummyOperator('task_1')
task_2 = DummyOperator('task_2')
task_1 >> task_2 # Define dependencies
>> 是用于建立任务关系的语法糖
task_1是task_2的upstream上游
task_2是task_1的downstream下游
任务实例
任务实例是任务的实例化体现,被赋予了一系列变量如execution_date,并且是拥有状态的物理实例。
状态包括如运行中,成功,失败,跳过,等待重试等等。
这其中的包含关系为,Tasks 组成DAG, Task instances组成DAG Run
4. 任务生命周期
详情https://airflow.apache.org/docs/stable/concepts.html#task-lifecycle
5. Operator算子
算子包括了BashOperator(Bash指令算子),DingdingOperator(阿里钉钉算子),GoogleCloudOperator(谷歌云算子),Papermill(执行JupterNotebook),PythonOperator(Python算子),
以及Cross-DAG Dependencies(用于跨DAG进行任务依赖使用),EmailOperator,SimpleHttpOperator,
MysqlOperator,Sesor…。
算子决定了一个任务该如何执行,算子通常是原子性的,只需要管好内部的执行,而无需关心与外部其他算子的关系,
而是由DAG去协调,并且通常一个DAG上的不同Task并不是运行在同一台机器上的。
通常如果两个Operator之间是不允许共享数据(这里的Operator更应该说是Task),
如果需要,建议合并为一个Operator,假如确实不能合并。可以参考XCom
DAG分配
Operator从1.8版本后支持延迟分配及推断分配。如
dag = DAG('my_dag', start_date=datetime(2016, 1, 1))
# 实例化算子时进行DAG分配,分配后则不能更换和解除分配
explicit_op = DummyOperator(task_id='op1', dag=dag)
# 延迟DAG分配
deferred_op = DummyOperator(task_id='op2')
deferred_op.dag = dag
# DAG推断分配
inferred_op = DummyOperator(task_id='op3')
inferred_op.set_upstream(deferred_op)
位移符配置依赖关系
下属写法是等效的
op1 >> op2
op1.set_downstream(op2)
op2 << op1
op2.set_upstream(op1)
多个算子可以通过这样创联关系
op1 >> op2 >> op3 << op4
# 等效于
op1.set_downstream(op2)
op2.set_downstream(op3)
op3.set_upstream(op4)
# 同时支持
op1 >> [op2, op3] >> op4
# 等同于
op1 >> op2 >> op4
op1 >> op3 >> op4
# 和
op1.set_downstream([op2, op3])
关系构建器
2.0版本将会从airflow.utils.helpers移动到airflow.models.baseoperator
用于复杂的关系构建,详情可看https://airflow.apache.org/docs/stable/concepts.html#relationship-builders
额外功能
6. Hook钩子
Hook是一系列外部平台或者数据的如Hive,S3,Mysql,HDFS,Pig等的接口,
可以作为算子构建的一部分,并独立于管道之外,独立存储在元数据库中。
7. Pool池子
pool是用于限制一系列任务同时并发执行的数量,是在Task声明时候设置,需要先于Task声明前存在,
可以在WebUI的admin/Pools中创建,同时pool的参数可以定义权重,使得任务在优先级列表中阻塞时,
可以根据权重安排执行顺序。
8. Connections连接
通过设置连接admin/connections,给与的con_id可以用于管道中的连接创建,同一个conn_id的不同连接
配置将会被随机的使用,从而实现负载均衡的效果。
9. Queues队列
任何Task都可以被声明赋予某个队列,而Worker也可以指定只关注某个队列。
详情https://airflow.apache.org/docs/stable/concepts.html#queues
10. XCom
用于Tasks间传数据,Python算子返回数据时默认会进行xcom_push,此时其他任务可以
# inside a PythonOperator called 'pushing_task'
def push_function():
return value
# inside another PythonOperator where provide_context=True
def pull_function(**context):
value = context['task_instance'].xcom_pull(task_ids='pushing_task')
详情https://airflow.apache.org/docs/stable/concepts.html#xcoms
11. Trigger Rules
用于设置每个operator的上游任务是如何状态,本身才会执行。如全部成功,全部失败,全部完成,
一个失败等等。默认的触发规则就是all_successed。
在all_successed和all_failed状态下,Skip任务会将任务状态级联到下游任务中,
详情https://airflow.apache.org/docs/stable/concepts.html#trigger-rules