1

I have set up a Google Cloud Storage bucket to send notifications to a Pub/Sub topic:

gsutil notification create -t my-topic -f json gs://test-bucket

I have created a subscription to this topic to push messages to a cloud function endpoint:

gcloud pubsub subscriptions create my-sub --topic my-topic

And the cloud function is deployed with:

gcloud functions deploy promo_received --region europe-west1 --runtime python37 --trigger-topic my-topic

The purpose of the function (right now), is to check if a file being created in the test-bucket matches a specific file name, and to fire a message off to Slack when it does. Currently the function looks like this:

def promo_received(data):
    date_str = datetime.today().strftime('%Y%m%d')
    filename = json.loads(data)["name"]
    bucket = json.loads(data)["bucket"]

    if filename == 'PROM_DTLS_{}.txt.gz'.format(date_str):
        msg = ":heavy_check_mark: *{}* has been uploaded to *{}*. Awaiting instructions.".format(filename, bucket)
        post_to_slack(url, msg)

When I test this by dropping a file named PROM_DTLS_20190913.txt.gz, I can see the function fires, however it crashes with 2 errors:

TypeError: promo_received() takes 1 positional argument but 2 were given

TypeError: the JSON object must be str, bytes or bytearray, not LocalProxy

This is my first time attempting to do this, and I'm not sure where to start with troubleshooting. Any help would be greatly appreciated!

0

2 Answers 2

2

You need to add the context as argument for your function, that will solve the first error:

def promo_received(data, context):
    [...]

Also, you don't need json.loads to retrieve the name of the file or the bucket:

data['name']
data['bucket']

This should get rid of the second error.

Check the example in the Google Cloud Storage Triggers documentation page

Sign up to request clarification or add additional context in comments.

2 Comments

Thanks for your help - I have implemented the change as you suggested but now I am getting KeyError: 'name'.
@Cam I'm talking from memory here as I can't run test right now, but can you try with data.get('name') instead?
1

To write a Python Cloud Function, look at this example. Note that Cloud Storage serializes the object into a utf-8 JSON string, which Cloud Functions then base64-encodes. So you need to first base64-decode the payload, then utf8-decode it, then JSON parse it.

def promo_received(event, context):
  obj = json.loads(base64.b64decode(event['data']).decode('utf-8'))
  filename = obj[“name”]
  bucket = obj[“bucket”]

  # the rest of your code goes here

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.