0

I have been looking at the airflow documentation for creating a scheduled dag run. My use case is i want to create & schedule a dag run on a specified time only once. This time value i will be getting from datetime column from a database table.

For this we do have schedule/schedule_interval DAG parameter but that accepts a datetime objects and cron expression which is recurring in nature. And if i want to run only once schedule/schedule_interval needs to be set to @once.

My ideal workflow would be

  • I'll have a master DAG 1 which scheduled to run every 10mins
  • This master dags reads the date time column values
  • using the datetime values this dag will create other dags which is scheduled to run on specified time which can vary for each dag here (datetime values)

DAG 1 (lets assume this is running at 9:00AM ) --> create 2 DAGs which run at 9:11 & 9:15

Reference documentaion

https://docs.astronomer.io/learn/dynamically-generating-dags
https://docs.astronomer.io/learn/scheduling-in-airflow#parameters

looking for some good suggestions preferably in python

Olaf Kock
  • 46,930
  • 8
  • 59
  • 90
Rohit sai
  • 119
  • 1
  • 10

1 Answers1

0

For your purpose, there is TriggerDagRunOperator. In the DAG that generates others, you can add the TriggerDagRunOperator, specify dag_id, and the required execution time. The operator adds a new dag_run to the queue with the required execution time, but don't forget to enable new DAGs and set schedule_interval=None. An example of DAG is below:

from datetime import datetime

from airflow import DAG
from airflow.decorators import task
from airflow.operators.trigger_dagrun import TriggerDagRunOperator


@task
def start():
    pass


@task
def generate_dag1(ti=None):
    # create dag template and etc
    # put to xcom and required execution datetime
    ti.xcom_push(key="dag_id", value="dag1")
    ti.xcom_push(key="execution_date", value="2023-04-06 15:51:00+00:00")


@task
def generate_dag2(ti=None):
    # create dag template and etc
    # put to xcom dag id and required execution datetime
    ti.xcom_push(key="dag_id", value="dag2")
    ti.xcom_push(key="execution_date", value="2023-04-06 16:00:00+00:00")


def trigger_dag_operator(task_id, xcom_task_id):
    return TriggerDagRunOperator(
        task_id=task_id,
        trigger_dag_id=f"{{{{ ti.xcom_pull(task_ids='{xcom_task_id}', key='dag_id') }}}}",
        execution_date=f"{{{{ ti.xcom_pull(task_ids='{xcom_task_id}', key='execution_date') }}}}",
    )


with DAG(
    dag_id="example_trigger_dag_operator",
    schedule=None,
    start_date=datetime(2023, 4, 1),
    tags=["example"],
    catchup=False
) as dag:
    start = start()
    start >> generate_dag1() >> trigger_dag_operator("trigger_dag1", "generate_dag1")
    start >> generate_dag2() >> trigger_dag_operator("trigger_dag2", "generate_dag2")
  • thanks this was helpful. The same behaviour how I can achieve triggering dag runs dynamically with diff execution dates in loop. The loop iterations is n and how do I chain them then? – Rohit sai Apr 10 '23 at 19:27
  • when i try calling start >> generate_dag1() >> trigger_dag_operator enclosed in a loop it says the the other dag has not yet triggered. i believe some context of current dag is missing? – Rohit sai Apr 10 '23 at 19:41