bundler: support concurrent handler invocations Add a HandlerLimit field, which controls the max number of concurrent handler invocations. Preserve the previous sequential behavior when HandlerLimit = 1. To simplify the logic, introduce a semaphore variant that provides the properties we need. Change-Id: I09680a0597c8eef7fe0f07611238e8acad89d012 Reviewed-on: https://code-review.googlesource.com/22610 Reviewed-by: Ross Light <light@google.com>
diff --git a/support/bundler/bundler.go b/support/bundler/bundler.go index c4e4c9a..8d8fb7f 100644 --- a/support/bundler/bundler.go +++ b/support/bundler/bundler.go
@@ -23,6 +23,7 @@ import ( "errors" + "math" "reflect" "sync" "time" @@ -71,6 +72,10 @@ // returning ErrOverflow. The default is DefaultBufferedByteLimit. BufferedByteLimit int + // The maximum number of handler invocations that can be running at once. + // The default is 1. + HandlerLimit int + handler func(interface{}) // called to handle a bundle itemSliceZero reflect.Value // nil (zero value) for slice of items flushTimer *time.Timer // implements DelayThreshold @@ -78,8 +83,22 @@ mu sync.Mutex sem *semaphore.Weighted // enforces BufferedByteLimit semOnce sync.Once - curBundle bundle // incoming items added to this bundle - handlingc <-chan struct{} // set to non-nil while a handler is running; closed when it returns + curBundle bundle // incoming items added to this bundle + + // Each bundle is assigned a unique ticket that determines the order in which the + // handler is called. The ticket is assigned with mu locked, but waiting for tickets + // to be handled is done via mu2 and cond, below. + nextTicket uint64 // next ticket to be assigned + + mu2 sync.Mutex + cond *sync.Cond + nextHandled uint64 // next ticket to be handled + + // In this implementation, active uses space proportional to HandlerLimit, and + // waitUntilAllHandled takes time proportional to HandlerLimit each time an acquire + // or release occurs, so large values of HandlerLimit max may cause performance + // issues. + active map[uint64]bool // tickets of bundles actively being handled } type bundle struct { @@ -104,21 +123,23 @@ BundleCountThreshold: DefaultBundleCountThreshold, BundleByteThreshold: DefaultBundleByteThreshold, BufferedByteLimit: DefaultBufferedByteLimit, + HandlerLimit: 1, handler: handler, itemSliceZero: reflect.Zero(reflect.SliceOf(reflect.TypeOf(itemExample))), + active: map[uint64]bool{}, } b.curBundle.items = b.itemSliceZero + b.cond = sync.NewCond(&b.mu2) return b } -func (b *Bundler) sema() *semaphore.Weighted { - // Create the semaphore lazily, because the user may set BufferedByteLimit +func (b *Bundler) initSemaphores() { + // Create the semaphores lazily, because the user may set limits // after NewBundler. b.semOnce.Do(func() { b.sem = semaphore.NewWeighted(int64(b.BufferedByteLimit)) }) - return b.sem } // Add adds item to the current bundle. It marks the bundle for handling and @@ -142,7 +163,8 @@ // footprint, we can't accept it. // (TryAcquire also returns false if anything is waiting on the semaphore, // so calls to Add and AddWait shouldn't be mixed.) - if !b.sema().TryAcquire(int64(size)) { + b.initSemaphores() + if !b.sem.TryAcquire(int64(size)) { return ErrOverflow } b.add(item, size) @@ -202,7 +224,8 @@ // If adding this item would exceed our allotted memory footprint, block // until space is available. The semaphore is FIFO, so there will be no // starvation. - if err := b.sema().Acquire(ctx, int64(size)); err != nil { + b.initSemaphores() + if err := b.sem.Acquire(ctx, int64(size)); err != nil { return err } // Here, we've reserved space for item. Other goroutines can call AddWait @@ -218,12 +241,13 @@ func (b *Bundler) Flush() { b.mu.Lock() b.startFlushLocked() - done := b.handlingc + // Here, all bundles with tickets < b.nextTicket are + // either finished or active. Those are the ones + // we want to wait for. + t := b.nextTicket b.mu.Unlock() - - if done != nil { - <-done - } + b.initSemaphores() + b.waitUntilAllHandled(t) } func (b *Bundler) startFlushLocked() { @@ -231,28 +255,95 @@ b.flushTimer.Stop() b.flushTimer = nil } - if b.curBundle.items.Len() == 0 { return } + // Here, both semaphores must have been initialized. bun := b.curBundle b.curBundle = bundle{items: b.itemSliceZero} - - done := make(chan struct{}) - var running <-chan struct{} - running, b.handlingc = b.handlingc, done - + ticket := b.nextTicket + b.nextTicket++ go func() { defer func() { b.sem.Release(int64(bun.size)) - close(done) + b.release(ticket) }() - - if running != nil { - // Wait for our turn to call the handler. - <-running - } - + b.acquire(ticket) b.handler(bun.items.Interface()) }() } + +// acquire blocks until ticket is the next to be served, then returns. In order for N +// acquire calls to return, the tickets must be in the range [0, N). A ticket must +// not be presented to acquire more than once. +func (b *Bundler) acquire(ticket uint64) { + b.mu2.Lock() + defer b.mu2.Unlock() + if ticket < b.nextHandled { + panic("bundler: acquire: arg too small") + } + for !(ticket == b.nextHandled && len(b.active) < b.HandlerLimit) { + b.cond.Wait() + } + // Here, + // ticket == b.nextHandled: the caller is the next one to be handled; + // and len(b.active) < b.HandlerLimit: there is space available. + b.active[ticket] = true + b.nextHandled++ + // Broadcast, not Signal: although at most one acquire waiter can make progress, + // there might be waiters in waitUntilAllHandled. + b.cond.Broadcast() +} + +// If a ticket is used for a call to acquire, it must later be passed to release. A +// ticket must not be presented to release more than once. +func (b *Bundler) release(ticket uint64) { + b.mu2.Lock() + defer b.mu2.Unlock() + if !b.active[ticket] { + panic("bundler: release: not an active ticket") + } + delete(b.active, ticket) + b.cond.Broadcast() +} + +// waitUntilAllHandled blocks until all tickets < n have called release, meaning +// all bundles with tickets < n have been handled. +func (b *Bundler) waitUntilAllHandled(n uint64) { + // Proof of correctness of this function. + // "N is acquired" means acquire(N) has returned. + // "N is released" means release(N) has returned. + // 1. If N is acquired, N-1 is acquired. + // Follows from the loop test in acquire, and the fact + // that nextHandled is incremented by 1. + // 2. If nextHandled >= N, then N-1 is acquired. + // Because we only increment nextHandled to N after N-1 is acquired. + // 3. If nextHandled >= N, then all n < N is acquired. + // Follows from #1 and #2. + // 4. If N is acquired and N is not in active, then N is released. + // Because we put N in active before acquire returns, and only + // remove it when it is released. + // Let min(active) be the smallest member of active, or infinity if active is empty. + // 5. If nextHandled >= N and N <= min(active), then all n < N is released. + // From nextHandled >= N and #3, all n < N is acquired. + // N <= min(active) implies n < min(active) for all n < N. So all n < N is not in active. + // So from #4, all n < N is released. + // The loop test below is the antecedent of #5. + b.mu2.Lock() + defer b.mu2.Unlock() + for !(b.nextHandled >= n && n <= min(b.active)) { + b.cond.Wait() + } +} + +// min returns the minimum value of the set s, or the largest uint64 if +// s is empty. +func min(s map[uint64]bool) uint64 { + var m uint64 = math.MaxUint64 + for n := range s { + if n < m { + m = n + } + } + return m +}
diff --git a/support/bundler/bundler_test.go b/support/bundler/bundler_test.go index 45efd52..bce2223 100644 --- a/support/bundler/bundler_test.go +++ b/support/bundler/bundler_test.go
@@ -223,6 +223,88 @@ } } +// Check that no more than HandlerLimit handlers are active at once. +func TestConcurrentHandlersMax(t *testing.T) { + const handlerLimit = 10 + var ( + mu sync.Mutex + active int + maxHandlers int + ) + b := NewBundler(int(0), func(s interface{}) { + mu.Lock() + active++ + if active > maxHandlers { + maxHandlers = active + } + if maxHandlers > handlerLimit { + t.Errorf("too many handlers running (got %d; want %d)", maxHandlers, handlerLimit) + } + mu.Unlock() + time.Sleep(1 * time.Millisecond) // let the scheduler work + mu.Lock() + active-- + mu.Unlock() + }) + b.BundleCountThreshold = 5 + b.HandlerLimit = 10 + defer b.Flush() + + more := 0 // extra iterations past saturation + for i := 0; more == 0 || i < more; i++ { + mu.Lock() + m := maxHandlers + mu.Unlock() + if m >= handlerLimit && more == 0 { + // Run past saturation to check that we don't exceed the max. + more = 2 * i + } + b.Add(i, 1) + } +} + +// Check that Flush doesn't return until all prior items have been handled. +func TestConcurrentFlush(t *testing.T) { + var ( + mu sync.Mutex + items = make(map[int]bool) + ) + b := NewBundler(int(0), func(s interface{}) { + mu.Lock() + for _, i := range s.([]int) { + items[i] = true + } + mu.Unlock() + time.Sleep(10 * time.Millisecond) + }) + b.BundleCountThreshold = 5 + b.HandlerLimit = 10 + defer b.Flush() + + var wg sync.WaitGroup + defer wg.Wait() + for i := 0; i < 5000; i++ { + b.Add(i, 1) + if i%100 == 0 { + i := i + wg.Add(1) + go func() { + defer wg.Done() + b.Flush() + mu.Lock() + defer mu.Unlock() + for j := 0; j <= i; j++ { + if !items[j] { + // Cannot use Fatal, since we're in a non-test goroutine. + t.Errorf("flush(%d): item %d not handled", i, j) + break + } + } + }() + } + } +} + type testHandler struct { mu sync.Mutex b [][]int