{"id":534,"date":"2020-11-02T03:32:18","date_gmt":"2020-11-02T03:32:18","guid":{"rendered":"https:\/\/blog.kaispace.cn\/?p=534"},"modified":"2020-11-02T03:32:18","modified_gmt":"2020-11-02T03:32:18","slug":"airflow%e9%9a%8f%e7%ac%94","status":"publish","type":"post","link":"https:\/\/blog.kaispace.cn\/?p=534","title":{"rendered":"Airflow\u968f\u7b14"},"content":{"rendered":"<h1>Airflow<\/h1>\n<p>Airflow\u662fapache\u57fa\u91d1\u4f1a\u4e0b\u7684\u4e00\u4e2a\u4efb\u52a1\u7f16\u6392\u6846\u67b6\uff0c\u5e95\u5c42\u57fa\u4e8epython\u5b9e\u73b0\uff0c\u652f\u6301\u591a\u79cdDB\uff0c\u652f\u6301Celery,Mq.\u5185\u7f6eWeb\u670d\u52a1\u5668\u53ca\u524d\u7aef<\/p>\n<h2>\u6307\u5f15<\/h2>\n<h2>1. Operator<\/h2>\n<p>\u8fd9\u91cc\u91cd\u70b9\u8bb2\u4e00\u4e0bBashOperator\u548cPythonOperator.<\/p>\n<h3>1.1 BashOperator(Bash\u6307\u4ee4\u7b97\u5b50)<\/h3>\n<p>\u4f7f\u7528BashOperator\u53ef\u4ee5\u5728Bash shell\u6267\u884c\u547d\u4ee4\uff0c\u4f8b\u5982<\/p>\n<pre><code class=\"language-python line-numbers\">run_this = BashOperator(\n    task_id='run_after_loop',\n    bash_command='echo 1',\n    dag=dag,\n)\n<\/code><\/pre>\n<p>BashOperator\u652f\u6301Jinja\u6a21\u677f\uff0c<font color='red'>\u6ce8\u610f\u53d8\u91cf\u5e94\u8be5\u5728env\u4e2d\u58f0\u660e<\/font><\/p>\n<pre><code class=\"language-python line-numbers\">bash_task = BashOperator(\n    task_id=\"bash_task\",\n    bash_command='echo \"here is the message: \\'$message\\'\"',\n    env={'message': '{{ dag_run.conf[\"message\"] if dag_run else \"\" }}'},\n)\n<\/code><\/pre>\n<h3>1.2 PythonOperator(Python\u7b97\u5b50)<\/h3>\n<p>Python\u7b97\u5b50\u662f\u7528\u4e8e\u76f4\u63a5\u6267\u884cPython\u7684\u51fd\u6570\u3002\u5982<\/p>\n<pre><code class=\"language-Python line-numbers\">def print_context(ds, **kwargs):\n    pprint(kwargs)\n    print(ds)\n    return 'Whatever you return gets printed in the logs'\n\n\nrun_this = PythonOperator(\n    task_id='print_the_context',\n    provide_context=True,\n    python_callable=print_context,\n    dag=dag,\n)\n<\/code><\/pre>\n<h4>\u4f20\u9012\u53c2\u6570<\/h4>\n<p>\u4f7f\u7528op_args\u548cop_kwargs\u53c2\u6570\u5c06\u9644\u52a0\u53c2\u6570\u4f20\u9012\u7ed9Python\u53ef\u8c03\u7528\u3002<\/p>\n<pre><code class=\"language-Python line-numbers\">def my_sleeping_function(random_base):\n    \"\"\"This is a function that will run within the DAG execution\"\"\"\n    time.sleep(random_base)\n\n\n# Generate 5 sleeping tasks, sleeping from 0.0 to 0.4 seconds respectively\nfor i in range(5):\n    task = PythonOperator(\n        task_id='sleep_for_' + str(i),\n        python_callable=my_sleeping_function,\n        op_kwargs={'random_base': float(i) \/ 10},\n        dag=dag,\n    )\n\n    run_this &gt;&gt; task\n<\/code><\/pre>\n<h4>\u6a21\u677f\u5316<\/h4>\n<p>Python\u7b97\u5b50\u540c\u6837\u652f\u6301Jinjia\u3002\u5f53\u60a8\u5c06ProvidContext\u53c2\u6570\u8bbe\u7f6e\u4e3aTrue\u65f6\uff0c<br \/>\nairflow\u4f1a\u4f20\u9012\u4e00\u7ec4\u989d\u5916\u7684\u5173\u952e\u5b57\u53c2\u6570\uff1a\u4e00\u4e2a\u7528\u4e8eJinja\u6a21\u677f\u53d8\u91cf\uff0c\u53e6\u4e00\u4e2a\u7528\u4e8etemplates_dict\u53c2\u6570\u3002<br \/>\ntemplate_dict\u53c2\u6570\u662f\u6a21\u677f\u5316\u7684\uff0c\u56e0\u6b64\u5b57\u5178\u4e2d\u7684\u6bcf\u4e2a\u503c\u90fd\u88ab\u8ba1\u7b97\u4e3aJinja\u6a21\u677f\u3002<\/p>\n<h2>1.3 \u81ea\u5b9a\u4e49\u7b97\u5b50<\/h2>\n<p>airflow\u652f\u6301\u81ea\u5b9a\u4e49\u7b97\u5b50\u7684\u8bbe\u8ba1\uff0c\u5982<\/p>\n<pre><code class=\"language-Python line-numbers\">from airflow.models.baseoperator import BaseOperator\nfrom airflow.utils.decorators import apply_defaults\n\nclass HelloOperator(BaseOperator):\n\n    @apply_defaults\n    def __init__(\n            self,\n            name: str,\n            *args, **kwargs) -&gt; None:\n        super().__init__(*args, **kwargs)\n        self.name = name\n\n    def execute(self, context):\n        message = \"Hello {}\".format(self.name)\n        print(message)\n        return message\n\n\u7136\u540e\u53ef\u4ee5\u8fd9\u6837\u7528\n\nfrom custom_operator.hello_operator import HelloOperator\n\nwith dag:\n    hello_task = HelloOperator(task_id='sample-task', name='foo_bar')\n<\/code><\/pre>\n<p>\u7ee7\u627f\u81eaBaseOperator<\/p>\n<h4>Hooks<\/h4>\n<p>\u7531\u4e8eHooks\u7684\u5b58\u5728\uff0c\u53ef\u4ee5\u5b9e\u73b0DAG\u5185\u7684\u5171\u4eab\u5916\u90e8\u8d44\u6e90\uff0c\u5982DB\u8fde\u63a5<\/p>\n<pre><code class=\"language-Python line-numbers\">class HelloDBOperator(BaseOperator):\n\n        @apply_defaults\n        def __init__(\n                self,\n                name: str,\n                mysql_conn_id: str,\n                database: str,\n                *args, **kwargs) -&gt; None:\n            super().__init__(*args, **kwargs)\n            self.name = name\n            self.mysql_conn_id = mysql_conn_id\n            self.database = database\n\n        def execute(self, context):\n            hook = MySqlHook(mysql_conn_id=self.mysql_conn_id,\n                     schema=self.database)\n            sql = \"select name from user\"\n            result = hook.get_first(sql)\n            message = \"Hello {}\".format(result['name'])\n            print(message)\n            return message\n<\/code><\/pre>\n<h4>\u7528\u6237\u754c\u9762<\/h4>\n<p>airflow\u652f\u6301\u5b9a\u5236Operator\u5728UI\u754c\u9762\u7684\u989c\u8272\u5b9a\u5236\uff0c\u5982<\/p>\n<pre><code class=\"language-Python line-numbers\">class HelloOperator(BaseOperator):\n    ui_color = '#ff0000'\n    ui_fgcolor = '#000000'\n    ....\n<\/code><\/pre>\n<h4>\u6a21\u677f<\/h4>\n<p>\u8be6\u60c5\u770bhttps:\/\/airflow.apache.org\/docs\/stable\/howto\/custom-operator.html<\/p>\n<h3>2.1 \u7ba1\u7406\u5185\u90e8\u8fde\u63a5<\/h3>\n<p>\u8fd9\u91cc\u7684\u5185\u90e8\u8fde\u63a5\u662f\u6307DAG\u5185\u90e8\u6240\u5171\u4eab\u4f7f\u7528\u7684\u6570\u636e\u8fde\u63a5\uff0c\u8fd9\u91cc\u53ef\u4ee5\u5728WebUI\u4e0a\u7684Admin\/Connections\u91cc\u9762\u7f16\u8f91<\/p>\n<p>\u8be6\u60c5https:\/\/airflow.apache.org\/docs\/stable\/howto\/connection\/index.html<\/p>\n<h2>\u6982\u5ff5<\/h2>\n<h3>1. DAGs\u6709\u5411\u65e0\u73af\u56fe<\/h3>\n<p>Airflow\u4e2d\u6bcf\u4e2a\u5927\u7684\u6267\u884c\u6d41\u90fd\u662f\u4e00\u4e2a\u6709\u5411\u65e0\u73af\u56fe\u7ec4\u6210\uff0c\u5176\u4e2d\u5305\u62ec\u4e86\u7ec4\u6210\u7684\u7b97\u5b50\uff0c<br \/>\n\u4e0d\u540c\u7b97\u5b50\u95f4\u53ef\u4ee5\u5b58\u5728\u5e76\u884c\u6216\u4f9d\u8d56\u7684\u5173\u7cfb\uff0cDAGs\u6784\u6210\u4e86\u6574\u4e2a\u8c03\u5ea6\u6a21\u578b\u7684\u6700\u5916\u5c42\u4efb\u52a1\u3002<br \/>\n1. DAG\u662f\u4f7f\u7528Pythond\u5b9a\u4e49\uff0c\u58f0\u660e\u7ed3\u6784\uff0c\u8868\u660e\u4f9d\u8d56\u548c\u6267\u884c\u987a\u5e8f\u3002<br \/>\n2. DAG\u53ef\u4ee5\u58f0\u660e\u4efb\u52a1\u7684\u6267\u884c\u987a\u5e8f\uff0c\u91cd\u8bd5\u6b21\u6570\uff0c\u4f9d\u8d56\u5173\u7cfb\uff0c<br \/>\n3. DAG\u662f\u4ee5Python\u6587\u4ef6\u5f62\u5f0f\uff0c\u653e\u7f6e\u5728\u9879\u76ee\u76ee\u5f55\/dags\u4e2d\uff0c\u7531\u6846\u67b6\u52a8\u6001\u751f\u6210<br \/>\n\u6267\u884c\u65f6\u95f4\u6216\u65b9\u6cd5\u7b49\u3002<br \/>\n4. DAG\u53ea\u4f1a\u4ece\u6587\u4ef6\u7684\u5168\u5c40\u4ee3\u7801\u4e2d\u5f55\u5165\uff0c\u800c\u4e0d\u4f1a\u4ecelocal scope,\u5982\u4e0b\u5217\u7684dag2\u5c31\u4e0d\u4f1a\u88ab\u5f55\u5165<\/p>\n<pre><code class=\"language-Python line-numbers\">dag_1 = DAG('this_dag_will_be_discovered')\n\ndef my_function():\n    dag_2 = DAG('but_this_dag_will_not')\n\nmy_function()\n<\/code><\/pre>\n<h4>\u9ed8\u8ba4\u914d\u7f6e<\/h4>\n<p>\u53ef\u4ee5\u901a\u8fc7\u8bbe\u7f6e\u4e00\u4e2adict\u4e3a\u9ed8\u8ba4\u914d\u7f6e\u521d\u59cb\u5316DAG\uff0c\u4ece\u800c\u907f\u514d\u6bcf\u6b21\u90fd\u9700\u8981\u4e00\u4e2a\u4e00\u4e2a\u5b57\u6bb5\u8bbe\u7f6e\uff0c\u5982<\/p>\n<pre><code class=\"language-Python line-numbers\">default_args = {\n    'start_date': datetime(2016, 1, 1),\n    'owner': 'airflow'\n}\n\ndag = DAG('my_dag', default_args=default_args)\nop = DummyOperator(task_id='dummy', dag=dag)\nprint(op.owner) # Airflow\n<\/code><\/pre>\n<h4>\u4e0a\u4e0b\u6587\u7ba1\u7406\u5668\uff08\u7c7b\u4f3c\u4e0e\u6587\u4ef6IO\uff09<\/h4>\n<pre><code class=\"language-python line-numbers\">with DAG('my_dag', start_date=datetime(2016, 1, 1)) as dag:\n    op = DummyOperator('op')\n\nop.dag is dag # True\n<\/code><\/pre>\n<p><font color='red'>\u9ed8\u8ba4airflow\u53ea\u4f1a\u641c\u7d22\u540d\u5b57\u5e26airflow\u6216\u8005DAG\u7684\uff0c<br \/>\n\u9700\u8981\u4fee\u6539\u67e5\u627e\u6240\u6709py\u6587\u4ef6\uff0c\u9700\u8981\u4fee\u6539DAG_DISCOVERY_SAFE_MDOE\u914d\u7f6e\u9879<\/font><\/p>\n<h3>2. DAG Runs(DAG \u5b9e\u4f8b)<\/h3>\n<p>DAG\u5b9e\u4f8b\u662f\u6307\u4e00\u4e2a\u7269\u7406\u4e0a\u7684DAG\u5b9e\u4f8b\u5bf9\u8c61\uff0c\u5305\u542b\u4e86\u4e00\u7cfb\u5217\u7684\u4efb\u52a1\u5b9e\u4f8b\u5e76\u8fd0\u884c\u5728\u4e00\u4e2a\u7279\u5b9a\u7684\u6267\u884c\u65f6\u95f4\u3002<br \/>\nDAG\u5b9e\u4f8b\u5fc5\u7136\u4f1a\u6709\u4e00\u4e2a\u6267\u884c\u65e5\u671f\uff0c\u65e0\u8bba\u662f\u4ece\u8c03\u5ea6\u5668\u521b\u5efa\u7684\uff0c\u8fd8\u662f\u7531\u989d\u5916\u7684\u89e6\u53d1\u5668\u521b\u5efa\u3002<\/p>\n<p>\u5b9e\u4f8b\u5316\u81ea\u540c\u4e2aDAG\u7684\u591a\u4e2aDAG\u5b9e\u4f8b\u53ef\u4ee5\u540c\u65f6\u5e76\u53d1\u7684\u6267\u884c\uff0c\u4f8b\u5982\u540c\u65f6\u6267\u884c2016-10-01\u548c2016-10-02\u3002<\/p>\n<h4>execution_date(\u6267\u884c\u65e5\u671f)<\/h4>\n<p>DAG\u5b9e\u4f8b\u53ca\u5176\u4efb\u52a1\u5b9e\u4f8b\u90fd\u9700\u8981\u4e00\u4e2a\u6267\u884c\u65e5\u671f\uff0c\u4f46\u662f\u8fd9\u4e2a\u6267\u884c\u65e5\u671f\u662f\u903b\u8f91\u4e0a\u7684\u6267\u884c\u65e5\u671f\uff0c\u800c\u4e0d\u662f\u7269\u7406\u4e0a\u7684\uff0c<br \/>\n\u5982\u6211\u9700\u8981\u6267\u884c\u4e00\u4e2a\u4e09\u4e2a\u6708\u524d\u7684\u4efb\u52a1\uff0c\u90a3\u4e48\u8fd9\u662fexecution_date\u5c31\u662f\u4e09\u4e2a\u6708\u524d\uff0c\u800c\u4e0d\u662f\u7269\u7406\u4e0a\u7684\u4efb\u52a1start_date\u3002<\/p>\n<h3>3. Task(\u4efb\u52a1)<\/h3>\n<p>\u4efb\u52a1\u7ec4\u6210\u4e86DAG\uff0c\u4e00\u4e2a\u4efb\u52a1\u4ee3\u8868\u4e86DAG\u4e2d\u7684\u4e00\u4e2a\u8282\u70b9\uff0c\u6bcf\u4e2a\u4efb\u52a1\u90fd\u662f\u7531\u4e00\u4e2aOperator\u7b97\u5b50\u6d3e\u751f\u751f\u6210\uff0c\u5982<br \/>\nPythonOperator\u6267\u884cPython\u4ee3\u7801\uff0cBashOperator\u6267\u884cbash\u6307\u4ee4\u7b49\u3002<\/p>\n<h4>\u4efb\u52a1\u95f4\u7684\u5173\u7cfb<\/h4>\n<p>\u4e0b\u5217\u4ee3\u7801\u521b\u5efa\u4e862\u4e2a\u4efb\u52a1\uff0c\u5e76\u6267\u884c\u987a\u5e8f\u4e3atask_1 -> task_2<\/p>\n<pre><code class=\"language-python line-numbers\">with DAG('my_dag', start_date=datetime(2016, 1, 1)) as dag:\n    task_1 = DummyOperator('task_1')\n    task_2 = DummyOperator('task_2')\n    task_1 &gt;&gt; task_2 # Define dependencies\n    &gt;&gt; \u662f\u7528\u4e8e\u5efa\u7acb\u4efb\u52a1\u5173\u7cfb\u7684\u8bed\u6cd5\u7cd6\n    task_1\u662ftask_2\u7684upstream\u4e0a\u6e38\n    task_2\u662ftask_1\u7684downstream\u4e0b\u6e38\n<\/code><\/pre>\n<h4>\u4efb\u52a1\u5b9e\u4f8b<\/h4>\n<p>\u4efb\u52a1\u5b9e\u4f8b\u662f\u4efb\u52a1\u7684\u5b9e\u4f8b\u5316\u4f53\u73b0\uff0c\u88ab\u8d4b\u4e88\u4e86\u4e00\u7cfb\u5217\u53d8\u91cf\u5982execution_date,\u5e76\u4e14\u662f\u62e5\u6709\u72b6\u6001\u7684\u7269\u7406\u5b9e\u4f8b\u3002<br \/>\n\u72b6\u6001\u5305\u62ec\u5982\u8fd0\u884c\u4e2d\uff0c\u6210\u529f\uff0c\u5931\u8d25\uff0c\u8df3\u8fc7\uff0c\u7b49\u5f85\u91cd\u8bd5\u7b49\u7b49\u3002<\/p>\n<p>\u8fd9\u5176\u4e2d\u7684\u5305\u542b\u5173\u7cfb\u4e3a\uff0cTasks \u7ec4\u6210DAG, Task instances\u7ec4\u6210DAG Run<\/p>\n<h3>4. \u4efb\u52a1\u751f\u547d\u5468\u671f<\/h3>\n<p>\u8be6\u60c5https:\/\/airflow.apache.org\/docs\/stable\/concepts.html#task-lifecycle<\/p>\n<h3>5. Operator\u7b97\u5b50<\/h3>\n<p>\u7b97\u5b50\u5305\u62ec\u4e86BashOperator(Bash\u6307\u4ee4\u7b97\u5b50),DingdingOperator(\u963f\u91cc\u9489\u9489\u7b97\u5b50),GoogleCloudOperator(\u8c37\u6b4c\u4e91\u7b97\u5b50),Papermill(\u6267\u884cJupterNotebook),PythonOperator(Python\u7b97\u5b50),<br \/>\n\u4ee5\u53caCross-DAG Dependencies(\u7528\u4e8e\u8de8DAG\u8fdb\u884c\u4efb\u52a1\u4f9d\u8d56\u4f7f\u7528)\uff0cEmailOperator,SimpleHttpOperator,<br \/>\nMysqlOperator,Sesor&#8230;\u3002<\/p>\n<p>\u7b97\u5b50\u51b3\u5b9a\u4e86\u4e00\u4e2a\u4efb\u52a1\u8be5\u5982\u4f55\u6267\u884c\uff0c\u7b97\u5b50\u901a\u5e38\u662f\u539f\u5b50\u6027\u7684\uff0c\u53ea\u9700\u8981\u7ba1\u597d\u5185\u90e8\u7684\u6267\u884c\uff0c\u800c\u65e0\u9700\u5173\u5fc3\u4e0e\u5916\u90e8\u5176\u4ed6\u7b97\u5b50\u7684\u5173\u7cfb\uff0c<br \/>\n\u800c\u662f\u7531DAG\u53bb\u534f\u8c03\uff0c\u5e76\u4e14\u901a\u5e38\u4e00\u4e2aDAG\u4e0a\u7684\u4e0d\u540cTask\u5e76\u4e0d\u662f\u8fd0\u884c\u5728\u540c\u4e00\u53f0\u673a\u5668\u4e0a\u7684\u3002<\/p>\n<p>\u901a\u5e38\u5982\u679c\u4e24\u4e2aOperator\u4e4b\u95f4\u662f\u4e0d\u5141\u8bb8\u5171\u4eab\u6570\u636e(\u8fd9\u91cc\u7684Operator\u66f4\u5e94\u8be5\u8bf4\u662fTask),<br \/>\n\u5982\u679c\u9700\u8981\uff0c\u5efa\u8bae\u5408\u5e76\u4e3a\u4e00\u4e2aOperator\uff0c\u5047\u5982\u786e\u5b9e\u4e0d\u80fd\u5408\u5e76\u3002\u53ef\u4ee5\u53c2\u8003XCom<\/p>\n<h4>DAG\u5206\u914d<\/h4>\n<p>Operator\u4ece1.8\u7248\u672c\u540e\u652f\u6301\u5ef6\u8fdf\u5206\u914d\u53ca\u63a8\u65ad\u5206\u914d\u3002\u5982<\/p>\n<pre><code class=\"language-python line-numbers\">dag = DAG('my_dag', start_date=datetime(2016, 1, 1))\n\n# \u5b9e\u4f8b\u5316\u7b97\u5b50\u65f6\u8fdb\u884cDAG\u5206\u914d\uff0c\u5206\u914d\u540e\u5219\u4e0d\u80fd\u66f4\u6362\u548c\u89e3\u9664\u5206\u914d\nexplicit_op = DummyOperator(task_id='op1', dag=dag)\n\n# \u5ef6\u8fdfDAG\u5206\u914d\ndeferred_op = DummyOperator(task_id='op2')\ndeferred_op.dag = dag\n\n# DAG\u63a8\u65ad\u5206\u914d\ninferred_op = DummyOperator(task_id='op3')\ninferred_op.set_upstream(deferred_op)\n<\/code><\/pre>\n<h4>\u4f4d\u79fb\u7b26\u914d\u7f6e\u4f9d\u8d56\u5173\u7cfb<\/h4>\n<p>\u4e0b\u5c5e\u5199\u6cd5\u662f\u7b49\u6548\u7684<\/p>\n<pre><code class=\"language-python line-numbers\">op1 &gt;&gt; op2\nop1.set_downstream(op2)\n\nop2 &lt;&lt; op1\nop2.set_upstream(op1)\n<\/code><\/pre>\n<p>\u591a\u4e2a\u7b97\u5b50\u53ef\u4ee5\u901a\u8fc7\u8fd9\u6837\u521b\u8054\u5173\u7cfb<\/p>\n<pre><code class=\"language-Python line-numbers\">op1 &gt;&gt; op2 &gt;&gt; op3 &lt;&lt; op4\n\n# \u7b49\u6548\u4e8e\nop1.set_downstream(op2)\nop2.set_downstream(op3)\nop3.set_upstream(op4)\n\n# \u540c\u65f6\u652f\u6301\nop1 &gt;&gt; [op2, op3] &gt;&gt; op4\n# \u7b49\u540c\u4e8e\nop1 &gt;&gt; op2 &gt;&gt; op4\nop1 &gt;&gt; op3 &gt;&gt; op4\n# \u548c\nop1.set_downstream([op2, op3])\n<\/code><\/pre>\n<h4>\u5173\u7cfb\u6784\u5efa\u5668<\/h4>\n<p>2.0\u7248\u672c\u5c06\u4f1a\u4eceairflow.utils.helpers\u79fb\u52a8\u5230airflow.models.baseoperator<br \/>\n\u7528\u4e8e\u590d\u6742\u7684\u5173\u7cfb\u6784\u5efa\uff0c\u8be6\u60c5\u53ef\u770bhttps:\/\/airflow.apache.org\/docs\/stable\/concepts.html#relationship-builders<\/p>\n<h3>\u989d\u5916\u529f\u80fd<\/h3>\n<h3>6. Hook\u94a9\u5b50<\/h3>\n<p>Hook\u662f\u4e00\u7cfb\u5217\u5916\u90e8\u5e73\u53f0\u6216\u8005\u6570\u636e\u7684\u5982Hive,S3,Mysql,HDFS,Pig\u7b49\u7684\u63a5\u53e3\uff0c<br \/>\n\u53ef\u4ee5\u4f5c\u4e3a\u7b97\u5b50\u6784\u5efa\u7684\u4e00\u90e8\u5206\uff0c\u5e76\u72ec\u7acb\u4e8e\u7ba1\u9053\u4e4b\u5916\uff0c\u72ec\u7acb\u5b58\u50a8\u5728\u5143\u6570\u636e\u5e93\u4e2d\u3002<\/p>\n<h3>7. Pool\u6c60\u5b50<\/h3>\n<p>pool\u662f\u7528\u4e8e\u9650\u5236\u4e00\u7cfb\u5217\u4efb\u52a1\u540c\u65f6\u5e76\u53d1\u6267\u884c\u7684\u6570\u91cf\uff0c\u662f\u5728Task\u58f0\u660e\u65f6\u5019\u8bbe\u7f6e\uff0c\u9700\u8981\u5148\u4e8eTask\u58f0\u660e\u524d\u5b58\u5728\uff0c<br \/>\n\u53ef\u4ee5\u5728WebUI\u7684admin\/Pools\u4e2d\u521b\u5efa\uff0c\u540c\u65f6pool\u7684\u53c2\u6570\u53ef\u4ee5\u5b9a\u4e49\u6743\u91cd\uff0c\u4f7f\u5f97\u4efb\u52a1\u5728\u4f18\u5148\u7ea7\u5217\u8868\u4e2d\u963b\u585e\u65f6\uff0c<br \/>\n\u53ef\u4ee5\u6839\u636e\u6743\u91cd\u5b89\u6392\u6267\u884c\u987a\u5e8f\u3002<\/p>\n<h3>8. Connections\u8fde\u63a5<\/h3>\n<p>\u901a\u8fc7\u8bbe\u7f6e\u8fde\u63a5admin\/connections\uff0c\u7ed9\u4e0e\u7684con_id\u53ef\u4ee5\u7528\u4e8e\u7ba1\u9053\u4e2d\u7684\u8fde\u63a5\u521b\u5efa\uff0c\u540c\u4e00\u4e2aconn_id\u7684\u4e0d\u540c\u8fde\u63a5<br \/>\n\u914d\u7f6e\u5c06\u4f1a\u88ab\u968f\u673a\u7684\u4f7f\u7528\uff0c\u4ece\u800c\u5b9e\u73b0\u8d1f\u8f7d\u5747\u8861\u7684\u6548\u679c\u3002<\/p>\n<h3>9. Queues\u961f\u5217<\/h3>\n<p>\u4efb\u4f55Task\u90fd\u53ef\u4ee5\u88ab\u58f0\u660e\u8d4b\u4e88\u67d0\u4e2a\u961f\u5217\uff0c\u800cWorker\u4e5f\u53ef\u4ee5\u6307\u5b9a\u53ea\u5173\u6ce8\u67d0\u4e2a\u961f\u5217\u3002<br \/>\n\u8be6\u60c5https:\/\/airflow.apache.org\/docs\/stable\/concepts.html#queues<\/p>\n<h3>10. XCom<\/h3>\n<p>\u7528\u4e8eTasks\u95f4\u4f20\u6570\u636e\uff0cPython\u7b97\u5b50\u8fd4\u56de\u6570\u636e\u65f6\u9ed8\u8ba4\u4f1a\u8fdb\u884cxcom_push\uff0c\u6b64\u65f6\u5176\u4ed6\u4efb\u52a1\u53ef\u4ee5<\/p>\n<pre><code class=\"language-Python line-numbers\"># inside a PythonOperator called 'pushing_task'\ndef push_function():\n    return value\n\n# inside another PythonOperator where provide_context=True\ndef pull_function(**context):\n    value = context['task_instance'].xcom_pull(task_ids='pushing_task')\n<\/code><\/pre>\n<p>\u8be6\u60c5https:\/\/airflow.apache.org\/docs\/stable\/concepts.html#xcoms<\/p>\n<h3>11. Trigger Rules<\/h3>\n<p>\u7528\u4e8e\u8bbe\u7f6e\u6bcf\u4e2aoperator\u7684\u4e0a\u6e38\u4efb\u52a1\u662f\u5982\u4f55\u72b6\u6001\uff0c\u672c\u8eab\u624d\u4f1a\u6267\u884c\u3002\u5982\u5168\u90e8\u6210\u529f\uff0c\u5168\u90e8\u5931\u8d25\uff0c\u5168\u90e8\u5b8c\u6210\uff0c<br \/>\n\u4e00\u4e2a\u5931\u8d25\u7b49\u7b49\u3002\u9ed8\u8ba4\u7684\u89e6\u53d1\u89c4\u5219\u5c31\u662fall_successed\u3002<\/p>\n<p>\u5728all_successed\u548call_failed\u72b6\u6001\u4e0b\uff0cSkip\u4efb\u52a1\u4f1a\u5c06\u4efb\u52a1\u72b6\u6001\u7ea7\u8054\u5230\u4e0b\u6e38\u4efb\u52a1\u4e2d\uff0c<br \/>\n\u8be6\u60c5https:\/\/airflow.apache.org\/docs\/stable\/concepts.html#trigger-rules<\/p>\n","protected":false},"excerpt":{"rendered":"<p>Airflow Airflow\u662fapache\u57fa\u91d1\u4f1a\u4e0b\u7684\u4e00\u4e2a\u4efb\u52a1\u7f16\u6392\u6846\u67b6\uff0c\u5e95\u5c42\u57fa\u4e8epython\u5b9e\u73b0\uff0c\u652f\u6301\u591a\u79cdDB&#8230;<\/p>\n","protected":false},"author":1,"featured_media":156,"comment_status":"closed","ping_status":"open","sticky":false,"template":"","format":"standard","meta":[],"categories":[41,40],"tags":[],"_links":{"self":[{"href":"https:\/\/blog.kaispace.cn\/index.php?rest_route=\/wp\/v2\/posts\/534"}],"collection":[{"href":"https:\/\/blog.kaispace.cn\/index.php?rest_route=\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/blog.kaispace.cn\/index.php?rest_route=\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/blog.kaispace.cn\/index.php?rest_route=\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/blog.kaispace.cn\/index.php?rest_route=%2Fwp%2Fv2%2Fcomments&post=534"}],"version-history":[{"count":1,"href":"https:\/\/blog.kaispace.cn\/index.php?rest_route=\/wp\/v2\/posts\/534\/revisions"}],"predecessor-version":[{"id":535,"href":"https:\/\/blog.kaispace.cn\/index.php?rest_route=\/wp\/v2\/posts\/534\/revisions\/535"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/blog.kaispace.cn\/index.php?rest_route=\/wp\/v2\/media\/156"}],"wp:attachment":[{"href":"https:\/\/blog.kaispace.cn\/index.php?rest_route=%2Fwp%2Fv2%2Fmedia&parent=534"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/blog.kaispace.cn\/index.php?rest_route=%2Fwp%2Fv2%2Fcategories&post=534"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/blog.kaispace.cn\/index.php?rest_route=%2Fwp%2Fv2%2Ftags&post=534"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}