pubsub: fix tests for ordering keys and DLQ
Fix failed merge for DLQ test and increase timeout for ordering keys verfication
Change-Id: Icac3bb9597c10f0e3d5d6c4f09e895ca3d2db085
Reviewed-on: https://code-review.googlesource.com/c/gocloud/+/53514
Reviewed-by: Tyler Bui-Palsulich <tbp@google.com>
diff --git a/pubsub/integration_test.go b/pubsub/integration_test.go
index 87773d9..4720205 100644
--- a/pubsub/integration_test.go
+++ b/pubsub/integration_test.go
@@ -1168,10 +1168,7 @@
}
func TestIntegration_OrderedKeys_JSON(t *testing.T) {
- if testing.Short() {
- t.Skip("Integration tests skipped in short mode")
- }
-
+ t.Parallel()
ctx := context.Background()
client := integrationTestClient(ctx, t)
defer client.Close()
@@ -1260,8 +1257,8 @@
select {
case <-done:
- case <-time.After(15 * time.Second):
- t.Fatal("timed out after 15s waiting for all messages to be received")
+ case <-time.After(30 * time.Second):
+ t.Fatal("timed out after 30s waiting for all messages to be received")
}
mu.Lock()
@@ -1295,7 +1292,7 @@
topic.PublishSettings.DelayThreshold = time.Second
topic.EnableMessageOrdering = true
- orderingKey := "some-ordering-key"
+ orderingKey := "some-ordering-key2"
// Publish a message that is too large so we'll get an error that
// pauses publishing for this ordering key.
r := topic.Publish(ctx, &Message{
@@ -1340,74 +1337,75 @@
topic, err := client.CreateTopic(ctx, topicIDs.New())
if err != nil {
+ t.Fatalf("CreateTopic error: %v", err)
+ }
- defer topic.Delete(ctx)
- defer topic.Stop()
+ defer topic.Delete(ctx)
+ defer topic.Stop()
- deadLetterTopic, err := client.CreateTopic(ctx, topicIDs.New())
- if err != nil {
- t.Fatalf("CreateTopic error: %v", err)
- }
- defer deadLetterTopic.Delete(ctx)
- defer deadLetterTopic.Stop()
+ deadLetterTopic, err := client.CreateTopic(ctx, topicIDs.New())
+ if err != nil {
+ t.Fatalf("CreateTopic error: %v", err)
+ }
+ defer deadLetterTopic.Delete(ctx)
+ defer deadLetterTopic.Stop()
- // We don't set MaxDeliveryAttempts in DeadLetterPolicy so that we can test
- // that MaxDeliveryAttempts defaults properly to 5 if not set.
- cfg := SubscriptionConfig{
- Topic: topic,
- DeadLetterPolicy: &DeadLetterPolicy{
- DeadLetterTopic: deadLetterTopic.String(),
- },
- }
- var sub *Subscription
- if sub, err = client.CreateSubscription(ctx, subIDs.New(), cfg); err != nil {
- t.Fatalf("CreateSub error: %v", err)
- }
- defer sub.Delete(ctx)
+ // We don't set MaxDeliveryAttempts in DeadLetterPolicy so that we can test
+ // that MaxDeliveryAttempts defaults properly to 5 if not set.
+ cfg := SubscriptionConfig{
+ Topic: topic,
+ DeadLetterPolicy: &DeadLetterPolicy{
+ DeadLetterTopic: deadLetterTopic.String(),
+ },
+ }
+ var sub *Subscription
+ if sub, err = client.CreateSubscription(ctx, subIDs.New(), cfg); err != nil {
+ t.Fatalf("CreateSub error: %v", err)
+ }
+ defer sub.Delete(ctx)
- got, err := sub.Config(ctx)
- if err != nil {
- t.Fatal(err)
- }
- want := SubscriptionConfig{
- Topic: topic,
- AckDeadline: 10 * time.Second,
- RetainAckedMessages: false,
- RetentionDuration: defaultRetentionDuration,
- ExpirationPolicy: defaultExpirationPolicy,
- DeadLetterPolicy: &DeadLetterPolicy{
- DeadLetterTopic: deadLetterTopic.String(),
- MaxDeliveryAttempts: 5,
- },
- }
- if diff := testutil.Diff(got, want); diff != "" {
- t.Fatalf("\ngot: - want: +\n%s", diff)
- }
+ got, err := sub.Config(ctx)
+ if err != nil {
+ t.Fatal(err)
+ }
+ want := SubscriptionConfig{
+ Topic: topic,
+ AckDeadline: 10 * time.Second,
+ RetainAckedMessages: false,
+ RetentionDuration: defaultRetentionDuration,
+ ExpirationPolicy: defaultExpirationPolicy,
+ DeadLetterPolicy: &DeadLetterPolicy{
+ DeadLetterTopic: deadLetterTopic.String(),
+ MaxDeliveryAttempts: 5,
+ },
+ }
+ if diff := testutil.Diff(got, want); diff != "" {
+ t.Fatalf("\ngot: - want: +\n%s", diff)
+ }
- res := topic.Publish(ctx, &Message{
- Data: []byte("failed message"),
- })
- if _, err := res.Get(ctx); err != nil {
- t.Fatalf("Publish message error: %v", err)
- }
+ res := topic.Publish(ctx, &Message{
+ Data: []byte("failed message"),
+ })
+ if _, err := res.Get(ctx); err != nil {
+ t.Fatalf("Publish message error: %v", err)
+ }
- ctx2, cancel := context.WithCancel(ctx)
- numAttempts := 1
- err = sub.Receive(ctx2, func(_ context.Context, m *Message) {
- if numAttempts >= 5 {
- cancel()
- m.Ack()
- return
- }
- if *m.DeliveryAttempt != numAttempts {
- t.Fatalf("Message delivery attempt: %d does not match numAttempts: %d\n", m.DeliveryAttempt, numAttempts)
- }
- numAttempts++
- m.Nack()
- })
- if err != nil {
- t.Fatalf("Streaming pull error: %v\n", err)
+ ctx2, cancel := context.WithCancel(ctx)
+ numAttempts := 1
+ err = sub.Receive(ctx2, func(_ context.Context, m *Message) {
+ if numAttempts >= 5 {
+ cancel()
+ m.Ack()
+ return
}
+ if *m.DeliveryAttempt != numAttempts {
+ t.Fatalf("Message delivery attempt: %d does not match numAttempts: %d\n", m.DeliveryAttempt, numAttempts)
+ }
+ numAttempts++
+ m.Nack()
+ })
+ if err != nil {
+ t.Fatalf("Streaming pull error: %v\n", err)
}
}
diff --git a/pubsub/internal/scheduler/receive_scheduler.go b/pubsub/internal/scheduler/receive_scheduler.go
index 58eb313..c33f0a7 100644
--- a/pubsub/internal/scheduler/receive_scheduler.go
+++ b/pubsub/internal/scheduler/receive_scheduler.go
@@ -45,10 +45,13 @@
// NewReceiveScheduler creates a new ReceiveScheduler.
//
// The workers arg is the number of concurrent calls to handle. If the workers
-// arg is 0, then a healthy default of 10 workers is used.
+// arg is 0, then a healthy default of 10 workers is used. If less than 0, this
+// will be set to an large number, similar to PublishScheduler's handler limit.
func NewReceiveScheduler(workers int) *ReceiveScheduler {
if workers == 0 {
workers = 10
+ } else if workers < 0 {
+ workers = 1e9
}
return &ReceiveScheduler{
diff --git a/pubsub/subscription.go b/pubsub/subscription.go
index 17c9283..95666a7 100644
--- a/pubsub/subscription.go
+++ b/pubsub/subscription.go
@@ -709,7 +709,7 @@
}
fc := newFlowController(maxCount, maxBytes)
- sched := scheduler.NewReceiveScheduler(numGoroutines)
+ sched := scheduler.NewReceiveScheduler(maxCount)
// Wait for all goroutines started by Receive to return, so instead of an
// obscure goroutine leak we have an obvious blocked call to Receive.