pubsub: fix tests for ordering keys and DLQ

Fix failed merge for DLQ test and increase timeout for ordering keys verfication

Change-Id: Icac3bb9597c10f0e3d5d6c4f09e895ca3d2db085
Reviewed-on: https://code-review.googlesource.com/c/gocloud/+/53514
Reviewed-by: Tyler Bui-Palsulich <tbp@google.com>
diff --git a/pubsub/integration_test.go b/pubsub/integration_test.go
index 87773d9..4720205 100644
--- a/pubsub/integration_test.go
+++ b/pubsub/integration_test.go
@@ -1168,10 +1168,7 @@
 }
 
 func TestIntegration_OrderedKeys_JSON(t *testing.T) {
-	if testing.Short() {
-		t.Skip("Integration tests skipped in short mode")
-	}
-
+	t.Parallel()
 	ctx := context.Background()
 	client := integrationTestClient(ctx, t)
 	defer client.Close()
@@ -1260,8 +1257,8 @@
 
 	select {
 	case <-done:
-	case <-time.After(15 * time.Second):
-		t.Fatal("timed out after 15s waiting for all messages to be received")
+	case <-time.After(30 * time.Second):
+		t.Fatal("timed out after 30s waiting for all messages to be received")
 	}
 
 	mu.Lock()
@@ -1295,7 +1292,7 @@
 	topic.PublishSettings.DelayThreshold = time.Second
 	topic.EnableMessageOrdering = true
 
-	orderingKey := "some-ordering-key"
+	orderingKey := "some-ordering-key2"
 	// Publish a message that is too large so we'll get an error that
 	// pauses publishing for this ordering key.
 	r := topic.Publish(ctx, &Message{
@@ -1340,74 +1337,75 @@
 
 	topic, err := client.CreateTopic(ctx, topicIDs.New())
 	if err != nil {
+		t.Fatalf("CreateTopic error: %v", err)
+	}
 
-		defer topic.Delete(ctx)
-		defer topic.Stop()
+	defer topic.Delete(ctx)
+	defer topic.Stop()
 
-		deadLetterTopic, err := client.CreateTopic(ctx, topicIDs.New())
-		if err != nil {
-			t.Fatalf("CreateTopic error: %v", err)
-		}
-		defer deadLetterTopic.Delete(ctx)
-		defer deadLetterTopic.Stop()
+	deadLetterTopic, err := client.CreateTopic(ctx, topicIDs.New())
+	if err != nil {
+		t.Fatalf("CreateTopic error: %v", err)
+	}
+	defer deadLetterTopic.Delete(ctx)
+	defer deadLetterTopic.Stop()
 
-		// We don't set MaxDeliveryAttempts in DeadLetterPolicy so that we can test
-		// that MaxDeliveryAttempts defaults properly to 5 if not set.
-		cfg := SubscriptionConfig{
-			Topic: topic,
-			DeadLetterPolicy: &DeadLetterPolicy{
-				DeadLetterTopic: deadLetterTopic.String(),
-			},
-		}
-		var sub *Subscription
-		if sub, err = client.CreateSubscription(ctx, subIDs.New(), cfg); err != nil {
-			t.Fatalf("CreateSub error: %v", err)
-		}
-		defer sub.Delete(ctx)
+	// We don't set MaxDeliveryAttempts in DeadLetterPolicy so that we can test
+	// that MaxDeliveryAttempts defaults properly to 5 if not set.
+	cfg := SubscriptionConfig{
+		Topic: topic,
+		DeadLetterPolicy: &DeadLetterPolicy{
+			DeadLetterTopic: deadLetterTopic.String(),
+		},
+	}
+	var sub *Subscription
+	if sub, err = client.CreateSubscription(ctx, subIDs.New(), cfg); err != nil {
+		t.Fatalf("CreateSub error: %v", err)
+	}
+	defer sub.Delete(ctx)
 
-		got, err := sub.Config(ctx)
-		if err != nil {
-			t.Fatal(err)
-		}
-		want := SubscriptionConfig{
-			Topic:               topic,
-			AckDeadline:         10 * time.Second,
-			RetainAckedMessages: false,
-			RetentionDuration:   defaultRetentionDuration,
-			ExpirationPolicy:    defaultExpirationPolicy,
-			DeadLetterPolicy: &DeadLetterPolicy{
-				DeadLetterTopic:     deadLetterTopic.String(),
-				MaxDeliveryAttempts: 5,
-			},
-		}
-		if diff := testutil.Diff(got, want); diff != "" {
-			t.Fatalf("\ngot: - want: +\n%s", diff)
-		}
+	got, err := sub.Config(ctx)
+	if err != nil {
+		t.Fatal(err)
+	}
+	want := SubscriptionConfig{
+		Topic:               topic,
+		AckDeadline:         10 * time.Second,
+		RetainAckedMessages: false,
+		RetentionDuration:   defaultRetentionDuration,
+		ExpirationPolicy:    defaultExpirationPolicy,
+		DeadLetterPolicy: &DeadLetterPolicy{
+			DeadLetterTopic:     deadLetterTopic.String(),
+			MaxDeliveryAttempts: 5,
+		},
+	}
+	if diff := testutil.Diff(got, want); diff != "" {
+		t.Fatalf("\ngot: - want: +\n%s", diff)
+	}
 
-		res := topic.Publish(ctx, &Message{
-			Data: []byte("failed message"),
-		})
-		if _, err := res.Get(ctx); err != nil {
-			t.Fatalf("Publish message error: %v", err)
-		}
+	res := topic.Publish(ctx, &Message{
+		Data: []byte("failed message"),
+	})
+	if _, err := res.Get(ctx); err != nil {
+		t.Fatalf("Publish message error: %v", err)
+	}
 
-		ctx2, cancel := context.WithCancel(ctx)
-		numAttempts := 1
-		err = sub.Receive(ctx2, func(_ context.Context, m *Message) {
-			if numAttempts >= 5 {
-				cancel()
-				m.Ack()
-				return
-			}
-			if *m.DeliveryAttempt != numAttempts {
-				t.Fatalf("Message delivery attempt: %d does not match numAttempts: %d\n", m.DeliveryAttempt, numAttempts)
-			}
-			numAttempts++
-			m.Nack()
-		})
-		if err != nil {
-			t.Fatalf("Streaming pull error: %v\n", err)
+	ctx2, cancel := context.WithCancel(ctx)
+	numAttempts := 1
+	err = sub.Receive(ctx2, func(_ context.Context, m *Message) {
+		if numAttempts >= 5 {
+			cancel()
+			m.Ack()
+			return
 		}
+		if *m.DeliveryAttempt != numAttempts {
+			t.Fatalf("Message delivery attempt: %d does not match numAttempts: %d\n", m.DeliveryAttempt, numAttempts)
+		}
+		numAttempts++
+		m.Nack()
+	})
+	if err != nil {
+		t.Fatalf("Streaming pull error: %v\n", err)
 	}
 }
 
diff --git a/pubsub/internal/scheduler/receive_scheduler.go b/pubsub/internal/scheduler/receive_scheduler.go
index 58eb313..c33f0a7 100644
--- a/pubsub/internal/scheduler/receive_scheduler.go
+++ b/pubsub/internal/scheduler/receive_scheduler.go
@@ -45,10 +45,13 @@
 // NewReceiveScheduler creates a new ReceiveScheduler.
 //
 // The workers arg is the number of concurrent calls to handle. If the workers
-// arg is 0, then a healthy default of 10 workers is used.
+// arg is 0, then a healthy default of 10 workers is used. If less than 0, this
+// will be set to an large number, similar to PublishScheduler's handler limit.
 func NewReceiveScheduler(workers int) *ReceiveScheduler {
 	if workers == 0 {
 		workers = 10
+	} else if workers < 0 {
+		workers = 1e9
 	}
 
 	return &ReceiveScheduler{
diff --git a/pubsub/subscription.go b/pubsub/subscription.go
index 17c9283..95666a7 100644
--- a/pubsub/subscription.go
+++ b/pubsub/subscription.go
@@ -709,7 +709,7 @@
 	}
 	fc := newFlowController(maxCount, maxBytes)
 
-	sched := scheduler.NewReceiveScheduler(numGoroutines)
+	sched := scheduler.NewReceiveScheduler(maxCount)
 
 	// Wait for all goroutines started by Receive to return, so instead of an
 	// obscure goroutine leak we have an obvious blocked call to Receive.