30

I have written a DAG with multiple PythonOperators

task1 = af_op.PythonOperator(task_id='Data_Extraction_Environment',
                          provide_context=True,
                          python_callable=Task1, dag=dag1)

def Task1(**kwargs):
    return(kwargs['dag_run'].conf.get('file'))

From PythonOperator i am calling "Task1" method. That method is returning a value,that value i need to pass to the next PythonOperator.How can i get the value from the "task1" variable or How can i get the value which is returned from Task1 method?

updated :

    def Task1(**kwargs):
          file_name = kwargs['dag_run'].conf.get[file]
          task_instance = kwargs['task_instance']
          task_instance.xcom_push(key='file', value=file_name) 
          return file_name

  t1 = PythonOperator(task_id = 'Task1',provide_context=True,python_callable=Task1,dag=dag)

  t2 =   BashOperator(
      task_id='Moving_bucket', 
      bash_command='python /home/raw.py {{ task_instance.xcom_pull(task_ids='Task1',key='file') }} ',
      dag=dag,
    )

t2.set_upstream(t1)
4
  • Yes...Yes i am executing in the same fashion...i have updated the code Commented May 3, 2018 at 11:29
  • At first i didn't add that manually...it gave "None" then i have added,In both the cases value is "None" Commented May 3, 2018 at 12:02
  • Yes..i checked that..... It is printing the value in the log.... Commented May 3, 2018 at 12:09
  • I am executing DAG from the URL like "localhost:8080/admin/rest_api/…{%22file%22:%22data.csv%22}" Commented May 3, 2018 at 12:13

1 Answer 1

46

You might want to check out Airflow's XCOM: https://airflow.apache.org/docs/apache-airflow/stable/concepts/xcoms.html

If you return a value from a function, this value is stored in xcom. In your case, you could access it like so from other Python code:

task_instance = kwargs['task_instance']
task_instance.xcom_pull(task_ids='Task1')

or in a template like so:

{{ task_instance.xcom_pull(task_ids='Task1') }}

If you want to specify a key you can push into XCOM (being inside a task):

task_instance = kwargs['task_instance']
task_instance.xcom_push(key='the_key', value=my_str)

Then later on you can access it like so:

task_instance.xcom_pull(task_ids='my_task', key='the_key')

EDIT 1

Follow-up question: Instead of using the value in another function how can i pass the value to another PythonOperator like - "t2 = "BashOperator(task_id='Moving_bucket', bash_command='python /home/raw.py "%s" '%file_name, dag=dag)" --- i want to access file_name which is returned by "Task1". How can this will be acheived?

First of all, it seems to me that the value is, in fact, not being passed to another PythonOperator but to a BashOperator.

Secondly, this is already covered in my answer above. The field bash_command is templated (see template_fields in the source: https://github.com/apache/incubator-airflow/blob/master/airflow/operators/bash_operator.py). Hence, we can use the templated version:

BashOperator(
  task_id='Moving_bucket', 
  bash_command='python /home/raw.py {{ task_instance.xcom_pull(task_ids='Task1') }} ',
  dag=dag,
)

EDIT 2

Explanation: Airflow works like this: It will execute Task1, then populate xcom and then execute the next task. So for your example to work you need Task1 executed first and then execute Moving_bucket downstream of Task1.

Since you are using a return function, you could also omit the key='file' from xcom_pull and not manually set it in the function.

Sign up to request clarification or add additional context in comments.

3 Comments

Instead of using the value in another function how can i pass the value to another PythonOperator like - "t2 = "BashOperator(task_id='Moving_bucket', bash_command='python /home/raw.py "%s" '%file_name, dag=dag)" --- i want to access file_name which is returned by "Task1". How can this will be acheived?
I have updated the question when i am doing that i am getting "NONE".when i am printing the file_name,it is printing the file_name. am i missing anything there?
Your bash operator does not work for me. It works replacing the single quotes by double quotes: task_ids="Task1" (note: My bash command is echo.)

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.