support/bundler: fix deadlock when using time based flush

bundler.Bundler can run into a deadlock under the following
configuration:

- handler limit of 1
- high size and count thresholds such that the only reason a bundle is
flushed is because of the delay timer expiring

Under these conditions, with high concurrency, a deadlock can happen
because the background handler goroutine can exit while there is a
pending queue, which will never get flushed.

Specifically, this sequence of events:

- handler goroutine calls postHandle and receives a nil for the next
bundle, signaling it has no more work to do.

- some other goroutine calls AddWait() -> enqueueCurBundle, which sees
that b.handlerLimit cannot be exceeded and proceeds to queue the item

- handler goroutine decrements b.handlerLimit and exits

The fix is to move the decrement of b.handlerLimit into postHandle such
that the decision to exit and allowing a new handler goroutine to be
spawned cannot be re-ordered.

Fixes: googleapis/google-api-go-client#475

Change-Id: I84421e53a4dc24eec36ceb147933aab57a40aec8
Reviewed-on: https://code-review.googlesource.com/c/google-api-go-client/+/55950
Reviewed-by: kokoro <noreply+kokoro@google.com>
Reviewed-by: Cody Oss <codyoss@google.com>
diff --git a/support/bundler/bundler.go b/support/bundler/bundler.go
index 5f603bd..9f8237a 100644
--- a/support/bundler/bundler.go
+++ b/support/bundler/bundler.go
@@ -324,17 +324,20 @@
 		b.handler(bu.items.Interface())
 		bu = b.postHandle(bu)
 	}
-	b.mu.Lock()
-	b.handlerCount--
-	b.mu.Unlock()
 }
 
 func (b *Bundler) postHandle(bu *bundle) *bundle {
 	b.mu.Lock()
 	defer b.mu.Unlock()
+
 	b.sem.Release(int64(bu.size))
 	bu.flush.Done()
-	return b.next()
+
+	bu = b.next()
+	if bu == nil {
+		b.handlerCount--
+	}
+	return bu
 }
 
 // AddWait adds item to the current bundle. It marks the bundle for handling and
diff --git a/support/bundler/bundler_test.go b/support/bundler/bundler_test.go
index e60a22f..7d16481 100644
--- a/support/bundler/bundler_test.go
+++ b/support/bundler/bundler_test.go
@@ -7,7 +7,9 @@
 import (
 	"context"
 	"fmt"
+	"math"
 	"reflect"
+	"runtime"
 	"sort"
 	"sync"
 	"testing"
@@ -363,6 +365,54 @@
 	}
 }
 
+// Test that time based flushes do not deadlock
+func TestBundlerTimeBasedFlushDeadlock(t *testing.T) {
+	const (
+		goroutines = 1e3
+		iterations = 1e3
+
+		N = goroutines * iterations
+	)
+
+	var wg sync.WaitGroup
+	wg.Add(N)
+
+	flush := func(i interface{}) {
+		time.Sleep(10 * time.Millisecond)
+		buf := i.([]int)
+		for i := 0; i < len(buf); i++ {
+			wg.Done()
+		}
+	}
+
+	b := NewBundler(int(0), flush)
+	b.DelayThreshold = 10 * time.Millisecond
+	b.HandlerLimit = 1
+
+	// high thresholds to ensure that we only hit time based flushes
+	b.BundleCountThreshold = math.MaxInt32
+	b.BundleByteThreshold = math.MaxInt32
+
+	ctx, cancel := context.WithCancel(context.Background())
+	time.AfterFunc(1*time.Second, cancel)
+
+	add := func(i int) {
+		for j := 0; j < iterations; j++ {
+			if err := b.AddWait(ctx, i, 1); err != nil {
+				t.Errorf("timed out: %v", err)
+			}
+			runtime.Gosched()
+		}
+	}
+
+	for i := 0; i < goroutines; i++ {
+		go add(i)
+	}
+
+	// verify that we don't block forever
+	wg.Wait()
+}
+
 type testHandler struct {
 	mu sync.Mutex
 	b  [][]int