Byte Ebi's Logo

Byte Ebi 🍤

每天一小口,蝦米變鯨魚

使用 Airflow DAG 觸發其他 DAG

透過 Airflow 內建的 Operator,在 DAG 中輕鬆觸發其他 DAG

Ray

在 DAG 中呼叫其他 DAG 作為上下游處理關係,除了直接利用 REST API 外
Airflow 也提供的內建的 Operator 可以使用

簡介

使用內建的 TriggerDagRunOperator 而不是呼叫 REST API 的好處是:

  1. 同一個 Airflow 內的 DAG 不需要傳入 Authorization Token 做身份驗證
  2. 不需要自己處理 retry、timeout、依賴管理等機制,以及預防網路傳輸造成的意外錯誤
  3. 容易釐清上下游 DAG 的關係

這個 Operator 不管是在 Airflow 2 中,還是最新的 Airflow 3 都可以使用,只是引入的方法有所差異

範例

以下使用 Airflow 2 做範例,流程就是使用 dag_a 呼叫 dag_b,非常簡單。
為了讓他不這麼簡單,會在 DAG 中傳遞兩個變數,盡可能模仿 REST API 的使用方法。

dag_a.py

先引入 TriggerDagRunOperator,然後定義要傳輸的變數 conf
在此範例中傳送姓名(name) 和年齡(age)。

使用 REST API 的時候也是在 request body 中定義 conf 發送
不同的點在於:使用內建 Operator 只需要指定要呼叫的下游 DAG trigger_dag_id
在這個範例中就是 dag_b

並且把剛剛決定要發送的變數 conf 放到 Operator 的 conf

# dag_a.py

import logging
from datetime import datetime

from airflow import DAG
from airflow.operators.trigger_dagrun import TriggerDagRunOperator

logger = logging.getLogger(__name__)


default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "start_date": datetime(2025, 7, 2),
}

with DAG(
    "dag_a",
    default_args=default_args,
    schedule_interval=None,
    dag_display_name="DAG A",
    tags=["test"],
) as dag:
    conf = {
        "name": "John Doe",
        "age": 20,
    }

    call_dag_b = TriggerDagRunOperator(
        task_id="call_dag_b",
        trigger_dag_id="dag_b",
        conf=conf,
    )

    (call_dag_b)

dag_b.py

接收兩個來自請求的傳入參數:name、age,並且在 log 印出

# dag_b.py

import logging
from datetime import datetime

from airflow import DAG
from airflow.operators.python import PythonOperator

logger = logging.getLogger(__name__)


default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "start_date": datetime(2025, 7, 2),
}

with DAG(
    "dag_b",
    default_args=default_args,
    schedule_interval=None,
    dag_display_name="DAG B",
    tags=["test"],
) as dag:

    def test_b(**kwargs):
        name = kwargs["params"].get("name")
        age = kwargs["params"].get("age")

        logger.info(f"name: {name}, age: {age}")
        return {"name": name, "age": age}

    test_b = PythonOperator(
        task_id="test_b",
        python_callable=test_b,
        dag=dag,
    )

    (test_b)

從畫面上可以看出 dag_a 使用了 TriggerDagRunOperator 作為 Operator

dag_a

dag_b 則是如說明一樣,接收兩個參數並且在 log 中印出

dag_b

以上就是使用 TriggerDagRunOperator 從一個 DAG 中呼叫另一個 DAG 的做法。

這個方法省去了 REST API 需要身份驗證的問題,以及避免網路傳輸途中的意外錯誤。
並且因為是 Operator,所以內建支援 Airflow 的 retry、timeout、依賴管理等機制,不需要自己處理。
也更容易釐清上下游 DAG 的觸發邏輯,維護起來輕鬆又愉快。

最新文章

Category

Tag