pubsub: add support for Message Storage Policy on topic level

- `MessageStoragePolicy` can be set on individual topics.
Previously, this could only be set on an organization level
through a Resource Location Restriction policy.
This is done either through `topic.CreateTopicWithConfig` or `topic.Update`.

- Add `*MessageStoragePolicy` to `TopicConfigToUpdate`.

- BREAKING CHANGE: Call to `topic.Update` with empty `TopicConfigToUpdate`
will no longer update the topic to the org level policy.
Instead, use a non-nil but unset
`MessageStoragePolicy` pointer.

Change-Id: I9820621de429cb8d35e0fdf2a8f80c3b92232743
Reviewed-on: https://code-review.googlesource.com/c/gocloud/+/43550
Reviewed-by: kokoro <noreply+kokoro@google.com>
Reviewed-by: Jean de Klerk <deklerk@google.com>
Reviewed-by: Emmanuel Odeke <emm.odeke@gmail.com>
diff --git a/pubsub/example_test.go b/pubsub/example_test.go
index 1a656dd..514ae58 100644
--- a/pubsub/example_test.go
+++ b/pubsub/example_test.go
@@ -49,6 +49,27 @@
 	_ = topic // TODO: use the topic.
 }
 
+func ExampleClient_CreateTopicWithConfig() {
+	ctx := context.Background()
+	client, err := pubsub.NewClient(ctx, "project-id")
+	if err != nil {
+		// TODO: Handle error.
+	}
+
+	// Create a new topic with the given name and config.
+	topicConfig := &pubsub.TopicConfig{
+		KMSKeyName: "projects/project-id/locations/global/keyRings/my-key-ring/cryptoKeys/my-key",
+		MessageStoragePolicy: pubsub.MessageStoragePolicy{
+			AllowedPersistenceRegions: []string{"us-east1"},
+		},
+	}
+	topic, err := client.CreateTopicWithConfig(ctx, "topicName", topicConfig)
+	if err != nil {
+		// TODO: Handle error.
+	}
+	_ = topic // TODO: use the topic.
+}
+
 // Use TopicInProject to refer to a topic that is not in the client's project, such
 // as a public topic.
 func ExampleClient_TopicInProject() {
@@ -188,6 +209,44 @@
 	}
 }
 
+func ExampleTopic_Update() {
+	ctx := context.Background()
+	client, err := pubsub.NewClient(ctx, "project-id")
+	if err != nil {
+		// TODO: Handle error.V
+	}
+	topic := client.Topic("topic-name")
+	topicConfig, err := topic.Update(ctx, pubsub.TopicConfigToUpdate{
+		MessageStoragePolicy: &pubsub.MessageStoragePolicy{
+			AllowedPersistenceRegions: []string{
+				"asia-east1", "asia-northeast1", "asia-southeast1", "australia-southeast1",
+				"europe-north1", "europe-west1", "europe-west2", "europe-west3", "europe-west4",
+				"us-central1", "us-central2", "us-east1", "us-east4", "us-west1", "us-west2"},
+		},
+	})
+	if err != nil {
+		// TODO: Handle error.
+	}
+	_ = topicConfig // TODO: Use TopicConfig
+}
+
+func ExampleTopic_Update_resetMessageStoragePolicy() {
+	ctx := context.Background()
+	client, err := pubsub.NewClient(ctx, "project-id")
+	if err != nil {
+		// TODO: Handle error.V
+	}
+	topic := client.Topic("topic-name")
+	topicConfig, err := topic.Update(ctx, pubsub.TopicConfigToUpdate{
+		// Just use a non-nil MessageStoragePolicy without any fields.
+		MessageStoragePolicy: &pubsub.MessageStoragePolicy{},
+	})
+	if err != nil {
+		// TODO: Handle error.
+	}
+	_ = topicConfig // TODO: Use TopicConfig
+}
+
 func ExampleSubscription_Delete() {
 	ctx := context.Background()
 	client, err := pubsub.NewClient(ctx, "project-id")
diff --git a/pubsub/integration_test.go b/pubsub/integration_test.go
index 6896183..2722f17 100644
--- a/pubsub/integration_test.go
+++ b/pubsub/integration_test.go
@@ -360,7 +360,7 @@
 	}
 }
 
-func TestIntegration_CreateSubscription_neverExpire(t *testing.T) {
+func TestIntegration_CreateSubscription_NeverExpire(t *testing.T) {
 	t.Parallel()
 	ctx := context.Background()
 	client := integrationTestClient(ctx, t)
@@ -550,7 +550,7 @@
 	}
 }
 
-func TestIntegration_UpdateSubscription_expirationPolicy(t *testing.T) {
+func TestIntegration_UpdateSubscription_ExpirationPolicy(t *testing.T) {
 	t.Parallel()
 	ctx := context.Background()
 	client := integrationTestClient(ctx, t)
@@ -629,7 +629,7 @@
 
 // NOTE: This test should be skipped by open source contributors. It requires
 // whitelisting, a (gsuite) organization project, and specific permissions.
-func TestIntegration_UpdateTopic(t *testing.T) {
+func TestIntegration_UpdateTopicLabels(t *testing.T) {
 	t.Parallel()
 	ctx := context.Background()
 	client := integrationTestClient(ctx, t)
@@ -767,21 +767,85 @@
 	}
 }
 
+func TestIntegration_MessageStoragePolicy_TopicLevel(t *testing.T) {
+	t.Parallel()
+	ctx := context.Background()
+	client := integrationTestClient(ctx, t)
+	defer client.Close()
+
+	topic, err := client.CreateTopic(ctx, topicIDs.New())
+	if err != nil {
+		t.Fatalf("CreateTopic error: %v", err)
+	}
+	defer topic.Delete(ctx)
+	defer topic.Stop()
+
+	// Initially the message storage policy should just be non-empty
+	got, err := topic.Config(ctx)
+	if err != nil {
+		t.Fatal(err)
+	}
+	if len(got.MessageStoragePolicy.AllowedPersistenceRegions) == 0 {
+		t.Fatalf("Empty AllowedPersistenceRegions in :\n%+v", got)
+	}
+
+	// Specify some regions to set.
+	regions := []string{"asia-east1", "us-east1"}
+	got, err = topic.Update(ctx, TopicConfigToUpdate{
+		MessageStoragePolicy: &MessageStoragePolicy{
+			AllowedPersistenceRegions: regions,
+		},
+	})
+	if err != nil {
+		t.Fatal(err)
+	}
+	want := TopicConfig{
+		MessageStoragePolicy: MessageStoragePolicy{
+			AllowedPersistenceRegions: regions,
+		},
+	}
+	if !testutil.Equal(got, want) {
+		t.Fatalf("\ngot  %+v\nwant regions%+v", got, want)
+	}
+
+	// Reset all allowed regions to project default.
+	got, err = topic.Update(ctx, TopicConfigToUpdate{
+		MessageStoragePolicy: &MessageStoragePolicy{},
+	})
+	if err != nil {
+		t.Fatal(err)
+	}
+	if len(got.MessageStoragePolicy.AllowedPersistenceRegions) == 0 {
+		t.Fatalf("Unexpectedly got empty MessageStoragePolicy.AllowedPersistenceRegions in:\n%+v", got)
+	}
+
+	// Removing all regions should fail
+	updateCfg := TopicConfigToUpdate{
+		MessageStoragePolicy: &MessageStoragePolicy{
+			AllowedPersistenceRegions: []string{},
+		},
+	}
+	if _, err = topic.Update(ctx, updateCfg); err == nil {
+		t.Fatalf("Unexpected succeeded in removing all regions\n%+v\n", got)
+	}
+}
+
 // NOTE: This test should be skipped by open source contributors. It requires
-// whitelisting, a (gsuite) organization project, and specific permissions.
+// a (gsuite) organization project, and specific permissions. The test for MessageStoragePolicy
+// on a topic level can be run on any topic and is covered by the previous test.
 //
 // Googlers, see internal bug 77920644. Furthermore, be sure to add your
 // service account as an owner of ps-geofencing-test.
-func TestIntegration_MessageStoragePolicy(t *testing.T) {
+func TestIntegration_MessageStoragePolicy_ProjectLevel(t *testing.T) {
 	// Verify that the message storage policy is populated.
 	if testing.Short() {
 		t.Skip("Integration tests skipped in short mode")
 	}
 	ctx := context.Background()
-	// The message storage policy depends on the Resource Location Restriction org policy.
-	// The usual testing project is in the google.com org, which has no resource location restrictions,
-	// so we will always see an empty MessageStoragePolicy. Use a project in another org that does
-	// have a restriction set ("us-east1").
+	// If a message storage policy is not set on a topic, the policy depends on the Resource Location
+	// Restriction which is specified on an organization level. The usual testing project is in the
+	// google.com org, which has no resource location restrictions. Use a project in another org that
+	// does have a restriction set ("us-east1").
 	projID := "ps-geofencing-test"
 	// We can use the same creds as always because the service account of the default testing project
 	// has permission to use the above project. This test will fail if a different service account
@@ -812,7 +876,7 @@
 	}
 }
 
-func TestIntegration_CreateTopicWithKMS(t *testing.T) {
+func TestIntegration_CreateTopic_KMS(t *testing.T) {
 	t.Parallel()
 	ctx := context.Background()
 	client := integrationTestClient(ctx, t)
@@ -868,7 +932,6 @@
 	tc := TopicConfig{
 		KMSKeyName: key.GetName(),
 	}
-
 	topic, err := client.CreateTopicWithConfig(ctx, topicIDs.New(), &tc)
 	if err != nil {
 		t.Fatalf("CreateTopicWithConfig error: %v", err)
@@ -886,3 +949,31 @@
 		t.Errorf("got %v, want %v", got, key.GetName())
 	}
 }
+
+func TestIntegration_CreateTopic_MessageStoragePolicy(t *testing.T) {
+	t.Parallel()
+	ctx := context.Background()
+	client := integrationTestClient(ctx, t)
+	defer client.Close()
+
+	tc := TopicConfig{
+		MessageStoragePolicy: MessageStoragePolicy{
+			AllowedPersistenceRegions: []string{"us-east1"},
+		},
+	}
+	topic, err := client.CreateTopicWithConfig(ctx, topicIDs.New(), &tc)
+	if err != nil {
+		t.Fatalf("CreateTopicWithConfig error: %v", err)
+	}
+	defer topic.Delete(ctx)
+	defer topic.Stop()
+
+	got, err := topic.Config(ctx)
+	if err != nil {
+		t.Fatal(err)
+	}
+	want := tc
+	if diff := testutil.Diff(got, want); diff != "" {
+		t.Fatalf("\ngot: - want: +\n%s", diff)
+	}
+}
diff --git a/pubsub/pstest/fake.go b/pubsub/pstest/fake.go
index 8b40b26..1355678 100644
--- a/pubsub/pstest/fake.go
+++ b/pubsub/pstest/fake.go
@@ -238,8 +238,8 @@
 		switch path {
 		case "labels":
 			t.proto.Labels = req.Topic.Labels
-		case "message_storage_policy": // "fetch" the policy
-			t.proto.MessageStoragePolicy = &pb.MessageStoragePolicy{AllowedPersistenceRegions: []string{"US"}}
+		case "message_storage_policy":
+			t.proto.MessageStoragePolicy = req.Topic.MessageStoragePolicy
 		default:
 			return nil, status.Errorf(codes.InvalidArgument, "unknown field name %q", path)
 		}
diff --git a/pubsub/topic.go b/pubsub/topic.go
index ac49390..ea7c277 100644
--- a/pubsub/topic.go
+++ b/pubsub/topic.go
@@ -185,7 +185,8 @@
 	MessageStoragePolicy MessageStoragePolicy
 
 	// The name of the the Cloud KMS key to be used to protect access to messages
-	// published to this topic, in the format "projects/P/locations/L/keyRings/R/cryptoKeys/K".
+	// published to this topic, in the format
+	// "projects/P/locations/L/keyRings/R/cryptoKeys/K".
 	KMSKeyName string
 }
 
@@ -193,9 +194,19 @@
 type TopicConfigToUpdate struct {
 	// If non-nil, the current set of labels is completely
 	// replaced by the new set.
+	Labels map[string]string
+
+	// If non-nil, the existing policy (containing the list of regions)
+	// is completely replaced by the new policy.
+	//
+	// Use the zero value &MessageStoragePolicy{} to reset the topic back to
+	// using the organization's Resource Location Restriction policy.
+	//
+	// If nil, the policy remains unchanged.
+	//
 	// This field has beta status. It is not subject to the stability guarantee
 	// and may change.
-	Labels map[string]string
+	MessageStoragePolicy *MessageStoragePolicy
 }
 
 func protoToTopicConfig(pbt *pb.Topic) TopicConfig {
@@ -210,12 +221,19 @@
 // is determined when the topic is created based on the policy configured at
 // the project level.
 type MessageStoragePolicy struct {
-	// The list of GCP regions where messages that are published to the topic may
-	// be persisted in storage. Messages published by publishers running in
+	// AllowedPersistenceRegions is the list of GCP regions where messages that are published
+	// to the topic may be persisted in storage. Messages published by publishers running in
 	// non-allowed GCP regions (or running outside of GCP altogether) will be
-	// routed for storage in one of the allowed regions. An empty list indicates a
-	// misconfiguration at the project or organization level, which will result in
-	// all Publish operations failing.
+	// routed for storage in one of the allowed regions.
+	//
+	// If empty, it indicates a misconfiguration at the project or organization level, which
+	// will result in all Publish operations failing. This field cannot be empty in updates.
+	//
+	// If nil, then the policy is not defined on a topic level. When used in updates, it resets
+	// the regions back to the organization level Resource Location Restriction policy.
+	//
+	// For more information, see
+	// https://cloud.google.com/pubsub/docs/resource-location-restriction#pubsub-storage-locations.
 	AllowedPersistenceRegions []string
 }
 
@@ -227,7 +245,7 @@
 }
 
 func messageStoragePolicyToProto(msp *MessageStoragePolicy) *pb.MessageStoragePolicy {
-	if msp == nil || len(msp.AllowedPersistenceRegions) <= 0 {
+	if msp == nil || msp.AllowedPersistenceRegions == nil {
 		return nil
 	}
 	return &pb.MessageStoragePolicy{AllowedPersistenceRegions: msp.AllowedPersistenceRegions}
@@ -244,9 +262,6 @@
 
 // Update changes an existing topic according to the fields set in cfg. It returns
 // the new TopicConfig.
-//
-// Any call to Update (even with an empty TopicConfigToUpdate) will update the
-// MessageStoragePolicy for the topic from the organization's settings.
 func (t *Topic) Update(ctx context.Context, cfg TopicConfigToUpdate) (TopicConfig, error) {
 	req := t.updateRequest(cfg)
 	if len(req.UpdateMask.Paths) == 0 {
@@ -261,11 +276,15 @@
 
 func (t *Topic) updateRequest(cfg TopicConfigToUpdate) *pb.UpdateTopicRequest {
 	pt := &pb.Topic{Name: t.name}
-	paths := []string{"message_storage_policy"} // always fetch
+	var paths []string
 	if cfg.Labels != nil {
 		pt.Labels = cfg.Labels
 		paths = append(paths, "labels")
 	}
+	if cfg.MessageStoragePolicy != nil {
+		pt.MessageStoragePolicy = messageStoragePolicyToProto(cfg.MessageStoragePolicy)
+		paths = append(paths, "message_storage_policy")
+	}
 	return &pb.UpdateTopicRequest{
 		Topic:      pt,
 		UpdateMask: &fmpb.FieldMask{Paths: paths},
diff --git a/pubsub/topic_test.go b/pubsub/topic_test.go
index 15cf524..0777a5a 100644
--- a/pubsub/topic_test.go
+++ b/pubsub/topic_test.go
@@ -79,9 +79,11 @@
 
 	id := "test-topic"
 	want := TopicConfig{
-		Labels:               map[string]string{"label": "value"},
-		MessageStoragePolicy: MessageStoragePolicy{},
-		KMSKeyName:           "projects/P/locations/L/keyRings/R/cryptoKeys/K",
+		Labels: map[string]string{"label": "value"},
+		MessageStoragePolicy: MessageStoragePolicy{
+			AllowedPersistenceRegions: []string{"us-east1"},
+		},
+		KMSKeyName: "projects/P/locations/L/keyRings/R/cryptoKeys/K",
 	}
 
 	topic := mustCreateTopicWithConfig(t, c, id, &want)
@@ -185,7 +187,7 @@
 	}
 }
 
-func TestUpdateTopic(t *testing.T) {
+func TestUpdateTopic_Label(t *testing.T) {
 	ctx := context.Background()
 	client, srv := newFake(t)
 	defer client.Close()
@@ -203,13 +205,14 @@
 
 	// replace labels
 	labels := map[string]string{"label": "value"}
-	config2, err := topic.Update(ctx, TopicConfigToUpdate{Labels: labels})
+	config2, err := topic.Update(ctx, TopicConfigToUpdate{
+		Labels: labels,
+	})
 	if err != nil {
 		t.Fatal(err)
 	}
 	want = TopicConfig{
-		Labels:               labels,
-		MessageStoragePolicy: MessageStoragePolicy{[]string{"US"}},
+		Labels: labels,
 	}
 	if !testutil.Equal(config2, want) {
 		t.Errorf("got %+v, want %+v", config2, want)
@@ -227,6 +230,38 @@
 	}
 }
 
+func TestUpdateTopic_MessageStoragePolicy(t *testing.T) {
+	ctx := context.Background()
+	client, srv := newFake(t)
+	defer client.Close()
+	defer srv.Close()
+
+	topic := mustCreateTopic(t, client, "T")
+	config, err := topic.Config(ctx)
+	if err != nil {
+		t.Fatal(err)
+	}
+	want := TopicConfig{}
+	if !testutil.Equal(config, want) {
+		t.Errorf("\ngot  %+v\nwant %+v", config, want)
+	}
+
+	// Update message storage policy.
+	msp := &MessageStoragePolicy{
+		AllowedPersistenceRegions: []string{"us-east1"},
+	}
+	config2, err := topic.Update(ctx, TopicConfigToUpdate{MessageStoragePolicy: msp})
+	if err != nil {
+		t.Fatal(err)
+	}
+	want.MessageStoragePolicy = MessageStoragePolicy{
+		AllowedPersistenceRegions: []string{"us-east1"},
+	}
+	if !testutil.Equal(config2, want) {
+		t.Errorf("\ngot  %+v\nwant %+v", config2, want)
+	}
+}
+
 type alwaysFailPublish struct {
 	pubsubpb.PublisherServer
 }