1

I have a PythonVirtualenvOperator which reads some data from a database - if there is no new data, then the DAG should end there, otherwise it should call additional tasks e.g

#dag.py

load_data >>[if_data,if_no_data]>>another_task>>last_task

I understand that it can be done using PythonBranchOperator but I can't see how I can combine the venv and the branch-operator.

Is it doable?

1
  • The PythonVirtualenvOperator is the load_data ? have you considered returning true/false with xcom ? Commented May 20, 2021 at 13:07

2 Answers 2

1

This can be solved using Xcom. load_date can push the number of records it processed (new data).

Your pipe can be:

def choose(**context):
    value = context['ti'].xcom_pull(task_ids='load_data')
    if int(value)>0:
        return 'if_data'
    return 'if_no_data'

branch = BranchPythonOperator(
    task_id='branch_task',
    provide_context=True, # Remove this line if Airflow>=2.0.0
    python_callable=choose)

load_data >> branch >>[if_data,if_no_data]>>another_task>>last_task
Sign up to request clarification or add additional context in comments.

2 Comments

What if I want to stop when if_no_data is triggered i.e another_task and last_task should only be run after if_data
Nvm, I figured out adding another_task.set_upstream(if_data) and last_task.set_upstream(another_task) before load_data >> branch >>[if_data, no_data] made the deal
0

This can be solved by using BranchPythonVirtualenvOperator

    branching_venv = BranchPythonVirtualenvOperator(
        task_id="branching_venv",
        requirements=["numpy~=1.24.4"],
        python_callable=branch_with_venv
    )

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.