快轉到主要內容
  1. Open-Source-Contributions/

深入研究 Apache Airflow 的 Dag Processing 機制

·5 分鐘· ·
Blog Zh-Tw Open-Source-Contribution Data-Engineering
Zhe-You Liu
作者
Zhe-You Liu
專注在貢獻開源、Distributed System、Data Engineering
目錄

源來是你 是在台灣積極貢獻大型開與軟體的社群
接下來我會在以下時間地點主持 Airflow 中文會議

時間7 月 2 日隔週三晚上 20:30 (台灣時區 UTC+8)
地點https://opensource4you.tw/airflow/meeting
行事曆https://opensource4you.tw/readme/calendar

如果你也對 Airflow 的技術新知或對貢獻 Airflow 本身感興趣
歡迎來 Airflow 會議共襄盛舉!

Python 撰寫的 Dag File 是如何被 Airflow 解析的?
#

本篇文章會深入研究 Airflow 2 的 Dag Processing 機制
Airflow 3 會牽涉 AIP-72 TaskSDK 的核心細節
所以這邊會以 Airflow 2 的 Dag Processing 機制來說明比較簡單!

無論是 Airflow 2 或 3 都會涉及以下核心概念

  • DagFileProcessorManager
  • DagFileProcessorProcess
  • DagBag

接下來會以整體的角度來說明這些概念
再進一步深入到各個核心概念的細節
最後總結 Airflow Dag Processing 流程與最佳實踐

概述 Dag Processing 機制
#

dag-parsing-in-high-level
附圖是 Airflow 2 的 Dag Processing 機制 high level 的架構圖

以下是各個 component 的簡介:

  • DagProcessor:
    • 是以 整個 Airflow 系統 的角度出發的 component 名稱
  • DagFileProcessorManager:
    • 是 DagProcessor 在 code level 的實作,也是管理 Dag Processing 的核心
    • 使用 queue 來管理 Dag File 的解析
  • DagFileProcessorProcess:
    • 是 DagFileProcessorManager fork 出來的 worker process
    • 負責解析單一 file path
    • 和同步 Dag 與錯誤訊息到 Metadata DB
  • DagBag:
    • 實際與 Dag File 互動的介面
    • 回傳實際的 Dag instances

整個 Dag Processing 大致的流程如下:

  1. 把待解析的 Dag File 路徑放到 DagFileProcessorManager 的 queue 中
  2. & 3. DagFileProcessorManager 會 fork 出 DagFileProcessorProcess subprocess 並存在 DagFileProcessorManager
  3. DagFileProcessorProcess 透過 DagBag 拿到 Dag instances 和錯誤訊息
  4. DagFileProcessorProcess 會把 Dag instances 和錯誤訊息同步到 Metadata DB

DagFileProcessorManager
#

dag-file-processor-manager

_run_parsing_loop 是整個 Manager 的核心

在每次 loop 中會做以下幾件事: 這邊只有列出跟與解析 Dag 最相關的步驟

  1. self._refresh_dag_dir()
    1. 更新距離上次 refresh 的時間差,如果大於 [scheduler/dag_dir_list_interval] 設定才會重新讀取 Dag Directory dag_filelocs = {full_loc for path in self._file_paths for full_loc in _iter_dag_filelocs(path)}]
    2. 在最後把 “被刪除檔案的 Dag” 從 Metadata DB 中刪除 SerializedDagModel.remove_deleted_dags
  2. self.prepare_file_path_queue()
    1. 如果發現 “有檔案被刪除” 的情況,也會透過剛剛所提的 set_file_paths 來更新 self._file_paths, self._file_path_queue, self._file_stats
    2. 再把 file_paths_in_progress = set(self._processors), file_paths_recently_processedfiles_paths_at_run_limit 這 3 者取聯集完 以外 的 Dag File 路徑加入到 self._file_path_queue
  3. self.start_new_processes()
  4. self.collect_results()
    • 等待多個 DagFileProcessorProcess 的結果並更新到 self._file_stats
    • 也清理 self.waitablesself._processors 中已經完成的 process
    • ready = multiprocessing.connection.wait(
          self.waitables.keys() - [self._direct_scheduler_conn], timeout=0
      )
      
      for sentinel in ready:
          if sentinel is not self._direct_scheduler_conn:
              processor = cast(DagFileProcessorProcess, self.waitables[sentinel])
              self.waitables.pop(processor.waitable_handle)
              self._processors.pop(processor.file_path)
              self._collect_results_from_processor(processor)
      

DagFileProcessorProcess
#

dag-file-processor-process

DagFileProcessorProcess 會以 DagBag 去拿到該路徑下解析完 Dags 的結果

DagBag
#

dag-bag

  • DagBag 是 Airflow 內部最終讀取 Dag File 的介面
  • collect_dags 是從 Dag File 中取得 Dag instance 的 entry point
    • .py 檔案讀取 self._load_modules_from_file
    • .zip 壓縮檔讀取 self._load_modules_from_zip
    • self._process_modules
      • 是整個 Dag Processing 中最花時間與資源的部分
        • 不只要 load modules 也會對 Dag objects 做各種處理與檢查
        • 這也是我認為 為什麼需要用 subprocess 來處理的原因
      • 實際從 Python 檔案讀 modules 後,以 isinstance(o, (DAG, SDKDAG)) 拿到 Dags 的 instance
      • top_level_dags = {(o, m) for m in mods for o in m.__dict__.values() if isinstance(o, (DAG, SDKDAG))}
      • 並以 self.bag_dag 檢查該 Dag 是否合法
        1. 是否有 cycle
        2. Dag policies
        3. 是否有重複的 Dag ID
        4. 都沒問題的話才加入到該 DagBag 中 self.dags[dag.dag_id] = dag

總結:與 Dag Processing 有關的最佳實踐
#

透過文章可以了解 DagFileProcessorManager, DagFileProcessorProcess, DagBag 三者之間的關係
以下是幾個我覺得值得注意的重點

  1. DagFileProcessorManager 會以 queue 來管理所有待解析的 Dag File 路徑
  • 擔任 dispatcher 的角色,並管理所有的 DagFileProcessorProcess subprocesses
  1. 每一個 file path 都是由一個 DagFileProcessorProcess 來處理:
  • 這個 process 會透過 DagBag 來拿到 Dag instance 和錯誤訊息
    • 這個步驟會是 最花時間與資源的部分 !
      • 單一個 python file 內,可能只有 1 個 Dag instance 也有可能有 1,000 個 Dag instances
      • 而且需要對每個 Dag instance 去做各種檢查!
  • 並把這些資訊同步到 Metadata DB 中
  1. DagBag 是最後拿到 Dag instance 的介面: 無論是 Read-Only 還是 Read-Write 都會透過 DagBag 來取得

透過 Dynamic Dag 生成大量 Dags 的最佳實踐
#

假設現在有一個 Python 檔 裡面只有一個讀取 config.yaml 的 Dynamic Dag
並且透過 config.yaml 會產生 1,000 個不同的 Dags

config.yaml 可以是任何外部狀態,如 xxx.json
或用 code 生成的 list,如 for loop

config.yaml:

customer_list:
- name: Facebook
  config: 300
- name: Apple
  config: 200
... 

single_dynamic_dag.py:

with open("config.yaml", 'r') as f:
  customers = yaml.safe_load(f)

for customer in customers:

    @dag(
      dag_id=f"{customer}_dag",
      start_date=datetime(2022, 2, 1)
    )
    def dynamic_generated_dag():
        @task
        def print_message(message):
            print(message)
    
    dynamic_generated_dag()

這樣的寫法會導致
1,000 個 Dag instances 只被 1 個 DagFileProcessorProcess 來處理

這也代表
這 1 個 process 要單獨處理 1,000 個 Dag instances!

因為 1 個 Python 檔 是由 1 個 DagFileProcessorProcess 處理的
Airflow 沒有辦法預先知道這個 Python 檔會產生多少個 Dag instances

這也是在 單一 Python 檔 使用 Dynamic Dag 去產生大量 Dags
有些 Dags 會偶爾突然消失的原因之一

例如:
在 load 到第 800 個 Dag instance 時
DagFileProcessorManager 認為 DagFileProcessorProcess 跑太久 timeout 了
就直接 terminate 這個 process

又進行下一輪的 Dag Processing
導致剩下的 200 個 Dag 被判斷為 “被刪除”!
所以就在 UI 看到怎麼少了 200 個 Dag

但有可能再下一輪的 Dag Processing 沒有超過 timeout
又可以在 UI 上看到 1,000 個 Dags

就會有這種「 Dags 偶爾突然消失」的現象發生

partition-parsing

如果要避免這個問題
只需要使用 partition 的方式來處理

例如:
config.yaml 多加分組的概念,並多加一些相同的 Python 檔
來讓不同的 DagFileProcessorProcess 處理不同的分組

當然需要有另一個 source_dag.pypartition.yaml 設定來維護好 single source of truth
並且讓 CI/CD pipeline 自動生成這些分組的 Dag Files

config.yaml with partition:

group_0:
- name: Facebook
  config: 300
group_1:
- name: Apple
  config: 200

group_0.py:

# get the current group name from the file path
current_file_name = os.path.basename(__file__)
current_group = current_file_name.split('.')[0]

# only import the customers for the current group
with open("config.yaml", 'r') as f:
  customers = yaml.safe_load(f)[current_group]

# generate the Dags for the current group
for customer in customers:
    @dag(
      dag_id=f"{current_group}_{customer}_dag",
      start_date=datetime(2022, 2, 1)
    )
    def dynamic_generated_dag():
        @task
        def print_message(message):
            print(message)
    dynamic_generated_dag()

相關文章

Airflow 的外部事件驅動排程 (External Event Driven Scheduling)
·5 分鐘
Blog Zh-Tw Open-Source-Contribution Data-Engineering
探討 Airflow 3 的 AIP-82 External Event Driven Scheduling 提案與 Common Message Queue 的實作細節和應用場景,如何讓 Airflow 支援 Real-Time Event Driven 工作流排程。
Apache Airflow 101
·6 分鐘
Blog Zh-Tw Data-Engineering
什麼是 Apache Airflow?解決了什麼問題?了解常見的 Use Cases、本身的架構與限制
從 0 成為 Apache Airflow Committer
·8 分鐘
Blog Zh-Tw Open-Source-Contribution
從未接觸 Data Engineering 的小白,如何在 5 個月,以 70+ PR,300 小時的貢獻成為 Apache Airflow Committer
從 0 開始貢獻 Apache Airflow
·5 分鐘
Blog Zh-Tw Open-Source-Contribution
從未接觸 Data Engineering 的小白,如何在 3 個月內貢獻超過 50 個 PR
MacOS 上的 Push-based CLI 工作流
·2 分鐘
Blog Zh-Tw
如何在 Mac 透過 Push-based 通知上減少 CLI 工作流中的 context switching
什麼是 Embedded Database? 簡介 RocksDB
·2 分鐘
Blog Zh-Tw Database
簡介 Embedded Database 和 RocksDB 的基本概念