pubsub: add support for filtering
Change-Id: I00ae70d06fdc39789e591eaaabc99b400164e096
Reviewed-on: https://code-review.googlesource.com/c/gocloud/+/54750
Reviewed-by: Kamal Aboul-Hosn <aboulhosn@google.com>
Reviewed-by: Seth Hollyman <shollyman@google.com>
Reviewed-by: kokoro <noreply+kokoro@google.com>
diff --git a/pubsub/integration_test.go b/pubsub/integration_test.go
index 105cea1..a015b30 100644
--- a/pubsub/integration_test.go
+++ b/pubsub/integration_test.go
@@ -1529,3 +1529,77 @@
t.Fatalf("CreateTopic should fail with fake endpoint, got nil err")
}
}
+
+func TestIntegration_Filter_CreateSubscription(t *testing.T) {
+ t.Parallel()
+ ctx := context.Background()
+ client := integrationTestClient(ctx, t)
+ defer client.Close()
+
+ topic, err := client.CreateTopic(ctx, topicIDs.New())
+ if err != nil {
+ t.Fatalf("CreateTopic error: %v", err)
+ }
+ defer topic.Delete(ctx)
+ defer topic.Stop()
+
+ cfg := SubscriptionConfig{
+ Topic: topic,
+ Filter: "attributes.event_type = \"1\"",
+ }
+
+ var sub *Subscription
+ if sub, err = client.CreateSubscription(ctx, subIDs.New(), cfg); err != nil {
+ t.Fatalf("CreateSub error: %v", err)
+ }
+ defer sub.Delete(ctx)
+
+ got, err := sub.Config(ctx)
+ if err != nil {
+ t.Fatal(err)
+ }
+ want := SubscriptionConfig{
+ Topic: topic,
+ AckDeadline: 10 * time.Second,
+ RetainAckedMessages: false,
+ RetentionDuration: defaultRetentionDuration,
+ ExpirationPolicy: defaultExpirationPolicy,
+ Filter: "attributes.event_type = \"1\"",
+ }
+ if diff := testutil.Diff(got, want); diff != "" {
+ t.Fatalf("SubsciptionConfig; got: - want: +\n%s", diff)
+ }
+
+ attrs := make(map[string]string)
+ attrs["event_type"] = "1"
+ res := topic.Publish(ctx, &Message{
+ Data: []byte("hello world"),
+ Attributes: attrs,
+ })
+ if _, err := res.Get(ctx); err != nil {
+ t.Fatalf("Publish message error: %v", err)
+ }
+
+ // Publish the same message with a different event_type
+ // and check it is filtered out.
+ attrs["event_type"] = "2"
+ res = topic.Publish(ctx, &Message{
+ Data: []byte("hello world"),
+ Attributes: attrs,
+ })
+ if _, err := res.Get(ctx); err != nil {
+ t.Fatalf("Publish message error: %v", err)
+ }
+
+ ctx2, cancel := context.WithTimeout(ctx, 5*time.Second)
+ defer cancel()
+ err = sub.Receive(ctx2, func(_ context.Context, m *Message) {
+ defer m.Ack()
+ if m.Attributes["event_type"] != "1" {
+ t.Fatalf("Got message with attributes that should be filtered out: %v", m.Attributes)
+ }
+ })
+ if err != nil {
+ t.Fatalf("Streaming pull error: %v\n", err)
+ }
+}
diff --git a/pubsub/subscription.go b/pubsub/subscription.go
index cc09209..5cba018 100644
--- a/pubsub/subscription.go
+++ b/pubsub/subscription.go
@@ -233,6 +233,16 @@
// DeadLetterPolicy specifies the conditions for dead lettering messages in
// a subscription. If not set, dead lettering is disabled.
DeadLetterPolicy *DeadLetterPolicy
+
+ // Filter is an expression written in the Cloud Pub/Sub filter language. If
+ // non-empty, then only `PubsubMessage`s whose `attributes` field matches the
+ // filter are delivered on this subscription. If empty, then no messages are
+ // filtered out. Cannot be changed after the subscription is created.
+ //
+ // It is EXPERIMENTAL and a part of a closed alpha that may not be
+ // accessible to all users. This field is subject to change or removal
+ // without notice.
+ Filter string
}
func (cfg *SubscriptionConfig) toProto(name string) *pb.Subscription {
@@ -259,6 +269,7 @@
ExpirationPolicy: expirationPolicyToProto(cfg.ExpirationPolicy),
EnableMessageOrdering: cfg.EnableMessageOrdering,
DeadLetterPolicy: pbDeadLetter,
+ Filter: cfg.Filter,
}
}
@@ -287,6 +298,7 @@
Labels: pbSub.Labels,
ExpirationPolicy: expirationPolicy,
DeadLetterPolicy: dlp,
+ Filter: pbSub.Filter,
}
pc := protoToPushConfig(pbSub.PushConfig)
if pc != nil {