2

I am creating some BashOperators within PythonOperator but it is not working. Task t1 reads conf parameter from the context have and return a value and second task t2 which is Python operator read the value using xcom, I am able to read the value but task in for loop are not create.

Is it Possible to create a BashOperator in PythonOperator?

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime


def load_properties(comment_char='#', sep='=', **kwargs):
    #some processing
    return kwargs['dag_run'].conf.get('propname')


def createtask(**kwargs):
    property_from_task1=ti.xcom_pull(key=None, task_ids='t1')
    for value in range(property_from_task1):
        task = BashOperator(task_id=task_name, bash_command='echo creating a commond', dag=dag)



default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2017, 12, 21),
    'provide_context': True,
}




dag = DAG(
    'param', default_args=default_args,schedule_interval=None)

t1 = PythonOperator(task_id='t1',
                    python_callable=load_properties,
                    provide_context=True,
                    dag=dag)


t2 = PythonOperator(task_id='t2',
                    python_callable=createtask,
                    provide_context=True,
                    dag=dag)


t2.set_upstream(t1)

2 Answers 2

1

So you are calling PythonOperator operator to build BashOperator, which airflow won't realize that until it execute the code. I am not aware of a way you can do that with compile time.

You can try to take your for loop out and build your bash operator from there, you might need to have

property_from_task1 = load_properties()

for value in range(property_from_task1):
        task = BashOperator(task_id=task_name, bash_command='echo creating a commond', dag=dag)

I find having a config file (YAML, JSON) then parse it as property_from_task1 might be easier but it depends on your use case.

Sign up to request clarification or add additional context in comments.

Comments

0

Airflow DAGs are already written in Python, so no need to generate operators within an operator. Further to Chengzhi's answer, here is a working snippet I use to chain commands sequentially:

operators = [BashOperator(task_id=step,
                          bash_command=command,
                          dag=exec_ssh) for step, command in commands.items()]

for upstream, downstream in zip(operators[:-1], operators[1:]):
    upstream >> downstream

In this example, "commands" is a dictionary of your steps that you can load from a JSON or some kind of config. You can extend it to build out DAGs with parallelism, branching, etc. Self-generating DAGs are the best!

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.