pubsub: send initial modack in synchronous=true mode

Messages that do not have an initial modack sent often become invisibly
expired. How this happens:

(given some ack distribution spooled up to 5m)
(given subscription config AckDeadline=15s)

- kaTick happens, send modacks. Next kaTick is 5m later
- 10s after this kaTick occurs, a message arrives. We do not send a modack, so
the expiration is 15s.
- 15s goes by. If the user didn't ack by now this message is dead-in-the-water,
because now the server has decided that this message has expired.
- 4m35s goes by and we send a modack. But, of course, it has already expired.

Whenever the user gets around to sending the ack, if they do so after the 15s
this ack will be recorded as status=expired in stackdriver.

This CL makes all messages - including synchronous - send a modack on receipt
to fix this issue. Users relying on Synchronous=true as a way to force the
subscription's AckDeadline to be used should instead use MaxExtension:

```
	cfg, err := sub.Config(ctx)
	if err != nil {
		// TODO handle err
	}

	sub.ReceiveSettings.MaxExtension = cfg.AckDeadline
```

This CL also removes the customized logger in favour of t.Log and a panic.

Fixes #1247

Change-Id: I6aa65e083ab12a79e61e8a4d968910dc794c964d
Reviewed-on: https://code-review.googlesource.com/c/36430
Reviewed-by: kokoro <noreply+kokoro@google.com>
Reviewed-by: Eno Compton <enocom@google.com>
diff --git a/pubsub/internal/longtest/endtoend_test.go b/pubsub/internal/longtest/endtoend_test.go
index fb4be40..dd1d362 100644
--- a/pubsub/internal/longtest/endtoend_test.go
+++ b/pubsub/internal/longtest/endtoend_test.go
@@ -15,12 +15,10 @@
 package longtest
 
 import (
-	"bytes"
 	"context"
 	"fmt"
 	"log"
 	"math/rand"
-	"os"
 	"sync"
 	"testing"
 	"time"
@@ -28,6 +26,8 @@
 	"cloud.google.com/go/internal/testutil"
 	"cloud.google.com/go/pubsub"
 	"google.golang.org/api/option"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
 )
 
 const (
@@ -38,39 +38,18 @@
 	numAcceptableDups       = int(nMessages * acceptableDupPercentage / 100)
 )
 
-// Buffer log messages to debug failures.
-var logBuf bytes.Buffer
-
 // The end-to-end pumps many messages into a topic and tests that they are all
 // delivered to each subscription for the topic. It also tests that messages
 // are not unexpectedly redelivered.
-func TestIntegration_EndToEnd(t *testing.T) {
-	if testing.Short() {
-		t.Skip("Integration tests skipped in short mode")
-	}
-	log.SetOutput(&logBuf)
-	ctx := context.Background()
-	ts := testutil.TokenSource(ctx, pubsub.ScopePubSub, pubsub.ScopeCloudPlatform)
-	if ts == nil {
-		t.Skip("Integration tests skipped. See CONTRIBUTING.md for details")
-	}
-
-	now := time.Now()
-	topicName := fmt.Sprintf("endtoend-%d", now.UnixNano())
-	subPrefix := fmt.Sprintf("endtoend-%d", now.UnixNano())
-
-	client, err := pubsub.NewClient(ctx, testutil.ProjID(), option.WithTokenSource(ts))
-	if err != nil {
-		t.Fatalf("Creating client error: %v", err)
-	}
-
-	var topic *pubsub.Topic
-	if topic, err = client.CreateTopic(ctx, topicName); err != nil {
-		t.Fatalf("CreateTopic error: %v", err)
-	}
-	defer topic.Delete(ctx)
+func TestEndToEnd_Dupes(t *testing.T) {
+	ctx, cancel := context.WithTimeout(context.Background(), timeout)
+	defer cancel()
+	client, topic, cleanup := prepareEndToEndTest(ctx, t)
+	defer cleanup()
+	subPrefix := fmt.Sprintf("endtoend-%d", time.Now().UnixNano())
 
 	// Two subscriptions to the same topic.
+	var err error
 	var subs [2]*pubsub.Subscription
 	for i := 0; i < len(subs); i++ {
 		subs[i], err = client.CreateSubscription(ctx, fmt.Sprintf("%s-%d", subPrefix, i), pubsub.SubscriptionConfig{
@@ -108,7 +87,7 @@
 		wg.Add(1)
 		go func() {
 			defer wg.Done()
-			con.consume(cctx, t, sub)
+			con.consume(ctx, t, sub)
 		}()
 	}
 	// Wait for a while after the last message before declaring quiescence.
@@ -142,7 +121,6 @@
 		}
 	}
 	wg.Wait()
-	ok := true
 	for i, con := range consumers {
 		var numDups int
 		var zeroes int
@@ -155,14 +133,92 @@
 
 		if zeroes > 0 {
 			t.Errorf("Consumer %d: %d messages never arrived", i, zeroes)
-			ok = false
 		} else if numDups > numAcceptableDups {
 			t.Errorf("Consumer %d: Willing to accept %d dups (%v duplicated of %d messages), but got %d", i, numAcceptableDups, acceptableDupPercentage, int(nMessages), numDups)
-			ok = false
 		}
 	}
-	if !ok {
-		logBuf.WriteTo(os.Stdout)
+}
+
+func TestEndToEnd_LongProcessingTime(t *testing.T) {
+	ctx, cancel := context.WithTimeout(context.Background(), timeout)
+	defer cancel()
+	client, topic, cleanup := prepareEndToEndTest(ctx, t)
+	defer cleanup()
+	subPrefix := fmt.Sprintf("endtoend-%d", time.Now().UnixNano())
+
+	// Two subscriptions to the same topic.
+	sub, err := client.CreateSubscription(ctx, subPrefix+"-00", pubsub.SubscriptionConfig{
+		Topic:       topic,
+		AckDeadline: ackDeadline,
+	})
+	if err != nil {
+		t.Fatalf("CreateSub error: %v", err)
+	}
+	defer sub.Delete(ctx)
+
+	// Tests the issue found in https://github.com/googleapis/google-cloud-go/issues/1247.
+	sub.ReceiveSettings.Synchronous = true
+	sub.ReceiveSettings.MaxOutstandingMessages = 500
+
+	err = publish(ctx, topic, 1000)
+	topic.Stop()
+	if err != nil {
+		t.Fatalf("publish: %v", err)
+	}
+
+	// recv provides an indication that messages are still arriving.
+	recv := make(chan struct{})
+	consumer := consumer{
+		counts:    make(map[string]int),
+		recv:      recv,
+		durations: []time.Duration{time.Hour},
+		processingDelay: func() time.Duration {
+			return time.Duration(1+rand.Int63n(120)) * time.Second
+		},
+	}
+	go consumer.consume(ctx, t, sub)
+	// Wait for a while after the last message before declaring quiescence.
+	// We wait a multiple of the ack deadline, for two reasons:
+	// 1. To detect if messages are redelivered after having their ack
+	//    deadline extended.
+	// 2. To wait for redelivery of messages that were en route when a Receive
+	//    is canceled. This can take considerably longer than the ack deadline.
+	quiescenceDur := ackDeadline * 6
+	quiescenceTimer := time.NewTimer(quiescenceDur)
+loop:
+	for {
+		select {
+		case <-recv:
+			// Reset timer so we wait quiescenceDur after the last message.
+			// See https://godoc.org/time#Timer.Reset for why the Stop
+			// and channel drain are necessary.
+			if !quiescenceTimer.Stop() {
+				<-quiescenceTimer.C
+			}
+			quiescenceTimer.Reset(quiescenceDur)
+
+		case <-quiescenceTimer.C:
+			cancel()
+			log.Println("quiesced")
+			break loop
+
+		case <-ctx.Done():
+			t.Fatal("timed out")
+		}
+	}
+	var numDups int
+	var zeroes int
+	for _, v := range consumer.counts {
+		if v == 0 {
+			zeroes++
+		}
+		numDups += v - 1
+	}
+
+	if zeroes > 0 {
+		t.Errorf("%d messages never arrived", zeroes)
+	} else if numDups > numAcceptableDups {
+		t.Errorf("Willing to accept %d dups (%v duplicated of %d messages), but got %d", numAcceptableDups, acceptableDupPercentage, int(nMessages), numDups)
 	}
 }
 
@@ -184,14 +240,20 @@
 
 // consumer consumes messages according to its configuration.
 type consumer struct {
+	// A consumer will spin out a Receive for each duration, which will be
+	// canceled after each duration and the next one spun up. For example, if
+	// there are 5 3 second durations, then there will be 5 3 second Receives.
 	durations []time.Duration
 
 	// A value is sent to recv each time Inc is called.
 	recv chan struct{}
 
-	mu     sync.Mutex
-	counts map[string]int
-	total  int
+	// How long to wait for before acking.
+	processingDelay func() time.Duration
+
+	mu         sync.Mutex
+	counts     map[string]int // msgID: recvdAmt
+	totalRecvd int
 }
 
 // consume reads messages from a subscription, and keeps track of what it receives in mc.
@@ -201,13 +263,12 @@
 		ctx2, cancel := context.WithTimeout(ctx, dur)
 		defer cancel()
 		id := sub.String()[len(sub.String())-1:]
-		log.Printf("%s: start receive", id)
-		prev := c.total
+		t.Logf("%s: start receive", id)
+		prev := c.totalRecvd
 		err := sub.Receive(ctx2, c.process)
-		log.Printf("%s: end receive; read %d", id, c.total-prev)
-		if err != nil {
-			t.Errorf("error from Receive: %v", err)
-			return
+		t.Logf("%s: end receive; read %d", id, c.totalRecvd-prev)
+		if serr, _ := status.FromError(err); serr.Code() != codes.Canceled {
+			panic(err)
 		}
 		select {
 		case <-ctx.Done():
@@ -215,17 +276,56 @@
 		default:
 		}
 	}
+	return
 }
 
 // process handles a message and records it in mc.
 func (c *consumer) process(_ context.Context, m *pubsub.Message) {
 	c.mu.Lock()
 	c.counts[m.ID]++
-	c.total++
+	c.totalRecvd++
 	c.mu.Unlock()
 	c.recv <- struct{}{}
+
+	var delay time.Duration
+	if c.processingDelay == nil {
+		delay = time.Duration(rand.Intn(int(ackDeadline * 3)))
+	} else {
+		delay = c.processingDelay()
+	}
+
 	// Simulate time taken to process m, while continuing to process more messages.
 	// Some messages will need to have their ack deadline extended due to this delay.
-	delay := rand.Intn(int(ackDeadline * 3))
-	time.AfterFunc(time.Duration(delay), m.Ack)
+	time.AfterFunc(delay, func() {
+		m.Ack()
+	})
+}
+
+// Remember to call cleanup!
+func prepareEndToEndTest(ctx context.Context, t *testing.T) (*pubsub.Client, *pubsub.Topic, func()) {
+	if testing.Short() {
+		t.Skip("Integration tests skipped in short mode")
+	}
+	ts := testutil.TokenSource(ctx, pubsub.ScopePubSub, pubsub.ScopeCloudPlatform)
+	if ts == nil {
+		t.Skip("Integration tests skipped. See CONTRIBUTING.md for details")
+	}
+
+	now := time.Now()
+	topicName := fmt.Sprintf("endtoend-%d", now.UnixNano())
+
+	client, err := pubsub.NewClient(ctx, testutil.ProjID(), option.WithTokenSource(ts))
+	if err != nil {
+		t.Fatalf("Creating client error: %v", err)
+	}
+
+	var topic *pubsub.Topic
+	if topic, err = client.CreateTopic(ctx, topicName); err != nil {
+		t.Fatalf("CreateTopic error: %v", err)
+	}
+
+	return client, topic, func() {
+		topic.Delete(ctx)
+		client.Close()
+	}
 }
diff --git a/pubsub/iterator.go b/pubsub/iterator.go
index 5b3415e..0d2a715 100644
--- a/pubsub/iterator.go
+++ b/pubsub/iterator.go
@@ -214,7 +214,7 @@
 		it.keepAliveDeadlines[m.ackID] = maxExt
 		// Don't change the mod-ack if the message is going to be nacked. This is
 		// possible if there are retries.
-		if !it.pendingNacks[m.ackID] && !it.po.synchronous {
+		if !it.pendingNacks[m.ackID] {
 			ackIDs[m.ackID] = true
 		}
 	}