使用 Airflow DAG 觸發其他 DAG
透過 Airflow 內建的 Operator,在 DAG 中輕鬆觸發其他 DAG

在 DAG 中呼叫其他 DAG 作為上下游處理關係,除了直接利用 REST API 外
Airflow 也提供的內建的 Operator 可以使用
簡介
使用內建的 TriggerDagRunOperator 而不是呼叫 REST API 的好處是:
- 同一個 Airflow 內的 DAG 不需要傳入 Authorization Token 做身份驗證
- 不需要自己處理 retry、timeout、依賴管理等機制,以及預防網路傳輸造成的意外錯誤
- 容易釐清上下游 DAG 的關係
這個 Operator 不管是在 Airflow 2 中,還是最新的 Airflow 3 都可以使用,只是引入的方法有所差異
範例
以下使用 Airflow 2 做範例,流程就是使用 dag_a
呼叫 dag_b
,非常簡單。
為了讓他不這麼簡單,會在 DAG 中傳遞兩個變數,盡可能模仿 REST API 的使用方法。
dag_a.py
先引入 TriggerDagRunOperator
,然後定義要傳輸的變數 conf。
在此範例中傳送姓名(name) 和年齡(age)。
使用 REST API 的時候也是在 request body 中定義 conf 發送
不同的點在於:使用內建 Operator 只需要指定要呼叫的下游 DAG trigger_dag_id
。
在這個範例中就是 dag_b
。
並且把剛剛決定要發送的變數 conf 放到 Operator 的 conf
中
# dag_a.py
import logging
from datetime import datetime
from airflow import DAG
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
logger = logging.getLogger(__name__)
default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(2025, 7, 2),
}
with DAG(
"dag_a",
default_args=default_args,
schedule_interval=None,
dag_display_name="DAG A",
tags=["test"],
) as dag:
conf = {
"name": "John Doe",
"age": 20,
}
call_dag_b = TriggerDagRunOperator(
task_id="call_dag_b",
trigger_dag_id="dag_b",
conf=conf,
)
(call_dag_b)
dag_b.py
接收兩個來自請求的傳入參數:name、age,並且在 log 印出
# dag_b.py
import logging
from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
logger = logging.getLogger(__name__)
default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(2025, 7, 2),
}
with DAG(
"dag_b",
default_args=default_args,
schedule_interval=None,
dag_display_name="DAG B",
tags=["test"],
) as dag:
def test_b(**kwargs):
name = kwargs["params"].get("name")
age = kwargs["params"].get("age")
logger.info(f"name: {name}, age: {age}")
return {"name": name, "age": age}
test_b = PythonOperator(
task_id="test_b",
python_callable=test_b,
dag=dag,
)
(test_b)
從畫面上可以看出 dag_a 使用了 TriggerDagRunOperator
作為 Operator
而 dag_b 則是如說明一樣,接收兩個參數並且在 log 中印出
以上就是使用 TriggerDagRunOperator
從一個 DAG 中呼叫另一個 DAG 的做法。
這個方法省去了 REST API 需要身份驗證的問題,以及避免網路傳輸途中的意外錯誤。
並且因為是 Operator,所以內建支援 Airflow 的 retry、timeout、依賴管理等機制,不需要自己處理。
也更容易釐清上下游 DAG 的觸發邏輯,維護起來輕鬆又愉快。