blob: ef1494223f528438fb4bad8d7da2c1d2529abbf6 [file] [log] [blame]
// Copyright 2014 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pubsub
import (
"fmt"
"math/rand"
"reflect"
"sync"
"testing"
"time"
"golang.org/x/net/context"
"cloud.google.com/go/internal/testutil"
"google.golang.org/api/option"
)
const timeout = time.Minute * 10
const ackDeadline = time.Second * 10
const batchSize = 100
const batches = 100
// messageCounter keeps track of how many times a given message has been received.
type messageCounter struct {
mu sync.Mutex
counts map[string]int
// A value is sent to recv each time Inc is called.
recv chan struct{}
}
func (mc *messageCounter) Inc(msgID string) {
mc.mu.Lock()
mc.counts[msgID] += 1
mc.mu.Unlock()
mc.recv <- struct{}{}
}
// process pulls messages from an iterator and records them in mc.
func process(t *testing.T, it *Iterator, mc *messageCounter) {
for {
m, err := it.Next()
if err == Done {
return
}
if err != nil {
t.Errorf("unexpected err from iterator: %v", err)
return
}
mc.Inc(m.ID)
// Simulate time taken to process m, while continuing to process more messages.
go func() {
// Some messages will need to have their ack deadline extended due to this delay.
delay := rand.Intn(int(ackDeadline * 3))
time.After(time.Duration(delay))
m.Done(true)
}()
}
}
// newIter constructs a new Iterator.
func newIter(t *testing.T, ctx context.Context, sub *Subscription) *Iterator {
it, err := sub.Pull(ctx)
if err != nil {
t.Fatalf("error constructing iterator: %v", err)
}
return it
}
// launchIter launches a number of goroutines to pull from the supplied Iterator.
func launchIter(t *testing.T, ctx context.Context, it *Iterator, mc *messageCounter, n int, wg *sync.WaitGroup) {
for j := 0; j < n; j++ {
wg.Add(1)
go func() {
defer wg.Done()
process(t, it, mc)
}()
}
}
// iteratorLifetime controls how long iterators live for before they are stopped.
type iteratorLifetimes interface {
// lifetimeChan should be called when an iterator is started. The
// returned channel will send when the iterator should be stopped.
lifetimeChan() <-chan time.Time
}
var immortal = &explicitLifetimes{}
// explicitLifetimes implements iteratorLifetime with hard-coded lifetimes, falling back
// to indefinite lifetimes when no explicit lifetimes remain.
type explicitLifetimes struct {
mu sync.Mutex
lifetimes []time.Duration
}
func (el *explicitLifetimes) lifetimeChan() <-chan time.Time {
el.mu.Lock()
defer el.mu.Unlock()
if len(el.lifetimes) == 0 {
return nil
}
lifetime := el.lifetimes[0]
el.lifetimes = el.lifetimes[1:]
return time.After(lifetime)
}
// consumer consumes messages according to its configuration.
type consumer struct {
// How many goroutines should pull from the subscription.
iteratorsInFlight int
// How many goroutines should pull from each iterator.
concurrencyPerIterator int
lifetimes iteratorLifetimes
}
// consume reads messages from a subscription, and keeps track of what it receives in mc.
// After consume returns, the caller should wait on wg to ensure that no more updates to mc will be made.
func (c *consumer) consume(t *testing.T, ctx context.Context, sub *Subscription, mc *messageCounter, wg *sync.WaitGroup, stop <-chan struct{}) {
for i := 0; i < c.iteratorsInFlight; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
it := newIter(t, ctx, sub)
launchIter(t, ctx, it, mc, c.concurrencyPerIterator, wg)
select {
case <-c.lifetimes.lifetimeChan():
it.Stop()
case <-stop:
it.Stop()
return
}
}
}()
}
}
// publish publishes many messages to topic, and returns the published message ids.
func publish(t *testing.T, ctx context.Context, topic *Topic) []string {
var published []string
msgs := make([]*Message, batchSize)
for i := 0; i < batches; i++ {
for j := 0; j < batchSize; j++ {
text := fmt.Sprintf("msg %02d-%02d", i, j)
msgs[j] = &Message{Data: []byte(text)}
}
ids, err := topic.Publish(ctx, msgs...)
if err != nil {
t.Errorf("Publish error: %v", err)
}
published = append(published, ids...)
}
return published
}
// diff returns counts of the differences between got and want.
func diff(got, want map[string]int) map[string]int {
ids := make(map[string]struct{})
for k, _ := range got {
ids[k] = struct{}{}
}
for k, _ := range want {
ids[k] = struct{}{}
}
gotWantCount := make(map[string]int)
for k, _ := range ids {
if got[k] == want[k] {
continue
}
desc := fmt.Sprintf("<got: %v ; want: %v>", got[k], want[k])
gotWantCount[desc] += 1
}
return gotWantCount
}
// TestEndToEnd pumps many messages into a topic and tests that they are all delivered to each subscription for the topic.
// It also tests that messages are not unexpectedly redelivered.
func TestEndToEnd(t *testing.T) {
if testing.Short() {
t.Skip("Integration tests skipped in short mode")
}
ctx := context.Background()
ts := testutil.TokenSource(ctx, ScopePubSub, ScopeCloudPlatform)
if ts == nil {
t.Skip("Integration tests skipped. See CONTRIBUTING.md for details")
}
now := time.Now()
topicName := fmt.Sprintf("endtoend-%d", now.Unix())
subPrefix := fmt.Sprintf("endtoend-%d", now.Unix())
client, err := NewClient(ctx, testutil.ProjID(), option.WithTokenSource(ts))
if err != nil {
t.Fatalf("Creating client error: %v", err)
}
var topic *Topic
if topic, err = client.NewTopic(ctx, topicName); err != nil {
t.Fatalf("CreateTopic error: %v", err)
}
defer topic.Delete(ctx)
// Three subscriptions to the same topic.
var subA, subB, subC *Subscription
if subA, err = client.NewSubscription(ctx, subPrefix+"-a", topic, ackDeadline, nil); err != nil {
t.Fatalf("CreateSub error: %v", err)
}
defer subA.Delete(ctx)
if subB, err = client.NewSubscription(ctx, subPrefix+"-b", topic, ackDeadline, nil); err != nil {
t.Fatalf("CreateSub error: %v", err)
}
defer subB.Delete(ctx)
if subC, err = client.NewSubscription(ctx, subPrefix+"-c", topic, ackDeadline, nil); err != nil {
t.Fatalf("CreateSub error: %v", err)
}
defer subC.Delete(ctx)
expectedCounts := make(map[string]int)
for _, id := range publish(t, ctx, topic) {
expectedCounts[id] = 1
}
// recv provides an indication that messages are still arriving.
recv := make(chan struct{})
// Keep track of the number of times each message (by message id) was
// seen from each subscription.
mcA := &messageCounter{counts: make(map[string]int), recv: recv}
mcB := &messageCounter{counts: make(map[string]int), recv: recv}
mcC := &messageCounter{counts: make(map[string]int), recv: recv}
stopC := make(chan struct{})
// We have three subscriptions to our topic.
// Each subscription will get a copy of each pulished message.
//
// subA has just one iterator, while subB has two. The subB iterators
// will each process roughly half of the messages for subB. All of
// these iterators live until all messages have been consumed. subC is
// processed by a series of short-lived iterators.
var wg sync.WaitGroup
con := &consumer{
concurrencyPerIterator: 1,
iteratorsInFlight: 2,
lifetimes: immortal,
}
con.consume(t, ctx, subA, mcA, &wg, stopC)
con = &consumer{
concurrencyPerIterator: 1,
iteratorsInFlight: 2,
lifetimes: immortal,
}
con.consume(t, ctx, subB, mcB, &wg, stopC)
con = &consumer{
concurrencyPerIterator: 1,
iteratorsInFlight: 2,
lifetimes: &explicitLifetimes{
lifetimes: []time.Duration{ackDeadline, ackDeadline, ackDeadline / 2, ackDeadline / 2},
},
}
con.consume(t, ctx, subC, mcC, &wg, stopC)
go func() {
timeoutC := time.After(timeout)
// Every time this ticker ticks, we will check if we have received any
// messages since the last time it ticked. We check less frequently
// than the ack deadline, so that we can detect if messages are
// redelivered after having their ack deadline extended.
checkQuiescence := time.NewTicker(ackDeadline * 3)
defer checkQuiescence.Stop()
var received bool
for {
select {
case <-recv:
received = true
case <-checkQuiescence.C:
if received {
received = false
} else {
close(stopC)
return
}
case <-timeoutC:
t.Errorf("timed out")
close(stopC)
return
}
}
}()
wg.Wait()
for _, mc := range []*messageCounter{mcA, mcB, mcC} {
if got, want := mc.counts, expectedCounts; !reflect.DeepEqual(got, want) {
t.Errorf("message counts: %v\n", diff(got, want))
}
}
}