fix(pubsub): respect subscription message ordering field in scheduler (#3886)

* fix(pubsub): respect subscription message ordering field in scheduler

* clarify comments and add testing to publishSync

* fix doc comments, remove unnecessary assignment

* revert pullstream test change

* fix error with RPC error test after adding config check in subscription
diff --git a/pubsub/integration_test.go b/pubsub/integration_test.go
index 3427655..c3e172e 100644
--- a/pubsub/integration_test.go
+++ b/pubsub/integration_test.go
@@ -22,6 +22,7 @@
 	"os"
 	"strings"
 	"sync"
+	"sync/atomic"
 	"testing"
 	"time"
 
@@ -683,6 +684,16 @@
 	}
 }
 
+// publishSync is a utility function for publishing a message and
+// blocking until the message has been confirmed.
+func publishSync(ctx context.Context, t *testing.T, topic *Topic, msg *Message) {
+	res := topic.Publish(ctx, msg)
+	_, err := res.Get(ctx)
+	if err != nil {
+		t.Fatalf("publishSync err: %v", err)
+	}
+}
+
 func TestIntegration_UpdateSubscription_ExpirationPolicy(t *testing.T) {
 	t.Parallel()
 	ctx := context.Background()
@@ -1344,6 +1355,76 @@
 	}
 }
 
+// TestIntegration_OrderedKeys_SubscriptionOrdering tests that messages
+// with ordering keys are not processed as such if the subscription
+// does not have message ordering enabled.
+func TestIntegration_OrderedKeys_SubscriptionOrdering(t *testing.T) {
+	ctx := context.Background()
+	client := integrationTestClient(ctx, t, option.WithEndpoint("us-west1-pubsub.googleapis.com:443"))
+	defer client.Close()
+
+	topic, err := client.CreateTopic(ctx, topicIDs.New())
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer topic.Delete(ctx)
+	defer topic.Stop()
+	exists, err := topic.Exists(ctx)
+	if err != nil {
+		t.Fatal(err)
+	}
+	if !exists {
+		t.Fatalf("topic %v should exist, but it doesn't", topic)
+	}
+	topic.EnableMessageOrdering = true
+
+	// Explicitly disable message ordering on the subscription.
+	enableMessageOrdering := false
+	subCfg := SubscriptionConfig{
+		Topic:                 topic,
+		EnableMessageOrdering: enableMessageOrdering,
+	}
+	sub, err := client.CreateSubscription(ctx, subIDs.New(), subCfg)
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer sub.Delete(ctx)
+
+	publishSync(ctx, t, topic, &Message{
+		Data:        []byte("message-1"),
+		OrderingKey: "ordering-key-1",
+	})
+
+	publishSync(ctx, t, topic, &Message{
+		Data:        []byte("message-2"),
+		OrderingKey: "ordering-key-1",
+	})
+
+	sub.ReceiveSettings.Synchronous = true
+	ctx2, cancel := context.WithTimeout(ctx, 12*time.Second)
+	defer cancel()
+
+	var numAcked int32
+	sub.Receive(ctx2, func(_ context.Context, msg *Message) {
+		// Create artificial constraints on message processing time.
+		if string(msg.Data) == "message-1" {
+			time.Sleep(10 * time.Second)
+		} else {
+			time.Sleep(5 * time.Second)
+		}
+		msg.Ack()
+		atomic.AddInt32(&numAcked, 1)
+	})
+	if sub.enableOrdering != enableMessageOrdering {
+		t.Fatalf("enableOrdering mismatch: got: %v, want: %v", sub.enableOrdering, enableMessageOrdering)
+	}
+	// If the messages were received on a subscription with the EnableMessageOrdering=true,
+	// total processing would exceed the timeout and only one message would be processed.
+	if numAcked < 2 {
+		t.Fatalf("did not process all messages in time, numAcked: %d", numAcked)
+	}
+}
+
 func TestIntegration_CreateSubscription_DeadLetterPolicy(t *testing.T) {
 	t.Parallel()
 	ctx := context.Background()
diff --git a/pubsub/pullstream_test.go b/pubsub/pullstream_test.go
index a91f98e..5554a29 100644
--- a/pubsub/pullstream_test.go
+++ b/pubsub/pullstream_test.go
@@ -101,10 +101,20 @@
 		t.Fatal(err)
 	}
 	defer client.Close()
+	topic, err := client.CreateTopic(ctx, "foo")
+	if err != nil {
+		t.Fatal(err)
+	}
+	sub, err := client.CreateSubscription(ctx, "foo", SubscriptionConfig{
+		Topic: topic,
+	})
+	if err != nil {
+		t.Fatal(err)
+	}
 
 	errc := make(chan error)
 	go func() {
-		errc <- client.Subscription("foo").Receive(ctx, func(context.Context, *Message) {
+		errc <- sub.Receive(ctx, func(context.Context, *Message) {
 			t.Error("should not have received any data")
 		})
 	}()
@@ -118,7 +128,7 @@
 				t.Fatal("expected to receive a grpc ResourceExhausted error")
 			}
 		} else {
-			t.Fatal("expected to receive a grpc ResourceExhausted error")
+			t.Fatalf("expected to receive a grpc ResourceExhausted error: %v", err)
 		}
 	}
 }
diff --git a/pubsub/subscription.go b/pubsub/subscription.go
index 85f8e20..982743b 100644
--- a/pubsub/subscription.go
+++ b/pubsub/subscription.go
@@ -47,6 +47,8 @@
 
 	mu            sync.Mutex
 	receiveActive bool
+
+	enableOrdering bool
 }
 
 // Subscription creates a reference to a subscription.
@@ -772,6 +774,14 @@
 	s.mu.Unlock()
 	defer func() { s.mu.Lock(); s.receiveActive = false; s.mu.Unlock() }()
 
+	// Check config to check EnableMessageOrdering field.
+	// See: https://github.com/googleapis/google-cloud-go/issues/3884
+	cfg, err := s.Config(ctx)
+	if err != nil {
+		return fmt.Errorf("sub.Config err: %v", err)
+	}
+	s.enableOrdering = cfg.EnableMessageOrdering
+
 	maxCount := s.ReceiveSettings.MaxOutstandingMessages
 	if maxCount == 0 {
 		maxCount = DefaultReceiveSettings.MaxOutstandingMessages
@@ -901,9 +911,14 @@
 						old(ackID, ack, receiveTime)
 					}
 					wg.Add(1)
+					// Make sure the subscription has ordering enabled before adding to scheduler.
+					var key string
+					if s.enableOrdering {
+						key = msg.OrderingKey
+					}
 					// TODO(deklerk): Can we have a generic handler at the
 					// constructor level?
-					if err := sched.Add(msg.OrderingKey, msg, func(msg interface{}) {
+					if err := sched.Add(key, msg, func(msg interface{}) {
 						defer wg.Done()
 						f(ctx2, msg.(*Message))
 					}); err != nil {