1

i create dag that calculate yesterdays date and use it in all the dag's tasks. Is there any way to pass the dag a parameter (in my case a date) and use it in all my operators call ? (i cant change operator's code i can only use it) and if my dag doe'snt get any date from outside - to calculate the yesterdays date and use it in all the flow?

so basically i want to ask if we can trigger the dag manually with parameter and if the dag doesnt get any parameter to use/calculate default value (in my case is to calculate yesterday's date)

need your help :)


ag_args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(1),
    'email': ['lalala@example.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'email_on_success': True,
    'retries': 1,
}

dag = DAG(
    dag_id='example_dag',
    default_args=dag_args,
    schedule_interval=None
)

t1 = SimpleHttpOperator(
    task_id='check_if_daily_report_ready',
    method='GET',
    endpoint="/bla/bla?date={date}".format(
        date=(datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")),
    http_conn_id="conn1",
    headers={"Content-Type": "application/json"},
    response_check=lambda response: True if response.status_code == 200 else False,
    dag=dag,
)



t2 = Queryperator(
    task_id='cal',
    query_file='ca.sql',
    query_folder='include/sql_files/bla',
    token='Token',
    default_param_dict={"date": (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")},
    dag=dag
)

export_file_from_S3_to_HDFS = ExportOperator(
    task_id='export_file_from_s3_to_hdfs',
    source_dict=s3_source_properties,
    target_dict='{user}/bla_{yesterday}_bla_raw_data.csv'.format(
        user=s3_source_bucket_path,
        yesterday=(datetime.now() - timedelta(days=1)).strftime("%Y%m%d"))),
    token='Token',
    dag=dag
)

lobliba
  • 105
  • 9

1 Answers1

-1

To pass a parameter into your dag you can use the method described here.

Then just use a template to define the operator's parameter, I guess the template would be something like this:

'{{ dag_run.conf.get("your_parameter_key", (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d") }}" '
FraDel
  • 174
  • 5