bundler: support concurrent handler invocations
Add a HandlerLimit field, which controls the max number of concurrent
handler invocations.
Preserve the previous sequential behavior when HandlerLimit = 1.
To simplify the logic, introduce a semaphore variant that provides the
properties we need.
Change-Id: I09680a0597c8eef7fe0f07611238e8acad89d012
Reviewed-on: https://code-review.googlesource.com/22610
Reviewed-by: Ross Light <light@google.com>
diff --git a/support/bundler/bundler.go b/support/bundler/bundler.go
index c4e4c9a..8d8fb7f 100644
--- a/support/bundler/bundler.go
+++ b/support/bundler/bundler.go
@@ -23,6 +23,7 @@
import (
"errors"
+ "math"
"reflect"
"sync"
"time"
@@ -71,6 +72,10 @@
// returning ErrOverflow. The default is DefaultBufferedByteLimit.
BufferedByteLimit int
+ // The maximum number of handler invocations that can be running at once.
+ // The default is 1.
+ HandlerLimit int
+
handler func(interface{}) // called to handle a bundle
itemSliceZero reflect.Value // nil (zero value) for slice of items
flushTimer *time.Timer // implements DelayThreshold
@@ -78,8 +83,22 @@
mu sync.Mutex
sem *semaphore.Weighted // enforces BufferedByteLimit
semOnce sync.Once
- curBundle bundle // incoming items added to this bundle
- handlingc <-chan struct{} // set to non-nil while a handler is running; closed when it returns
+ curBundle bundle // incoming items added to this bundle
+
+ // Each bundle is assigned a unique ticket that determines the order in which the
+ // handler is called. The ticket is assigned with mu locked, but waiting for tickets
+ // to be handled is done via mu2 and cond, below.
+ nextTicket uint64 // next ticket to be assigned
+
+ mu2 sync.Mutex
+ cond *sync.Cond
+ nextHandled uint64 // next ticket to be handled
+
+ // In this implementation, active uses space proportional to HandlerLimit, and
+ // waitUntilAllHandled takes time proportional to HandlerLimit each time an acquire
+ // or release occurs, so large values of HandlerLimit max may cause performance
+ // issues.
+ active map[uint64]bool // tickets of bundles actively being handled
}
type bundle struct {
@@ -104,21 +123,23 @@
BundleCountThreshold: DefaultBundleCountThreshold,
BundleByteThreshold: DefaultBundleByteThreshold,
BufferedByteLimit: DefaultBufferedByteLimit,
+ HandlerLimit: 1,
handler: handler,
itemSliceZero: reflect.Zero(reflect.SliceOf(reflect.TypeOf(itemExample))),
+ active: map[uint64]bool{},
}
b.curBundle.items = b.itemSliceZero
+ b.cond = sync.NewCond(&b.mu2)
return b
}
-func (b *Bundler) sema() *semaphore.Weighted {
- // Create the semaphore lazily, because the user may set BufferedByteLimit
+func (b *Bundler) initSemaphores() {
+ // Create the semaphores lazily, because the user may set limits
// after NewBundler.
b.semOnce.Do(func() {
b.sem = semaphore.NewWeighted(int64(b.BufferedByteLimit))
})
- return b.sem
}
// Add adds item to the current bundle. It marks the bundle for handling and
@@ -142,7 +163,8 @@
// footprint, we can't accept it.
// (TryAcquire also returns false if anything is waiting on the semaphore,
// so calls to Add and AddWait shouldn't be mixed.)
- if !b.sema().TryAcquire(int64(size)) {
+ b.initSemaphores()
+ if !b.sem.TryAcquire(int64(size)) {
return ErrOverflow
}
b.add(item, size)
@@ -202,7 +224,8 @@
// If adding this item would exceed our allotted memory footprint, block
// until space is available. The semaphore is FIFO, so there will be no
// starvation.
- if err := b.sema().Acquire(ctx, int64(size)); err != nil {
+ b.initSemaphores()
+ if err := b.sem.Acquire(ctx, int64(size)); err != nil {
return err
}
// Here, we've reserved space for item. Other goroutines can call AddWait
@@ -218,12 +241,13 @@
func (b *Bundler) Flush() {
b.mu.Lock()
b.startFlushLocked()
- done := b.handlingc
+ // Here, all bundles with tickets < b.nextTicket are
+ // either finished or active. Those are the ones
+ // we want to wait for.
+ t := b.nextTicket
b.mu.Unlock()
-
- if done != nil {
- <-done
- }
+ b.initSemaphores()
+ b.waitUntilAllHandled(t)
}
func (b *Bundler) startFlushLocked() {
@@ -231,28 +255,95 @@
b.flushTimer.Stop()
b.flushTimer = nil
}
-
if b.curBundle.items.Len() == 0 {
return
}
+ // Here, both semaphores must have been initialized.
bun := b.curBundle
b.curBundle = bundle{items: b.itemSliceZero}
-
- done := make(chan struct{})
- var running <-chan struct{}
- running, b.handlingc = b.handlingc, done
-
+ ticket := b.nextTicket
+ b.nextTicket++
go func() {
defer func() {
b.sem.Release(int64(bun.size))
- close(done)
+ b.release(ticket)
}()
-
- if running != nil {
- // Wait for our turn to call the handler.
- <-running
- }
-
+ b.acquire(ticket)
b.handler(bun.items.Interface())
}()
}
+
+// acquire blocks until ticket is the next to be served, then returns. In order for N
+// acquire calls to return, the tickets must be in the range [0, N). A ticket must
+// not be presented to acquire more than once.
+func (b *Bundler) acquire(ticket uint64) {
+ b.mu2.Lock()
+ defer b.mu2.Unlock()
+ if ticket < b.nextHandled {
+ panic("bundler: acquire: arg too small")
+ }
+ for !(ticket == b.nextHandled && len(b.active) < b.HandlerLimit) {
+ b.cond.Wait()
+ }
+ // Here,
+ // ticket == b.nextHandled: the caller is the next one to be handled;
+ // and len(b.active) < b.HandlerLimit: there is space available.
+ b.active[ticket] = true
+ b.nextHandled++
+ // Broadcast, not Signal: although at most one acquire waiter can make progress,
+ // there might be waiters in waitUntilAllHandled.
+ b.cond.Broadcast()
+}
+
+// If a ticket is used for a call to acquire, it must later be passed to release. A
+// ticket must not be presented to release more than once.
+func (b *Bundler) release(ticket uint64) {
+ b.mu2.Lock()
+ defer b.mu2.Unlock()
+ if !b.active[ticket] {
+ panic("bundler: release: not an active ticket")
+ }
+ delete(b.active, ticket)
+ b.cond.Broadcast()
+}
+
+// waitUntilAllHandled blocks until all tickets < n have called release, meaning
+// all bundles with tickets < n have been handled.
+func (b *Bundler) waitUntilAllHandled(n uint64) {
+ // Proof of correctness of this function.
+ // "N is acquired" means acquire(N) has returned.
+ // "N is released" means release(N) has returned.
+ // 1. If N is acquired, N-1 is acquired.
+ // Follows from the loop test in acquire, and the fact
+ // that nextHandled is incremented by 1.
+ // 2. If nextHandled >= N, then N-1 is acquired.
+ // Because we only increment nextHandled to N after N-1 is acquired.
+ // 3. If nextHandled >= N, then all n < N is acquired.
+ // Follows from #1 and #2.
+ // 4. If N is acquired and N is not in active, then N is released.
+ // Because we put N in active before acquire returns, and only
+ // remove it when it is released.
+ // Let min(active) be the smallest member of active, or infinity if active is empty.
+ // 5. If nextHandled >= N and N <= min(active), then all n < N is released.
+ // From nextHandled >= N and #3, all n < N is acquired.
+ // N <= min(active) implies n < min(active) for all n < N. So all n < N is not in active.
+ // So from #4, all n < N is released.
+ // The loop test below is the antecedent of #5.
+ b.mu2.Lock()
+ defer b.mu2.Unlock()
+ for !(b.nextHandled >= n && n <= min(b.active)) {
+ b.cond.Wait()
+ }
+}
+
+// min returns the minimum value of the set s, or the largest uint64 if
+// s is empty.
+func min(s map[uint64]bool) uint64 {
+ var m uint64 = math.MaxUint64
+ for n := range s {
+ if n < m {
+ m = n
+ }
+ }
+ return m
+}
diff --git a/support/bundler/bundler_test.go b/support/bundler/bundler_test.go
index 45efd52..bce2223 100644
--- a/support/bundler/bundler_test.go
+++ b/support/bundler/bundler_test.go
@@ -223,6 +223,88 @@
}
}
+// Check that no more than HandlerLimit handlers are active at once.
+func TestConcurrentHandlersMax(t *testing.T) {
+ const handlerLimit = 10
+ var (
+ mu sync.Mutex
+ active int
+ maxHandlers int
+ )
+ b := NewBundler(int(0), func(s interface{}) {
+ mu.Lock()
+ active++
+ if active > maxHandlers {
+ maxHandlers = active
+ }
+ if maxHandlers > handlerLimit {
+ t.Errorf("too many handlers running (got %d; want %d)", maxHandlers, handlerLimit)
+ }
+ mu.Unlock()
+ time.Sleep(1 * time.Millisecond) // let the scheduler work
+ mu.Lock()
+ active--
+ mu.Unlock()
+ })
+ b.BundleCountThreshold = 5
+ b.HandlerLimit = 10
+ defer b.Flush()
+
+ more := 0 // extra iterations past saturation
+ for i := 0; more == 0 || i < more; i++ {
+ mu.Lock()
+ m := maxHandlers
+ mu.Unlock()
+ if m >= handlerLimit && more == 0 {
+ // Run past saturation to check that we don't exceed the max.
+ more = 2 * i
+ }
+ b.Add(i, 1)
+ }
+}
+
+// Check that Flush doesn't return until all prior items have been handled.
+func TestConcurrentFlush(t *testing.T) {
+ var (
+ mu sync.Mutex
+ items = make(map[int]bool)
+ )
+ b := NewBundler(int(0), func(s interface{}) {
+ mu.Lock()
+ for _, i := range s.([]int) {
+ items[i] = true
+ }
+ mu.Unlock()
+ time.Sleep(10 * time.Millisecond)
+ })
+ b.BundleCountThreshold = 5
+ b.HandlerLimit = 10
+ defer b.Flush()
+
+ var wg sync.WaitGroup
+ defer wg.Wait()
+ for i := 0; i < 5000; i++ {
+ b.Add(i, 1)
+ if i%100 == 0 {
+ i := i
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ b.Flush()
+ mu.Lock()
+ defer mu.Unlock()
+ for j := 0; j <= i; j++ {
+ if !items[j] {
+ // Cannot use Fatal, since we're in a non-test goroutine.
+ t.Errorf("flush(%d): item %d not handled", i, j)
+ break
+ }
+ }
+ }()
+ }
+ }
+}
+
type testHandler struct {
mu sync.Mutex
b [][]int