pubsub: fix "log after test finished" failure

This CL fixes a problem in pubsub's e2e test whereby a log can happen after
the test is finished, since the test doesn't wait for the async consumer
to finish before declaring the test completed. This CL fixes that by adding
an explicit "done" to consumer.consume which the tests wait on.

See: https://sponge.corp.google.com/target?id=c5050c57-9048-4b70-af8f-41bc4b627020&target=cloud-go-libraries/google-cloud-go/continuous/go112&searchFor=&show=FAILED&sortBy=STATUS

```
=== RUN   TestEndToEnd_LongProcessingTime
2019/07/10 18:55:35 quiesced
--- PASS: TestEndToEnd_LongProcessingTime (148.68s)
    endtoend_test.go:266: 0: start receive
PASS
panic: Log in goroutine after TestEndToEnd_LongProcessingTime has completed

goroutine 40333 [running]:
testing.(*common).logDepth(0xc0002c6300, 0xc0005b84a0, 0x19, 0x3)
	/usr/local/go/src/testing/testing.go:634 +0x51a
testing.(*common).log(...)
	/usr/local/go/src/testing/testing.go:614
testing.(*common).Logf(0xc0002c6300, 0xcf7981, 0x18, 0xc000219f88, 0x2, 0x2)
	/usr/local/go/src/testing/testing.go:649 +0x91
cloud.google.com/go/pubsub/internal/longtest.(*consumer).consume(0xc000299540, 0xde0b80, 0xc00034a120, 0xc0002c6300, 0xc000482320)
	/root/go/src/cloud.google.com/go/pubsub/internal/longtest/endtoend_test.go:269 +0x3e2
created by cloud.google.com/go/pubsub/internal/longtest.TestEndToEnd_LongProcessingTime
	/root/go/src/cloud.google.com/go/pubsub/internal/longtest/endtoend_test.go:179 +0x6b0
FAIL	cloud.google.com/go/pubsub/internal/longtest	773.275s
```

Change-Id: I8009c0baf9647ef3a8ad3ece5aff7f9fc3b313d7
Reviewed-on: https://code-review.googlesource.com/c/gocloud/+/42690
Reviewed-by: Alex Hong <hongalex@google.com>
Reviewed-by: kokoro <noreply+kokoro@google.com>
diff --git a/pubsub/internal/longtest/endtoend_test.go b/pubsub/internal/longtest/endtoend_test.go
index 3312c4d..a5c7920 100644
--- a/pubsub/internal/longtest/endtoend_test.go
+++ b/pubsub/internal/longtest/endtoend_test.go
@@ -137,6 +137,14 @@
 			t.Errorf("Consumer %d: Willing to accept %d dups (%v duplicated of %d messages), but got %d", i, numAcceptableDups, acceptableDupPercentage, int(nMessages), numDups)
 		}
 	}
+
+	for i, con := range consumers {
+		select {
+		case <-con.done:
+		case <-time.After(15 * time.Second):
+			t.Fatalf("timed out waiting for consumer %d to finish", i)
+		}
+	}
 }
 
 func TestEndToEnd_LongProcessingTime(t *testing.T) {
@@ -175,6 +183,7 @@
 		processingDelay: func() time.Duration {
 			return time.Duration(1+rand.Int63n(120)) * time.Second
 		},
+		done: make(chan struct{}),
 	}
 	go consumer.consume(ctx, t, sub)
 	// Wait for a while after the last message before declaring quiescence.
@@ -220,6 +229,12 @@
 	} else if numDups > numAcceptableDups {
 		t.Errorf("Willing to accept %d dups (%v duplicated of %d messages), but got %d", numAcceptableDups, acceptableDupPercentage, int(nMessages), numDups)
 	}
+
+	select {
+	case <-consumer.done:
+	case <-time.After(15 * time.Second):
+		t.Fatal("timed out waiting for consumer to finish")
+	}
 }
 
 // publish publishes n messages to topic.
@@ -254,11 +269,15 @@
 	mu         sync.Mutex
 	counts     map[string]int // msgID: recvdAmt
 	totalRecvd int
+
+	// Done consuming.
+	done chan struct{}
 }
 
 // consume reads messages from a subscription, and keeps track of what it receives in mc.
 // After consume returns, the caller should wait on wg to ensure that no more updates to mc will be made.
 func (c *consumer) consume(ctx context.Context, t *testing.T, sub *pubsub.Subscription) {
+	defer close(c.done)
 	for _, dur := range c.durations {
 		ctx2, cancel := context.WithTimeout(ctx, dur)
 		defer cancel()