Skip to main content
  1. Open-Source-Contributions/

Deep Dive into Apache Airflow's Dag Processing Mechanism

·8 mins· ·
Blog En Open-Source-Contribution Data-Engineering
Zhe-You Liu
Author
Zhe-You Liu
Focused on Contributing to open source, Distributed Systems, and Data Engineering.
Table of Contents

OpenSource4You is a community actively contributing to large-scale open source software projects in Taiwan.
I will be hosting the Airflow Mandarin Meeting at the following time and location:

Time: Every other Wednesday at 8:30 PM starting July 2nd (Taiwan time UTC+8)
Location: https://opensource4you.tw/airflow/meeting
Calendar: https://opensource4you.tw/readme/calendar

If you’re interested in Airflow’s latest technical developments or contributing to Airflow itself.
Welcome to join our Airflow meetings!

How Are Python-Written Dag Files Parsed by Airflow?
#

This article provides an in-depth exploration of Airflow 2’s Dag Processing mechanism.
Airflow 3 involves core details of AIP-72 TaskSDK.
So we’ll focus on Airflow 2’s Dag Processing mechanism for simplicity!

Whether it’s Airflow 2 or 3, both involve the following core concepts:

  • DagFileProcessorManager
  • DagFileProcessorProcess
  • DagBag

Next, we’ll explain these concepts from an overall perspective,
then dive deeper into the details of each core concept,
and finally summarize the Airflow Dag Processing workflow and best practices.

Overview of Dag Processing Mechanism
#

dag-parsing-in-high-level
The above diagram shows the high-level architecture of Airflow 2’s Dag Processing mechanism

Here’s an introduction to each component:

  • DagProcessor:
    • This is the component name from the perspective of the entire Airflow system
  • DagFileProcessorManager:
    • This is the code-level implementation of DagProcessor and serves as the core manager of Dag Processing
    • Uses a queue to manage Dag File parsing
  • DagFileProcessorProcess:
    • This is a worker process forked by DagFileProcessorManager
    • Responsible for parsing individual file paths
    • And synchronizing Dags and error messages to the Metadata DB
  • DagBag:
    • The actual interface for interacting with Dag Files
    • Returns actual Dag instances

The overall Dag Processing workflow is as follows:

  1. Dag File paths to be parsed are placed into the DagFileProcessorManager’s queue
  2. & 3. DagFileProcessorManager forks DagFileProcessorProcess subprocesses and stores them within DagFileProcessorManager
  3. DagFileProcessorProcess obtains Dag instances and error messages through DagBag
  4. DagFileProcessorProcess synchronizes Dag instances and error messages to the Metadata DB

DagFileProcessorManager
#

dag-file-processor-manager

_run_parsing_loop is the core of the entire Manager

In each loop, the following tasks are performed: Only listing the steps most relevant to Dag parsing

  1. self._refresh_dag_dir()
    1. Updates the time difference since the last refresh, and only re-reads the Dag Directory if it’s greater than the [scheduler/dag_dir_list_interval] setting dag_filelocs = {full_loc for path in self._file_paths for full_loc in _iter_dag_filelocs(path)}]
    2. Finally removes “Dags of deleted files” from the Metadata DB SerializedDagModel.remove_deleted_dags
  2. self.prepare_file_path_queue()
    1. If it detects “files have been deleted”, it also updates self._file_paths, self._file_path_queue, self._file_stats through the previously mentioned set_file_paths
    2. Then adds Dag File paths excluding the union of file_paths_in_progress = set(self._processors), file_paths_recently_processed and files_paths_at_run_limit to self._file_path_queue
  3. self.start_new_processes()
  4. self.collect_results()
    • Waits for results from multiple DagFileProcessorProcess instances and updates them to self._file_stats
    • Also cleans up completed processes from self.waitables and self._processors
    • ready = multiprocessing.connection.wait(
          self.waitables.keys() - [self._direct_scheduler_conn], timeout=0
      )
      
      for sentinel in ready:
          if sentinel is not self._direct_scheduler_conn:
              processor = cast(DagFileProcessorProcess, self.waitables[sentinel])
              self.waitables.pop(processor.waitable_handle)
              self._processors.pop(processor.file_path)
              self._collect_results_from_processor(processor)
      

DagFileProcessorProcess
#

dag-file-processor-process

DagFileProcessorProcess uses DagBag to get the parsed Dags results for that path

DagBag
#

dag-bag

  • DagBag is Airflow’s internal interface for finally reading Dag Files
  • collect_dags is the entry point for getting Dag instances from Dag Files
    • Reads from .py files self._load_modules_from_file
    • Or reads from .zip archives self._load_modules_from_zip
    • self._process_modules
      • This is the most time and resource-intensive part of the entire Dag Processing
        • Not only does it need to load modules, but it also performs various processing and checks on Dag objects
        • This is why I believe subprocess handling is necessary
      • After actually reading modules from Python files, gets Dag instances with isinstance(o, (DAG, SDKDAG))
      • top_level_dags = {(o, m) for m in mods for o in m.__dict__.values() if isinstance(o, (Dag, SDKDag))}
      • And uses self.bag_dag to check if the Dag is valid
        1. Whether there are cycles
        2. Dag policies
        3. Whether there are duplicate Dag IDs
        4. Only adds to the DagBag if everything is fine self.dags[dag.dag_id] = dag

Summary: Best Practices Related to Dag Processing#

Through this article, we can understand the relationship between DagFileProcessorManager, DagFileProcessorProcess, and DagBag.
Here are several key points I think are worth noting:

  1. DagFileProcessorManager manages all Dag File paths to be parsed using a queue
  • Acts as a dispatcher and manages all DagFileProcessorProcess subprocesses
  1. Each file path is handled by one DagFileProcessorProcess:
  • This process gets Dag instances and error messages through DagBag
    • This step is the most time and resource-intensive part!
      • A single Python file might contain just 1 Dag instance or potentially 1,000 Dag instances
      • And each Dag instance needs to undergo various checks!
  • And synchronizes this information to the Metadata DB
  1. DagBag is the final interface for getting Dag instances: Whether Read-Only or Read-Write, all go through DagBag to obtain them

Best Practices for Generating Large Numbers of Dags Through Dynamic Dags
#

Suppose there’s one Python file with only one Dynamic Dag that reads config.yaml
And through config.yaml, it generates 1,000 different Dags

config.yaml can be any external state, like xxx.json
Or a list generated by code, like a for loop

config.yaml:

customer_list:
- name: Facebook
  config: 300
- name: Apple
  config: 200
... 

single_dynamic_dag.py:

with open("config.yaml", 'r') as f:
  customers = yaml.safe_load(f)

for customer in customers:

    @dag(
      dag_id=f"{customer}_dag",
      start_date=datetime(2022, 2, 1)
    )
    def dynamic_generated_dag():
        @task
        def print_message(message):
            print(message)
    
    dynamic_generated_dag()

This approach causes
these 1,000 Dag instances to be processed by only 1 DagFileProcessorProcess

This also means
This 1 process has to handle 1,000 Dag instances alone!

Because 1 Python file is handled by 1 DagFileProcessorProcess
Airflow cannot predict in advance how many Dag instances this Python file will generate

This is one of the reasons why some Dags occasionally disappear suddenly when using Dynamic Dags to generate large numbers of Dags in a single Python file

For example:
When loading the 800th Dag instance
DagFileProcessorManager determines that DagFileProcessorProcess has been running too long and times out
It directly terminates this process

Then proceeds to the next round of Dag Processing
Causing the remaining 200 Dags to be judged as “deleted”!
So you see 200 fewer Dags in the UI

But it’s possible that in the next round of Dag Processing, it doesn’t exceed the timeout
And you can see all 1,000 Dags in the UI again

This creates the phenomenon of “Dags occasionally disappearing suddenly”

partition-parsing

To avoid this problem
simply use a partition approach

For example:
Add grouping concepts to config.yaml and add some identical Python files
to let different DagFileProcessorProcess instances handle different groups

Of course, you need another source_dag.py and partition.yaml configuration to maintain a single source of truth
And let the CI/CD pipeline automatically generate these grouped Dag Files

config.yaml with partition:

group_0:
- name: Facebook
  config: 300
group_1:
- name: Apple
  config: 200

group_0.py:

# get the current group name from the file path
current_file_name = os.path.basename(__file__)
current_group = current_file_name.split('.')[0]

# only import the customers for the current group
with open("config.yaml", 'r') as f:
  customers = yaml.safe_load(f)[current_group]

# generate the Dags for the current group
for customer in customers:
    @dag(
      dag_id=f"{current_group}_{customer}_dag",
      start_date=datetime(2022, 2, 1)
    )
    def dynamic_generated_dag():
        @task
        def print_message(message):
            print(message)
    dynamic_generated_dag()

Related

External Event Driven Scheduling in Airflow
·8 mins
Blog En Open-Source-Contribution Data-Engineering
Exploring the AIP-82 External Event Driven Scheduling proposal in Airflow 3, the implementation details and use cases of the Common Message Queue, and how to enable real-time event-driven workflow scheduling in Airflow.
Apache Airflow 101
·9 mins
Blog En Data-Engineering
What is Apache Airflow? What problems does it solve? Understand its common use cases, architecture, and limitations.
Becoming an Apache Airflow Committer from 0
·14 mins
Blog En Open-Source-Contribution
How a Complete Beginner in Data Engineering Became an Apache Airflow Committer in 5 Months with 70+ PRs and 300 Hours of Contributions
Contributing to Apache Airflow from 0
·7 mins
Blog En Open-Source-Contribution
How a beginner with no prior Data Engineering experience managed to contribute over 50 PRs in just 3 months
Push-based CLI Workflow on MacOS
·3 mins
Blog En
How to reduce context switching in CLI workflows on Mac through push-based notifications
What is an Embedded Database? A brief introduction to RocksDB
·3 mins
Blog En Database
A brief introduction to Embedded Database and concept of RocksDB