快轉到主要內容
  1. Data-Engineerings/

Apache Airflow 101

·5 分鐘· ·
Blog Zh-Tw Data-Engineering
Zhe-You Liu
作者
Zhe-You Liu
專注在貢獻開源、Distributed System、Data Engineering
目錄

什麼是 Apache Airflow?
#

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows

airflow-logo

Apache Airflow 是一個開源的 Workflow 管理平台
讓開發者以 Workflow as Code 去定義 Data Pipeline

在 Airflow 中,會把一整個 Workflow 叫做 Dag
同時也代表 Directed Acyclic Graph(有向無環圖)

Dag 是由多個 Task(任務) 組成的
這些 DagTask 之間可以有各種依賴關係,只要不形成環(Cycle)就可以

支援多種排程方式或是讓開發者定義複雜的 Dag
也提供 UI 介面來監控、管理 Dag

是目前最流行的 Workflow Orchestration Tool 之一

  • 在 GitHub 上有超過 40,000顆星星
  • 是 Apache Software Foundation 第 5 大的專案
  • 超過 20,000 個企業使用 Apache Airflow

    According to HG Insight

Apache Airflow 的 Feature
#

Apache Airflow 在 Data Pipeline 中的角色
#

Airflow 也會說是「Data Orchestration Tool」

以 Data Platform 架構來說
Airflow 是 Orchestration Layer,可以說是整個平台的大腦
負責精確的調度、規劃、並詳細記錄和監控所有的 Workflow 各個 Task 的狀態

orchestration-layer
Reference from chaossearch.io / cloud-data-platform-architecture-guide

以大型的 Data Platform 架構來說
Airflow 會是在最上層規劃一路從 Data Source 到最終給 End User 的 Data Product 的所有步驟
但每個 Task 實際執行通常是交給其他專項工具來執行

例如:

  • Airflow Dag 負責定期開始 Data Ingestion, 再接著跑 ETL, 最後根據 Data 類別決定要 Land 到哪個 Table、View
    • 而其中的
      • Data Ingestion 可能是交給 Airbyte
      • ETL 可能是交給 Spark、Flink、DuckDB 等其他 Compute 來執行
    • Airflow 只專注於調度(Orchestration)

    但以中小型的 Data Platform 來說
    拿 Airflow 的 LocalExecutor 或搭配 CeleryExecutor 來執行 Dag 是非常夠用的 !

Airflow 與其他常見的 Data Pipeline 的 Component 更詳細的關係可以參考下圖

Airflow in Data Pipeline
Reference from ml4devs.com / scalable-efficient-big-data-analytics-machine-learning-pipeline-architecture-on-cloud

Airflow 解決了什麼問題?
#

以我在 Data Team 的經驗來說
Airflow 解決以下問題:

  • Workflow 的 Observability
    • 尤其是其中的 Grid View (DagRun/TaskInstance records) 也可以當做整體 Pipeline 執行的指標(Metrics)
    • 例如:
        1. 在 UI 看到(或是收到 Failure 通知) xxx Task 最近很常失敗
        • 負責 xxx Task 的 component/ service 可能有問題需要修復
        1. 看到近幾次 DagRun 的執行時間變長
        • 再針對最近的 DagRun 和 TaskInstance 進行排查
        • 有可能是最近的資料量變多了,或是某個 Task 的效能變差了
  • Workflow 的可重試性
    • 可以直接在 UI 針對 Dag Level 或 Task Level 進行人工重試
    • 可以減輕 operation 時的負擔
    • 常見的使用場境可能是:
  • 根據 config 動態生成 Dag
    • 可以在 Dag Code 中動態根據 config 來生成 Dag
    • 例如:
      • 有根據目前有的客戶 id 設定檔來生成 client_<client_id>_dag 的 Dag
      • 達到動態生成、動態更新 Dag 的目的
  • 自動化 onboarding
    • backfilling 不一定會是每個 Data Team 都會需要的功能
    • 常見的使用場境可能是:
      • 例如:
        • 有一個 @daily 的 Dag 會產每天的 snapshot view
        • 如果有新的客戶 onboard 時,只需要設好 catchup 或是透過 backfill 就可以自動把新客戶的資料補齊

總結來說
使用 Airflow 可以很大程度的減少 Operation 的負擔

Airflow 本身的架構
#

airflow-architecture
Reference from airflow.apache.org / 2.10.5/core-concepts

  • Meta Database
    • 所有 Dag, Task 的狀態都是存在 Meta Database
    • 也建議使用 如: PgBouncer 這種 connection pool proxy
      • 因為所有 components 幾乎無時無刻都需要存取 Meta Database
  • Scheduler
    • 負責監控所有 tasks 和 dags
      • 檢查是否有任何 dag 需要新的 DagRun
      • 檢查 DagRun 中是否有需要 schedule 的 TaskInstances 或完整的 DagRun
      • 選擇需要 schedule 的 TaskInstance,並在 Execution Pool 和其他 Concurrency 限制下把 TaskInstances 加到
    • 可以參考 如何 fine-tune Scheduler
  • Worker
    • 負責實際執行 TaskInstance 的 callable
  • Trigger
  • Dag Processor(Dag Parser)
    • 定期把 Dag Files (Python 檔) parse 成在 Meta Database 的 Dag records
  • Web Server(API Server)
    • 提供 UI 和 Rest API

常見的 Airflow 的 Use Cases
#

Airflow 適合的場景
#

  • 週期性的 Workflow
    • 需要固定的時間間隔執行
    • 例如:每 x 小時、每 x 天、每 x 週執行的
  • Workflow 需要拆成多個可人工重試的 Task
    • 例如:需要經過多個不同 Services 的 ETL
  • Workflow 有複雜的 Task 依賴關係
    • 例如:需要根據某些條件來決定 Task 的執行順序
  • Workflow 需要根據外部狀態擴展
    • Dynamic Dag Rendering
    • 例如:需要根據某些 config, query 來決定有哪些 Dag、Tasks 需要執行

Airflow 實際的 Use Cases
#

  • Data Pipeline / ETL / Business Intelligence
  • Infrastructure Automation
  • MLOps

主要也是因為這些 Use Cases 都符合上一節所說的場景

可以參考 astronomer.io / use-cases 的更多實際案例

Airflow 本身的限制
#

根據上方 Airflow 的架構圖其實就可以猜到:

Meta Database 的 I/O 會是 Airflow 的瓶頸 !

每個 Component 都隨時需要從 Meta Database 讀取、寫入狀態
例如: 每個 DagRun 都是一個新的 record、每個 TaskInstance 都是一個新的 record
根據執行狀態來更新 DagRun、TaskInstance 的狀態

所以 Airflow 並不適合作為:

  • low latency 的 Event Driven Workflow
    • 這邊的 low latency 是指ms 級別的
        1. 在 Airflow 的架構有提到 Trigger,內部實作其實是每 x 秒去 poll外部系統狀態
        1. 就算是直接透過 Rest API 去 Trigger Dag 還是需要經過 Scheduler 的排程才會真的被 Worker 執行
  • 每秒數千、數萬 messages 級別的 consumer

目前 Airflow 最大的 Throughput 大約是 每秒 100 多個 DagRun

這邊是根據:
Airflow Submit 2024: How we Tuned our Airflow to Make 1.2 million DAG Runs - per day! 我們把 12000000 / 86400 = 138.89
就算是 fine tune 過的 Airflow 設定
因為 Database 的 I/O Bottleneck 還是只能處理每ㄧ百出頭級別的 message throughput

Anti Pattern: 拿 單個 Airflow Cluster 作為 RabbitMQ 和 Kafka 下游的 consumer 當然 單個 Airflow Cluster 就不適合作為處理每秒數千、數萬 messages 級別的 consumer

這邊只說 單個 Airflow Cluster 是因為
我們確實可以用partition的概念不同的 Airflow Cluster 來處理不同的 Kafka Topic

例如:
airflow-cluster-a Cluster 都處理 topic-group-a-* 的 messages
airflow-cluster-b Cluster 都處理 topic-group-b-* 的 messages

透過分不同的 partition 來達到 Kafka 級別的 Message Throughput
同時保有容易監控、方便針對 Dag Level 或 Task Level 人工重試的特性

總結
#

透過本篇文章
應該可以了解 Apache Airflow 整體架構和適合的場景

  • 對於中小型的 Data Pipeline
  • 對於大型的 Data Pipeline
    • 讓 Airflow 專注於Orchestration的角色
      • 實際 Task 的執行就交給該子領域的專業工具
    • 並 fine-tune Airflow 的 Scheduler 和 Meta Database 設定
  • 利用 Airflow 本身的特性來打造靈活可觀察可重試的 Data Pipeline

同時了解 Airflow 的限制

例如:
low-latency(ms level)或是每秒數千、數萬 messages 級別的 consumer的情境
Apache Airflow 可能不是最好的選擇

相關文章

MacOS 上的 Push-based CLI 工作流
·2 分鐘
Blog Zh-Tw
如何在 Mac 透過 Push-based 通知上減少 CLI 工作流中的 context switching
從 0 成為 Apache Airflow Committer
·8 分鐘
Blog Zh-Tw Open-Source-Contribution
從未接觸 Data Engineering 的小白,如何在 5 個月,以 70+ PR,300 小時的貢獻成為 Apache Airflow Committer
從 0 開始貢獻 Apache Airflow
·5 分鐘
Blog Zh-Tw Open-Source-Contribution
從未接觸 Data Engineering 的小白,如何在 3 個月內貢獻超過 50 個 PR
什麼是 Embedded Database? 簡介 RocksDB
·2 分鐘
Blog Zh-Tw Database
簡介 Embedded Database 和 RocksDB 的基本概念
2024 Dcard 暑期實習面試
·2 分鐘
Blog Zh-Tw Intern
2024 Dcard 暑期實習面試
2024 Appier 暑期實習面試
·2 分鐘
Blog Zh-Tw Intern
2024 Appier 暑期實習面試