3

In Admin->Connection I set an Conn Type S3.

Basically I have this code in My Python script:

if __name__ == '__main__':
    AWS_ACCESS_KEY_ID = "..."
    AWS_SECRET_ACCESS_KEY = "..."
    AWS_DEFAULT_REGION = "..."
    Start_Work

What I want to do is call my script from Airflow and pass to it the arguments for the connection (instead of hard code them in the script).

How do I do that?

Let's assume that this is the connection: enter image description here

How do I access each filed data?

2 Answers 2

8

One thing you can do is import the provide_session util to then retrieve the connection based on the conn_id. You can then pass that to the python operator.

So it would look something like this:

from airflow.utils.db import provide_session

@provide_session
def get_conn(conn_id, session=None):
    conn = (session.query(Connection)
                   .filter(Connection.conn_id == conn_id)
                   .first())
    return conn

def my_python_function():

   conn = get_conn('connection_id')

   key_id = conn.extra_dejson.get('AWS_ACCESS_KEY_ID')
   secret_key = conn.extra_dejson.get('AWS_SECRET_ACCESS_KEY')
   default_region = conn.extra_dejson.get('DEFAULT_REGION')

task1 = PythonOperator(task_id='my_task', python_callable=my_python_function, dag=dag)

task1

EDIT: Removed quotes from python callable

Sign up to request clarification or add additional context in comments.

4 Comments

It shows: Can't decrypt extra params for login=None, FERNET_KEY configuration is missing
@jack Yeah, you probably want to work with higher level access from Airflow code.
@dlamblin Some of my scripts connect to S3. I don't want to hard code the connection details in each one of them. I want to set it in one place.
this implementation didn't work in airflow 2.0
1

I see your connection ID is M_1 and your connection type is S3 so you can load this inside a PythonOperator (or a python script called from BashOperator) with:

from airflow.hooks.s3_hook import S3Hook

def py_op_callable:
    hook = S3Hook('M_1')
    botocore_credentials = hook.get_credentials()
    botocore_credentials.access_key
    botocore_credentials.secret_key
    botocore_credentials.token

On v1.9.0 it seems get_credentials is not yet there. There's only the private _get_credentials() on AwsHook which S3Hook inherits. If you're sure you put them into the extra parameters the direct approach is:

from airflow.hooks.base_hook import BaseHook

def py_op_callable:
    hook = BaseHook('M_1')
    extra = hook.get_connection().extra_dejson
    key_id = extra.get('aws_access_key_id')
    secret_key = extra.get('aws_secret_access_key')
    default_region = extra.get('region_name')
    return key_id,secret_key,default_region

2 Comments

'S3Hook' object has no attribute 'get_credentials'
@jack It does in v1.10.0 test. In v1.9.0 there is a private _get_credentials on AwsHook which S3Hook inherits.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.