blob: 6ed4fb64546a663801e9d785923724e2ef656908 [file] [log] [blame]
// Copyright 2017 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pubsub
import (
"context"
"errors"
"fmt"
"reflect"
"sync"
"sync/atomic"
"testing"
"time"
"cloud.google.com/go/internal/testutil"
"cloud.google.com/go/pubsub/pstest"
"google.golang.org/api/option"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
var (
projName = "some-project"
topicName = "some-topic"
fullyQualifiedTopicName = fmt.Sprintf("projects/%s/topics/%s", projName, topicName)
)
func TestSplitRequestIDs(t *testing.T) {
t.Parallel()
ids := []string{"aaaa", "bbbb", "cccc", "dddd", "eeee"}
for _, test := range []struct {
ids []string
splitIndex int
}{
{[]string{}, 0},
{ids, 2},
{ids[:2], 2},
} {
got1, got2 := splitRequestIDs(test.ids, reqFixedOverhead+20)
want1, want2 := test.ids[:test.splitIndex], test.ids[test.splitIndex:]
if !testutil.Equal(got1, want1) {
t.Errorf("%v, 1: got %v, want %v", test, got1, want1)
}
if !testutil.Equal(got2, want2) {
t.Errorf("%v, 2: got %v, want %v", test, got2, want2)
}
}
}
func TestAckDistribution(t *testing.T) {
if testing.Short() {
t.SkipNow()
}
t.Skip("broken")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
minAckDeadline = 1 * time.Second
pstest.SetMinAckDeadline(minAckDeadline)
srv := pstest.NewServer()
defer srv.Close()
defer pstest.ResetMinAckDeadline()
// Create the topic via a Publish. It's convenient to do it here as opposed to client.CreateTopic because the client
// has not been established yet, and also because we want to create the topic once whereas the client is established
// below twice.
srv.Publish(fullyQualifiedTopicName, []byte("creating a topic"), nil)
queuedMsgs := make(chan int32, 1024)
go continuouslySend(ctx, srv, queuedMsgs)
for _, testcase := range []struct {
initialProcessSecs int32
finalProcessSecs int32
}{
{initialProcessSecs: 3, finalProcessSecs: 5}, // Process time goes up
{initialProcessSecs: 5, finalProcessSecs: 3}, // Process time goes down
} {
t.Logf("Testing %d -> %d", testcase.initialProcessSecs, testcase.finalProcessSecs)
// processTimeSecs is used by the sender to coordinate with the receiver how long the receiver should
// pretend to process for. e.g. if we test 3s -> 5s, processTimeSecs will start at 3, causing receiver
// to process messages received for 3s while sender sends the first batch. Then, as sender begins to
// send the next batch, sender will swap processTimeSeconds to 5s and begin sending, and receiver will
// process each message for 5s. In this way we simulate a client whose time-to-ack (process time) changes.
processTimeSecs := testcase.initialProcessSecs
s, client, err := initConn(ctx, srv.Addr)
if err != nil {
t.Fatal(err)
}
// recvdWg increments for each message sent, and decrements for each message received.
recvdWg := &sync.WaitGroup{}
go startReceiving(ctx, t, s, recvdWg, &processTimeSecs)
startSending(t, queuedMsgs, &processTimeSecs, testcase.initialProcessSecs, testcase.finalProcessSecs, recvdWg)
recvdWg.Wait()
time.Sleep(100 * time.Millisecond) // Wait a bit more for resources to clean up
err = client.Close()
if err != nil {
t.Fatal(err)
}
modacks := modacksByTime(srv.Messages())
u := modackDeadlines(modacks)
initialDL := int32(minAckDeadline / time.Second)
if !setsAreEqual(u, []int32{initialDL, testcase.initialProcessSecs, testcase.finalProcessSecs}) {
t.Fatalf("Expected modack deadlines to contain (exactly, and only) %ds, %ds, %ds. Instead, got %v",
initialDL, testcase.initialProcessSecs, testcase.finalProcessSecs, toSet(u))
}
}
}
// modacksByTime buckets modacks by time.
func modacksByTime(msgs []*pstest.Message) map[time.Time][]pstest.Modack {
modacks := map[time.Time][]pstest.Modack{}
for _, msg := range msgs {
for _, m := range msg.Modacks {
modacks[m.ReceivedAt] = append(modacks[m.ReceivedAt], m)
}
}
return modacks
}
// setsAreEqual reports whether a and b contain the same values, ignoring duplicates.
func setsAreEqual(haystack, needles []int32) bool {
hMap := map[int32]bool{}
nMap := map[int32]bool{}
for _, n := range needles {
nMap[n] = true
}
for _, n := range haystack {
hMap[n] = true
}
return reflect.DeepEqual(nMap, hMap)
}
// startReceiving pretends to be a client. It calls s.Receive and acks messages after some random delay. It also
// looks out for dupes - any message that arrives twice will cause a failure.
func startReceiving(ctx context.Context, t *testing.T, s *Subscription, recvdWg *sync.WaitGroup, processTimeSecs *int32) {
t.Log("Receiving..")
var recvdMu sync.Mutex
recvd := map[string]bool{}
err := s.Receive(ctx, func(ctx context.Context, msg *Message) {
msgData := string(msg.Data)
recvdMu.Lock()
_, ok := recvd[msgData]
if ok {
recvdMu.Unlock()
t.Fatalf("already saw \"%s\"\n", msgData)
return
}
recvd[msgData] = true
recvdMu.Unlock()
select {
case <-ctx.Done():
msg.Nack()
recvdWg.Done()
case <-time.After(time.Duration(atomic.LoadInt32(processTimeSecs)) * time.Second):
msg.Ack()
recvdWg.Done()
}
})
if err != nil {
if status.Code(err) != codes.Canceled {
t.Error(err)
}
}
}
// startSending sends four batches of messages broken up by minDeadline, initialProcessSecs, and finalProcessSecs.
func startSending(t *testing.T, queuedMsgs chan int32, processTimeSecs *int32, initialProcessSecs int32, finalProcessSecs int32, recvdWg *sync.WaitGroup) {
var msg int32
// We must send this block to force the receiver to send its initially-configured modack time. The time that
// gets sent should be ignorant of the distribution, since there haven't been enough (any, actually) messages
// to create a distribution yet.
t.Log("minAckDeadlineSecsSending an initial message")
recvdWg.Add(1)
msg++
queuedMsgs <- msg
<-time.After(minAckDeadline)
t.Logf("Sending some messages to update distribution to %d. This new distribution will be used "+
"when the next batch of messages go out.", initialProcessSecs)
for i := 0; i < 10; i++ {
recvdWg.Add(1)
msg++
queuedMsgs <- msg
}
atomic.SwapInt32(processTimeSecs, finalProcessSecs)
<-time.After(time.Duration(initialProcessSecs) * time.Second)
t.Logf("Sending many messages to update distribution to %d. This new distribution will be used "+
"when the next batch of messages go out.", finalProcessSecs)
for i := 0; i < 100; i++ {
recvdWg.Add(1)
msg++
queuedMsgs <- msg // Send many messages to drastically change distribution
}
<-time.After(time.Duration(finalProcessSecs) * time.Second)
t.Logf("Last message going out, whose deadline should be %d.", finalProcessSecs)
recvdWg.Add(1)
msg++
queuedMsgs <- msg
}
// continuouslySend continuously sends messages that exist on the queuedMsgs chan.
func continuouslySend(ctx context.Context, srv *pstest.Server, queuedMsgs chan int32) {
for {
select {
case <-ctx.Done():
return
case m := <-queuedMsgs:
srv.Publish(fullyQualifiedTopicName, []byte(fmt.Sprintf("message %d", m)), nil)
}
}
}
func toSet(arr []int32) []int32 {
var s []int32
m := map[int32]bool{}
for _, v := range arr {
_, ok := m[v]
if !ok {
s = append(s, v)
m[v] = true
}
}
return s
}
func initConn(ctx context.Context, addr string) (*Subscription, *Client, error) {
conn, err := grpc.Dial(addr, grpc.WithInsecure())
if err != nil {
return nil, nil, err
}
client, err := NewClient(ctx, projName, option.WithGRPCConn(conn))
if err != nil {
return nil, nil, err
}
topic := client.Topic(topicName)
s, err := client.CreateSubscription(ctx, fmt.Sprintf("sub-%d", time.Now().UnixNano()), SubscriptionConfig{Topic: topic})
if err != nil {
return nil, nil, err
}
exists, err := s.Exists(ctx)
if !exists {
return nil, nil, errors.New("Subscription does not exist")
}
if err != nil {
return nil, nil, err
}
return s, client, nil
}
// modackDeadlines takes a map of time => Modack, gathers all the Modack.AckDeadlines,
// and returns them as a slice
func modackDeadlines(m map[time.Time][]pstest.Modack) []int32 {
var u []int32
for _, vv := range m {
for _, v := range vv {
u = append(u, v.AckDeadline)
}
}
return u
}