blob: 0128afe64954ebc4e36eea33b295545fbd9417e2 [file] [log] [blame]
// Copyright 2016 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pubsub
import (
"errors"
"reflect"
"sort"
"testing"
"time"
"golang.org/x/net/context"
)
func TestKeepAliveExtendsDeadline(t *testing.T) {
ticker := make(chan time.Time)
deadline := time.Nanosecond * 15
s := &testService{modDeadlineCalled: make(chan modDeadlineCall)}
checkModDeadlineCall := func(ackIDs []string) {
got := <-s.modDeadlineCalled
sort.Strings(got.ackIDs)
want := modDeadlineCall{
subName: "subname",
deadline: deadline,
ackIDs: ackIDs,
}
if !reflect.DeepEqual(got, want) {
t.Errorf("keepalive: got:\n%v\nwant:\n%v", got, want)
}
}
ka := &keepAlive{
s: s,
Ctx: context.Background(),
Sub: "subname",
ExtensionTick: ticker,
Deadline: deadline,
MaxExtension: time.Hour,
}
ka.Start()
ka.Add("a")
ka.Add("b")
ticker <- time.Time{}
checkModDeadlineCall([]string{"a", "b"})
ka.Add("c")
ka.Remove("b")
ticker <- time.Time{}
checkModDeadlineCall([]string{"a", "c"})
ka.Remove("a")
ka.Remove("c")
ka.Add("d")
ticker <- time.Time{}
checkModDeadlineCall([]string{"d"})
ka.Remove("d")
ka.Stop()
}
func TestKeepAliveStopsWhenNoItem(t *testing.T) {
ticker := make(chan time.Time)
stopped := make(chan bool)
s := &testService{modDeadlineCalled: make(chan modDeadlineCall, 3)}
ka := &keepAlive{
s: s,
Ctx: context.Background(),
ExtensionTick: ticker,
}
ka.Start()
// There should be no call to modifyAckDeadline since there is no item.
ticker <- time.Time{}
go func() {
ka.Stop() // No items; should not block
if len(s.modDeadlineCalled) > 0 {
t.Errorf("unexpected extension to non-existent items: %v", <-s.modDeadlineCalled)
}
close(stopped)
}()
select {
case <-stopped:
case <-time.After(time.Second):
t.Errorf("keepAlive timed out waiting for stop")
}
}
func TestKeepAliveStopsWhenItemsExpired(t *testing.T) {
ticker := make(chan time.Time)
stopped := make(chan bool)
s := &testService{modDeadlineCalled: make(chan modDeadlineCall, 2)}
ka := &keepAlive{
s: s,
Ctx: context.Background(),
ExtensionTick: ticker,
MaxExtension: time.Duration(0), // Should expire items at the first tick.
}
ka.Start()
ka.Add("a")
ka.Add("b")
// Wait until the clock advances. Without this loop, this test fails on
// Windows because the clock doesn't advance at all between ka.Add and the
// expiration check after the tick is received.
begin := time.Now()
for time.Now().Equal(begin) {
time.Sleep(time.Millisecond)
}
// There should be no call to modifyAckDeadline since both items are expired.
ticker <- time.Time{}
go func() {
ka.Stop() // No live items; should not block.
if len(s.modDeadlineCalled) > 0 {
t.Errorf("unexpected extension to expired items")
}
close(stopped)
}()
select {
case <-stopped:
case <-time.After(time.Second):
t.Errorf("timed out waiting for stop")
}
}
func TestKeepAliveBlocksUntilAllItemsRemoved(t *testing.T) {
ticker := make(chan time.Time)
eventc := make(chan string, 3)
s := &testService{modDeadlineCalled: make(chan modDeadlineCall)}
ka := &keepAlive{
s: s,
Ctx: context.Background(),
ExtensionTick: ticker,
MaxExtension: time.Hour, // Should not expire.
}
ka.Start()
ka.Add("a")
ka.Add("b")
go func() {
ticker <- time.Time{}
// We expect a call since both items should be extended.
select {
case args := <-s.modDeadlineCalled:
sort.Strings(args.ackIDs)
got := args.ackIDs
want := []string{"a", "b"}
if !reflect.DeepEqual(got, want) {
t.Errorf("mismatching IDs:\ngot %v\nwant %v", got, want)
}
case <-time.After(time.Second):
t.Errorf("timed out waiting for deadline extend call")
}
time.Sleep(10 * time.Millisecond)
eventc <- "pre-remove-b"
// Remove one item, Stop should still be waiting.
ka.Remove("b")
ticker <- time.Time{}
// We expect a call since the item is still alive.
select {
case args := <-s.modDeadlineCalled:
got := args.ackIDs
want := []string{"a"}
if !reflect.DeepEqual(got, want) {
t.Errorf("mismatching IDs:\ngot %v\nwant %v", got, want)
}
case <-time.After(time.Second):
t.Errorf("timed out waiting for deadline extend call")
}
time.Sleep(10 * time.Millisecond)
eventc <- "pre-remove-a"
// Remove the last item so that Stop can proceed.
ka.Remove("a")
}()
go func() {
ka.Stop() // Should block all item are removed.
eventc <- "post-stop"
}()
for i, want := range []string{"pre-remove-b", "pre-remove-a", "post-stop"} {
select {
case got := <-eventc:
if got != want {
t.Errorf("event #%d:\ngot %v\nwant %v", i, got, want)
}
case <-time.After(time.Second):
t.Errorf("time out waiting for #%d event: want %v", i, want)
}
}
}
// extendCallResult contains a list of ackIDs which are expected in an ackID
// extension request, along with the result that should be returned.
type extendCallResult struct {
ackIDs []string
err error
}
// extendService implements modifyAckDeadline using a hard-coded list of extendCallResults.
type extendService struct {
service
calls []extendCallResult
t *testing.T // used for error logging.
}
func (es *extendService) modifyAckDeadline(ctx context.Context, subName string, deadline time.Duration, ackIDs []string) error {
if len(es.calls) == 0 {
es.t.Fatalf("unexpected call to modifyAckDeadline: ackIDs: %v", ackIDs)
}
call := es.calls[0]
es.calls = es.calls[1:]
if got, want := ackIDs, call.ackIDs; !reflect.DeepEqual(got, want) {
es.t.Errorf("unexpected arguments to modifyAckDeadline: got: %v ; want: %v", got, want)
}
return call.err
}
// Test implementation returns the first 2 elements as head, and the rest as tail.
func (es *extendService) splitAckIDs(ids []string) ([]string, []string) {
if len(ids) < 2 {
return ids, nil
}
return ids[:2], ids[2:]
}
func TestKeepAliveSplitsBatches(t *testing.T) {
type testCase struct {
calls []extendCallResult
}
for _, tc := range []testCase{
{
calls: []extendCallResult{
{
ackIDs: []string{"a", "b"},
},
{
ackIDs: []string{"c", "d"},
},
{
ackIDs: []string{"e", "f"},
},
},
},
{
calls: []extendCallResult{
{
ackIDs: []string{"a", "b"},
err: errors.New("bang"),
},
// On error we retry once.
{
ackIDs: []string{"a", "b"},
err: errors.New("bang"),
},
// We give up after failing twice, so we move on to the next set, "c" and "d".
{
ackIDs: []string{"c", "d"},
err: errors.New("bang"),
},
// Again, we retry once.
{
ackIDs: []string{"c", "d"},
},
{
ackIDs: []string{"e", "f"},
},
},
},
} {
s := &extendService{
t: t,
calls: tc.calls,
}
ka := &keepAlive{
s: s,
Ctx: context.Background(),
Sub: "subname",
}
ka.extendDeadlines([]string{"a", "b", "c", "d", "e", "f"})
if len(s.calls) != 0 {
t.Errorf("expected extend calls did not occur: %v", s.calls)
}
}
}