pubsub: add support for filtering

Change-Id: I00ae70d06fdc39789e591eaaabc99b400164e096
Reviewed-on: https://code-review.googlesource.com/c/gocloud/+/54750
Reviewed-by: Kamal Aboul-Hosn <aboulhosn@google.com>
Reviewed-by: Seth Hollyman <shollyman@google.com>
Reviewed-by: kokoro <noreply+kokoro@google.com>
diff --git a/pubsub/integration_test.go b/pubsub/integration_test.go
index 105cea1..a015b30 100644
--- a/pubsub/integration_test.go
+++ b/pubsub/integration_test.go
@@ -1529,3 +1529,77 @@
 		t.Fatalf("CreateTopic should fail with fake endpoint, got nil err")
 	}
 }
+
+func TestIntegration_Filter_CreateSubscription(t *testing.T) {
+	t.Parallel()
+	ctx := context.Background()
+	client := integrationTestClient(ctx, t)
+	defer client.Close()
+
+	topic, err := client.CreateTopic(ctx, topicIDs.New())
+	if err != nil {
+		t.Fatalf("CreateTopic error: %v", err)
+	}
+	defer topic.Delete(ctx)
+	defer topic.Stop()
+
+	cfg := SubscriptionConfig{
+		Topic:  topic,
+		Filter: "attributes.event_type = \"1\"",
+	}
+
+	var sub *Subscription
+	if sub, err = client.CreateSubscription(ctx, subIDs.New(), cfg); err != nil {
+		t.Fatalf("CreateSub error: %v", err)
+	}
+	defer sub.Delete(ctx)
+
+	got, err := sub.Config(ctx)
+	if err != nil {
+		t.Fatal(err)
+	}
+	want := SubscriptionConfig{
+		Topic:               topic,
+		AckDeadline:         10 * time.Second,
+		RetainAckedMessages: false,
+		RetentionDuration:   defaultRetentionDuration,
+		ExpirationPolicy:    defaultExpirationPolicy,
+		Filter:              "attributes.event_type = \"1\"",
+	}
+	if diff := testutil.Diff(got, want); diff != "" {
+		t.Fatalf("SubsciptionConfig; got: - want: +\n%s", diff)
+	}
+
+	attrs := make(map[string]string)
+	attrs["event_type"] = "1"
+	res := topic.Publish(ctx, &Message{
+		Data:       []byte("hello world"),
+		Attributes: attrs,
+	})
+	if _, err := res.Get(ctx); err != nil {
+		t.Fatalf("Publish message error: %v", err)
+	}
+
+	// Publish the same message with a different event_type
+	// and check it is filtered out.
+	attrs["event_type"] = "2"
+	res = topic.Publish(ctx, &Message{
+		Data:       []byte("hello world"),
+		Attributes: attrs,
+	})
+	if _, err := res.Get(ctx); err != nil {
+		t.Fatalf("Publish message error: %v", err)
+	}
+
+	ctx2, cancel := context.WithTimeout(ctx, 5*time.Second)
+	defer cancel()
+	err = sub.Receive(ctx2, func(_ context.Context, m *Message) {
+		defer m.Ack()
+		if m.Attributes["event_type"] != "1" {
+			t.Fatalf("Got message with attributes that should be filtered out: %v", m.Attributes)
+		}
+	})
+	if err != nil {
+		t.Fatalf("Streaming pull error: %v\n", err)
+	}
+}
diff --git a/pubsub/subscription.go b/pubsub/subscription.go
index cc09209..5cba018 100644
--- a/pubsub/subscription.go
+++ b/pubsub/subscription.go
@@ -233,6 +233,16 @@
 	// DeadLetterPolicy specifies the conditions for dead lettering messages in
 	// a subscription. If not set, dead lettering is disabled.
 	DeadLetterPolicy *DeadLetterPolicy
+
+	// Filter is an expression written in the Cloud Pub/Sub filter language. If
+	// non-empty, then only `PubsubMessage`s whose `attributes` field matches the
+	// filter are delivered on this subscription. If empty, then no messages are
+	// filtered out. Cannot be changed after the subscription is created.
+	//
+	// It is EXPERIMENTAL and a part of a closed alpha that may not be
+	// accessible to all users. This field is subject to change or removal
+	// without notice.
+	Filter string
 }
 
 func (cfg *SubscriptionConfig) toProto(name string) *pb.Subscription {
@@ -259,6 +269,7 @@
 		ExpirationPolicy:         expirationPolicyToProto(cfg.ExpirationPolicy),
 		EnableMessageOrdering:    cfg.EnableMessageOrdering,
 		DeadLetterPolicy:         pbDeadLetter,
+		Filter:                   cfg.Filter,
 	}
 }
 
@@ -287,6 +298,7 @@
 		Labels:              pbSub.Labels,
 		ExpirationPolicy:    expirationPolicy,
 		DeadLetterPolicy:    dlp,
+		Filter:              pbSub.Filter,
 	}
 	pc := protoToPushConfig(pbSub.PushConfig)
 	if pc != nil {