support/bundler: fix deadlock when using time based flush
bundler.Bundler can run into a deadlock under the following
configuration:
- handler limit of 1
- high size and count thresholds such that the only reason a bundle is
flushed is because of the delay timer expiring
Under these conditions, with high concurrency, a deadlock can happen
because the background handler goroutine can exit while there is a
pending queue, which will never get flushed.
Specifically, this sequence of events:
- handler goroutine calls postHandle and receives a nil for the next
bundle, signaling it has no more work to do.
- some other goroutine calls AddWait() -> enqueueCurBundle, which sees
that b.handlerLimit cannot be exceeded and proceeds to queue the item
- handler goroutine decrements b.handlerLimit and exits
The fix is to move the decrement of b.handlerLimit into postHandle such
that the decision to exit and allowing a new handler goroutine to be
spawned cannot be re-ordered.
Fixes: googleapis/google-api-go-client#475
Change-Id: I84421e53a4dc24eec36ceb147933aab57a40aec8
Reviewed-on: https://code-review.googlesource.com/c/google-api-go-client/+/55950
Reviewed-by: kokoro <noreply+kokoro@google.com>
Reviewed-by: Cody Oss <codyoss@google.com>
diff --git a/support/bundler/bundler.go b/support/bundler/bundler.go
index 5f603bd..9f8237a 100644
--- a/support/bundler/bundler.go
+++ b/support/bundler/bundler.go
@@ -324,17 +324,20 @@
b.handler(bu.items.Interface())
bu = b.postHandle(bu)
}
- b.mu.Lock()
- b.handlerCount--
- b.mu.Unlock()
}
func (b *Bundler) postHandle(bu *bundle) *bundle {
b.mu.Lock()
defer b.mu.Unlock()
+
b.sem.Release(int64(bu.size))
bu.flush.Done()
- return b.next()
+
+ bu = b.next()
+ if bu == nil {
+ b.handlerCount--
+ }
+ return bu
}
// AddWait adds item to the current bundle. It marks the bundle for handling and
diff --git a/support/bundler/bundler_test.go b/support/bundler/bundler_test.go
index e60a22f..7d16481 100644
--- a/support/bundler/bundler_test.go
+++ b/support/bundler/bundler_test.go
@@ -7,7 +7,9 @@
import (
"context"
"fmt"
+ "math"
"reflect"
+ "runtime"
"sort"
"sync"
"testing"
@@ -363,6 +365,54 @@
}
}
+// Test that time based flushes do not deadlock
+func TestBundlerTimeBasedFlushDeadlock(t *testing.T) {
+ const (
+ goroutines = 1e3
+ iterations = 1e3
+
+ N = goroutines * iterations
+ )
+
+ var wg sync.WaitGroup
+ wg.Add(N)
+
+ flush := func(i interface{}) {
+ time.Sleep(10 * time.Millisecond)
+ buf := i.([]int)
+ for i := 0; i < len(buf); i++ {
+ wg.Done()
+ }
+ }
+
+ b := NewBundler(int(0), flush)
+ b.DelayThreshold = 10 * time.Millisecond
+ b.HandlerLimit = 1
+
+ // high thresholds to ensure that we only hit time based flushes
+ b.BundleCountThreshold = math.MaxInt32
+ b.BundleByteThreshold = math.MaxInt32
+
+ ctx, cancel := context.WithCancel(context.Background())
+ time.AfterFunc(1*time.Second, cancel)
+
+ add := func(i int) {
+ for j := 0; j < iterations; j++ {
+ if err := b.AddWait(ctx, i, 1); err != nil {
+ t.Errorf("timed out: %v", err)
+ }
+ runtime.Gosched()
+ }
+ }
+
+ for i := 0; i < goroutines; i++ {
+ go add(i)
+ }
+
+ // verify that we don't block forever
+ wg.Wait()
+}
+
type testHandler struct {
mu sync.Mutex
b [][]int