OpenSource4You is a community actively contributing to large-scale open source software projects in Taiwan.
I will be hosting the Airflow Mandarin Meeting at the following time and location:
Time: Every other Wednesday at 8:30 PM starting July 2nd (Taiwan time UTC+8)
Location: https://opensource4you.tw/airflow/meeting
Calendar: https://opensource4you.tw/readme/calendar
If you’re interested in Airflow’s latest technical developments or contributing to Airflow itself.
Welcome to join our Airflow meetings!
What is Event-Driven Architecture?#
Event-Driven Architecture can be simply defined as a system that “takes corresponding actions only when specific events occur.”
It is particularly suitable for scenarios requiring more immediate responses.
The common understanding of Airflow’s scheduling capabilities is based on Cron & Time Intervals, such as triggering a DAG periodically every X hours, daily, or monthly to complete a workflow.
However, Airflow can also be triggered by events!
Today, we’re going to explore a new feature in Airflow 3: External Event-Driven Scheduling.
This is part of the AIP-82 External event driven scheduling in Airflow proposal, and it is also highly related to the Common Message Queue.
In this article, we will delve into the implementation details and application scenarios of these features.
How to Write an Event-Driven DAG Before AIP-82#
Before AIP-82, writing event-driven DAGs was possible, but it required a combination of Continuous Scheduling, max_active_runs=1, and a Sensor.
This approach involves continuously scheduling a DAG and using a Sensor in the first task to wait for an external event.
Here is an example of how to listen to specific Apache Kafka topics to trigger a DAG before AIP-82:
# copy from https://www.astronomer.io/docs/learn/airflow-kafka
@dag(
start_date=datetime(2023, 4, 1),
schedule="@continuous",
max_active_runs=1,
catchup=False,
render_template_as_native_obj=True,
)
def listen_to_the_stream():
listen_for_mood = AwaitMessageTriggerFunctionSensor(
task_id="listen_for_mood",
kafka_config_id="kafka_listener",
topics=[KAFKA_TOPIC],
# the apply function will be used from within the triggerer, this is
# why it needs to be a dot notation string
apply_function="listen_to_the_stream.listen_function",
poll_interval=5,
poll_timeout=1,
apply_function_kwargs={"pet_moods_needing_a_walk": PET_MOODS_NEEDING_A_WALK},
event_triggered_function=event_triggered_function,
)
- When the use case involves only sporadic events, the execution proceeds as follows:
- Only one DagRun executes at any given time.
- The first task of the DagRun is a Sensor that continuously polls for an external event.
- When the external event occurs, the workflow proceeds to the next task.
- This approach works well for sporadic events.
- However, when the use case involves a continuous stream of many events, the execution changes:
- Still, only one DagRun can execute at a time.
- Even if subsequent events arrive, the system must wait for all tasks of the previous DagRun to complete before starting the next one.
- This means that an application that should provide real-time processing degrades into a batch-processing model!
Another point to consider is that if the Operator or Sensor waiting for the external event does not implement Deferrable Operators, it will occupy an extra worker slot.
AIP-82 External event driven scheduling in Airflow#
Simply put, this is achieved by combining Assets and Triggers to create a DAG-level event-driven architecture.
Since it uses Triggers, the event-driven mechanism here is poll-based, not push-based.

This approach offers the following benefits:
- Reduces the overhead of scheduling an extra DagRun just to start polling.
- It directly creates a Trigger, bypassing the DagRun.
- This eliminates the scheduling overhead for the “polling” phase, making it more responsive.
- A
DagRunis scheduled only when the conditions are met and theAssetEventis updated.
- It directly creates a Trigger, bypassing the DagRun.
- For the end-user, it eliminates the need to define a Sensor task in the DAG.
AIP-82 Implementation Details#
Next, we will detail the implementation specifics of AIP-82 in the Airflow core.
AIP-82 Create references between triggers and assets #43666#
This PR adds a many-to-many relationship between the asset and trigger tables (
Many-to-Many Association Table).
It only adds Foreign Keys and Indexes, so it doesn’t require an Association Object to define the many-to-many relationship.
AIP-82 Save references between assets and triggers #43826#
There are three main parts to this PR:
- End-users now have a
watchersparameter when defining anAsset
# copy from https://github.com/apache/airflow/pull/43826 PR description
trigger = SqsSensorTrigger(sqs_queue="my_queue")
asset = Asset("example_asset_watchers", watchers=[trigger]) # new `watchers` parameter
with DAG(
dag_id="example_dataset_watcher",
schedule=[asset],
catchup=False,
):
task = EmptyOperator(task_id="task",)
chain(task)
- During DAG processing, the newly added
watchersmust also be added to theassetmodel.
- In bulk_write_to_db, when an
assetis recorded,- it uses add_asset_trigger_references to record the newly defined
watchers.- add_asset_trigger_references is not just about “adding triggers related to the asset.”
- It’s more of a sync operation, dividing references into
refs_to_addandrefs_to_removeto update thetriggerassociations for thatasset.
- It’s more of a sync operation, dividing references into
- add_asset_trigger_references is not just about “adding triggers related to the asset.”
- The previous PR only defined the many-to-many relationship between
assetandtrigger.- The type of
watchersislist[Trigger]. - This PR handles the actual writing of the
triggerinto theassettable.
- The type of
- it uses add_asset_trigger_references to record the newly defined
While reviewing the PR comments, I also learned that Airflow currently does not delete records from the
assettable.
- Serialization of
trigger
The remaining changes are minor adjustments to some methods, as repr is used to record the trigger in the asset.
However, the use of
reprwas later replaced withhashbecause a string representation is not suitable as a unique identifier for an object. Atriggeris composed of aclasspathandkwargs. Ultimately, it is generated byhash((classpath, json.dumps(BaseSerialization.serialize(kwargs)).encode("utf-8"))).
AIP-82 Handle trigger serialization #45562#
This PR mainly deals with integration and unit testing for the AIP-72 Task SDK.
For AIP-82 itself, it introduces AssetWatcher to make the semantics of watchers clearer.
# copy from https://github.com/apache/airflow/pull/45562 PR description
trigger = FileTrigger(....)
asset = Asset("<my_queue>", watchers=[AssetWatcher(name="my_file_watcher", trigger=trigger)])
with DAG(
dag_id=DAG_ID,
schedule=asset,
):
empty_task = EmptyOperator(task_id="empty_task")
chain(empty_task)
AssetWatcher inherits from BaseTrigger but adds a name field.
AIP-82 Introduce BaseEventTrigger as base class for triggers used with event driven scheduling #46391#
In the [LAZY CONSENSUS] AIP-82 - Create new interface BaseEventTrigger discussion thread,
@Vicent Beck clearly explains why an additional BaseEventTrigger interface is needed to work with AssetWatcher and restrict which triggers are suitable for Event-Driven Scheduling.
- The example in the discussion thread is as follows:
- If a DAG uses an
S3KeyTriggeras awatcher, - as long as the file
<my-file>exists in the<my-bucket>bucket,trigger = S3KeyTrigger(bucket_name="<my-bucket>", bucket_key="<my-file>") asset = Asset("s3_asset", watchers=[ AssetWatcher(name="s3_asset_watcher", trigger=trigger) ]) with DAG( dag_id="example_dag", schedule=[asset], catchup=False, ): ... - this DAG will be triggered continuously!
- If a DAG uses an
- This is not a bug but the expected behavior of the trigger.
- However, without special restrictions on which triggers are suitable for Event-Driven Scheduling, users could inadvertently write DAGs that are triggered indefinitely, as shown above.
So, after this PR introduced the BaseEventTrigger interface, AssetWatcher will only accept subclasses that inherit from BaseEventTrigger to prevent infinite triggering situations.
Common Message Queue Trigger#
AIP-82 mentions the following event-driven scenarios:
Ideally, there should be an end-to-end solution in Airflow to trigger DAGs based on external events such as:
- A message has been pushed to a message queue such as Google Pub/Sub, Amazon SQS, Azure Message Bus, Apache Kafka, …
- A file has been created in a storage service
- A database has been updated
Among these, it mentions triggering DAGs via a Message Queue. So, Vikram Koka proposed [DISCUSS]: Proposing a “common message queue” abstraction. The goal is to have a simpler, unified interface for users to write Event-Driven Scheduling DAGs.
Regardless of which Message Queue the user uses, they can use the same MessageQueueTrigger(queue=...) class and provide different queue URIs.
# copy from https://github.com/apache/airflow/pull/46694/files
from __future__ import annotations
from airflow.models.dag import DAG
from airflow.providers.common.messaging.triggers.msg_queue import MessageQueueTrigger
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.sdk.definitions.asset import Asset, AssetWatcher
trigger = MessageQueueTrigger(queue="https://sqs.us-east-1.amazonaws.com/0123456789/my-queue")
asset = Asset("sqs_asset", watchers=[AssetWatcher(name="sqs_asset_watcher", trigger=trigger)])
with DAG(
dag_id="example_msgq_watcher",
schedule=[asset],
catchup=False,
):
EmptyOperator(task_id="task")
As explained in the section on AIP-82 Introduce BaseEventTrigger as base class for triggers used with event driven scheduling #46391,
MessageQueueTriggerwill also inherit fromBaseEventTriggerto restrict its use to Event-Driven Scheduling scenarios.
Currently, the providers that have implemented the Common Message Queue Trigger are:
Conclusion: Advantages and Limitations#
AIP-82 External event driven scheduling in Airflow and the Common Message Queue allow Airflow to:
- Reduce unnecessary scheduling overhead
- Trigger DagRuns more promptly
- Simplify the writing of Event-Driven DAGs
The current implementation still has some limitations:
- Only two
CommonMessageQueueTriggerproviders have been implemented because no one else has contributed them.
- These are the aforementioned Amazon SQS and Apache Kafka implementations.
- This limits user choice and flexibility when using Event-Driven scheduling.
- Database I/O bottleneck
- DagRuns, TaskInstances, Assets, and Triggers all need to continuously update their status in the database.
- Therefore, the database I/O bottleneck must be considered for the use case, and a reasonable polling interval must be set.
But the good news is:
Anyone can be that “no one”!
It is precisely because “no one has implemented it” that everyone has the opportunity to contribute!
Adding a provider’s Trigger is not overly difficult, but it can have a significant impact on users. If you are interested, you can refer to AIP-82: Add KafkaMessageQueueProvider.
References#
- Apache Airflow
- Airflow Improvement Proposal
- PRs related to AIP-82
- https://github.com/apache/airflow/pulls?q=%22AIP-82%22
- https://github.com/apache/airflow/pull/43666/files
- https://github.com/apache/airflow/pull/43826/files
- https://github.com/apache/airflow/pull/45562/files
- https://github.com/apache/airflow/pull/46912/files
- https://github.com/apache/airflow/pull/44369/files
- https://github.com/apache/airflow/pull/44664/files
- AIP-82: Add KafkaMessageQueueProvider
- SQLAlchemy

