使用動態任務讓 Airflow 效能大解放
用 Dynamic Task Mapping 釋放 Airflow 的平行處理能力
當任務之間有相依,但是彼此又是獨立事件時到底應該如何優雅的處裡呢?
使用迴圈?還是多一個 DAG?抑或是有其他更好的方案?
背景說明
在使用 Airflow 的時候常會遇到一個任務中有重複的任務需要處裡
例如說將訂單中的商品逐筆產生出貨單發送到倉庫
一開始最直覺就是想到「迴圈」,但是這樣會將全部的子任務綁訂為一個整體
當其中有一筆失敗,就必須人工介入處理
因為整筆重新執行的話會讓發送成功的出貨單被重複發送
第二個會想到的就是藉由 Airflow 的 RESTful API 功能將任務切分成:
- 接收訂單,在
DAG main做預處理,並且根據商品逐筆呼叫DAG sub - 每個 DAG 各別發送單一商品的出貨單
因為其特性所以稱為 Processor-Sender 模式,畫成流程圖大概長這樣:

這麼做可以解決上述迴圈的重試問題,成功將發送出貨單的事件原子化
既滿足了分開運算的需求,並且子任務是可觀測的,一旦出錯只需要個別去重試就好
原本用這個模式也用得很開心,直到發現 DAG 數量勢不可擋的一直增加
而且要檢查 DAG main 資料的執行結果時要手動在 DAG sub 紀錄中翻找
在上面出貨的例子中:
當訂單內的所有出貨單完成發送後,必須要有一個 DAG 定期去檢查相同 DAG main 的所有 DAG sub 的處理狀態
等到所有 DAG sub 都完成了,才能執行下一步
這時候我發現官方文件中有提到一個東西:Dynamic Task Mapping — Airflow Documentation
(以下範例配合公司使用的版本 2.10.5,不過在 Airflow 3 也有這個功能)
可以藉由將將資料集展開(expand),達成在同一個 DAG 內並行處理相同任務的目的
並且因為是在同一個 DAG 中,所以當所有任務處裡完之後是可以接續其他任務的!
預期的流程圖如下:

PoC (Proof of Concept,概念驗證)
著手開始做 PoC 前先確認目的:
驗證 Dynamic Task Mapping 是否可以替代 Processor-Sender 模式,節省 DAG 數量方便管理
以及分析評估需求:
- 可觀測性:能夠在畫面上看到個別的項目執行結果
- 原子性:任務可以被分別重試,避免耦合
- 效能:能同時執行多個 task,並且可調整並行數量
簡單寫出 Dynamic Task Mapping PoC 的程式碼
from datetime import datetime, timedelta
import random
import time
from airflow import DAG
from airflow.decorators import task
from airflow.exceptions import AirflowException
from airflow.models.xcom_arg import XComArg
default_args = {
"owner": "airflow",
"retry_delay": timedelta(minutes=1),
}
with DAG(
dag_id="dynamic_task_mapping_poc",
start_date=datetime(2025, 1, 13),
schedule=None,
catchup=False,
default_args=default_args,
tags=["poc"],
max_active_runs=1,
max_active_tasks=3,
) as dag:
@task
def fetch_items():
"""
模擬 DAG 處理資料
"""
return [
{"item_id": "A"},
{"item_id": "B"},
{"item_id": "C"},
{"item_id": "D"},
{"item_id": "E"},
{"item_id": "F"},
{"item_id": "G"},
{"item_id": "H"},
{"item_id": "I"},
{"item_id": "J"},
{"item_id": "K"},
{"item_id": "L"},
]
@task
def process_single_item(item: dict) -> str:
"""
每個 item 都是獨立的 task instance
"""
item_id = item["item_id"]
# 20% 機率失敗,驗證手動 retry 機制
if random.random() < 0.2:
raise AirflowException(f"Random failure for item {item_id}")
time.sleep(random.randint(5,10))
print(f"Successfully processed item {item_id}")
return f"done_{item_id}"
@task
def summarize(results: XComArg):
"""
收集所有 process 的結果
"""
print("Finished items:", results)
return results
# ===== Pipeline Flow =====
raw_items = fetch_items()
# expand 會根據 raw_items list 動態生成多個 process_single_item instances
processed = process_single_item.expand(item=raw_items)
# summary task 取回所有 mapped results
summarize(processed)
一、觸發 DAG
因為一開始偷懶沒有額外設定資料庫,所以機器上使用的資料庫是 SQLite
根據 Sequential Executor
說明,使用 SQLite 時只能將AIRFLOW__CORE__EXECUTOR設為 SequentialExecutor
而SequentialExecutor一次只能執行一個 task
不過在此時已經可以看到第二個 task 有一個分頁叫做「Mapped Tasks」
點擊後就會看到傳入的列表資料被展開為獨立任務!

二、平行處裡驗證
為了使用 SequentialExecutor,只好起個容器來跑 Airflow 和 PostgreSQL
此時要驗證的是:
- Dynamic Task Mapping 的平行處理能力
- 可以根據 DAG 的
max_active_tasks參數進行限制
以下是驗證使用的 docker-compose.yaml 檔案
因為作為本機的 PoC 用途,密碼什麼的就都先明碼了
version: "3.9"
services:
postgres:
image: postgres:13
container_name: airflow-postgres
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
volumes:
- postgres_data:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U airflow"]
interval: 5s
timeout: 5s
retries: 5
airflow:
image: apache/airflow:2.10.5-python3.11
container_name: airflow
depends_on:
postgres:
condition: service_healthy
environment:
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres:5432/airflow
AIRFLOW__CORE__PARALLELISM: 32
AIRFLOW__CORE__MAX_ACTIVE_TASKS_PER_DAG: 16
AIRFLOW__CORE__LOAD_EXAMPLES: "false"
AIRFLOW__WEBSERVER__EXPOSE_CONFIG: "true"
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
ports:
- "8080:8080"
command: >
bash -c "
airflow db migrate &&
airflow users create \\
--username admin \\
--password admin \\
--firstname admin \\
--lastname admin \\
--role Admin \\
--email [email protected] &&
airflow scheduler & airflow webserver
"
volumes:
postgres_data:
驗證流程
- 啟動容器:
docker compose up - 檢查 airflow 參數設定
- 進入容器:
docker exec -it airflow bash - 在容器中輸入:
airflow config get-value core executor - 必須要是:
LocalExecutor
- 進入容器:
驗證結果
在 max_active_tasks=3 的情況下,確實是同時恰好運行 3 個 task


三、重試機制
根據上面的 PoC 程式碼,當 DAG 執行時有 20% 機率拋出錯誤
此時後續任務「summarize」被阻塞,狀態為 「upstream_failed」

點擊畫面上第 11 個執行失敗任務「Map Index: 10」並且按下重新執行

重試後觀察到該任務成功被更新為 success

當全部失敗任務被清除後,後續任務「summarize」會自動執行
不需要手動清除重試

差異比較
| Processor-Sender | Dynamic Task Mapping | |
|---|---|---|
| 平行處裡 | 可以同時執行多個 | 可以,前提是不是使用 SQLite |
| Retry | 可以手動清除,甚至自行觸發 | 可以手動清除 |
| 後續任務 | 不使用 wait_for_completion=True 就不會阻塞 |
當有子任務失敗時會阻塞後續的任務 |
| 使用場景 | 有可能需要傳入參數手動執行的場景 | 根據主要資料 DAG 進行平行任務 |
結論
如果子任務輕量、不可切分、失敗一起重跑,選用 for 迴圈處理
而當子任務需要的不只是簡單的重試,而是根據條件手動輸入參數執行時用兩個 DAG 的 Processor-Sender 模式
應該使用 Dynamic Task Mapping 的情境則是:
- 主任務下的子任務間彼此沒有相依
- 每個 task 可獨立 retry、success/fail、log
如此當任意子任務發生錯誤時可以手動進行重試,達成子任務間解耦的目的
