4

I have a stored XCom value that I wanted to pass to another python function which is not called using PythonOperator.


def sql_file_template():
    <some code which uses xcom variable>

def call_stored_proc(**kwargs):
        
        #project = kwargs['row_id']
        print("INSIDE CALL STORE PROC ------------")   
        query = """CALL `{0}.dataset_name.store_proc`(
                          '{1}' # source table
                        , ['{2}'] # row_ids
                        , '{3}' # pivot_col_name   
                        , '{4}' # pivot_col_value
                        ,  100 # max_columns
                        , 'MAX' # aggregation
                );"""
        query = query.format(kwargs['project'],kwargs['source_tbl'] ,kwargs['row_id'],kwargs['pivot_col'],kwargs['pivot_val'])
        job = client.query(query, location="US")
        for result in job.result():
            task_instance = kwargs['task_instance']
            task_instance.xcom_push(key='query_string', value=result) 
                print result
                return result



bq_cmd = PythonOperator (
    task_id=                    'task1'
    provide_context=            True,
    python_callable=            call_stored_proc,
    op_kwargs=                  {'project'        : project,
                                 'source_tbl'     : source_tbl,
                                 'row_id'         : row_id,
                                 'pivot_col'      : pivot_col,
                                 'pivot_val'      : pivot_val
                                },
    dag=                        dag
)

dummy_operator >> bq_cmd
sql_file_template()

The output of stored proc is a string which is captured using xcom.

Now I would like to pass this value to some python function sql_file_template without using PythonOperator.

As per Airflow documentation xcom can be accessed only between tasks.

Can anyone help on this?

2 Answers 2

3

If you have access to the Airflow installation you'd like to query (configuration, database access, and code) you can use Airflow's airflow.models.XCom:get_one class method:

from datetime import datetime

from airflow.models import XCom


execution_date = datetime(2020, 8, 28)
xcom_value = XCom.get_one(execution_date=execution_date,
                          task_id="the_task_id",
                          dag_id="the_dag_id")            
Sign up to request clarification or add additional context in comments.

Comments

0

So you want to access XCOM outside Airflow (probably a different project / module, without creating any Airflow DAGs / tasks)?


Airflow uses SQLAlchemy for mapping all it's models (including XCOM) to corresponding SQLAlchemy backend (meta-db) tables

Therefore this can be done in two ways

  1. Leverage Airflow's SQLAlchemy model

    (without having to create a task or DAG). Here's an untested code snippet for reference

from typing import List
from airflow.models import XCom
from airflow.settings import Session
from airflow.utils.db import provide_session
from pendulum import Pendulum


@provide_session
def read_xcom_values(dag_id: str,
                     task_id: str,
                     execution_date: Pendulum,
                     session: Optional[Session]) -> List[str]:
    """
    Function that reads and returns 'values' of XCOMs with given filters
    :param dag_id: 
    :param task_id: 
    :param execution_date: datetime object
    :param session: Airflow's SQLAlchemy Session (this param must not be passed, it will be automatically supplied by
                    '@provide_session' decorator)
    :return: 
    """
    # read XCOMs
    xcoms: List[XCom] = session.query(XCom).filter(
        XCom.dag_id == dag_id, XCom.task_id == task_id,
        XCom.execution_date == execution_date).all()
    # retrive 'value' fields from XCOMs
    xcom_values: List[str] = list(map(lambda xcom: xcom.value, xcoms))
    return xcom_values

Do note that since it is importing airflow packages, it still requires working airflow installation on python classpath (as well as connection to backend-db), but here we are not creating any tasks or dags (this snippet can be run in a standalone python file)

For this snippet, I have referred to views.py which is my favorite place to peek into Airflow's SQLAlchemy magic


  1. Directly query Airflow's SQLAlchemy backend meta-db

    Connect to meta db and run this query

    SELECT value FROM xcom WHERE dag_id='' AND task_id='' AND ..

3 Comments

I would like access the xcom in same code and not outside. The above code is airflow dag code and I would like access the xcom variable in same code
@Neha0908 both the proposed solutions would work irrespective of whether you are using in Airflow code, or independent from it. When using XCOM in task, you get to call xcom_pull method from TaskInstance class, which is just a shortcut (underlying it uses the same thing as above). see this and this
and while you can do it in a dag-definition file also, it can unnecessarily load your backend db, read point-2 here

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.