pubsub: check early if streaming iterator is already drained

Fix the fake streaming server to behave more realistically, by waiting
for a CloseSend from the client before ending the stream.

This revealed that the streaming iterator has an unnecessary delay,
due to the stop method blocking on the channel when there were no more
messages. So we now check that case before reading from the channel.

Also, s/Skip/SkipNow/ in tests.

Change-Id: Iacd826c07f6d327475c69e5002cab2381e47d9a5
Reviewed-on: https://code-review.googlesource.com/11374
Reviewed-by: kokoro <noreply+kokoro@google.com>
Reviewed-by: Michael Darakananda <pongad@google.com>
Reviewed-by: Kamal Aboul-Hosn <aboulhosn@google.com>
diff --git a/pubsub/fake_test.go b/pubsub/fake_test.go
index 552dd1e..9f7f0ef 100644
--- a/pubsub/fake_test.go
+++ b/pubsub/fake_test.go
@@ -104,13 +104,15 @@
 	// Send responses.
 	for {
 		if len(s.pullResponses) == 0 {
-			return nil
+			// Nothing to send, so wait for the client to shut down the stream.
+			err := <-errc // a real error, or at least EOF
+			if err == io.EOF {
+				return nil
+			}
+			return err
 		}
 		pr := s.pullResponses[0]
-		// Repeat last response.
-		if len(s.pullResponses) > 1 {
-			s.pullResponses = s.pullResponses[1:]
-		}
+		s.pullResponses = s.pullResponses[1:]
 		if pr.err == io.EOF {
 			return nil
 		}
diff --git a/pubsub/iterator.go b/pubsub/iterator.go
index 7c644cc..4da7ca3 100644
--- a/pubsub/iterator.go
+++ b/pubsub/iterator.go
@@ -323,6 +323,8 @@
 	if it.err == nil {
 		it.err = iterator.Done
 	}
+	// Before reading from the channel, see if we're already drained.
+	it.checkDrained()
 	it.mu.Unlock()
 	// Nack all the pending messages.
 	// Grab the lock separately for each message to allow the receiver
diff --git a/pubsub/streaming_pull_test.go b/pubsub/streaming_pull_test.go
index bff1c53..c47adc1 100644
--- a/pubsub/streaming_pull_test.go
+++ b/pubsub/streaming_pull_test.go
@@ -58,7 +58,7 @@
 
 func testStreamingPullIteration(t *testing.T, client *Client, server *fakeServer, msgs []*pb.ReceivedMessage) {
 	if !useStreamingPull {
-		t.Skip()
+		t.SkipNow()
 	}
 	sub := client.Subscription("s")
 	iter, err := sub.Pull(context.Background())
@@ -101,7 +101,7 @@
 
 func TestStreamingPullStop(t *testing.T) {
 	if !useStreamingPull {
-		t.Skip()
+		t.SkipNow()
 	}
 	// After Stop is called, Next returns iterator.Done.
 	client, server := newFake(t)
@@ -128,7 +128,7 @@
 
 func TestStreamingPullError(t *testing.T) {
 	if !useStreamingPull {
-		t.Skip()
+		t.SkipNow()
 	}
 	client, server := newFake(t)
 	server.addStreamingPullError(grpc.Errorf(codes.Internal, ""))
@@ -148,7 +148,7 @@
 
 func TestStreamingPullCancel(t *testing.T) {
 	if !useStreamingPull {
-		t.Skip()
+		t.SkipNow()
 	}
 	// Test that canceling the iterator's context behaves correctly.
 	client, server := newFake(t)
@@ -186,7 +186,7 @@
 
 func TestStreamingPullRetry(t *testing.T) {
 	if !useStreamingPull {
-		t.Skip()
+		t.SkipNow()
 	}
 	// Check that we retry on io.EOF or Unavailable.
 	client, server := newFake(t)
@@ -202,7 +202,7 @@
 
 func TestStreamingPullConcurrent(t *testing.T) {
 	if !useStreamingPull {
-		t.Skip()
+		t.SkipNow()
 	}
 	newMsg := func(i int) *pb.ReceivedMessage {
 		return &pb.ReceivedMessage{