bundler: prevent mixing calls to Add and AddWait
Set a state variable based on the first call to Add or AddWait.
Then return an error if the other method is subsequently called.
This issue: https://github.com/googleapis/google-api-go-client/issues/423
Fixes #423
Change-Id: I1031dc89574e793edeb69ecd1d0c6e8aefe5f064
Reviewed-on: https://code-review.googlesource.com/c/google-api-go-client/+/49370
Reviewed-by: kokoro <noreply+kokoro@google.com>
Reviewed-by: Eddie Scholtz <escholtz@google.com>
Reviewed-by: Jean de Klerk <deklerk@google.com>
diff --git a/support/bundler/bundler.go b/support/bundler/bundler.go
index f8e0574..418143d 100644
--- a/support/bundler/bundler.go
+++ b/support/bundler/bundler.go
@@ -21,6 +21,8 @@
"golang.org/x/sync/semaphore"
)
+type mode int
+
const (
DefaultDelayThreshold = time.Second
DefaultBundleCountThreshold = 10
@@ -28,12 +30,22 @@
DefaultBufferedByteLimit = 1e9 // 1G
)
+const (
+ none mode = iota
+ add
+ addWait
+)
+
var (
// ErrOverflow indicates that Bundler's stored bytes exceeds its BufferedByteLimit.
ErrOverflow = errors.New("bundler reached buffered byte limit")
// ErrOversizedItem indicates that an item's size exceeds the maximum bundle size.
ErrOversizedItem = errors.New("item size exceeds bundle byte limit")
+
+ // errMixedMethods indicates that mutually exclusive methods has been
+ // called subsequently.
+ errMixedMethods = errors.New("calls to Add and AddWait cannot be mixed")
)
// A Bundler collects items added to it into a bundle until the bundle
@@ -94,6 +106,9 @@
curFlush *sync.WaitGroup // counts outstanding bundles since last flush
prevFlush chan bool // signal used to wait for prior flush
+ // The first call to Add or AddWait, mode will be add or addWait respectively.
+ // If there wasn't call yet then mode is none.
+ mode mode
// TODO: consider alternative queue implementation for head/tail bundle. see:
// https://code-review.googlesource.com/c/google-api-go-client/+/47991/4/support/bundler/bundler.go#74
}
@@ -149,7 +164,7 @@
}
// enqueueCurBundle moves curBundle to the end of the queue. The bundle may be
-// handled immediately if we are below handlerLimit. It requires that b.mu is
+// handled immediately if we are below HandlerLimit. It requires that b.mu is
// locked.
func (b *Bundler) enqueueCurBundle() {
// We don't require callers to check if there is a pending bundle. It
@@ -178,6 +193,18 @@
}
}
+// setMode sets the state of Bundler's mode. If mode was defined before
+// and passed state is different from it then return an error.
+func (b *Bundler) setMode(m mode) error {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+ if b.mode == m || b.mode == none {
+ b.mode = m
+ return nil
+ }
+ return errMixedMethods
+}
+
// canFit returns true if bu can fit an additional item of size bytes based
// on the limits of Bundler b.
func (b *Bundler) canFit(bu *bundle, size int) bool {
@@ -197,6 +224,9 @@
//
// Add never blocks.
func (b *Bundler) Add(item interface{}, size int) error {
+ if err := b.setMode(add); err != nil {
+ return err
+ }
// If this item exceeds the maximum size of a bundle,
// we can never send it.
if b.BundleByteLimit > 0 && size > b.BundleByteLimit {
@@ -316,6 +346,9 @@
//
// Calls to Add and AddWait should not be mixed on the same Bundler.
func (b *Bundler) AddWait(ctx context.Context, item interface{}, size int) error {
+ if err := b.setMode(addWait); err != nil {
+ return err
+ }
// If this item exceeds the maximum size of a bundle,
// we can never send it.
if b.BundleByteLimit > 0 && size > b.BundleByteLimit {
diff --git a/support/bundler/bundler_test.go b/support/bundler/bundler_test.go
index 019312d..e60a22f 100644
--- a/support/bundler/bundler_test.go
+++ b/support/bundler/bundler_test.go
@@ -258,6 +258,29 @@
}
}
+func TestModeError(t *testing.T) {
+ // Call Add then AddWait.
+ b := NewBundler(int(0), func(interface{}) {})
+ b.BundleByteLimit = 4
+ b.BufferedByteLimit = 4
+ if err := b.Add(0, 2); err != nil {
+ t.Fatal(err)
+ }
+ if got, want := b.AddWait(context.Background(), 0, 2), errMixedMethods; got != want {
+ t.Fatalf("got %v, want %v", got, want)
+ }
+ // Call AddWait then Add on new Bundler.
+ b1 := NewBundler(int(0), func(interface{}) {})
+ b1.BundleByteLimit = 4
+ b1.BufferedByteLimit = 4
+ if err := b1.AddWait(context.Background(), 0, 2); err != nil {
+ t.Fatal(err)
+ }
+ if got, want := b1.Add(0, 2), errMixedMethods; got != want {
+ t.Fatalf("got %v, want %v", got, want)
+ }
+}
+
// Check that no more than HandlerLimit handlers are active at once.
func TestConcurrentHandlersMax(t *testing.T) {
const handlerLimit = 10