bundler: prevent mixing calls to Add and AddWait

Set a state variable based on the first call to Add or AddWait.
Then return an error if the other method is subsequently called.
This issue: https://github.com/googleapis/google-api-go-client/issues/423

Fixes #423

Change-Id: I1031dc89574e793edeb69ecd1d0c6e8aefe5f064
Reviewed-on: https://code-review.googlesource.com/c/google-api-go-client/+/49370
Reviewed-by: kokoro <noreply+kokoro@google.com>
Reviewed-by: Eddie Scholtz <escholtz@google.com>
Reviewed-by: Jean de Klerk <deklerk@google.com>
diff --git a/support/bundler/bundler.go b/support/bundler/bundler.go
index f8e0574..418143d 100644
--- a/support/bundler/bundler.go
+++ b/support/bundler/bundler.go
@@ -21,6 +21,8 @@
 	"golang.org/x/sync/semaphore"
 )
 
+type mode int
+
 const (
 	DefaultDelayThreshold       = time.Second
 	DefaultBundleCountThreshold = 10
@@ -28,12 +30,22 @@
 	DefaultBufferedByteLimit    = 1e9 // 1G
 )
 
+const (
+	none mode = iota
+	add
+	addWait
+)
+
 var (
 	// ErrOverflow indicates that Bundler's stored bytes exceeds its BufferedByteLimit.
 	ErrOverflow = errors.New("bundler reached buffered byte limit")
 
 	// ErrOversizedItem indicates that an item's size exceeds the maximum bundle size.
 	ErrOversizedItem = errors.New("item size exceeds bundle byte limit")
+
+	// errMixedMethods indicates that mutually exclusive methods has been
+	// called subsequently.
+	errMixedMethods = errors.New("calls to Add and AddWait cannot be mixed")
 )
 
 // A Bundler collects items added to it into a bundle until the bundle
@@ -94,6 +106,9 @@
 	curFlush  *sync.WaitGroup // counts outstanding bundles since last flush
 	prevFlush chan bool       // signal used to wait for prior flush
 
+	// The first call to Add or AddWait, mode will be add or addWait respectively.
+	// If there wasn't call yet then mode is none.
+	mode mode
 	// TODO: consider alternative queue implementation for head/tail bundle. see:
 	// https://code-review.googlesource.com/c/google-api-go-client/+/47991/4/support/bundler/bundler.go#74
 }
@@ -149,7 +164,7 @@
 }
 
 // enqueueCurBundle moves curBundle to the end of the queue. The bundle may be
-// handled immediately if we are below handlerLimit. It requires that b.mu is
+// handled immediately if we are below HandlerLimit. It requires that b.mu is
 // locked.
 func (b *Bundler) enqueueCurBundle() {
 	// We don't require callers to check if there is a pending bundle. It
@@ -178,6 +193,18 @@
 	}
 }
 
+// setMode sets the state of Bundler's mode. If mode was defined before
+// and passed state is different from it then return an error.
+func (b *Bundler) setMode(m mode) error {
+	b.mu.Lock()
+	defer b.mu.Unlock()
+	if b.mode == m || b.mode == none {
+		b.mode = m
+		return nil
+	}
+	return errMixedMethods
+}
+
 // canFit returns true if bu can fit an additional item of size bytes based
 // on the limits of Bundler b.
 func (b *Bundler) canFit(bu *bundle, size int) bool {
@@ -197,6 +224,9 @@
 //
 // Add never blocks.
 func (b *Bundler) Add(item interface{}, size int) error {
+	if err := b.setMode(add); err != nil {
+		return err
+	}
 	// If this item exceeds the maximum size of a bundle,
 	// we can never send it.
 	if b.BundleByteLimit > 0 && size > b.BundleByteLimit {
@@ -316,6 +346,9 @@
 //
 // Calls to Add and AddWait should not be mixed on the same Bundler.
 func (b *Bundler) AddWait(ctx context.Context, item interface{}, size int) error {
+	if err := b.setMode(addWait); err != nil {
+		return err
+	}
 	// If this item exceeds the maximum size of a bundle,
 	// we can never send it.
 	if b.BundleByteLimit > 0 && size > b.BundleByteLimit {
diff --git a/support/bundler/bundler_test.go b/support/bundler/bundler_test.go
index 019312d..e60a22f 100644
--- a/support/bundler/bundler_test.go
+++ b/support/bundler/bundler_test.go
@@ -258,6 +258,29 @@
 	}
 }
 
+func TestModeError(t *testing.T) {
+	// Call Add then AddWait.
+	b := NewBundler(int(0), func(interface{}) {})
+	b.BundleByteLimit = 4
+	b.BufferedByteLimit = 4
+	if err := b.Add(0, 2); err != nil {
+		t.Fatal(err)
+	}
+	if got, want := b.AddWait(context.Background(), 0, 2), errMixedMethods; got != want {
+		t.Fatalf("got %v, want %v", got, want)
+	}
+	// Call AddWait then Add on new Bundler.
+	b1 := NewBundler(int(0), func(interface{}) {})
+	b1.BundleByteLimit = 4
+	b1.BufferedByteLimit = 4
+	if err := b1.AddWait(context.Background(), 0, 2); err != nil {
+		t.Fatal(err)
+	}
+	if got, want := b1.Add(0, 2), errMixedMethods; got != want {
+		t.Fatalf("got %v, want %v", got, want)
+	}
+}
+
 // Check that no more than HandlerLimit handlers are active at once.
 func TestConcurrentHandlersMax(t *testing.T) {
 	const handlerLimit = 10