Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
feat(pubsub): allow trace extraction from protobuf message
Export the function NewMessageCarrierFromPB which, given a protobuf
PubsubMessage, returns a TextMapCarrier which can be used to extract
trace contexts.

This allows it to be used in e.g. HTTP push endpoints.
  • Loading branch information
radhus committed Sep 5, 2024
commit c5735ac6e8f739d9824e40f05ebc64cae122e1a0
24 changes: 17 additions & 7 deletions pubsub/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"log"
"sync"

pb "cloud.google.com/go/pubsub/apiv1/pubsubpb"
"cloud.google.com/go/pubsub/internal"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
Expand Down Expand Up @@ -273,33 +274,42 @@ func tracer() trace.Tracer {

var _ propagation.TextMapCarrier = (*messageCarrier)(nil)

// messageCarrier injects and extracts traces from a pubsub.Message.
// messageCarrier injects and extracts traces from pubsub.Message attributes.
type messageCarrier struct {
msg *Message
attributes map[string]string
}

const googclientPrefix string = "googclient_"

// newMessageCarrier creates a new PubsubMessageCarrier.
func newMessageCarrier(msg *Message) messageCarrier {
return messageCarrier{msg: msg}
return messageCarrier{attributes: msg.Attributes}
}

// NewMessageCarrierFromPB creates a propagation.TextMapCarrier that can be used to extract the trace
// context from a protobuf PubsubMessage.
//
// Example:
// ctx = propagation.TraceContext{}.Extract(ctx, pubsub.NewMessageCarrierFromPB(msg))
func NewMessageCarrierFromPB(msg *pb.PubsubMessage) propagation.TextMapCarrier {
return messageCarrier{attributes: msg.Attributes}
}

// Get retrieves a single value for a given key.
func (c messageCarrier) Get(key string) string {
return c.msg.Attributes[googclientPrefix+key]
return c.attributes[googclientPrefix+key]
}

// Set sets an attribute.
func (c messageCarrier) Set(key, val string) {
c.msg.Attributes[googclientPrefix+key] = val
c.attributes[googclientPrefix+key] = val
}

// Keys returns a slice of all keys in the carrier.
func (c messageCarrier) Keys() []string {
i := 0
out := make([]string, len(c.msg.Attributes))
for k := range c.msg.Attributes {
out := make([]string, len(c.attributes))
for k := range c.attributes {
out[i] = k
i++
}
Expand Down