blob: 019312da33ac83ac466fc284eeae8b8bd34f6854 [file] [log] [blame]
// Copyright 2016 Google LLC.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package bundler
import (
"context"
"fmt"
"reflect"
"sort"
"sync"
"testing"
"time"
)
func TestBundlerCount1(t *testing.T) {
// Unbundled case: one item per bundle.
handler := &testHandler{}
b := NewBundler(int(0), handler.handleImmediate)
b.BundleCountThreshold = 1
b.DelayThreshold = time.Second
for i := 0; i < 3; i++ {
if err := b.Add(i, 1); err != nil {
t.Fatal(err)
}
}
b.Flush()
got := handler.bundles()
want := [][]int{{0}, {1}, {2}}
if !reflect.DeepEqual(got, want) {
t.Errorf("bundles: got %v, want %v", got, want)
}
// All bundles should have been handled "immediately": much less
// than the delay threshold of 1s.
tgot := quantizeTimes(handler.times(), 100*time.Millisecond)
twant := []int{0, 0, 0}
if !reflect.DeepEqual(tgot, twant) {
t.Errorf("times: got %v, want %v", tgot, twant)
}
}
func TestBundlerCount3(t *testing.T) {
handler := &testHandler{}
b := NewBundler(int(0), handler.handleImmediate)
b.BundleCountThreshold = 3
b.DelayThreshold = 100 * time.Millisecond
// Add 8 items.
// The first two bundles of 3 should both be handled quickly.
// The third bundle of 2 should not be handled for about DelayThreshold ms.
for i := 0; i < 8; i++ {
if err := b.Add(i, 1); err != nil {
t.Fatal(err)
}
}
time.Sleep(5 * b.DelayThreshold)
// We should not need to close the bundler.
bgot := handler.bundles()
bwant := [][]int{{0, 1, 2}, {3, 4, 5}, {6, 7}}
if !reflect.DeepEqual(bgot, bwant) {
t.Errorf("bundles: got %v, want %v", bgot, bwant)
}
tgot := quantizeTimes(handler.times(), b.DelayThreshold)
if len(tgot) != 3 || tgot[0] != 0 || tgot[1] != 0 || tgot[2] == 0 {
t.Errorf("times: got %v, want [0, 0, non-zero]", tgot)
}
}
// Test that items are handled correctly at roughly the right time with a "slow"
// handler (takes 300 milliseconds) and that the last bundle is automatically
// flushed.
func TestBundlerCountSlowHandler(t *testing.T) {
handler := &testHandler{}
b := NewBundler(int(0), handler.handleSlow)
b.BundleCountThreshold = 3
b.DelayThreshold = 500 * time.Millisecond
// Add 10 items.
for i := 0; i < 10; i++ {
if err := b.Add(i, 1); err != nil {
t.Fatal(err)
}
}
time.Sleep(4 * 300 * time.Millisecond)
// We should not need to close the bundler.
bgot := handler.bundles()
bwant := [][]int{{0, 1, 2}, {3, 4, 5}, {6, 7, 8}, {9}}
if !reflect.DeepEqual(bgot, bwant) {
t.Errorf("bundles: got %v, want %v", bgot, bwant)
}
tgot := quantizeTimes(handler.times(), 100*time.Millisecond)
// Should handle new bundle every 300 milliseconds, and last incomplete
// bundle should get automatically flushed.
twant := []int{0, 3, 6, 9}
if !reflect.DeepEqual(tgot, twant) {
t.Errorf("times: got %v, want [0, 0, non-zero]", tgot)
}
}
func TestBundlerByteThreshold(t *testing.T) {
handler := &testHandler{}
b := NewBundler(int(0), handler.handleImmediate)
b.BundleCountThreshold = 10
b.BundleByteThreshold = 3
// Increase the limit beyond the number of bundles we expect (3)
// so that bundles get handled immediately after they cross the
// threshold. Otherwise, the test is non-deterministic. With the default
// HandlerLimit of 1, the 2nd and 3rd bundles may or may not be
// combined based on how long it takes to handle the 1st bundle.
b.HandlerLimit = 10
add := func(i interface{}, s int) {
if err := b.Add(i, s); err != nil {
t.Fatal(err)
}
}
add(1, 1)
add(2, 2)
// Hit byte threshold AND under HandlerLimit:
// bundle = 1, 2
add(3, 1)
add(4, 1)
add(5, 2)
// Passed byte threshold AND under byte limit AND under HandlerLimit:
// bundle = 3, 4, 5
add(6, 1)
b.Flush()
bgot := handler.bundles()
// We don't care about the order they were handled in. We just want
// to test that crossing the threshold triggered handling.
sort.Slice(bgot, func(i, j int) bool {
return bgot[i][0] < bgot[j][0]
})
bwant := [][]int{{1, 2}, {3, 4, 5}, {6}}
if !reflect.DeepEqual(bgot, bwant) {
t.Errorf("bundles: got %v, want %v", bgot, bwant)
}
tgot := quantizeTimes(handler.times(), b.DelayThreshold)
twant := []int{0, 0, 0}
if !reflect.DeepEqual(tgot, twant) {
t.Errorf("times: got %v, want %v", tgot, twant)
}
}
func TestBundlerLimit(t *testing.T) {
handler := &testHandler{}
b := NewBundler(int(0), handler.handleImmediate)
b.BundleCountThreshold = 10
b.BundleByteLimit = 3
add := func(i interface{}, s int) {
if err := b.Add(i, s); err != nil {
t.Fatal(err)
}
}
add(1, 1)
add(2, 2)
// Hit byte limit: bundle = 1, 2
add(3, 1)
add(4, 1)
add(5, 2)
// Exceeded byte limit: bundle = 3, 4
add(6, 2)
// Exceeded byte limit: bundle = 5
b.Flush()
bgot := handler.bundles()
bwant := [][]int{{1, 2}, {3, 4}, {5}, {6}}
if !reflect.DeepEqual(bgot, bwant) {
t.Errorf("bundles: got %v, want %v", bgot, bwant)
}
tgot := quantizeTimes(handler.times(), b.DelayThreshold)
twant := []int{0, 0, 0, 0}
if !reflect.DeepEqual(tgot, twant) {
t.Errorf("times: got %v, want %v", tgot, twant)
}
}
func TestAddWait(t *testing.T) {
var (
mu sync.Mutex
events []string
)
event := func(s string) {
mu.Lock()
events = append(events, s)
mu.Unlock()
}
handlec := make(chan int)
done := make(chan struct{})
b := NewBundler(int(0), func(interface{}) {
<-handlec
event("handle")
})
b.BufferedByteLimit = 3
addw := func(sz int) {
if err := b.AddWait(context.Background(), 0, sz); err != nil {
t.Fatal(err)
}
event(fmt.Sprintf("addw(%d)", sz))
}
addw(2)
go func() {
addw(3) // blocks until first bundle is handled
close(done)
}()
// Give addw(3) a chance to finish
time.Sleep(100 * time.Millisecond)
handlec <- 1 // handle the first bundle
select {
case <-time.After(time.Second):
t.Fatal("timed out")
case <-done:
}
want := []string{"addw(2)", "handle", "addw(3)"}
if !reflect.DeepEqual(events, want) {
t.Errorf("got %v\nwant%v", events, want)
}
}
func TestAddWaitCancel(t *testing.T) {
b := NewBundler(int(0), func(interface{}) {})
b.BufferedByteLimit = 3
ctx, cancel := context.WithCancel(context.Background())
go func() {
time.Sleep(100 * time.Millisecond)
cancel()
}()
err := b.AddWait(ctx, 0, 4)
if want := context.Canceled; err != want {
t.Fatalf("got %v, want %v", err, want)
}
}
func TestBundlerErrors(t *testing.T) {
// Use a handler that blocks forever, to force the bundler to run out of
// memory.
b := NewBundler(int(0), func(interface{}) { select {} })
b.BundleByteLimit = 3
b.BufferedByteLimit = 10
if got, want := b.Add(1, 4), ErrOversizedItem; got != want {
t.Fatalf("got %v, want %v", got, want)
}
for i := 0; i < 5; i++ {
if err := b.Add(i, 2); err != nil {
t.Fatal(err)
}
}
if got, want := b.Add(5, 1), ErrOverflow; got != want {
t.Fatalf("got %v, want %v", got, want)
}
}
// Check that no more than HandlerLimit handlers are active at once.
func TestConcurrentHandlersMax(t *testing.T) {
const handlerLimit = 10
var (
mu sync.Mutex
active int
maxHandlers int
)
b := NewBundler(int(0), func(s interface{}) {
mu.Lock()
active++
if active > maxHandlers {
maxHandlers = active
}
if maxHandlers > handlerLimit {
t.Errorf("too many handlers running (got %d; want %d)", maxHandlers, handlerLimit)
}
mu.Unlock()
time.Sleep(1 * time.Millisecond) // let the scheduler work
mu.Lock()
active--
mu.Unlock()
})
b.BundleCountThreshold = 5
b.HandlerLimit = 10
defer b.Flush()
more := 0 // extra iterations past saturation
for i := 0; more == 0 || i < more; i++ {
mu.Lock()
m := maxHandlers
mu.Unlock()
if m >= handlerLimit && more == 0 {
// Run past saturation to check that we don't exceed the max.
more = 2 * i
}
b.Add(i, 1)
}
}
// Check that Flush doesn't return until all prior items have been handled.
func TestConcurrentFlush(t *testing.T) {
var (
mu sync.Mutex
items = make(map[int]bool)
)
b := NewBundler(int(0), func(s interface{}) {
mu.Lock()
for _, i := range s.([]int) {
items[i] = true
}
mu.Unlock()
time.Sleep(10 * time.Millisecond)
})
b.BundleCountThreshold = 5
b.HandlerLimit = 10
defer b.Flush()
var wg sync.WaitGroup
defer wg.Wait()
for i := 0; i < 50; i++ {
b.Add(i, 1)
if i%100 == 0 {
i := i
wg.Add(1)
go func() {
defer wg.Done()
b.Flush()
mu.Lock()
defer mu.Unlock()
for j := 0; j <= i; j++ {
if !items[j] {
// Cannot use Fatal, since we're in a non-test goroutine.
t.Errorf("flush(%d): item %d not handled", i, j)
break
}
}
}()
}
}
}
type testHandler struct {
mu sync.Mutex
b [][]int
t []time.Time
}
func (t *testHandler) bundles() [][]int {
t.mu.Lock()
defer t.mu.Unlock()
return t.b
}
func (t *testHandler) times() []time.Time {
t.mu.Lock()
defer t.mu.Unlock()
return t.t
}
// Handler takes no time beyond adding to a list
func (t *testHandler) handleImmediate(b interface{}) {
t.mu.Lock()
defer t.mu.Unlock()
t.b = append(t.b, b.([]int))
t.t = append(t.t, time.Now())
}
// Handler takes 300 milliseconds
func (t *testHandler) handleSlow(b interface{}) {
t.mu.Lock()
defer t.mu.Unlock()
t.b = append(t.b, b.([]int))
t.t = append(t.t, time.Now())
time.Sleep(300 * time.Millisecond)
}
// Handler takes one millisecond
func (t *testHandler) handleQuick(b interface{}) {
t.mu.Lock()
defer t.mu.Unlock()
t.b = append(t.b, b.([]int))
t.t = append(t.t, time.Now())
time.Sleep(time.Millisecond)
}
// Round times to the nearest q and express them as the number of q
// since the first time.
// E.g. if q is 100ms, then a time within 50ms of the first time
// will be represented as 0, a time 150 to 250ms of the first time
// we be represented as 1, etc.
func quantizeTimes(times []time.Time, q time.Duration) []int {
var rs []int
for _, t := range times {
d := t.Sub(times[0])
r := int((d + q/2) / q)
rs = append(rs, r)
}
return rs
}
func TestQuantizeTimes(t *testing.T) {
quantum := 100 * time.Millisecond
for _, test := range []struct {
millis []int // times in milliseconds
want []int
}{
{[]int{10, 20, 30}, []int{0, 0, 0}},
{[]int{0, 49, 50, 90}, []int{0, 0, 1, 1}},
{[]int{0, 95, 170, 315}, []int{0, 1, 2, 3}},
} {
var times []time.Time
for _, ms := range test.millis {
times = append(times, time.Unix(0, int64(ms*1e6)))
}
got := quantizeTimes(times, quantum)
if !reflect.DeepEqual(got, test.want) {
t.Errorf("%v: got %v, want %v", test.millis, got, test.want)
}
}
}
// Measure the cost of adding a bunch of items only, though some handling may be
// happening in the background
func BenchmarkBundlerAdd(bench *testing.B) {
// Unbundled case: one item per bundle.
handler := &testHandler{}
b := NewBundler(int(0), handler.handleImmediate)
b.BundleCountThreshold = 1
b.DelayThreshold = time.Second
for i := 0; i < bench.N; i++ {
if err := b.Add(i, 1); err != nil {
bench.Fatal(err)
}
}
}
// Measure the cost of adding a bunch of items, and then waiting for them all to
// be handled, when handling is immediate (no delay)
func BenchmarkBundlerAddAndFlush(bench *testing.B) {
// Unbundled case: one item per bundle.
handler := &testHandler{}
b := NewBundler(int(0), handler.handleImmediate)
b.BundleCountThreshold = 1
b.DelayThreshold = time.Second
for i := 0; i < bench.N; i++ {
if err := b.Add(i, 1); err != nil {
bench.Fatal(err)
}
}
b.Flush()
}
// Measure the cost of adding a bunch of items, and then waiting for them all to
// be handled, when handling a bundle (1 item only) takes one millisecond
func BenchmarkBundlerAddAndFlushSlow1(bench *testing.B) {
// Unbundled case: one item per bundle.
handler := &testHandler{}
b := NewBundler(int(0), handler.handleQuick)
b.BundleCountThreshold = 1
b.DelayThreshold = time.Second
for i := 0; i < bench.N; i++ {
if err := b.Add(i, 1); err != nil {
bench.Fatal(err)
}
}
b.Flush()
}
// Measure the cost of adding a bunch of items, and then waiting for them all to
// be handled, when handling a bundle (25 items) takes one millisecond
func BenchmarkBundlerAddAndFlushSlow25(bench *testing.B) {
// More realistic: 25 items per bundle
handler := &testHandler{}
b := NewBundler(int(0), handler.handleQuick)
b.BundleCountThreshold = 25
b.DelayThreshold = time.Second
for i := 0; i < bench.N; i++ {
if err := b.Add(i, 1); err != nil {
bench.Fatal(err)
}
}
b.Flush()
}