pubsub: add support for Message Storage Policy on topic level
- `MessageStoragePolicy` can be set on individual topics.
Previously, this could only be set on an organization level
through a Resource Location Restriction policy.
This is done either through `topic.CreateTopicWithConfig` or `topic.Update`.
- Add `*MessageStoragePolicy` to `TopicConfigToUpdate`.
- BREAKING CHANGE: Call to `topic.Update` with empty `TopicConfigToUpdate`
will no longer update the topic to the org level policy.
Instead, use a non-nil but unset
`MessageStoragePolicy` pointer.
Change-Id: I9820621de429cb8d35e0fdf2a8f80c3b92232743
Reviewed-on: https://code-review.googlesource.com/c/gocloud/+/43550
Reviewed-by: kokoro <noreply+kokoro@google.com>
Reviewed-by: Jean de Klerk <deklerk@google.com>
Reviewed-by: Emmanuel Odeke <emm.odeke@gmail.com>
diff --git a/pubsub/example_test.go b/pubsub/example_test.go
index 1a656dd..514ae58 100644
--- a/pubsub/example_test.go
+++ b/pubsub/example_test.go
@@ -49,6 +49,27 @@
_ = topic // TODO: use the topic.
}
+func ExampleClient_CreateTopicWithConfig() {
+ ctx := context.Background()
+ client, err := pubsub.NewClient(ctx, "project-id")
+ if err != nil {
+ // TODO: Handle error.
+ }
+
+ // Create a new topic with the given name and config.
+ topicConfig := &pubsub.TopicConfig{
+ KMSKeyName: "projects/project-id/locations/global/keyRings/my-key-ring/cryptoKeys/my-key",
+ MessageStoragePolicy: pubsub.MessageStoragePolicy{
+ AllowedPersistenceRegions: []string{"us-east1"},
+ },
+ }
+ topic, err := client.CreateTopicWithConfig(ctx, "topicName", topicConfig)
+ if err != nil {
+ // TODO: Handle error.
+ }
+ _ = topic // TODO: use the topic.
+}
+
// Use TopicInProject to refer to a topic that is not in the client's project, such
// as a public topic.
func ExampleClient_TopicInProject() {
@@ -188,6 +209,44 @@
}
}
+func ExampleTopic_Update() {
+ ctx := context.Background()
+ client, err := pubsub.NewClient(ctx, "project-id")
+ if err != nil {
+ // TODO: Handle error.V
+ }
+ topic := client.Topic("topic-name")
+ topicConfig, err := topic.Update(ctx, pubsub.TopicConfigToUpdate{
+ MessageStoragePolicy: &pubsub.MessageStoragePolicy{
+ AllowedPersistenceRegions: []string{
+ "asia-east1", "asia-northeast1", "asia-southeast1", "australia-southeast1",
+ "europe-north1", "europe-west1", "europe-west2", "europe-west3", "europe-west4",
+ "us-central1", "us-central2", "us-east1", "us-east4", "us-west1", "us-west2"},
+ },
+ })
+ if err != nil {
+ // TODO: Handle error.
+ }
+ _ = topicConfig // TODO: Use TopicConfig
+}
+
+func ExampleTopic_Update_resetMessageStoragePolicy() {
+ ctx := context.Background()
+ client, err := pubsub.NewClient(ctx, "project-id")
+ if err != nil {
+ // TODO: Handle error.V
+ }
+ topic := client.Topic("topic-name")
+ topicConfig, err := topic.Update(ctx, pubsub.TopicConfigToUpdate{
+ // Just use a non-nil MessageStoragePolicy without any fields.
+ MessageStoragePolicy: &pubsub.MessageStoragePolicy{},
+ })
+ if err != nil {
+ // TODO: Handle error.
+ }
+ _ = topicConfig // TODO: Use TopicConfig
+}
+
func ExampleSubscription_Delete() {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, "project-id")
diff --git a/pubsub/integration_test.go b/pubsub/integration_test.go
index 6896183..2722f17 100644
--- a/pubsub/integration_test.go
+++ b/pubsub/integration_test.go
@@ -360,7 +360,7 @@
}
}
-func TestIntegration_CreateSubscription_neverExpire(t *testing.T) {
+func TestIntegration_CreateSubscription_NeverExpire(t *testing.T) {
t.Parallel()
ctx := context.Background()
client := integrationTestClient(ctx, t)
@@ -550,7 +550,7 @@
}
}
-func TestIntegration_UpdateSubscription_expirationPolicy(t *testing.T) {
+func TestIntegration_UpdateSubscription_ExpirationPolicy(t *testing.T) {
t.Parallel()
ctx := context.Background()
client := integrationTestClient(ctx, t)
@@ -629,7 +629,7 @@
// NOTE: This test should be skipped by open source contributors. It requires
// whitelisting, a (gsuite) organization project, and specific permissions.
-func TestIntegration_UpdateTopic(t *testing.T) {
+func TestIntegration_UpdateTopicLabels(t *testing.T) {
t.Parallel()
ctx := context.Background()
client := integrationTestClient(ctx, t)
@@ -767,21 +767,85 @@
}
}
+func TestIntegration_MessageStoragePolicy_TopicLevel(t *testing.T) {
+ t.Parallel()
+ ctx := context.Background()
+ client := integrationTestClient(ctx, t)
+ defer client.Close()
+
+ topic, err := client.CreateTopic(ctx, topicIDs.New())
+ if err != nil {
+ t.Fatalf("CreateTopic error: %v", err)
+ }
+ defer topic.Delete(ctx)
+ defer topic.Stop()
+
+ // Initially the message storage policy should just be non-empty
+ got, err := topic.Config(ctx)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if len(got.MessageStoragePolicy.AllowedPersistenceRegions) == 0 {
+ t.Fatalf("Empty AllowedPersistenceRegions in :\n%+v", got)
+ }
+
+ // Specify some regions to set.
+ regions := []string{"asia-east1", "us-east1"}
+ got, err = topic.Update(ctx, TopicConfigToUpdate{
+ MessageStoragePolicy: &MessageStoragePolicy{
+ AllowedPersistenceRegions: regions,
+ },
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+ want := TopicConfig{
+ MessageStoragePolicy: MessageStoragePolicy{
+ AllowedPersistenceRegions: regions,
+ },
+ }
+ if !testutil.Equal(got, want) {
+ t.Fatalf("\ngot %+v\nwant regions%+v", got, want)
+ }
+
+ // Reset all allowed regions to project default.
+ got, err = topic.Update(ctx, TopicConfigToUpdate{
+ MessageStoragePolicy: &MessageStoragePolicy{},
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+ if len(got.MessageStoragePolicy.AllowedPersistenceRegions) == 0 {
+ t.Fatalf("Unexpectedly got empty MessageStoragePolicy.AllowedPersistenceRegions in:\n%+v", got)
+ }
+
+ // Removing all regions should fail
+ updateCfg := TopicConfigToUpdate{
+ MessageStoragePolicy: &MessageStoragePolicy{
+ AllowedPersistenceRegions: []string{},
+ },
+ }
+ if _, err = topic.Update(ctx, updateCfg); err == nil {
+ t.Fatalf("Unexpected succeeded in removing all regions\n%+v\n", got)
+ }
+}
+
// NOTE: This test should be skipped by open source contributors. It requires
-// whitelisting, a (gsuite) organization project, and specific permissions.
+// a (gsuite) organization project, and specific permissions. The test for MessageStoragePolicy
+// on a topic level can be run on any topic and is covered by the previous test.
//
// Googlers, see internal bug 77920644. Furthermore, be sure to add your
// service account as an owner of ps-geofencing-test.
-func TestIntegration_MessageStoragePolicy(t *testing.T) {
+func TestIntegration_MessageStoragePolicy_ProjectLevel(t *testing.T) {
// Verify that the message storage policy is populated.
if testing.Short() {
t.Skip("Integration tests skipped in short mode")
}
ctx := context.Background()
- // The message storage policy depends on the Resource Location Restriction org policy.
- // The usual testing project is in the google.com org, which has no resource location restrictions,
- // so we will always see an empty MessageStoragePolicy. Use a project in another org that does
- // have a restriction set ("us-east1").
+ // If a message storage policy is not set on a topic, the policy depends on the Resource Location
+ // Restriction which is specified on an organization level. The usual testing project is in the
+ // google.com org, which has no resource location restrictions. Use a project in another org that
+ // does have a restriction set ("us-east1").
projID := "ps-geofencing-test"
// We can use the same creds as always because the service account of the default testing project
// has permission to use the above project. This test will fail if a different service account
@@ -812,7 +876,7 @@
}
}
-func TestIntegration_CreateTopicWithKMS(t *testing.T) {
+func TestIntegration_CreateTopic_KMS(t *testing.T) {
t.Parallel()
ctx := context.Background()
client := integrationTestClient(ctx, t)
@@ -868,7 +932,6 @@
tc := TopicConfig{
KMSKeyName: key.GetName(),
}
-
topic, err := client.CreateTopicWithConfig(ctx, topicIDs.New(), &tc)
if err != nil {
t.Fatalf("CreateTopicWithConfig error: %v", err)
@@ -886,3 +949,31 @@
t.Errorf("got %v, want %v", got, key.GetName())
}
}
+
+func TestIntegration_CreateTopic_MessageStoragePolicy(t *testing.T) {
+ t.Parallel()
+ ctx := context.Background()
+ client := integrationTestClient(ctx, t)
+ defer client.Close()
+
+ tc := TopicConfig{
+ MessageStoragePolicy: MessageStoragePolicy{
+ AllowedPersistenceRegions: []string{"us-east1"},
+ },
+ }
+ topic, err := client.CreateTopicWithConfig(ctx, topicIDs.New(), &tc)
+ if err != nil {
+ t.Fatalf("CreateTopicWithConfig error: %v", err)
+ }
+ defer topic.Delete(ctx)
+ defer topic.Stop()
+
+ got, err := topic.Config(ctx)
+ if err != nil {
+ t.Fatal(err)
+ }
+ want := tc
+ if diff := testutil.Diff(got, want); diff != "" {
+ t.Fatalf("\ngot: - want: +\n%s", diff)
+ }
+}
diff --git a/pubsub/pstest/fake.go b/pubsub/pstest/fake.go
index 8b40b26..1355678 100644
--- a/pubsub/pstest/fake.go
+++ b/pubsub/pstest/fake.go
@@ -238,8 +238,8 @@
switch path {
case "labels":
t.proto.Labels = req.Topic.Labels
- case "message_storage_policy": // "fetch" the policy
- t.proto.MessageStoragePolicy = &pb.MessageStoragePolicy{AllowedPersistenceRegions: []string{"US"}}
+ case "message_storage_policy":
+ t.proto.MessageStoragePolicy = req.Topic.MessageStoragePolicy
default:
return nil, status.Errorf(codes.InvalidArgument, "unknown field name %q", path)
}
diff --git a/pubsub/topic.go b/pubsub/topic.go
index ac49390..ea7c277 100644
--- a/pubsub/topic.go
+++ b/pubsub/topic.go
@@ -185,7 +185,8 @@
MessageStoragePolicy MessageStoragePolicy
// The name of the the Cloud KMS key to be used to protect access to messages
- // published to this topic, in the format "projects/P/locations/L/keyRings/R/cryptoKeys/K".
+ // published to this topic, in the format
+ // "projects/P/locations/L/keyRings/R/cryptoKeys/K".
KMSKeyName string
}
@@ -193,9 +194,19 @@
type TopicConfigToUpdate struct {
// If non-nil, the current set of labels is completely
// replaced by the new set.
+ Labels map[string]string
+
+ // If non-nil, the existing policy (containing the list of regions)
+ // is completely replaced by the new policy.
+ //
+ // Use the zero value &MessageStoragePolicy{} to reset the topic back to
+ // using the organization's Resource Location Restriction policy.
+ //
+ // If nil, the policy remains unchanged.
+ //
// This field has beta status. It is not subject to the stability guarantee
// and may change.
- Labels map[string]string
+ MessageStoragePolicy *MessageStoragePolicy
}
func protoToTopicConfig(pbt *pb.Topic) TopicConfig {
@@ -210,12 +221,19 @@
// is determined when the topic is created based on the policy configured at
// the project level.
type MessageStoragePolicy struct {
- // The list of GCP regions where messages that are published to the topic may
- // be persisted in storage. Messages published by publishers running in
+ // AllowedPersistenceRegions is the list of GCP regions where messages that are published
+ // to the topic may be persisted in storage. Messages published by publishers running in
// non-allowed GCP regions (or running outside of GCP altogether) will be
- // routed for storage in one of the allowed regions. An empty list indicates a
- // misconfiguration at the project or organization level, which will result in
- // all Publish operations failing.
+ // routed for storage in one of the allowed regions.
+ //
+ // If empty, it indicates a misconfiguration at the project or organization level, which
+ // will result in all Publish operations failing. This field cannot be empty in updates.
+ //
+ // If nil, then the policy is not defined on a topic level. When used in updates, it resets
+ // the regions back to the organization level Resource Location Restriction policy.
+ //
+ // For more information, see
+ // https://cloud.google.com/pubsub/docs/resource-location-restriction#pubsub-storage-locations.
AllowedPersistenceRegions []string
}
@@ -227,7 +245,7 @@
}
func messageStoragePolicyToProto(msp *MessageStoragePolicy) *pb.MessageStoragePolicy {
- if msp == nil || len(msp.AllowedPersistenceRegions) <= 0 {
+ if msp == nil || msp.AllowedPersistenceRegions == nil {
return nil
}
return &pb.MessageStoragePolicy{AllowedPersistenceRegions: msp.AllowedPersistenceRegions}
@@ -244,9 +262,6 @@
// Update changes an existing topic according to the fields set in cfg. It returns
// the new TopicConfig.
-//
-// Any call to Update (even with an empty TopicConfigToUpdate) will update the
-// MessageStoragePolicy for the topic from the organization's settings.
func (t *Topic) Update(ctx context.Context, cfg TopicConfigToUpdate) (TopicConfig, error) {
req := t.updateRequest(cfg)
if len(req.UpdateMask.Paths) == 0 {
@@ -261,11 +276,15 @@
func (t *Topic) updateRequest(cfg TopicConfigToUpdate) *pb.UpdateTopicRequest {
pt := &pb.Topic{Name: t.name}
- paths := []string{"message_storage_policy"} // always fetch
+ var paths []string
if cfg.Labels != nil {
pt.Labels = cfg.Labels
paths = append(paths, "labels")
}
+ if cfg.MessageStoragePolicy != nil {
+ pt.MessageStoragePolicy = messageStoragePolicyToProto(cfg.MessageStoragePolicy)
+ paths = append(paths, "message_storage_policy")
+ }
return &pb.UpdateTopicRequest{
Topic: pt,
UpdateMask: &fmpb.FieldMask{Paths: paths},
diff --git a/pubsub/topic_test.go b/pubsub/topic_test.go
index 15cf524..0777a5a 100644
--- a/pubsub/topic_test.go
+++ b/pubsub/topic_test.go
@@ -79,9 +79,11 @@
id := "test-topic"
want := TopicConfig{
- Labels: map[string]string{"label": "value"},
- MessageStoragePolicy: MessageStoragePolicy{},
- KMSKeyName: "projects/P/locations/L/keyRings/R/cryptoKeys/K",
+ Labels: map[string]string{"label": "value"},
+ MessageStoragePolicy: MessageStoragePolicy{
+ AllowedPersistenceRegions: []string{"us-east1"},
+ },
+ KMSKeyName: "projects/P/locations/L/keyRings/R/cryptoKeys/K",
}
topic := mustCreateTopicWithConfig(t, c, id, &want)
@@ -185,7 +187,7 @@
}
}
-func TestUpdateTopic(t *testing.T) {
+func TestUpdateTopic_Label(t *testing.T) {
ctx := context.Background()
client, srv := newFake(t)
defer client.Close()
@@ -203,13 +205,14 @@
// replace labels
labels := map[string]string{"label": "value"}
- config2, err := topic.Update(ctx, TopicConfigToUpdate{Labels: labels})
+ config2, err := topic.Update(ctx, TopicConfigToUpdate{
+ Labels: labels,
+ })
if err != nil {
t.Fatal(err)
}
want = TopicConfig{
- Labels: labels,
- MessageStoragePolicy: MessageStoragePolicy{[]string{"US"}},
+ Labels: labels,
}
if !testutil.Equal(config2, want) {
t.Errorf("got %+v, want %+v", config2, want)
@@ -227,6 +230,38 @@
}
}
+func TestUpdateTopic_MessageStoragePolicy(t *testing.T) {
+ ctx := context.Background()
+ client, srv := newFake(t)
+ defer client.Close()
+ defer srv.Close()
+
+ topic := mustCreateTopic(t, client, "T")
+ config, err := topic.Config(ctx)
+ if err != nil {
+ t.Fatal(err)
+ }
+ want := TopicConfig{}
+ if !testutil.Equal(config, want) {
+ t.Errorf("\ngot %+v\nwant %+v", config, want)
+ }
+
+ // Update message storage policy.
+ msp := &MessageStoragePolicy{
+ AllowedPersistenceRegions: []string{"us-east1"},
+ }
+ config2, err := topic.Update(ctx, TopicConfigToUpdate{MessageStoragePolicy: msp})
+ if err != nil {
+ t.Fatal(err)
+ }
+ want.MessageStoragePolicy = MessageStoragePolicy{
+ AllowedPersistenceRegions: []string{"us-east1"},
+ }
+ if !testutil.Equal(config2, want) {
+ t.Errorf("\ngot %+v\nwant %+v", config2, want)
+ }
+}
+
type alwaysFailPublish struct {
pubsubpb.PublisherServer
}