Skip to main content
  1. Open-Source-Contributions/

External Event Driven Scheduling in Airflow

·8 mins· ·
Blog En Open-Source-Contribution Data-Engineering
Zhe-You Liu
Author
Zhe-You Liu
Focused on Contributing to open source, Distributed Systems, and Data Engineering.
Table of Contents

OpenSource4You is a community actively contributing to large-scale open source software projects in Taiwan.
I will be hosting the Airflow Mandarin Meeting at the following time and location:

Time: Every other Wednesday at 8:30 PM starting July 2nd (Taiwan time UTC+8)
Location: airflow.opensource4you.tw
Calendar: calendar.opensource4you.tw

If you’re interested in Airflow’s latest technical developments or contributing to Airflow itself.
Welcome to join our Airflow meetings!

What is Event-Driven Architecture?
#

Event-Driven Architecture can be simply defined as a system that “takes corresponding actions only when specific events occur.” It is particularly suitable for scenarios requiring more immediate responses.

event-driven

The common understanding of Airflow’s scheduling capabilities is based on Cron & Time Intervals, such as triggering a DAG periodically every X hours, daily, or monthly to complete a workflow.

However, Airflow can also be triggered by events!

Today, we’re going to explore a new feature in Airflow 3: External Event-Driven Scheduling. This is part of the AIP-82 External event driven scheduling in Airflow proposal, and it is also highly related to the Common Message Queue.
In this article, we will delve into the implementation details and application scenarios of these features.

How to Write an Event-Driven DAG Before AIP-82
#

Before AIP-82, writing event-driven DAGs was possible, but it required a combination of Continuous Scheduling, max_active_runs=1, and a Sensor.

This approach involves continuously scheduling a DAG and using a Sensor in the first task to wait for an external event.

Here is an example of how to listen to specific Apache Kafka topics to trigger a DAG before AIP-82:

# copy from https://www.astronomer.io/docs/learn/airflow-kafka
@dag(
    start_date=datetime(2023, 4, 1),
    schedule="@continuous",
    max_active_runs=1,
    catchup=False,
    render_template_as_native_obj=True,
)
def listen_to_the_stream():
    listen_for_mood = AwaitMessageTriggerFunctionSensor(
        task_id="listen_for_mood",
        kafka_config_id="kafka_listener",
        topics=[KAFKA_TOPIC],
        # the apply function will be used from within the triggerer, this is
        # why it needs to be a dot notation string
        apply_function="listen_to_the_stream.listen_function",
        poll_interval=5,
        poll_timeout=1,
        apply_function_kwargs={"pet_moods_needing_a_walk": PET_MOODS_NEEDING_A_WALK},
        event_triggered_function=event_triggered_function,
    )
  • When the use case involves only sporadic events, the execution proceeds as follows:
    • Only one DagRun executes at any given time.
    • The first task of the DagRun is a Sensor that continuously polls for an external event.
      • When the external event occurs, the workflow proceeds to the next task.
    • This approach works well for sporadic events.
  • However, when the use case involves a continuous stream of many events, the execution changes:
    • Still, only one DagRun can execute at a time.
    • Even if subsequent events arrive, the system must wait for all tasks of the previous DagRun to complete before starting the next one.
    • This means that an application that should provide real-time processing degrades into a batch-processing model!

Another point to consider is that if the Operator or Sensor waiting for the external event does not implement Deferrable Operators, it will occupy an extra worker slot.

AIP-82 External event driven scheduling in Airflow
#

Simply put, this is achieved by combining Assets and Triggers to create a DAG-level event-driven architecture.

Since it uses Triggers, the event-driven mechanism here is poll-based, not push-based.

external-event-driven-scheduling

This approach offers the following benefits:

  • Reduces the overhead of scheduling an extra DagRun just to start polling.
    • It directly creates a Trigger, bypassing the DagRun.
      • This eliminates the scheduling overhead for the “polling” phase, making it more responsive.
    • A DagRun is scheduled only when the conditions are met and the AssetEvent is updated.
  • For the end-user, it eliminates the need to define a Sensor task in the DAG.

AIP-82 Implementation Details
#

Next, we will detail the implementation specifics of AIP-82 in the Airflow core.

AIP-82 Create references between triggers and assets #43666
#

This PR adds a many-to-many relationship between the asset and trigger tables ( Many-to-Many Association Table). It only adds Foreign Keys and Indexes, so it doesn’t require an Association Object to define the many-to-many relationship.

AIP-82 Save references between assets and triggers #43826
#

There are three main parts to this PR:

  1. End-users now have a watchers parameter when defining an Asset
# copy from https://github.com/apache/airflow/pull/43826 PR description

trigger = SqsSensorTrigger(sqs_queue="my_queue")
asset = Asset("example_asset_watchers", watchers=[trigger]) # new `watchers` parameter

with DAG(
    dag_id="example_dataset_watcher",
    schedule=[asset],
    catchup=False,
):
    task = EmptyOperator(task_id="task",)
    chain(task)
  1. During DAG processing, the newly added watchers must also be added to the asset model.
  • In bulk_write_to_db, when an asset is recorded,
    • it uses add_asset_trigger_references to record the newly defined watchers.
      • add_asset_trigger_references is not just about “adding triggers related to the asset.”
        • It’s more of a sync operation, dividing references into refs_to_add and refs_to_remove to update the trigger associations for that asset.
    • The previous PR only defined the many-to-many relationship between asset and trigger.
      • The type of watchers is list[Trigger].
      • This PR handles the actual writing of the trigger into the asset table.

While reviewing the PR comments, I also learned that Airflow currently does not delete records from the asset table.

  1. Serialization of trigger

The remaining changes are minor adjustments to some methods, as repr is used to record the trigger in the asset.

However, the use of repr was later replaced with hash because a string representation is not suitable as a unique identifier for an object. A trigger is composed of a classpath and kwargs. Ultimately, it is generated by hash((classpath, json.dumps(BaseSerialization.serialize(kwargs)).encode("utf-8"))).

AIP-82 Use hash instead of repr #44797

AIP-82 Handle trigger serialization #45562
#

This PR mainly deals with integration and unit testing for the AIP-72 Task SDK. For AIP-82 itself, it introduces AssetWatcher to make the semantics of watchers clearer.

# copy from https://github.com/apache/airflow/pull/45562 PR description
trigger = FileTrigger(....)
asset = Asset("<my_queue>", watchers=[AssetWatcher(name="my_file_watcher", trigger=trigger)])

with DAG(
    dag_id=DAG_ID,
    schedule=asset,
):
    empty_task = EmptyOperator(task_id="empty_task")

    chain(empty_task)

AssetWatcher inherits from BaseTrigger but adds a name field.

AIP-82 Introduce BaseEventTrigger as base class for triggers used with event driven scheduling #46391
#

In the [LAZY CONSENSUS] AIP-82 - Create new interface BaseEventTrigger discussion thread, @Vicent Beck clearly explains why an additional BaseEventTrigger interface is needed to work with AssetWatcher and restrict which triggers are suitable for Event-Driven Scheduling.

  • The example in the discussion thread is as follows:
    • If a DAG uses an S3KeyTrigger as a watcher,
    • as long as the file <my-file> exists in the <my-bucket> bucket,
      trigger = S3KeyTrigger(bucket_name="<my-bucket>", bucket_key="<my-file>")
      asset = Asset("s3_asset", watchers=[
        AssetWatcher(name="s3_asset_watcher", trigger=trigger)
      ])
      
      with DAG(
        dag_id="example_dag",
        schedule=[asset],
        catchup=False,
      ):
        ...
      
    • this DAG will be triggered continuously!
  • This is not a bug but the expected behavior of the trigger.
    • However, without special restrictions on which triggers are suitable for Event-Driven Scheduling, users could inadvertently write DAGs that are triggered indefinitely, as shown above.

So, after this PR introduced the BaseEventTrigger interface, AssetWatcher will only accept subclasses that inherit from BaseEventTrigger to prevent infinite triggering situations.

Common Message Queue Trigger
#

AIP-82 mentions the following event-driven scenarios:

Ideally, there should be an end-to-end solution in Airflow to trigger DAGs based on external events such as:

  • A message has been pushed to a message queue such as Google Pub/Sub, Amazon SQS, Azure Message Bus, Apache Kafka, …
  • A file has been created in a storage service
  • A database has been updated

Among these, it mentions triggering DAGs via a Message Queue. So, Vikram Koka proposed [DISCUSS]: Proposing a “common message queue” abstraction. The goal is to have a simpler, unified interface for users to write Event-Driven Scheduling DAGs.

Regardless of which Message Queue the user uses, they can use the same MessageQueueTrigger(queue=...) class and provide different queue URIs.

# copy from https://github.com/apache/airflow/pull/46694/files

from __future__ import annotations

from airflow.models.dag import DAG
from airflow.providers.common.messaging.triggers.msg_queue import MessageQueueTrigger
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.sdk.definitions.asset import Asset, AssetWatcher

trigger = MessageQueueTrigger(queue="https://sqs.us-east-1.amazonaws.com/0123456789/my-queue")
asset = Asset("sqs_asset", watchers=[AssetWatcher(name="sqs_asset_watcher", trigger=trigger)])

with DAG(
    dag_id="example_msgq_watcher",
    schedule=[asset],
    catchup=False,
):
    EmptyOperator(task_id="task")

As explained in the section on AIP-82 Introduce BaseEventTrigger as base class for triggers used with event driven scheduling #46391, MessageQueueTrigger will also inherit from BaseEventTrigger to restrict its use to Event-Driven Scheduling scenarios.

Currently, the providers that have implemented the Common Message Queue Trigger are:

Conclusion: Advantages and Limitations
#

AIP-82 External event driven scheduling in Airflow and the Common Message Queue allow Airflow to:

  1. Reduce unnecessary scheduling overhead
  2. Trigger DagRuns more promptly
  3. Simplify the writing of Event-Driven DAGs

The current implementation still has some limitations:

  1. Only two CommonMessageQueueTrigger providers have been implemented because no one else has contributed them.
  • These are the aforementioned Amazon SQS and Apache Kafka implementations.
  • This limits user choice and flexibility when using Event-Driven scheduling.
  1. Database I/O bottleneck
  • DagRuns, TaskInstances, Assets, and Triggers all need to continuously update their status in the database.
  • Therefore, the database I/O bottleneck must be considered for the use case, and a reasonable polling interval must be set.

But the good news is: Anyone can be that “no one”!
It is precisely because “no one has implemented it” that everyone has the opportunity to contribute!

Adding a provider’s Trigger is not overly difficult, but it can have a significant impact on users. If you are interested, you can refer to AIP-82: Add KafkaMessageQueueProvider.

Let’s be “no one” together!

be-nobody

References
#

Related

Apache Airflow 101
·9 mins
Blog En Data-Engineering
What is Apache Airflow? What problems does it solve? Understand its common use cases, architecture, and limitations.
Becoming an Apache Airflow Committer from 0
·14 mins
Blog En Open-Source-Contribution
How a Complete Beginner in Data Engineering Became an Apache Airflow Committer in 5 Months with 70+ PRs and 300 Hours of Contributions
Contributing to Apache Airflow from 0
·7 mins
Blog En Open-Source-Contribution
How a beginner with no prior Data Engineering experience managed to contribute over 50 PRs in just 3 months
What is an Embedded Database? A brief introduction to RocksDB
·3 mins
Blog En Database
A brief introduction to Embedded Database and concept of RocksDB
Push-based CLI Workflow on MacOS
·3 mins
Blog En
How to reduce context switching in CLI workflows on Mac through push-based notifications
Life Is Short Use 'uv'
·1 min
Blog En Python
How to use uv to manage Python packages and projects