bundler: support concurrent handler invocations

Add a HandlerLimit field, which controls the max number of concurrent
handler invocations.

Preserve the previous sequential behavior when HandlerLimit = 1.

To simplify the logic, introduce a semaphore variant that provides the
properties we need.

Change-Id: I09680a0597c8eef7fe0f07611238e8acad89d012
Reviewed-on: https://code-review.googlesource.com/22610
Reviewed-by: Ross Light <light@google.com>
diff --git a/support/bundler/bundler.go b/support/bundler/bundler.go
index c4e4c9a..8d8fb7f 100644
--- a/support/bundler/bundler.go
+++ b/support/bundler/bundler.go
@@ -23,6 +23,7 @@
 
 import (
 	"errors"
+	"math"
 	"reflect"
 	"sync"
 	"time"
@@ -71,6 +72,10 @@
 	// returning ErrOverflow. The default is DefaultBufferedByteLimit.
 	BufferedByteLimit int
 
+	// The maximum number of handler invocations that can be running at once.
+	// The default is 1.
+	HandlerLimit int
+
 	handler       func(interface{}) // called to handle a bundle
 	itemSliceZero reflect.Value     // nil (zero value) for slice of items
 	flushTimer    *time.Timer       // implements DelayThreshold
@@ -78,8 +83,22 @@
 	mu        sync.Mutex
 	sem       *semaphore.Weighted // enforces BufferedByteLimit
 	semOnce   sync.Once
-	curBundle bundle          // incoming items added to this bundle
-	handlingc <-chan struct{} // set to non-nil while a handler is running; closed when it returns
+	curBundle bundle // incoming items added to this bundle
+
+	// Each bundle is assigned a unique ticket that determines the order in which the
+	// handler is called. The ticket is assigned with mu locked, but waiting for tickets
+	// to be handled is done via mu2 and cond, below.
+	nextTicket uint64 // next ticket to be assigned
+
+	mu2         sync.Mutex
+	cond        *sync.Cond
+	nextHandled uint64 // next ticket to be handled
+
+	// In this implementation, active uses space proportional to HandlerLimit, and
+	// waitUntilAllHandled takes time proportional to HandlerLimit each time an acquire
+	// or release occurs, so large values of HandlerLimit max may cause performance
+	// issues.
+	active map[uint64]bool // tickets of bundles actively being handled
 }
 
 type bundle struct {
@@ -104,21 +123,23 @@
 		BundleCountThreshold: DefaultBundleCountThreshold,
 		BundleByteThreshold:  DefaultBundleByteThreshold,
 		BufferedByteLimit:    DefaultBufferedByteLimit,
+		HandlerLimit:         1,
 
 		handler:       handler,
 		itemSliceZero: reflect.Zero(reflect.SliceOf(reflect.TypeOf(itemExample))),
+		active:        map[uint64]bool{},
 	}
 	b.curBundle.items = b.itemSliceZero
+	b.cond = sync.NewCond(&b.mu2)
 	return b
 }
 
-func (b *Bundler) sema() *semaphore.Weighted {
-	// Create the semaphore lazily, because the user may set BufferedByteLimit
+func (b *Bundler) initSemaphores() {
+	// Create the semaphores lazily, because the user may set limits
 	// after NewBundler.
 	b.semOnce.Do(func() {
 		b.sem = semaphore.NewWeighted(int64(b.BufferedByteLimit))
 	})
-	return b.sem
 }
 
 // Add adds item to the current bundle. It marks the bundle for handling and
@@ -142,7 +163,8 @@
 	// footprint, we can't accept it.
 	// (TryAcquire also returns false if anything is waiting on the semaphore,
 	// so calls to Add and AddWait shouldn't be mixed.)
-	if !b.sema().TryAcquire(int64(size)) {
+	b.initSemaphores()
+	if !b.sem.TryAcquire(int64(size)) {
 		return ErrOverflow
 	}
 	b.add(item, size)
@@ -202,7 +224,8 @@
 	// If adding this item would exceed our allotted memory footprint, block
 	// until space is available. The semaphore is FIFO, so there will be no
 	// starvation.
-	if err := b.sema().Acquire(ctx, int64(size)); err != nil {
+	b.initSemaphores()
+	if err := b.sem.Acquire(ctx, int64(size)); err != nil {
 		return err
 	}
 	// Here, we've reserved space for item. Other goroutines can call AddWait
@@ -218,12 +241,13 @@
 func (b *Bundler) Flush() {
 	b.mu.Lock()
 	b.startFlushLocked()
-	done := b.handlingc
+	// Here, all bundles with tickets < b.nextTicket are
+	// either finished or active. Those are the ones
+	// we want to wait for.
+	t := b.nextTicket
 	b.mu.Unlock()
-
-	if done != nil {
-		<-done
-	}
+	b.initSemaphores()
+	b.waitUntilAllHandled(t)
 }
 
 func (b *Bundler) startFlushLocked() {
@@ -231,28 +255,95 @@
 		b.flushTimer.Stop()
 		b.flushTimer = nil
 	}
-
 	if b.curBundle.items.Len() == 0 {
 		return
 	}
+	// Here, both semaphores must have been initialized.
 	bun := b.curBundle
 	b.curBundle = bundle{items: b.itemSliceZero}
-
-	done := make(chan struct{})
-	var running <-chan struct{}
-	running, b.handlingc = b.handlingc, done
-
+	ticket := b.nextTicket
+	b.nextTicket++
 	go func() {
 		defer func() {
 			b.sem.Release(int64(bun.size))
-			close(done)
+			b.release(ticket)
 		}()
-
-		if running != nil {
-			// Wait for our turn to call the handler.
-			<-running
-		}
-
+		b.acquire(ticket)
 		b.handler(bun.items.Interface())
 	}()
 }
+
+// acquire blocks until ticket is the next to be served, then returns. In order for N
+// acquire calls to return, the tickets must be in the range [0, N). A ticket must
+// not be presented to acquire more than once.
+func (b *Bundler) acquire(ticket uint64) {
+	b.mu2.Lock()
+	defer b.mu2.Unlock()
+	if ticket < b.nextHandled {
+		panic("bundler: acquire: arg too small")
+	}
+	for !(ticket == b.nextHandled && len(b.active) < b.HandlerLimit) {
+		b.cond.Wait()
+	}
+	// Here,
+	// ticket == b.nextHandled: the caller is the next one to be handled;
+	// and len(b.active) < b.HandlerLimit: there is space available.
+	b.active[ticket] = true
+	b.nextHandled++
+	// Broadcast, not Signal: although at most one acquire waiter can make progress,
+	// there might be waiters in waitUntilAllHandled.
+	b.cond.Broadcast()
+}
+
+// If a ticket is used for a call to acquire, it must later be passed to release. A
+// ticket must not be presented to release more than once.
+func (b *Bundler) release(ticket uint64) {
+	b.mu2.Lock()
+	defer b.mu2.Unlock()
+	if !b.active[ticket] {
+		panic("bundler: release: not an active ticket")
+	}
+	delete(b.active, ticket)
+	b.cond.Broadcast()
+}
+
+// waitUntilAllHandled blocks until all tickets < n have called release, meaning
+// all bundles with tickets < n have been handled.
+func (b *Bundler) waitUntilAllHandled(n uint64) {
+	// Proof of correctness of this function.
+	// "N is acquired" means acquire(N) has returned.
+	// "N is released" means release(N) has returned.
+	// 1. If N is acquired, N-1 is acquired.
+	//    Follows from the loop test in acquire, and the fact
+	//    that nextHandled is incremented by 1.
+	// 2. If nextHandled >= N, then N-1 is acquired.
+	//    Because we only increment nextHandled to N after N-1 is acquired.
+	// 3. If nextHandled >= N, then all n < N is acquired.
+	//    Follows from #1 and #2.
+	// 4. If N is acquired and N is not in active, then N is released.
+	//    Because we put N in active before acquire returns, and only
+	//    remove it when it is released.
+	// Let min(active) be the smallest member of active, or infinity if active is empty.
+	// 5. If nextHandled >= N and N <= min(active), then all n < N is released.
+	//    From nextHandled >= N and #3, all n < N is acquired.
+	//    N <= min(active) implies n < min(active) for all n < N. So all n < N is not in active.
+	//    So from #4, all n < N is released.
+	// The loop test below is the antecedent of #5.
+	b.mu2.Lock()
+	defer b.mu2.Unlock()
+	for !(b.nextHandled >= n && n <= min(b.active)) {
+		b.cond.Wait()
+	}
+}
+
+// min returns the minimum value of the set s, or the largest uint64 if
+// s is empty.
+func min(s map[uint64]bool) uint64 {
+	var m uint64 = math.MaxUint64
+	for n := range s {
+		if n < m {
+			m = n
+		}
+	}
+	return m
+}
diff --git a/support/bundler/bundler_test.go b/support/bundler/bundler_test.go
index 45efd52..bce2223 100644
--- a/support/bundler/bundler_test.go
+++ b/support/bundler/bundler_test.go
@@ -223,6 +223,88 @@
 	}
 }
 
+// Check that no more than HandlerLimit handlers are active at once.
+func TestConcurrentHandlersMax(t *testing.T) {
+	const handlerLimit = 10
+	var (
+		mu          sync.Mutex
+		active      int
+		maxHandlers int
+	)
+	b := NewBundler(int(0), func(s interface{}) {
+		mu.Lock()
+		active++
+		if active > maxHandlers {
+			maxHandlers = active
+		}
+		if maxHandlers > handlerLimit {
+			t.Errorf("too many handlers running (got %d; want %d)", maxHandlers, handlerLimit)
+		}
+		mu.Unlock()
+		time.Sleep(1 * time.Millisecond) // let the scheduler work
+		mu.Lock()
+		active--
+		mu.Unlock()
+	})
+	b.BundleCountThreshold = 5
+	b.HandlerLimit = 10
+	defer b.Flush()
+
+	more := 0 // extra iterations past saturation
+	for i := 0; more == 0 || i < more; i++ {
+		mu.Lock()
+		m := maxHandlers
+		mu.Unlock()
+		if m >= handlerLimit && more == 0 {
+			// Run past saturation to check that we don't exceed the max.
+			more = 2 * i
+		}
+		b.Add(i, 1)
+	}
+}
+
+// Check that Flush doesn't return until all prior items have been handled.
+func TestConcurrentFlush(t *testing.T) {
+	var (
+		mu    sync.Mutex
+		items = make(map[int]bool)
+	)
+	b := NewBundler(int(0), func(s interface{}) {
+		mu.Lock()
+		for _, i := range s.([]int) {
+			items[i] = true
+		}
+		mu.Unlock()
+		time.Sleep(10 * time.Millisecond)
+	})
+	b.BundleCountThreshold = 5
+	b.HandlerLimit = 10
+	defer b.Flush()
+
+	var wg sync.WaitGroup
+	defer wg.Wait()
+	for i := 0; i < 5000; i++ {
+		b.Add(i, 1)
+		if i%100 == 0 {
+			i := i
+			wg.Add(1)
+			go func() {
+				defer wg.Done()
+				b.Flush()
+				mu.Lock()
+				defer mu.Unlock()
+				for j := 0; j <= i; j++ {
+					if !items[j] {
+						// Cannot use Fatal, since we're in a non-test goroutine.
+						t.Errorf("flush(%d): item %d not handled", i, j)
+						break
+					}
+				}
+			}()
+		}
+	}
+}
+
 type testHandler struct {
 	mu sync.Mutex
 	b  [][]int