-1

I'm working on this airflow dag file to do some test with XCOM, but not sure how to use it between python operators. Can someone please help how to write the logic to pass a message between the python operators using XCOM push and pull functions. Below is the dag file that i'm working on...

The question is how to pass a message from each task to another task

@dag_factory
def create_dag():
    with DAG(
        dag_id="DSStest",
        default_args=default_args,
        schedule_interval=timedelta(1),
    ) as dag:
        # Define operators here, for example:

        output_file = path_in_workspace("testout")

        rscript_file = path_in_workspace("rtest2.R ")

        bcmd1 = "downloading some file here..."

        t0 = PythonOperator(
            task_id="start",
            python_callable=my_func2,
            provide_context=True,
            op_args=[output_file, 0],
        )

        t1 = PythonOperator(
            task_id="job1",
            python_callable=my_func1,
            provide_context=True,
            op_args=[output_file, 1],
        )

        t2 = PythonOperator(
            task_id="job2",
            python_callable=my_func1,
            provide_context=True,
            op_args=[output_file, 2],
        )

        t10 = PythonOperator(
            task_id="job10",
            python_callable=my_func2,
            provide_context=True,
            op_args=[output_file, 10],
        )

        t20 = BashOperator(
            task_id="job20",
            bash_command=bcmd1,
            queue={
                "worker_type": "png-bash-worker",
                "request_memory": "1G",
                "request_cpu": 1,
            },
        )

        # Define dependencies between operators here, for example:

        t0 >> t1
        t0 >> t2
        t1 >> t10
        t2 >> t10
        t10 >> t11
        t11 >> t20

        return dag  # Do not change this
3

1 Answer 1

3

I recommend to take a look at this example, it shows/explains everything regarding xcoms and PythonOperators:
example_xcom.py

The official airflow page also explains xcoms in a lot of detail:
official documentation

Sign up to request clarification or add additional context in comments.

1 Comment

That example is great, but let's not pretend the official page explains it in much detail at all, at least at the time of this comment. If you don't already know how xcoms work or have an example to go off of, the existing documentation for this is next to useless, and lacking in necessary context.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.