i create dag that calculate yesterdays date and use it in all the dag's tasks. Is there any way to pass the dag a parameter (in my case a date) and use it in all my operators call ? (i cant change operator's code i can only use it) and if my dag doe'snt get any date from outside - to calculate the yesterdays date and use it in all the flow?
so basically i want to ask if we can trigger the dag manually with parameter and if the dag doesnt get any parameter to use/calculate default value (in my case is to calculate yesterday's date)
need your help :)
ag_args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(1),
'email': ['lalala@example.com'],
'email_on_failure': True,
'email_on_retry': False,
'email_on_success': True,
'retries': 1,
}
dag = DAG(
dag_id='example_dag',
default_args=dag_args,
schedule_interval=None
)
t1 = SimpleHttpOperator(
task_id='check_if_daily_report_ready',
method='GET',
endpoint="/bla/bla?date={date}".format(
date=(datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")),
http_conn_id="conn1",
headers={"Content-Type": "application/json"},
response_check=lambda response: True if response.status_code == 200 else False,
dag=dag,
)
t2 = Queryperator(
task_id='cal',
query_file='ca.sql',
query_folder='include/sql_files/bla',
token='Token',
default_param_dict={"date": (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")},
dag=dag
)
export_file_from_S3_to_HDFS = ExportOperator(
task_id='export_file_from_s3_to_hdfs',
source_dict=s3_source_properties,
target_dict='{user}/bla_{yesterday}_bla_raw_data.csv'.format(
user=s3_source_bucket_path,
yesterday=(datetime.now() - timedelta(days=1)).strftime("%Y%m%d"))),
token='Token',
dag=dag
)