bundler: stop timer from waiting for flushed requests to be handled

The timer used to flush an un-full bundler happens to call b.Flush(), which
includes a wait until all the outstanding bundles included the just-flushed one
have been handled. But there doesn't seem to be any reason for the timer to do the
wait, since it doesn't do anything after the wait. So, change the timer call to
use a new flushWithoutWait() method. For certain workloads, this change should
reduce the number of timer threads waiting around and being repeatedly woken up by
broadcasts on the b.cond condition variable.

Testing:  bundler tests, pubsub tests (in gcloud repo, but using this bundler change),
and pubsub loadtest.  Let me know if there is anything else I should run.

Change-Id: I656479ebfa914d10ac0bac5f65c3e68f7f40187c
Reviewed-on: https://code-review.googlesource.com/c/google-api-go-client/+/47790
Reviewed-by: Dan Scales <danscales@google.com>
diff --git a/support/bundler/bundler.go b/support/bundler/bundler.go
index fc515e4..8c9693b 100644
--- a/support/bundler/bundler.go
+++ b/support/bundler/bundler.go
@@ -182,7 +182,7 @@
 	// (We could try to call Reset on the timer instead, but that would add a lot
 	// of complexity to the code just to save one small allocation.)
 	if b.flushTimer == nil {
-		b.flushTimer = time.AfterFunc(b.DelayThreshold, b.Flush)
+		b.flushTimer = time.AfterFunc(b.DelayThreshold, b.flushWithoutWait)
 	}
 
 	// If the current bundle equals the count threshold, close it.
@@ -226,6 +226,14 @@
 	return nil
 }
 
+// flushWithoutWait forces the current bundle to be be flushed if non-empty (but
+// doesn't wait for any bundles to actually be handled).
+func (b *Bundler) flushWithoutWait() {
+	b.mu.Lock()
+	b.startFlushLocked()
+	b.mu.Unlock()
+}
+
 // Flush invokes the handler for all remaining items in the Bundler and waits
 // for it to return.
 func (b *Bundler) Flush() {