bundler: add a blocking Add method

AddWait blocks if there is not enough space, instead of returning
immediately with ErrOverflow.

Motivation: We want the new async pubsub.Topic.Publish to
wait instead of returning an error.

Change-Id: I72ed4b814816466dece9fdcd81e8c5c964a3027a
Reviewed-on: https://code-review.googlesource.com/11111
Reviewed-by: Michael McGreevy <mcgreevy@golang.org>
Reviewed-by: Michael Darakananda <pongad@google.com>
Reviewed-by: Jonathan Amsterdam <jba@google.com>
diff --git a/support/bundler/bundler.go b/support/bundler/bundler.go
index 62cfd21..3cbfa3f 100644
--- a/support/bundler/bundler.go
+++ b/support/bundler/bundler.go
@@ -26,6 +26,8 @@
 	"reflect"
 	"sync"
 	"time"
+
+	"golang.org/x/net/context"
 )
 
 const (
@@ -74,11 +76,12 @@
 	handlec       chan int          // sent to when a bundle is ready for handling
 	timer         *time.Timer       // implements DelayThreshold
 
-	mu            sync.Mutex
-	bufferedSize  int           // total bytes buffered
-	closedBundles []bundle      // bundles waiting to be handled
-	curBundle     bundle        // incoming items added to this bundle
-	calledc       chan struct{} // closed and re-created after handler is called
+	mu             sync.Mutex
+	spaceAvailable chan struct{} // closed and replaced when space is available
+	bufferedSize   int           // total bytes buffered
+	closedBundles  []bundle      // bundles waiting to be handled
+	curBundle      bundle        // incoming items added to this bundle
+	calledc        chan struct{} // closed and re-created after handler is called
 }
 
 type bundle struct {
@@ -102,12 +105,13 @@
 		BundleByteThreshold:  DefaultBundleByteThreshold,
 		BufferedByteLimit:    DefaultBufferedByteLimit,
 
-		handler:       handler,
-		itemSliceZero: reflect.Zero(reflect.SliceOf(reflect.TypeOf(itemExample))),
-		donec:         make(chan struct{}),
-		handlec:       make(chan int, 1),
-		calledc:       make(chan struct{}),
-		timer:         time.NewTimer(1000 * time.Hour), // harmless initial timeout
+		handler:        handler,
+		itemSliceZero:  reflect.Zero(reflect.SliceOf(reflect.TypeOf(itemExample))),
+		donec:          make(chan struct{}),
+		handlec:        make(chan int, 1),
+		calledc:        make(chan struct{}),
+		timer:          time.NewTimer(1000 * time.Hour), // harmless initial timeout
+		spaceAvailable: make(chan struct{}),
 	}
 	b.curBundle.items = b.itemSliceZero
 	go b.background()
@@ -137,6 +141,15 @@
 	if b.bufferedSize+size > b.BufferedByteLimit {
 		return ErrOverflow
 	}
+	b.addLocked(item, size)
+	return nil
+}
+
+// addLocked adds item to the current bundle. It marks the bundle for handling and
+// starts a new one if any of the thresholds or limits are exceeded.
+//
+// addLocked is called with the lock held.
+func (b *Bundler) addLocked(item interface{}, size int) {
 	// If adding this item to the current bundle would cause it to exceed the
 	// maximum bundle size, close the current bundle and start a new one.
 	if b.BundleByteLimit > 0 && b.curBundle.size+size > b.BundleByteLimit {
@@ -158,6 +171,38 @@
 	if b.curBundle.size >= b.BundleByteThreshold {
 		b.closeAndHandleBundle()
 	}
+}
+
+// AddWait adds item to the current bundle. It marks the bundle for handling and
+// starts a new one if any of the thresholds or limits are exceeded.
+//
+// If the item's size exceeds the maximum bundle size (Bundler.BundleByteLimit), then
+// the item can never be handled. AddWait returns ErrOversizedItem in this case.
+//
+// If adding the item would exceed the maximum memory allowed (Bundler.BufferedByteLimit),
+// AddWait blocks until space is available or ctx is done.
+func (b *Bundler) AddWait(ctx context.Context, item interface{}, size int) error {
+	// If this item exceeds the maximum size of a bundle,
+	// we can never send it.
+	if b.BundleByteLimit > 0 && size > b.BundleByteLimit {
+		return ErrOversizedItem
+	}
+	b.mu.Lock()
+	// If adding this item would exceed our allotted memory
+	// footprint, block until space is available.
+	// TODO(jba): avoid starvation of large items.
+	for b.bufferedSize+size > b.BufferedByteLimit {
+		avail := b.spaceAvailable
+		b.mu.Unlock()
+		select {
+		case <-ctx.Done():
+			return ctx.Err()
+		case <-avail:
+			b.mu.Lock()
+		}
+	}
+	b.addLocked(item, size)
+	b.mu.Unlock()
 	return nil
 }
 
@@ -250,7 +295,10 @@
 			// during this loop will have a chance of succeeding.
 			b.mu.Lock()
 			b.bufferedSize -= bun.size
+			avail := b.spaceAvailable
+			b.spaceAvailable = make(chan struct{})
 			b.mu.Unlock()
+			close(avail)
 		}
 		// Signal that we've sent all outstanding bundles.
 		close(calledc)
diff --git a/support/bundler/bundler_test.go b/support/bundler/bundler_test.go
index 8d56519..2667cb9 100644
--- a/support/bundler/bundler_test.go
+++ b/support/bundler/bundler_test.go
@@ -15,6 +15,8 @@
 package bundler
 
 import (
+	"context"
+	"fmt"
 	"reflect"
 	"sync"
 	"testing"
@@ -141,6 +143,64 @@
 	}
 }
 
+func TestAddWait(t *testing.T) {
+	var (
+		mu     sync.Mutex
+		events []string
+	)
+	event := func(s string) {
+		mu.Lock()
+		events = append(events, s)
+		mu.Unlock()
+	}
+
+	handlec := make(chan int)
+	done := make(chan struct{})
+	b := NewBundler(int(0), func(interface{}) {
+		<-handlec
+		event("handle")
+	})
+	b.BufferedByteLimit = 3
+	addw := func(sz int) {
+		if err := b.AddWait(context.Background(), 0, sz); err != nil {
+			t.Fatal(err)
+		}
+		event(fmt.Sprintf("addw(%d)", sz))
+	}
+
+	addw(2)
+	go func() {
+		addw(3) // blocks until first bundle is handled
+		close(done)
+	}()
+	// Give addw(3) a chance to finish
+	time.Sleep(100 * time.Millisecond)
+	handlec <- 1 // handle the first bundle
+	select {
+	case <-time.After(time.Second):
+		t.Fatal("timed out")
+	case <-done:
+	}
+	want := []string{"addw(2)", "handle", "addw(3)"}
+	if !reflect.DeepEqual(events, want) {
+		t.Errorf("got  %v\nwant%v", events, want)
+	}
+}
+
+func TestAddWaitCancel(t *testing.T) {
+	b := NewBundler(int(0), func(interface{}) {})
+	b.BufferedByteLimit = 3
+	ctx, cancel := context.WithCancel(context.Background())
+	go func() {
+		time.Sleep(100 * time.Millisecond)
+		cancel()
+	}()
+	err := b.AddWait(ctx, 0, 4)
+	if want := context.Canceled; err != want {
+		t.Fatalf("got %v, want %v", err, want)
+	}
+}
+
 func TestBundlerErrors(t *testing.T) {
 	// Use a handler that blocks forever, to force the bundler to run out of
 	// memory.