2

I am still fairly new to Airflow, and trying to figure out the logistics of passing arguments between tasks/dags. My question is - is it possible to pass arguments from a BranchPythonOperator task, into the task_id's that it calls.

ie:

@task
def task_a():
    ***print(a)***
    return {}

def get_task_run(**kwargs):
    a = 'Pass-Argument'
    return 'task_a'

tasks = BranchPythonOperator(
        task_id='get_task_run',
        python_callable=get_task_run,
    )

In the code above for example, is it possible to somehow get the variable 'a' inside the 'task_a' that is called from the BranchPythonOperator?

1
  • If value of the variable is constant then you can use Admin > Variables. There is also XCom option available here Admin > XComs to share the data between tasks. Commented Jun 2, 2021 at 9:08

1 Answer 1

2

One way of doing this could be by doing an xcom_push from withing the get_task_run function and then pulling it from task_a using get_current_context.

from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from airflow.operators.python import get_current_context, BranchPythonOperator

default_args = {
    'owner': 'airflow',
}

@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(1),
     catchup=False, tags=['example'])
def decorated_dag():

    @task
    def task_a():
        context = get_current_context()
        var_from_branch_task = context['ti'].xcom_pull(
            task_ids='branch_task', key='a')
        print(f"Result: {var_from_branch_task}")

    @task
    def task_b():
        print('task_b')

    def _get_task_run(ti):
        if 'something':
            ti.xcom_push(key='a', value='var_pushed_from_branch task')
            return 'task_a'
        else:
            return 'task_b'

    branch_task = BranchPythonOperator(
        task_id='branch_task',
        python_callable=_get_task_run,
    )
    task_a_exec = task_a()
    task_b_exec = task_b()
    branch_task >> [task_a_exec, task_b_exec]

example_decorated_dag = decorated_dag()

Keep in mind that that BranchPythonOperator should return a single task_id or a list of task_ids to follow. Thats why you just can't return a dict or list or tuple to use it as XcomArg with the other decorated tasks. Let me know if that worked for you!

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.