pubsub: send initial modack in synchronous=true mode
Messages that do not have an initial modack sent often become invisibly
expired. How this happens:
(given some ack distribution spooled up to 5m)
(given subscription config AckDeadline=15s)
- kaTick happens, send modacks. Next kaTick is 5m later
- 10s after this kaTick occurs, a message arrives. We do not send a modack, so
the expiration is 15s.
- 15s goes by. If the user didn't ack by now this message is dead-in-the-water,
because now the server has decided that this message has expired.
- 4m35s goes by and we send a modack. But, of course, it has already expired.
Whenever the user gets around to sending the ack, if they do so after the 15s
this ack will be recorded as status=expired in stackdriver.
This CL makes all messages - including synchronous - send a modack on receipt
to fix this issue. Users relying on Synchronous=true as a way to force the
subscription's AckDeadline to be used should instead use MaxExtension:
```
cfg, err := sub.Config(ctx)
if err != nil {
// TODO handle err
}
sub.ReceiveSettings.MaxExtension = cfg.AckDeadline
```
This CL also removes the customized logger in favour of t.Log and a panic.
Fixes #1247
Change-Id: I6aa65e083ab12a79e61e8a4d968910dc794c964d
Reviewed-on: https://code-review.googlesource.com/c/36430
Reviewed-by: kokoro <noreply+kokoro@google.com>
Reviewed-by: Eno Compton <enocom@google.com>
diff --git a/pubsub/internal/longtest/endtoend_test.go b/pubsub/internal/longtest/endtoend_test.go
index fb4be40..dd1d362 100644
--- a/pubsub/internal/longtest/endtoend_test.go
+++ b/pubsub/internal/longtest/endtoend_test.go
@@ -15,12 +15,10 @@
package longtest
import (
- "bytes"
"context"
"fmt"
"log"
"math/rand"
- "os"
"sync"
"testing"
"time"
@@ -28,6 +26,8 @@
"cloud.google.com/go/internal/testutil"
"cloud.google.com/go/pubsub"
"google.golang.org/api/option"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
)
const (
@@ -38,39 +38,18 @@
numAcceptableDups = int(nMessages * acceptableDupPercentage / 100)
)
-// Buffer log messages to debug failures.
-var logBuf bytes.Buffer
-
// The end-to-end pumps many messages into a topic and tests that they are all
// delivered to each subscription for the topic. It also tests that messages
// are not unexpectedly redelivered.
-func TestIntegration_EndToEnd(t *testing.T) {
- if testing.Short() {
- t.Skip("Integration tests skipped in short mode")
- }
- log.SetOutput(&logBuf)
- ctx := context.Background()
- ts := testutil.TokenSource(ctx, pubsub.ScopePubSub, pubsub.ScopeCloudPlatform)
- if ts == nil {
- t.Skip("Integration tests skipped. See CONTRIBUTING.md for details")
- }
-
- now := time.Now()
- topicName := fmt.Sprintf("endtoend-%d", now.UnixNano())
- subPrefix := fmt.Sprintf("endtoend-%d", now.UnixNano())
-
- client, err := pubsub.NewClient(ctx, testutil.ProjID(), option.WithTokenSource(ts))
- if err != nil {
- t.Fatalf("Creating client error: %v", err)
- }
-
- var topic *pubsub.Topic
- if topic, err = client.CreateTopic(ctx, topicName); err != nil {
- t.Fatalf("CreateTopic error: %v", err)
- }
- defer topic.Delete(ctx)
+func TestEndToEnd_Dupes(t *testing.T) {
+ ctx, cancel := context.WithTimeout(context.Background(), timeout)
+ defer cancel()
+ client, topic, cleanup := prepareEndToEndTest(ctx, t)
+ defer cleanup()
+ subPrefix := fmt.Sprintf("endtoend-%d", time.Now().UnixNano())
// Two subscriptions to the same topic.
+ var err error
var subs [2]*pubsub.Subscription
for i := 0; i < len(subs); i++ {
subs[i], err = client.CreateSubscription(ctx, fmt.Sprintf("%s-%d", subPrefix, i), pubsub.SubscriptionConfig{
@@ -108,7 +87,7 @@
wg.Add(1)
go func() {
defer wg.Done()
- con.consume(cctx, t, sub)
+ con.consume(ctx, t, sub)
}()
}
// Wait for a while after the last message before declaring quiescence.
@@ -142,7 +121,6 @@
}
}
wg.Wait()
- ok := true
for i, con := range consumers {
var numDups int
var zeroes int
@@ -155,14 +133,92 @@
if zeroes > 0 {
t.Errorf("Consumer %d: %d messages never arrived", i, zeroes)
- ok = false
} else if numDups > numAcceptableDups {
t.Errorf("Consumer %d: Willing to accept %d dups (%v duplicated of %d messages), but got %d", i, numAcceptableDups, acceptableDupPercentage, int(nMessages), numDups)
- ok = false
}
}
- if !ok {
- logBuf.WriteTo(os.Stdout)
+}
+
+func TestEndToEnd_LongProcessingTime(t *testing.T) {
+ ctx, cancel := context.WithTimeout(context.Background(), timeout)
+ defer cancel()
+ client, topic, cleanup := prepareEndToEndTest(ctx, t)
+ defer cleanup()
+ subPrefix := fmt.Sprintf("endtoend-%d", time.Now().UnixNano())
+
+ // Two subscriptions to the same topic.
+ sub, err := client.CreateSubscription(ctx, subPrefix+"-00", pubsub.SubscriptionConfig{
+ Topic: topic,
+ AckDeadline: ackDeadline,
+ })
+ if err != nil {
+ t.Fatalf("CreateSub error: %v", err)
+ }
+ defer sub.Delete(ctx)
+
+ // Tests the issue found in https://github.com/googleapis/google-cloud-go/issues/1247.
+ sub.ReceiveSettings.Synchronous = true
+ sub.ReceiveSettings.MaxOutstandingMessages = 500
+
+ err = publish(ctx, topic, 1000)
+ topic.Stop()
+ if err != nil {
+ t.Fatalf("publish: %v", err)
+ }
+
+ // recv provides an indication that messages are still arriving.
+ recv := make(chan struct{})
+ consumer := consumer{
+ counts: make(map[string]int),
+ recv: recv,
+ durations: []time.Duration{time.Hour},
+ processingDelay: func() time.Duration {
+ return time.Duration(1+rand.Int63n(120)) * time.Second
+ },
+ }
+ go consumer.consume(ctx, t, sub)
+ // Wait for a while after the last message before declaring quiescence.
+ // We wait a multiple of the ack deadline, for two reasons:
+ // 1. To detect if messages are redelivered after having their ack
+ // deadline extended.
+ // 2. To wait for redelivery of messages that were en route when a Receive
+ // is canceled. This can take considerably longer than the ack deadline.
+ quiescenceDur := ackDeadline * 6
+ quiescenceTimer := time.NewTimer(quiescenceDur)
+loop:
+ for {
+ select {
+ case <-recv:
+ // Reset timer so we wait quiescenceDur after the last message.
+ // See https://godoc.org/time#Timer.Reset for why the Stop
+ // and channel drain are necessary.
+ if !quiescenceTimer.Stop() {
+ <-quiescenceTimer.C
+ }
+ quiescenceTimer.Reset(quiescenceDur)
+
+ case <-quiescenceTimer.C:
+ cancel()
+ log.Println("quiesced")
+ break loop
+
+ case <-ctx.Done():
+ t.Fatal("timed out")
+ }
+ }
+ var numDups int
+ var zeroes int
+ for _, v := range consumer.counts {
+ if v == 0 {
+ zeroes++
+ }
+ numDups += v - 1
+ }
+
+ if zeroes > 0 {
+ t.Errorf("%d messages never arrived", zeroes)
+ } else if numDups > numAcceptableDups {
+ t.Errorf("Willing to accept %d dups (%v duplicated of %d messages), but got %d", numAcceptableDups, acceptableDupPercentage, int(nMessages), numDups)
}
}
@@ -184,14 +240,20 @@
// consumer consumes messages according to its configuration.
type consumer struct {
+ // A consumer will spin out a Receive for each duration, which will be
+ // canceled after each duration and the next one spun up. For example, if
+ // there are 5 3 second durations, then there will be 5 3 second Receives.
durations []time.Duration
// A value is sent to recv each time Inc is called.
recv chan struct{}
- mu sync.Mutex
- counts map[string]int
- total int
+ // How long to wait for before acking.
+ processingDelay func() time.Duration
+
+ mu sync.Mutex
+ counts map[string]int // msgID: recvdAmt
+ totalRecvd int
}
// consume reads messages from a subscription, and keeps track of what it receives in mc.
@@ -201,13 +263,12 @@
ctx2, cancel := context.WithTimeout(ctx, dur)
defer cancel()
id := sub.String()[len(sub.String())-1:]
- log.Printf("%s: start receive", id)
- prev := c.total
+ t.Logf("%s: start receive", id)
+ prev := c.totalRecvd
err := sub.Receive(ctx2, c.process)
- log.Printf("%s: end receive; read %d", id, c.total-prev)
- if err != nil {
- t.Errorf("error from Receive: %v", err)
- return
+ t.Logf("%s: end receive; read %d", id, c.totalRecvd-prev)
+ if serr, _ := status.FromError(err); serr.Code() != codes.Canceled {
+ panic(err)
}
select {
case <-ctx.Done():
@@ -215,17 +276,56 @@
default:
}
}
+ return
}
// process handles a message and records it in mc.
func (c *consumer) process(_ context.Context, m *pubsub.Message) {
c.mu.Lock()
c.counts[m.ID]++
- c.total++
+ c.totalRecvd++
c.mu.Unlock()
c.recv <- struct{}{}
+
+ var delay time.Duration
+ if c.processingDelay == nil {
+ delay = time.Duration(rand.Intn(int(ackDeadline * 3)))
+ } else {
+ delay = c.processingDelay()
+ }
+
// Simulate time taken to process m, while continuing to process more messages.
// Some messages will need to have their ack deadline extended due to this delay.
- delay := rand.Intn(int(ackDeadline * 3))
- time.AfterFunc(time.Duration(delay), m.Ack)
+ time.AfterFunc(delay, func() {
+ m.Ack()
+ })
+}
+
+// Remember to call cleanup!
+func prepareEndToEndTest(ctx context.Context, t *testing.T) (*pubsub.Client, *pubsub.Topic, func()) {
+ if testing.Short() {
+ t.Skip("Integration tests skipped in short mode")
+ }
+ ts := testutil.TokenSource(ctx, pubsub.ScopePubSub, pubsub.ScopeCloudPlatform)
+ if ts == nil {
+ t.Skip("Integration tests skipped. See CONTRIBUTING.md for details")
+ }
+
+ now := time.Now()
+ topicName := fmt.Sprintf("endtoend-%d", now.UnixNano())
+
+ client, err := pubsub.NewClient(ctx, testutil.ProjID(), option.WithTokenSource(ts))
+ if err != nil {
+ t.Fatalf("Creating client error: %v", err)
+ }
+
+ var topic *pubsub.Topic
+ if topic, err = client.CreateTopic(ctx, topicName); err != nil {
+ t.Fatalf("CreateTopic error: %v", err)
+ }
+
+ return client, topic, func() {
+ topic.Delete(ctx)
+ client.Close()
+ }
}
diff --git a/pubsub/iterator.go b/pubsub/iterator.go
index 5b3415e..0d2a715 100644
--- a/pubsub/iterator.go
+++ b/pubsub/iterator.go
@@ -214,7 +214,7 @@
it.keepAliveDeadlines[m.ackID] = maxExt
// Don't change the mod-ack if the message is going to be nacked. This is
// possible if there are retries.
- if !it.pendingNacks[m.ackID] && !it.po.synchronous {
+ if !it.pendingNacks[m.ackID] {
ackIDs[m.ackID] = true
}
}