bundler: limit the total number of goroutines to HandlerLimit

The bundler creates a goroutine per bundle. Only HandlerLimit can
execute in parallel so when write rates are high and handling is
slow, there can be a buildup of goroutines that are competing to
determine if it is their turn to execute. This behavior is
exhibited by TestConcurrentHandlersMax which occasionally fails
to terminate and generates 250,000+ goroutines within 2 seconds
on a 6-core 3.7ghz linux desktop.

In this change, we limit the total number of goroutines to
HandlerLimit. We change the bundle type to be a linked list where
items are added to the tail and handled from the head. Once the
user-specified handler returns within the goroutine, we check
if another bundle is ready for handling. If a bundle is ready,
the existing goroutine handles it. Otherwise, the goroutine ends.
This allows the goroutine count to grow and shrink as necessary.

Since we avoid sending bundles to goroutines prematurely, we
increase our opportunity to fill bundles to their maximum item
count.

This change allows bundler tests to execute and pass when
GOMAXPROCS=1. Previously they would fail to terminate.

Fixes #367
Fixes #417

Change-Id: I73a236ffe49087aedebbbbb6bbc94e0a041e93c1
Reviewed-on: https://code-review.googlesource.com/c/google-api-go-client/+/47991
Reviewed-by: kokoro <noreply+kokoro@google.com>
Reviewed-by: Jean de Klerk <deklerk@google.com>
diff --git a/support/bundler/bundler.go b/support/bundler/bundler.go
index 8c9693b..5fc06b5 100644
--- a/support/bundler/bundler.go
+++ b/support/bundler/bundler.go
@@ -14,7 +14,6 @@
 import (
 	"context"
 	"errors"
-	"math"
 	"reflect"
 	"sync"
 	"time"
@@ -38,7 +37,11 @@
 )
 
 // A Bundler collects items added to it into a bundle until the bundle
-// exceeds a given size, then calls a user-provided function to handle the bundle.
+// exceeds a given size, then calls a user-provided function to handle the
+// bundle.
+//
+// The exported fields are only safe to modify prior to the first call to Add
+// or AddWait.
 type Bundler struct {
 	// Starting from the time that the first message is added to a bundle, once
 	// this delay has passed, handle the bundle. The default is DefaultDelayThreshold.
@@ -68,32 +71,51 @@
 
 	handler       func(interface{}) // called to handle a bundle
 	itemSliceZero reflect.Value     // nil (zero value) for slice of items
-	flushTimer    *time.Timer       // implements DelayThreshold
 
-	mu        sync.Mutex
-	sem       *semaphore.Weighted // enforces BufferedByteLimit
-	semOnce   sync.Once
-	curBundle bundle // incoming items added to this bundle
+	mu           sync.Mutex          // guards access to fields below
+	flushTimer   *time.Timer         // implements DelayThreshold
+	handlerCount int                 // # of bundles currently being handled (i.e. handler is invoked on them)
+	sem          *semaphore.Weighted // enforces BufferedByteLimit
+	semOnce      sync.Once           // guards semaphore initialization
 
-	// Each bundle is assigned a unique ticket that determines the order in which the
-	// handler is called. The ticket is assigned with mu locked, but waiting for tickets
-	// to be handled is done via mu2 and cond, below.
-	nextTicket uint64 // next ticket to be assigned
+	// The current bundle we're adding items to. Not yet in the queue.
+	// Appended to the queue once the flushTimer fires or the bundle
+	// thresholds/limits are reached. If curBundle is nil and tail is
+	// not, we first try to add items to tail. Once tail is full or handled,
+	// we create a new curBundle for the incoming item.
+	curBundle *bundle
 
-	mu2         sync.Mutex
-	cond        *sync.Cond
-	nextHandled uint64 // next ticket to be handled
+	// The next bundle in the queue to be handled. Nil if the queue is
+	// empty.
+	head *bundle
 
-	// In this implementation, active uses space proportional to HandlerLimit, and
-	// waitUntilAllHandled takes time proportional to HandlerLimit each time an acquire
-	// or release occurs, so large values of HandlerLimit max may cause performance
-	// issues.
-	active map[uint64]bool // tickets of bundles actively being handled
+	// The last bundle in the queue to be handled. Nil if the queue is
+	// empty. If curBundle is nil and tail isn't, we attempt to add new
+	// items to the tail until if becomes full or has been passed to the
+	// handler.
+	tail *bundle
+
+	curFlush  *sync.WaitGroup // counts outstanding bundles since last flush
+	prevFlush chan bool       // signal used to wait for prior flush
+
+	// TODO: consider alternative queue implementation for head/tail bundle. see:
+	// https://code-review.googlesource.com/c/google-api-go-client/+/47991/4/support/bundler/bundler.go#74
 }
 
+// A bundle is a group of items that were added individually and will be passed
+// to a handler as a slice.
 type bundle struct {
-	items reflect.Value // slice of item type
-	size  int           // size in bytes of all items
+	items reflect.Value   // slice of T
+	size  int             // size in bytes of all items
+	next  *bundle         // bundles are handled in order as a linked list queue
+	flush *sync.WaitGroup // the counter that tracks flush completion
+}
+
+// add appends item to this bundle and increments the total size. It requires
+// that b.mu is locked.
+func (bu *bundle) add(item interface{}, size int) {
+	bu.items = reflect.Append(bu.items, reflect.ValueOf(item))
+	bu.size += size
 }
 
 // NewBundler creates a new Bundler.
@@ -117,10 +139,8 @@
 
 		handler:       handler,
 		itemSliceZero: reflect.Zero(reflect.SliceOf(reflect.TypeOf(itemExample))),
-		active:        map[uint64]bool{},
+		curFlush:      &sync.WaitGroup{},
 	}
-	b.curBundle.items = b.itemSliceZero
-	b.cond = sync.NewCond(&b.mu2)
 	return b
 }
 
@@ -132,6 +152,43 @@
 	})
 }
 
+// enqueueCurBundle moves curBundle to the end of the queue. The bundle may be
+// handled immediately if we are below handlerLimit. It requires that b.mu is
+// locked.
+func (b *Bundler) enqueueCurBundle() {
+	// We don't require callers to check if there is a pending bundle. It
+	// may have already been appended to the queue. If so, return early.
+	if b.curBundle == nil {
+		return
+	}
+	// If we are below the HandlerLimit, the queue must be empty. Handle
+	// immediately with a new goroutine.
+	if b.handlerCount < b.HandlerLimit {
+		b.handlerCount++
+		go b.handle(b.curBundle)
+	} else if b.tail != nil {
+		// There are bundles on the queue, so append to the end
+		b.tail.next = b.curBundle
+		b.tail = b.curBundle
+	} else {
+		// The queue is empty, so initialize the queue
+		b.head = b.curBundle
+		b.tail = b.curBundle
+	}
+	b.curBundle = nil
+	if b.flushTimer != nil {
+		b.flushTimer.Stop()
+		b.flushTimer = nil
+	}
+}
+
+// canFit returns true if bu can fit an additional item of size bytes based
+// on the limits of Bundler b.
+func (b *Bundler) canFit(bu *bundle, size int) bool {
+	return (b.BundleByteLimit <= 0 || bu.size+size <= b.BundleByteLimit) &&
+		(b.BundleCountThreshold <= 0 || bu.items.Len() < b.BundleCountThreshold)
+}
+
 // Add adds item to the current bundle. It marks the bundle for handling and
 // starts a new one if any of the thresholds or limits are exceeded.
 //
@@ -149,6 +206,7 @@
 	if b.BundleByteLimit > 0 && size > b.BundleByteLimit {
 		return ErrOversizedItem
 	}
+
 	// If adding this item would exceed our allotted memory
 	// footprint, we can't accept it.
 	// (TryAcquire also returns false if anything is waiting on the semaphore,
@@ -157,42 +215,98 @@
 	if !b.sem.TryAcquire(int64(size)) {
 		return ErrOverflow
 	}
-	b.add(item, size)
+
+	b.mu.Lock()
+	defer b.mu.Unlock()
+	return b.add(item, size)
+}
+
+// add adds item to the tail of the bundle queue or curBundle depending on space
+// and nil-ness (see inline comments). It marks curBundle for handling (by
+// appending it to the queue) if any of the thresholds or limits are exceeded.
+// curBundle is lazily initialized. It requires that b.mu is locked.
+func (b *Bundler) add(item interface{}, size int) error {
+	// If we don't have a curBundle, see if we can add to the queue tail.
+	if b.tail != nil && b.curBundle == nil && b.canFit(b.tail, size) {
+		b.tail.add(item, size)
+		return nil
+	}
+
+	// If we can't fit in the existing curBundle, move it onto the queue.
+	if b.curBundle != nil && !b.canFit(b.curBundle, size) {
+		b.enqueueCurBundle()
+	}
+
+	// Create a curBundle if we don't have one.
+	if b.curBundle == nil {
+		b.curFlush.Add(1)
+		b.curBundle = &bundle{
+			items: b.itemSliceZero,
+			flush: b.curFlush,
+		}
+	}
+
+	// Add the item.
+	b.curBundle.add(item, size)
+
+	// If curBundle is ready for handling, move it to the queue.
+	if b.curBundle.size >= b.BundleByteThreshold ||
+		b.curBundle.items.Len() == b.BundleCountThreshold {
+		b.enqueueCurBundle()
+	}
+
+	// If we created a new bundle and it wasn't immediately handled, set a timer
+	if b.curBundle != nil && b.flushTimer == nil {
+		b.flushTimer = time.AfterFunc(b.DelayThreshold, b.tryHandleBundles)
+	}
+
 	return nil
 }
 
-// add adds item to the current bundle. It marks the bundle for handling and
-// starts a new one if any of the thresholds or limits are exceeded.
-func (b *Bundler) add(item interface{}, size int) {
+// tryHandleBundles is the timer callback that handles or queues any current
+// bundle after DelayThreshold time, even if the bundle isn't completely full.
+func (b *Bundler) tryHandleBundles() {
+	b.mu.Lock()
+	b.enqueueCurBundle()
+	b.mu.Unlock()
+}
+
+// next returns the next bundle that is ready for handling and removes it from
+// the internal queue. It requires that b.mu is locked.
+func (b *Bundler) next() *bundle {
+	if b.head == nil {
+		return nil
+	}
+	out := b.head
+	b.head = b.head.next
+	if b.head == nil {
+		b.tail = nil
+	}
+	out.next = nil
+	return out
+}
+
+// handle calls the user-specified handler on the given bundle. handle is
+// intended to be run as a goroutine. After the handler returns, we update the
+// byte total. handle continues processing additional bundles that are ready.
+// If no more bundles are ready, the handler count is decremented and the
+// goroutine ends.
+func (b *Bundler) handle(bu *bundle) {
+	for bu != nil {
+		b.handler(bu.items.Interface())
+		bu = b.postHandle(bu)
+	}
+	b.mu.Lock()
+	b.handlerCount--
+	b.mu.Unlock()
+}
+
+func (b *Bundler) postHandle(bu *bundle) *bundle {
 	b.mu.Lock()
 	defer b.mu.Unlock()
-
-	// If adding this item to the current bundle would cause it to exceed the
-	// maximum bundle size, close the current bundle and start a new one.
-	if b.BundleByteLimit > 0 && b.curBundle.size+size > b.BundleByteLimit {
-		b.startFlushLocked()
-	}
-	// Add the item.
-	b.curBundle.items = reflect.Append(b.curBundle.items, reflect.ValueOf(item))
-	b.curBundle.size += size
-
-	// Start a timer to flush the item if one isn't already running.
-	// startFlushLocked clears the timer and closes the bundle at the same time,
-	// so we only allocate a new timer for the first item in each bundle.
-	// (We could try to call Reset on the timer instead, but that would add a lot
-	// of complexity to the code just to save one small allocation.)
-	if b.flushTimer == nil {
-		b.flushTimer = time.AfterFunc(b.DelayThreshold, b.flushWithoutWait)
-	}
-
-	// If the current bundle equals the count threshold, close it.
-	if b.curBundle.items.Len() == b.BundleCountThreshold {
-		b.startFlushLocked()
-	}
-	// If the current bundle equals or exceeds the byte threshold, close it.
-	if b.curBundle.size >= b.BundleByteThreshold {
-		b.startFlushLocked()
-	}
+	b.sem.Release(int64(bu.size))
+	bu.flush.Done()
+	return b.next()
 }
 
 // AddWait adds item to the current bundle. It marks the bundle for handling and
@@ -218,130 +332,42 @@
 	if err := b.sem.Acquire(ctx, int64(size)); err != nil {
 		return err
 	}
-	// Here, we've reserved space for item. Other goroutines can call AddWait
-	// and even acquire space, but no one can take away our reservation
-	// (assuming sem.Release is used correctly). So there is no race condition
-	// resulting from locking the mutex after sem.Acquire returns.
-	b.add(item, size)
-	return nil
-}
 
-// flushWithoutWait forces the current bundle to be be flushed if non-empty (but
-// doesn't wait for any bundles to actually be handled).
-func (b *Bundler) flushWithoutWait() {
 	b.mu.Lock()
-	b.startFlushLocked()
-	b.mu.Unlock()
+	defer b.mu.Unlock()
+	return b.add(item, size)
 }
 
 // Flush invokes the handler for all remaining items in the Bundler and waits
 // for it to return.
 func (b *Bundler) Flush() {
 	b.mu.Lock()
-	b.startFlushLocked()
-	// Here, all bundles with tickets < b.nextTicket are
-	// either finished or active. Those are the ones
-	// we want to wait for.
-	t := b.nextTicket
+
+	// If a curBundle is pending, move it to the queue.
+	b.enqueueCurBundle()
+
+	// Store a pointer to the WaitGroup that counts outstanding bundles
+	// in the current flush and create a new one to track the next flush.
+	wg := b.curFlush
+	b.curFlush = &sync.WaitGroup{}
+
+	// Flush must wait for all prior, outstanding flushes to complete.
+	// We use a channel to communicate completion between each flush in
+	// the sequence.
+	prev := b.prevFlush
+	next := make(chan bool)
+	b.prevFlush = next
+
 	b.mu.Unlock()
-	b.initSemaphores()
-	b.waitUntilAllHandled(t)
-}
 
-func (b *Bundler) startFlushLocked() {
-	if b.flushTimer != nil {
-		b.flushTimer.Stop()
-		b.flushTimer = nil
+	// Wait until the previous flush is finished.
+	if prev != nil {
+		<-prev
 	}
-	if b.curBundle.items.Len() == 0 {
-		return
-	}
-	// Here, both semaphores must have been initialized.
-	bun := b.curBundle
-	b.curBundle = bundle{items: b.itemSliceZero}
-	ticket := b.nextTicket
-	b.nextTicket++
-	go func() {
-		defer func() {
-			b.sem.Release(int64(bun.size))
-			b.release(ticket)
-		}()
-		b.acquire(ticket)
-		b.handler(bun.items.Interface())
-	}()
-}
 
-// acquire blocks until ticket is the next to be served, then returns. In order for N
-// acquire calls to return, the tickets must be in the range [0, N). A ticket must
-// not be presented to acquire more than once.
-func (b *Bundler) acquire(ticket uint64) {
-	b.mu2.Lock()
-	defer b.mu2.Unlock()
-	if ticket < b.nextHandled {
-		panic("bundler: acquire: arg too small")
-	}
-	for !(ticket == b.nextHandled && len(b.active) < b.HandlerLimit) {
-		b.cond.Wait()
-	}
-	// Here,
-	// ticket == b.nextHandled: the caller is the next one to be handled;
-	// and len(b.active) < b.HandlerLimit: there is space available.
-	b.active[ticket] = true
-	b.nextHandled++
-	// Broadcast, not Signal: although at most one acquire waiter can make progress,
-	// there might be waiters in waitUntilAllHandled.
-	b.cond.Broadcast()
-}
+	// Wait until this flush is finished.
+	wg.Wait()
 
-// If a ticket is used for a call to acquire, it must later be passed to release. A
-// ticket must not be presented to release more than once.
-func (b *Bundler) release(ticket uint64) {
-	b.mu2.Lock()
-	defer b.mu2.Unlock()
-	if !b.active[ticket] {
-		panic("bundler: release: not an active ticket")
-	}
-	delete(b.active, ticket)
-	b.cond.Broadcast()
-}
-
-// waitUntilAllHandled blocks until all tickets < n have called release, meaning
-// all bundles with tickets < n have been handled.
-func (b *Bundler) waitUntilAllHandled(n uint64) {
-	// Proof of correctness of this function.
-	// "N is acquired" means acquire(N) has returned.
-	// "N is released" means release(N) has returned.
-	// 1. If N is acquired, N-1 is acquired.
-	//    Follows from the loop test in acquire, and the fact
-	//    that nextHandled is incremented by 1.
-	// 2. If nextHandled >= N, then N-1 is acquired.
-	//    Because we only increment nextHandled to N after N-1 is acquired.
-	// 3. If nextHandled >= N, then all n < N is acquired.
-	//    Follows from #1 and #2.
-	// 4. If N is acquired and N is not in active, then N is released.
-	//    Because we put N in active before acquire returns, and only
-	//    remove it when it is released.
-	// Let min(active) be the smallest member of active, or infinity if active is empty.
-	// 5. If nextHandled >= N and N <= min(active), then all n < N is released.
-	//    From nextHandled >= N and #3, all n < N is acquired.
-	//    N <= min(active) implies n < min(active) for all n < N. So all n < N is not in active.
-	//    So from #4, all n < N is released.
-	// The loop test below is the antecedent of #5.
-	b.mu2.Lock()
-	defer b.mu2.Unlock()
-	for !(b.nextHandled >= n && n <= min(b.active)) {
-		b.cond.Wait()
-	}
-}
-
-// min returns the minimum value of the set s, or the largest uint64 if
-// s is empty.
-func min(s map[uint64]bool) uint64 {
-	var m uint64 = math.MaxUint64
-	for n := range s {
-		if n < m {
-			m = n
-		}
-	}
-	return m
+	// Allow the next flush to finish.
+	close(next)
 }
diff --git a/support/bundler/bundler_test.go b/support/bundler/bundler_test.go
index 5fa34f8..019312d 100644
--- a/support/bundler/bundler_test.go
+++ b/support/bundler/bundler_test.go
@@ -8,6 +8,7 @@
 	"context"
 	"fmt"
 	"reflect"
+	"sort"
 	"sync"
 	"testing"
 	"time"
@@ -105,6 +106,12 @@
 	b := NewBundler(int(0), handler.handleImmediate)
 	b.BundleCountThreshold = 10
 	b.BundleByteThreshold = 3
+	// Increase the limit beyond the number of bundles we expect (3)
+	// so that bundles get handled immediately after they cross the
+	// threshold. Otherwise, the test is non-deterministic. With the default
+	// HandlerLimit of 1, the 2nd and 3rd bundles may or may not be
+	// combined based on how long it takes to handle the 1st bundle.
+	b.HandlerLimit = 10
 	add := func(i interface{}, s int) {
 		if err := b.Add(i, s); err != nil {
 			t.Fatal(err)
@@ -113,14 +120,21 @@
 
 	add(1, 1)
 	add(2, 2)
-	// Hit byte threshold: bundle = 1, 2
+	// Hit byte threshold AND under HandlerLimit:
+	// bundle = 1, 2
 	add(3, 1)
 	add(4, 1)
 	add(5, 2)
-	// Passed byte threshold, but not limit: bundle = 3, 4, 5
+	// Passed byte threshold AND under byte limit AND under HandlerLimit:
+	// bundle = 3, 4, 5
 	add(6, 1)
 	b.Flush()
 	bgot := handler.bundles()
+	// We don't care about the order they were handled in. We just want
+	// to test that crossing the threshold triggered handling.
+	sort.Slice(bgot, func(i, j int) bool {
+		return bgot[i][0] < bgot[j][0]
+	})
 	bwant := [][]int{{1, 2}, {3, 4, 5}, {6}}
 	if !reflect.DeepEqual(bgot, bwant) {
 		t.Errorf("bundles: got %v, want %v", bgot, bwant)