// Copyright 2016 Google LLC.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package bundler

import (
	"context"
	"fmt"
	"reflect"
	"sync"
	"testing"
	"time"
)

func TestBundlerCount1(t *testing.T) {
	// Unbundled case: one item per bundle.
	handler := &testHandler{}
	b := NewBundler(int(0), handler.handle)
	b.BundleCountThreshold = 1
	b.DelayThreshold = time.Second

	for i := 0; i < 3; i++ {
		if err := b.Add(i, 1); err != nil {
			t.Fatal(err)
		}
	}
	b.Flush()
	got := handler.bundles()
	want := [][]int{{0}, {1}, {2}}
	if !reflect.DeepEqual(got, want) {
		t.Errorf("bundles: got %v, want %v", got, want)
	}
	// All bundles should have been handled "immediately": much less
	// than the delay threshold of 1s.
	tgot := quantizeTimes(handler.times(), 100*time.Millisecond)
	twant := []int{0, 0, 0}
	if !reflect.DeepEqual(tgot, twant) {
		t.Errorf("times: got %v, want %v", tgot, twant)
	}
}

func TestBundlerCount3(t *testing.T) {
	handler := &testHandler{}
	b := NewBundler(int(0), handler.handle)
	b.BundleCountThreshold = 3
	b.DelayThreshold = 100 * time.Millisecond
	// Add 8 items.
	// The first two bundles of 3 should both be handled quickly.
	// The third bundle of 2 should not be handled for about DelayThreshold ms.
	for i := 0; i < 8; i++ {
		if err := b.Add(i, 1); err != nil {
			t.Fatal(err)
		}
	}
	time.Sleep(5 * b.DelayThreshold)
	// We should not need to close the bundler.

	bgot := handler.bundles()
	bwant := [][]int{{0, 1, 2}, {3, 4, 5}, {6, 7}}
	if !reflect.DeepEqual(bgot, bwant) {
		t.Errorf("bundles: got %v, want %v", bgot, bwant)
	}

	tgot := quantizeTimes(handler.times(), b.DelayThreshold)
	if len(tgot) != 3 || tgot[0] != 0 || tgot[1] != 0 || tgot[2] == 0 {
		t.Errorf("times: got %v, want [0, 0, non-zero]", tgot)
	}
}

func TestBundlerByteThreshold(t *testing.T) {
	handler := &testHandler{}
	b := NewBundler(int(0), handler.handle)
	b.BundleCountThreshold = 10
	b.BundleByteThreshold = 3
	add := func(i interface{}, s int) {
		if err := b.Add(i, s); err != nil {
			t.Fatal(err)
		}
	}

	add(1, 1)
	add(2, 2)
	// Hit byte threshold: bundle = 1, 2
	add(3, 1)
	add(4, 1)
	add(5, 2)
	// Passed byte threshold, but not limit: bundle = 3, 4, 5
	add(6, 1)
	b.Flush()
	bgot := handler.bundles()
	bwant := [][]int{{1, 2}, {3, 4, 5}, {6}}
	if !reflect.DeepEqual(bgot, bwant) {
		t.Errorf("bundles: got %v, want %v", bgot, bwant)
	}
	tgot := quantizeTimes(handler.times(), b.DelayThreshold)
	twant := []int{0, 0, 0}
	if !reflect.DeepEqual(tgot, twant) {
		t.Errorf("times: got %v, want %v", tgot, twant)
	}
}

func TestBundlerLimit(t *testing.T) {
	handler := &testHandler{}
	b := NewBundler(int(0), handler.handle)
	b.BundleCountThreshold = 10
	b.BundleByteLimit = 3
	add := func(i interface{}, s int) {
		if err := b.Add(i, s); err != nil {
			t.Fatal(err)
		}
	}

	add(1, 1)
	add(2, 2)
	// Hit byte limit: bundle = 1, 2
	add(3, 1)
	add(4, 1)
	add(5, 2)
	// Exceeded byte limit: bundle = 3, 4
	add(6, 2)
	// Exceeded byte limit: bundle = 5
	b.Flush()
	bgot := handler.bundles()
	bwant := [][]int{{1, 2}, {3, 4}, {5}, {6}}
	if !reflect.DeepEqual(bgot, bwant) {
		t.Errorf("bundles: got %v, want %v", bgot, bwant)
	}
	tgot := quantizeTimes(handler.times(), b.DelayThreshold)
	twant := []int{0, 0, 0, 0}
	if !reflect.DeepEqual(tgot, twant) {
		t.Errorf("times: got %v, want %v", tgot, twant)
	}
}

func TestAddWait(t *testing.T) {
	var (
		mu     sync.Mutex
		events []string
	)
	event := func(s string) {
		mu.Lock()
		events = append(events, s)
		mu.Unlock()
	}

	handlec := make(chan int)
	done := make(chan struct{})
	b := NewBundler(int(0), func(interface{}) {
		<-handlec
		event("handle")
	})
	b.BufferedByteLimit = 3
	addw := func(sz int) {
		if err := b.AddWait(context.Background(), 0, sz); err != nil {
			t.Fatal(err)
		}
		event(fmt.Sprintf("addw(%d)", sz))
	}

	addw(2)
	go func() {
		addw(3) // blocks until first bundle is handled
		close(done)
	}()
	// Give addw(3) a chance to finish
	time.Sleep(100 * time.Millisecond)
	handlec <- 1 // handle the first bundle
	select {
	case <-time.After(time.Second):
		t.Fatal("timed out")
	case <-done:
	}
	want := []string{"addw(2)", "handle", "addw(3)"}
	if !reflect.DeepEqual(events, want) {
		t.Errorf("got  %v\nwant%v", events, want)
	}
}

func TestAddWaitCancel(t *testing.T) {
	b := NewBundler(int(0), func(interface{}) {})
	b.BufferedByteLimit = 3
	ctx, cancel := context.WithCancel(context.Background())
	go func() {
		time.Sleep(100 * time.Millisecond)
		cancel()
	}()
	err := b.AddWait(ctx, 0, 4)
	if want := context.Canceled; err != want {
		t.Fatalf("got %v, want %v", err, want)
	}
}

func TestBundlerErrors(t *testing.T) {
	// Use a handler that blocks forever, to force the bundler to run out of
	// memory.
	b := NewBundler(int(0), func(interface{}) { select {} })
	b.BundleByteLimit = 3
	b.BufferedByteLimit = 10

	if got, want := b.Add(1, 4), ErrOversizedItem; got != want {
		t.Fatalf("got %v, want %v", got, want)
	}

	for i := 0; i < 5; i++ {
		if err := b.Add(i, 2); err != nil {
			t.Fatal(err)
		}
	}
	if got, want := b.Add(5, 1), ErrOverflow; got != want {
		t.Fatalf("got %v, want %v", got, want)
	}
}

// Check that no more than HandlerLimit handlers are active at once.
func TestConcurrentHandlersMax(t *testing.T) {
	const handlerLimit = 10
	var (
		mu          sync.Mutex
		active      int
		maxHandlers int
	)
	b := NewBundler(int(0), func(s interface{}) {
		mu.Lock()
		active++
		if active > maxHandlers {
			maxHandlers = active
		}
		if maxHandlers > handlerLimit {
			t.Errorf("too many handlers running (got %d; want %d)", maxHandlers, handlerLimit)
		}
		mu.Unlock()
		time.Sleep(1 * time.Millisecond) // let the scheduler work
		mu.Lock()
		active--
		mu.Unlock()
	})
	b.BundleCountThreshold = 5
	b.HandlerLimit = 10
	defer b.Flush()

	more := 0 // extra iterations past saturation
	for i := 0; more == 0 || i < more; i++ {
		mu.Lock()
		m := maxHandlers
		mu.Unlock()
		if m >= handlerLimit && more == 0 {
			// Run past saturation to check that we don't exceed the max.
			more = 2 * i
		}
		b.Add(i, 1)
	}
}

// Check that Flush doesn't return until all prior items have been handled.
func TestConcurrentFlush(t *testing.T) {
	var (
		mu    sync.Mutex
		items = make(map[int]bool)
	)
	b := NewBundler(int(0), func(s interface{}) {
		mu.Lock()
		for _, i := range s.([]int) {
			items[i] = true
		}
		mu.Unlock()
		time.Sleep(10 * time.Millisecond)
	})
	b.BundleCountThreshold = 5
	b.HandlerLimit = 10
	defer b.Flush()

	var wg sync.WaitGroup
	defer wg.Wait()
	for i := 0; i < 50; i++ {
		b.Add(i, 1)
		if i%100 == 0 {
			i := i
			wg.Add(1)
			go func() {
				defer wg.Done()
				b.Flush()
				mu.Lock()
				defer mu.Unlock()
				for j := 0; j <= i; j++ {
					if !items[j] {
						// Cannot use Fatal, since we're in a non-test goroutine.
						t.Errorf("flush(%d): item %d not handled", i, j)
						break
					}
				}
			}()
		}
	}
}

type testHandler struct {
	mu sync.Mutex
	b  [][]int
	t  []time.Time
}

func (t *testHandler) bundles() [][]int {
	t.mu.Lock()
	defer t.mu.Unlock()
	return t.b
}

func (t *testHandler) times() []time.Time {
	t.mu.Lock()
	defer t.mu.Unlock()
	return t.t
}

func (t *testHandler) handle(b interface{}) {
	t.mu.Lock()
	defer t.mu.Unlock()
	t.b = append(t.b, b.([]int))
	t.t = append(t.t, time.Now())
}

// Round times to the nearest q and express them as the number of q
// since the first time.
// E.g. if q is 100ms, then a time within 50ms of the first time
// will be represented as 0, a time 150 to 250ms of the first time
// we be represented as 1, etc.
func quantizeTimes(times []time.Time, q time.Duration) []int {
	var rs []int
	for _, t := range times {
		d := t.Sub(times[0])
		r := int((d + q/2) / q)
		rs = append(rs, r)
	}
	return rs
}

func TestQuantizeTimes(t *testing.T) {
	quantum := 100 * time.Millisecond
	for _, test := range []struct {
		millis []int // times in milliseconds
		want   []int
	}{
		{[]int{10, 20, 30}, []int{0, 0, 0}},
		{[]int{0, 49, 50, 90}, []int{0, 0, 1, 1}},
		{[]int{0, 95, 170, 315}, []int{0, 1, 2, 3}},
	} {
		var times []time.Time
		for _, ms := range test.millis {
			times = append(times, time.Unix(0, int64(ms*1e6)))
		}
		got := quantizeTimes(times, quantum)
		if !reflect.DeepEqual(got, test.want) {
			t.Errorf("%v: got %v, want %v", test.millis, got, test.want)
		}
	}
}
