bundler: add a blocking Add method
AddWait blocks if there is not enough space, instead of returning
immediately with ErrOverflow.
Motivation: We want the new async pubsub.Topic.Publish to
wait instead of returning an error.
Change-Id: I72ed4b814816466dece9fdcd81e8c5c964a3027a
Reviewed-on: https://code-review.googlesource.com/11111
Reviewed-by: Michael McGreevy <mcgreevy@golang.org>
Reviewed-by: Michael Darakananda <pongad@google.com>
Reviewed-by: Jonathan Amsterdam <jba@google.com>
diff --git a/support/bundler/bundler.go b/support/bundler/bundler.go
index 62cfd21..3cbfa3f 100644
--- a/support/bundler/bundler.go
+++ b/support/bundler/bundler.go
@@ -26,6 +26,8 @@
"reflect"
"sync"
"time"
+
+ "golang.org/x/net/context"
)
const (
@@ -74,11 +76,12 @@
handlec chan int // sent to when a bundle is ready for handling
timer *time.Timer // implements DelayThreshold
- mu sync.Mutex
- bufferedSize int // total bytes buffered
- closedBundles []bundle // bundles waiting to be handled
- curBundle bundle // incoming items added to this bundle
- calledc chan struct{} // closed and re-created after handler is called
+ mu sync.Mutex
+ spaceAvailable chan struct{} // closed and replaced when space is available
+ bufferedSize int // total bytes buffered
+ closedBundles []bundle // bundles waiting to be handled
+ curBundle bundle // incoming items added to this bundle
+ calledc chan struct{} // closed and re-created after handler is called
}
type bundle struct {
@@ -102,12 +105,13 @@
BundleByteThreshold: DefaultBundleByteThreshold,
BufferedByteLimit: DefaultBufferedByteLimit,
- handler: handler,
- itemSliceZero: reflect.Zero(reflect.SliceOf(reflect.TypeOf(itemExample))),
- donec: make(chan struct{}),
- handlec: make(chan int, 1),
- calledc: make(chan struct{}),
- timer: time.NewTimer(1000 * time.Hour), // harmless initial timeout
+ handler: handler,
+ itemSliceZero: reflect.Zero(reflect.SliceOf(reflect.TypeOf(itemExample))),
+ donec: make(chan struct{}),
+ handlec: make(chan int, 1),
+ calledc: make(chan struct{}),
+ timer: time.NewTimer(1000 * time.Hour), // harmless initial timeout
+ spaceAvailable: make(chan struct{}),
}
b.curBundle.items = b.itemSliceZero
go b.background()
@@ -137,6 +141,15 @@
if b.bufferedSize+size > b.BufferedByteLimit {
return ErrOverflow
}
+ b.addLocked(item, size)
+ return nil
+}
+
+// addLocked adds item to the current bundle. It marks the bundle for handling and
+// starts a new one if any of the thresholds or limits are exceeded.
+//
+// addLocked is called with the lock held.
+func (b *Bundler) addLocked(item interface{}, size int) {
// If adding this item to the current bundle would cause it to exceed the
// maximum bundle size, close the current bundle and start a new one.
if b.BundleByteLimit > 0 && b.curBundle.size+size > b.BundleByteLimit {
@@ -158,6 +171,38 @@
if b.curBundle.size >= b.BundleByteThreshold {
b.closeAndHandleBundle()
}
+}
+
+// AddWait adds item to the current bundle. It marks the bundle for handling and
+// starts a new one if any of the thresholds or limits are exceeded.
+//
+// If the item's size exceeds the maximum bundle size (Bundler.BundleByteLimit), then
+// the item can never be handled. AddWait returns ErrOversizedItem in this case.
+//
+// If adding the item would exceed the maximum memory allowed (Bundler.BufferedByteLimit),
+// AddWait blocks until space is available or ctx is done.
+func (b *Bundler) AddWait(ctx context.Context, item interface{}, size int) error {
+ // If this item exceeds the maximum size of a bundle,
+ // we can never send it.
+ if b.BundleByteLimit > 0 && size > b.BundleByteLimit {
+ return ErrOversizedItem
+ }
+ b.mu.Lock()
+ // If adding this item would exceed our allotted memory
+ // footprint, block until space is available.
+ // TODO(jba): avoid starvation of large items.
+ for b.bufferedSize+size > b.BufferedByteLimit {
+ avail := b.spaceAvailable
+ b.mu.Unlock()
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-avail:
+ b.mu.Lock()
+ }
+ }
+ b.addLocked(item, size)
+ b.mu.Unlock()
return nil
}
@@ -250,7 +295,10 @@
// during this loop will have a chance of succeeding.
b.mu.Lock()
b.bufferedSize -= bun.size
+ avail := b.spaceAvailable
+ b.spaceAvailable = make(chan struct{})
b.mu.Unlock()
+ close(avail)
}
// Signal that we've sent all outstanding bundles.
close(calledc)
diff --git a/support/bundler/bundler_test.go b/support/bundler/bundler_test.go
index 8d56519..2667cb9 100644
--- a/support/bundler/bundler_test.go
+++ b/support/bundler/bundler_test.go
@@ -15,6 +15,8 @@
package bundler
import (
+ "context"
+ "fmt"
"reflect"
"sync"
"testing"
@@ -141,6 +143,64 @@
}
}
+func TestAddWait(t *testing.T) {
+ var (
+ mu sync.Mutex
+ events []string
+ )
+ event := func(s string) {
+ mu.Lock()
+ events = append(events, s)
+ mu.Unlock()
+ }
+
+ handlec := make(chan int)
+ done := make(chan struct{})
+ b := NewBundler(int(0), func(interface{}) {
+ <-handlec
+ event("handle")
+ })
+ b.BufferedByteLimit = 3
+ addw := func(sz int) {
+ if err := b.AddWait(context.Background(), 0, sz); err != nil {
+ t.Fatal(err)
+ }
+ event(fmt.Sprintf("addw(%d)", sz))
+ }
+
+ addw(2)
+ go func() {
+ addw(3) // blocks until first bundle is handled
+ close(done)
+ }()
+ // Give addw(3) a chance to finish
+ time.Sleep(100 * time.Millisecond)
+ handlec <- 1 // handle the first bundle
+ select {
+ case <-time.After(time.Second):
+ t.Fatal("timed out")
+ case <-done:
+ }
+ want := []string{"addw(2)", "handle", "addw(3)"}
+ if !reflect.DeepEqual(events, want) {
+ t.Errorf("got %v\nwant%v", events, want)
+ }
+}
+
+func TestAddWaitCancel(t *testing.T) {
+ b := NewBundler(int(0), func(interface{}) {})
+ b.BufferedByteLimit = 3
+ ctx, cancel := context.WithCancel(context.Background())
+ go func() {
+ time.Sleep(100 * time.Millisecond)
+ cancel()
+ }()
+ err := b.AddWait(ctx, 0, 4)
+ if want := context.Canceled; err != want {
+ t.Fatalf("got %v, want %v", err, want)
+ }
+}
+
func TestBundlerErrors(t *testing.T) {
// Use a handler that blocks forever, to force the bundler to run out of
// memory.