fix(pubsub): respect subscription message ordering field in scheduler (#3886)
* fix(pubsub): respect subscription message ordering field in scheduler
* clarify comments and add testing to publishSync
* fix doc comments, remove unnecessary assignment
* revert pullstream test change
* fix error with RPC error test after adding config check in subscription
diff --git a/pubsub/integration_test.go b/pubsub/integration_test.go
index 3427655..c3e172e 100644
--- a/pubsub/integration_test.go
+++ b/pubsub/integration_test.go
@@ -22,6 +22,7 @@
"os"
"strings"
"sync"
+ "sync/atomic"
"testing"
"time"
@@ -683,6 +684,16 @@
}
}
+// publishSync is a utility function for publishing a message and
+// blocking until the message has been confirmed.
+func publishSync(ctx context.Context, t *testing.T, topic *Topic, msg *Message) {
+ res := topic.Publish(ctx, msg)
+ _, err := res.Get(ctx)
+ if err != nil {
+ t.Fatalf("publishSync err: %v", err)
+ }
+}
+
func TestIntegration_UpdateSubscription_ExpirationPolicy(t *testing.T) {
t.Parallel()
ctx := context.Background()
@@ -1344,6 +1355,76 @@
}
}
+// TestIntegration_OrderedKeys_SubscriptionOrdering tests that messages
+// with ordering keys are not processed as such if the subscription
+// does not have message ordering enabled.
+func TestIntegration_OrderedKeys_SubscriptionOrdering(t *testing.T) {
+ ctx := context.Background()
+ client := integrationTestClient(ctx, t, option.WithEndpoint("us-west1-pubsub.googleapis.com:443"))
+ defer client.Close()
+
+ topic, err := client.CreateTopic(ctx, topicIDs.New())
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer topic.Delete(ctx)
+ defer topic.Stop()
+ exists, err := topic.Exists(ctx)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if !exists {
+ t.Fatalf("topic %v should exist, but it doesn't", topic)
+ }
+ topic.EnableMessageOrdering = true
+
+ // Explicitly disable message ordering on the subscription.
+ enableMessageOrdering := false
+ subCfg := SubscriptionConfig{
+ Topic: topic,
+ EnableMessageOrdering: enableMessageOrdering,
+ }
+ sub, err := client.CreateSubscription(ctx, subIDs.New(), subCfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer sub.Delete(ctx)
+
+ publishSync(ctx, t, topic, &Message{
+ Data: []byte("message-1"),
+ OrderingKey: "ordering-key-1",
+ })
+
+ publishSync(ctx, t, topic, &Message{
+ Data: []byte("message-2"),
+ OrderingKey: "ordering-key-1",
+ })
+
+ sub.ReceiveSettings.Synchronous = true
+ ctx2, cancel := context.WithTimeout(ctx, 12*time.Second)
+ defer cancel()
+
+ var numAcked int32
+ sub.Receive(ctx2, func(_ context.Context, msg *Message) {
+ // Create artificial constraints on message processing time.
+ if string(msg.Data) == "message-1" {
+ time.Sleep(10 * time.Second)
+ } else {
+ time.Sleep(5 * time.Second)
+ }
+ msg.Ack()
+ atomic.AddInt32(&numAcked, 1)
+ })
+ if sub.enableOrdering != enableMessageOrdering {
+ t.Fatalf("enableOrdering mismatch: got: %v, want: %v", sub.enableOrdering, enableMessageOrdering)
+ }
+ // If the messages were received on a subscription with the EnableMessageOrdering=true,
+ // total processing would exceed the timeout and only one message would be processed.
+ if numAcked < 2 {
+ t.Fatalf("did not process all messages in time, numAcked: %d", numAcked)
+ }
+}
+
func TestIntegration_CreateSubscription_DeadLetterPolicy(t *testing.T) {
t.Parallel()
ctx := context.Background()
diff --git a/pubsub/pullstream_test.go b/pubsub/pullstream_test.go
index a91f98e..5554a29 100644
--- a/pubsub/pullstream_test.go
+++ b/pubsub/pullstream_test.go
@@ -101,10 +101,20 @@
t.Fatal(err)
}
defer client.Close()
+ topic, err := client.CreateTopic(ctx, "foo")
+ if err != nil {
+ t.Fatal(err)
+ }
+ sub, err := client.CreateSubscription(ctx, "foo", SubscriptionConfig{
+ Topic: topic,
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
errc := make(chan error)
go func() {
- errc <- client.Subscription("foo").Receive(ctx, func(context.Context, *Message) {
+ errc <- sub.Receive(ctx, func(context.Context, *Message) {
t.Error("should not have received any data")
})
}()
@@ -118,7 +128,7 @@
t.Fatal("expected to receive a grpc ResourceExhausted error")
}
} else {
- t.Fatal("expected to receive a grpc ResourceExhausted error")
+ t.Fatalf("expected to receive a grpc ResourceExhausted error: %v", err)
}
}
}
diff --git a/pubsub/subscription.go b/pubsub/subscription.go
index 85f8e20..982743b 100644
--- a/pubsub/subscription.go
+++ b/pubsub/subscription.go
@@ -47,6 +47,8 @@
mu sync.Mutex
receiveActive bool
+
+ enableOrdering bool
}
// Subscription creates a reference to a subscription.
@@ -772,6 +774,14 @@
s.mu.Unlock()
defer func() { s.mu.Lock(); s.receiveActive = false; s.mu.Unlock() }()
+ // Check config to check EnableMessageOrdering field.
+ // See: https://github.com/googleapis/google-cloud-go/issues/3884
+ cfg, err := s.Config(ctx)
+ if err != nil {
+ return fmt.Errorf("sub.Config err: %v", err)
+ }
+ s.enableOrdering = cfg.EnableMessageOrdering
+
maxCount := s.ReceiveSettings.MaxOutstandingMessages
if maxCount == 0 {
maxCount = DefaultReceiveSettings.MaxOutstandingMessages
@@ -901,9 +911,14 @@
old(ackID, ack, receiveTime)
}
wg.Add(1)
+ // Make sure the subscription has ordering enabled before adding to scheduler.
+ var key string
+ if s.enableOrdering {
+ key = msg.OrderingKey
+ }
// TODO(deklerk): Can we have a generic handler at the
// constructor level?
- if err := sched.Add(msg.OrderingKey, msg, func(msg interface{}) {
+ if err := sched.Add(key, msg, func(msg interface{}) {
defer wg.Done()
f(ctx2, msg.(*Message))
}); err != nil {