Skip to content
Prev Previous commit
Next Next commit
add tests for updating topic with schema
  • Loading branch information
hongalex committed Jan 26, 2023
commit 230a0ea017df7d866be40c4b7d5b760fa66205fd
49 changes: 49 additions & 0 deletions pubsub/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"cloud.google.com/go/kms/apiv1/kmspb"
pb "cloud.google.com/go/pubsub/apiv1/pubsubpb"
testutil2 "cloud.google.com/go/pubsub/internal/testutil"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
gax "github.com/googleapis/gax-go/v2"
"golang.org/x/oauth2/google"
Expand Down Expand Up @@ -1956,6 +1957,8 @@ func TestIntegration_TopicRetention(t *testing.T) {
if err != nil {
t.Fatal(err)
}
defer topic.Delete(ctx)
defer topic.Stop()

cfg, err := topic.Config(ctx)
if err != nil {
Expand Down Expand Up @@ -2011,3 +2014,49 @@ func TestExactlyOnceDelivery_PublishReceive(t *testing.T) {
// Tests for large messages (larger than the 4MB gRPC limit).
testPublishAndReceive(t, client, 0, false, true, 1, 5*1024*1024)
}

func TestIntegration_TopicUpdateSchema(t *testing.T) {
ctx := context.Background()
// TODO(hongalex): update these staging endpoints after schema evolution is GA.
c := integrationTestClient(ctx, t, option.WithEndpoint("staging-pubsub.sandbox.googleapis.com:443"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's just not even merge until we can remove this. That should hopefully be next week.

defer c.Close()

sc := integrationTestSchemaClient(ctx, t, option.WithEndpoint("staging-pubsub.sandbox.googleapis.com:443"))
defer sc.Close()

schemaContent, err := ioutil.ReadFile("testdata/schema/us-states.avsc")
if err != nil {
t.Fatal(err)
}

schemaID := schemaIDs.New()
schemaCfg, err := sc.CreateSchema(ctx, schemaID, SchemaConfig{
Type: SchemaAvro,
Definition: string(schemaContent),
})
if err != nil {
t.Fatal(err)
}
defer sc.DeleteSchema(ctx, schemaID)

topic, err := c.CreateTopic(ctx, topicIDs.New())
if err != nil {
t.Fatal(err)
}
defer topic.Delete(ctx)
defer topic.Stop()

schema := &SchemaSettings{
Schema: schemaCfg.Name,
Encoding: EncodingJSON,
}
cfg, err := topic.Update(ctx, TopicConfigToUpdate{
SchemaSettings: schema,
})
if err != nil {
t.Fatal(err)
}
if diff := cmp.Diff(cfg.SchemaSettings, schema); diff != "" {
t.Fatalf("schema settings for update -want, +got: %v", diff)
}
}
2 changes: 2 additions & 0 deletions pubsub/pstest/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,8 @@ func (s *GServer) UpdateTopic(_ context.Context, req *pb.UpdateTopicRequest) (*p
return nil, err
}
t.proto.MessageRetentionDuration = req.Topic.MessageRetentionDuration
case "schema_settings":
t.proto.SchemaSettings = req.Topic.SchemaSettings
default:
return nil, status.Errorf(codes.InvalidArgument, "unknown field name %q", path)
}
Expand Down
32 changes: 32 additions & 0 deletions pubsub/topic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,38 @@ func TestUpdateTopic_MessageStoragePolicy(t *testing.T) {
}
}

func TestUpdateTopic_SchemaSettings(t *testing.T) {
ctx := context.Background()
client, srv := newFake(t)
defer client.Close()
defer srv.Close()

topic := mustCreateTopic(t, client, "T")
config, err := topic.Config(ctx)
if err != nil {
t.Fatal(err)
}
want := TopicConfig{}
opt := cmpopts.IgnoreUnexported(TopicConfig{})
if !testutil.Equal(config, want, opt) {
t.Errorf("\ngot %+v\nwant %+v", config, want)
}

// Update schema settings.
settings := &SchemaSettings{
Schema: "some-schema",
Encoding: EncodingJSON,
FirstRevisionID: "1234",
}
config2, err := topic.Update(ctx, TopicConfigToUpdate{SchemaSettings: settings})
if err != nil {
t.Fatal(err)
}
if !testutil.Equal(config2.SchemaSettings, settings) {
t.Errorf("\ngot %+v\nwant %+v", config2, settings)
}
}

type alwaysFailPublish struct {
pubsubpb.PublisherServer
}
Expand Down