Add four simple benchmarks for adding items to a bundle and flushing

The first benchmark measures the cost of Add() only. The second measures the cost
of Add() and Flush() -- waiting for all the added items to be handled. These first
two benchmarks have an immediate handler (no delay).

The third and fourth benchmarks use a handler that takes 1 millisecond (sleeps for
1 millisecond). The third benchmark has bundles of 1 item each, and the fourth
bench is more realistic and has 25 items per bundle.

Also, added one test that checks rough times are right for a handler that has a
300-millisecond delay.

Change-Id: I2d9b8e2971011d1e071f2d05a4d7e2eb5cdd6fc0
Reviewed-on: https://code-review.googlesource.com/c/google-api-go-client/+/47997
Reviewed-by: kokoro <noreply+kokoro@google.com>
Reviewed-by: Jean de Klerk <deklerk@google.com>
diff --git a/support/bundler/bundler_test.go b/support/bundler/bundler_test.go
index c716adc..5fa34f8 100644
--- a/support/bundler/bundler_test.go
+++ b/support/bundler/bundler_test.go
@@ -16,7 +16,7 @@
 func TestBundlerCount1(t *testing.T) {
 	// Unbundled case: one item per bundle.
 	handler := &testHandler{}
-	b := NewBundler(int(0), handler.handle)
+	b := NewBundler(int(0), handler.handleImmediate)
 	b.BundleCountThreshold = 1
 	b.DelayThreshold = time.Second
 
@@ -42,7 +42,7 @@
 
 func TestBundlerCount3(t *testing.T) {
 	handler := &testHandler{}
-	b := NewBundler(int(0), handler.handle)
+	b := NewBundler(int(0), handler.handleImmediate)
 	b.BundleCountThreshold = 3
 	b.DelayThreshold = 100 * time.Millisecond
 	// Add 8 items.
@@ -68,9 +68,41 @@
 	}
 }
 
+// Test that items are handled correctly at roughly the right time with a "slow"
+// handler (takes 300 milliseconds) and that the last bundle is automatically
+// flushed.
+func TestBundlerCountSlowHandler(t *testing.T) {
+	handler := &testHandler{}
+	b := NewBundler(int(0), handler.handleSlow)
+	b.BundleCountThreshold = 3
+	b.DelayThreshold = 500 * time.Millisecond
+	// Add 10 items.
+	for i := 0; i < 10; i++ {
+		if err := b.Add(i, 1); err != nil {
+			t.Fatal(err)
+		}
+	}
+	time.Sleep(4 * 300 * time.Millisecond)
+	// We should not need to close the bundler.
+
+	bgot := handler.bundles()
+	bwant := [][]int{{0, 1, 2}, {3, 4, 5}, {6, 7, 8}, {9}}
+	if !reflect.DeepEqual(bgot, bwant) {
+		t.Errorf("bundles: got %v, want %v", bgot, bwant)
+	}
+
+	tgot := quantizeTimes(handler.times(), 100*time.Millisecond)
+	// Should handle new bundle every 300 milliseconds, and last incomplete
+	// bundle should get automatically flushed.
+	twant := []int{0, 3, 6, 9}
+	if !reflect.DeepEqual(tgot, twant) {
+		t.Errorf("times: got %v, want [0, 0, non-zero]", tgot)
+	}
+}
+
 func TestBundlerByteThreshold(t *testing.T) {
 	handler := &testHandler{}
-	b := NewBundler(int(0), handler.handle)
+	b := NewBundler(int(0), handler.handleImmediate)
 	b.BundleCountThreshold = 10
 	b.BundleByteThreshold = 3
 	add := func(i interface{}, s int) {
@@ -102,7 +134,7 @@
 
 func TestBundlerLimit(t *testing.T) {
 	handler := &testHandler{}
-	b := NewBundler(int(0), handler.handle)
+	b := NewBundler(int(0), handler.handleImmediate)
 	b.BundleCountThreshold = 10
 	b.BundleByteLimit = 3
 	add := func(i interface{}, s int) {
@@ -312,13 +344,32 @@
 	return t.t
 }
 
-func (t *testHandler) handle(b interface{}) {
+// Handler takes no time beyond adding to a list
+func (t *testHandler) handleImmediate(b interface{}) {
 	t.mu.Lock()
 	defer t.mu.Unlock()
 	t.b = append(t.b, b.([]int))
 	t.t = append(t.t, time.Now())
 }
 
+// Handler takes 300 milliseconds
+func (t *testHandler) handleSlow(b interface{}) {
+	t.mu.Lock()
+	defer t.mu.Unlock()
+	t.b = append(t.b, b.([]int))
+	t.t = append(t.t, time.Now())
+	time.Sleep(300 * time.Millisecond)
+}
+
+// Handler takes one millisecond
+func (t *testHandler) handleQuick(b interface{}) {
+	t.mu.Lock()
+	defer t.mu.Unlock()
+	t.b = append(t.b, b.([]int))
+	t.t = append(t.t, time.Now())
+	time.Sleep(time.Millisecond)
+}
+
 // Round times to the nearest q and express them as the number of q
 // since the first time.
 // E.g. if q is 100ms, then a time within 50ms of the first time
@@ -354,3 +405,70 @@
 		}
 	}
 }
+
+// Measure the cost of adding a bunch of items only, though some handling may be
+// happening in the background
+func BenchmarkBundlerAdd(bench *testing.B) {
+	// Unbundled case: one item per bundle.
+	handler := &testHandler{}
+	b := NewBundler(int(0), handler.handleImmediate)
+	b.BundleCountThreshold = 1
+	b.DelayThreshold = time.Second
+
+	for i := 0; i < bench.N; i++ {
+		if err := b.Add(i, 1); err != nil {
+			bench.Fatal(err)
+		}
+	}
+}
+
+// Measure the cost of adding a bunch of items, and then waiting for them all to
+// be handled, when handling is immediate (no delay)
+func BenchmarkBundlerAddAndFlush(bench *testing.B) {
+	// Unbundled case: one item per bundle.
+	handler := &testHandler{}
+	b := NewBundler(int(0), handler.handleImmediate)
+	b.BundleCountThreshold = 1
+	b.DelayThreshold = time.Second
+
+	for i := 0; i < bench.N; i++ {
+		if err := b.Add(i, 1); err != nil {
+			bench.Fatal(err)
+		}
+	}
+	b.Flush()
+}
+
+// Measure the cost of adding a bunch of items, and then waiting for them all to
+// be handled, when handling a bundle (1 item only) takes one millisecond
+func BenchmarkBundlerAddAndFlushSlow1(bench *testing.B) {
+	// Unbundled case: one item per bundle.
+	handler := &testHandler{}
+	b := NewBundler(int(0), handler.handleQuick)
+	b.BundleCountThreshold = 1
+	b.DelayThreshold = time.Second
+
+	for i := 0; i < bench.N; i++ {
+		if err := b.Add(i, 1); err != nil {
+			bench.Fatal(err)
+		}
+	}
+	b.Flush()
+}
+
+// Measure the cost of adding a bunch of items, and then waiting for them all to
+// be handled, when handling a bundle (25 items) takes one millisecond
+func BenchmarkBundlerAddAndFlushSlow25(bench *testing.B) {
+	// More realistic: 25 items per bundle
+	handler := &testHandler{}
+	b := NewBundler(int(0), handler.handleQuick)
+	b.BundleCountThreshold = 25
+	b.DelayThreshold = time.Second
+
+	for i := 0; i < bench.N; i++ {
+		if err := b.Add(i, 1); err != nil {
+			bench.Fatal(err)
+		}
+	}
+	b.Flush()
+}