how to pass parameters from pythonoperator task to simplehttpoperator task in airflow dag? -
i want trigger simplehttpoperator,like this: airflow trigger_dag test_trigger --conf '{"name":"something"}'
then use pythonoperator python_callable accept parameters using kwargs['dag_run'].conf , , want pass ['dag_run'].conf simplehttpoperator, how can it? can help?
cc_ = {} def run_this_func(ds, **kwargs): cc_ = kwargs['dag_run'].conf logging.info(cc_) return cc_ run_this = pythonoperator( task_id='run_this', provide_context=true, python_callable=run_this_func, dag=dag) http_task = simplehttpoperator( task_id='http_task', http_conn_id='test_http', method='post', endpoint='/api/v1/function', data=cc_, headers={"authorization": "basic ywrtaw46mtizndu2", "accept": "application/json, text/plain, */*"}, response_check=lambda response: true if "10000" in response.content else false, dag=dag) http_task.set_upstream(run_this)
for communication between tasks, might want check xcom, https://airflow.incubator.apache.org/concepts.html#xcoms
*****update*****
(thanks daniel more detail) below codes can give try, in simplehttpoperator return value via xcom:
http_task = simplehttpoperator( task_id='http_task', http_conn_id='test_http', method='post', endpoint='/api/v1/function', data=json.loads("{{ task_instance.xcom_pull(task_ids='run_this', key='return_value') }}"), headers={"authorization": "basic ywrtaw46mtizndu2", "accept": "application/json, text/plain, */*"}, response_check=lambda response: true if "10000" in response.content else false, dag=dag)
Comments
Post a Comment