1

On on_failure_callback I am calling sns method to send notification which is working fine.
I want to declare sns function as a separate function using airflow plugins.

import boto3
from airflow import DAG
from airflow.operators import BashOperator
from datetime import datetime, timedelta

# t1, t2, t3 and t4 are examples of tasks created using operators

def sns(state):
  client = boto3.client('sns',aws_access_key_id='abcd',aws_secret_access_key='abcd',region_name='us-east-1')
  response = client.publish(
    TargetArn='Topicarn',
    Message='Test')

t3 = BashOperator(
    task_id='task_3',
    bash_command='fail',
    on_failure_callback=sns,
    dag=dag)

Thanks,
Rajeev

8
  • 1
    Have you read the official documentation on Plugins? airflow.apache.org/plugins.html - there is a very nice example there which you can try for yourself. If you have a specific problem while doing that, then you can ask a more specific question. Commented Mar 11, 2019 at 12:47
  • I have read the official documentation.Looking answer for below questions: Commented Mar 11, 2019 at 13:10
  • 1.Which base class I should use? Commented Mar 11, 2019 at 13:11
  • 2.How I should configure plugin folder ? Commented Mar 11, 2019 at 13:11
  • 1
    You should update your original question with those points. It will get answered then. The way it is now is too broad Commented Mar 11, 2019 at 13:13

2 Answers 2

1

(replying to actual questions posted to comments, please move those to the original question)

  1. Since this will be a basically a notification operator, you should inherit from BaseOperator.
  2. Plugin folder is usually located in the project's root, next to the dag folder.
  3. As for the plugin itself, once you inherit from AirflowPlugin, it will be accessible for import as airflow.operators.my_operator. You can use it as any other operator.
Sign up to request clarification or add additional context in comments.

Comments

0

Below code worked for me :

plugin code :

import boto3
from airflow.models import BaseOperator
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.decorators import apply_defaults

class snsoperator(BaseOperator):
  @apply_defaults
  def __init__(self, *args, **kwargs):

   super(snsoperator, self).__init__(*args, **kwargs)

  def __call__(self, *args, **kwargs):
    print("calling execute")
    client = boto3.client('sns', aws_access_key_id='abc',
                              aws_secret_access_key='abc', region_name='us-east-1')
    response = client.publish(
            TargetArn='abc',
            Message='Test')

class snsplugin(AirflowPlugin):
  name = "snsplugin"
  operators = [snsoperator]

DAG code :

import boto3
from airflow import DAG
from airflow.operators import BashOperator
from datetime import datetime, timedelta
from airflow.operators.snsplugin import snsoperator

t3 = BashOperator(
    task_id='task_3',
    bash_command='fail',
    on_failure_callback=snsoperator(task_id='sns'),
    dag=dag)

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.