blob: ad3ae010c826bdf64ea9853e6d81f3e01d820b04 [file] [log] [blame]
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
package wire
import (
"context"
"errors"
"sort"
"testing"
"time"
"cloud.google.com/go/internal/testutil"
"cloud.google.com/go/pubsublite/internal/test"
"github.com/google/uuid"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
)
func TestPartitionSet(t *testing.T) {
partitions := newPartitionSet(&pb.PartitionAssignment{
Partitions: []int64{8, 5, 8, 1},
})
wantPartitions := []int{1, 5, 8}
for _, partition := range wantPartitions {
if !partitions.Contains(partition) {
t.Errorf("Contains(%d) got false, want true", partition)
}
}
for _, partition := range []int{2, 3, 4, 6, 7} {
if partitions.Contains(partition) {
t.Errorf("Contains(%d) got true, want false", partition)
}
}
gotPartitions := partitions.Ints()
sort.Ints(gotPartitions)
if !testutil.Equal(gotPartitions, wantPartitions) {
t.Errorf("Ints() got %v, want %v", gotPartitions, wantPartitions)
}
}
var fakeUUID = [16]byte{'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '0', '1', '2', '3', '4', '5'}
func fakeGenerateUUID() (uuid.UUID, error) {
return fakeUUID, nil
}
// testAssigner wraps an assigner for ease of testing.
type testAssigner struct {
// Fake error to simulate receiver unable to handle assignment.
recvError error
t *testing.T
asn *assigner
partitions chan []int
serviceTestProxy
}
func newTestAssigner(t *testing.T, subscription string, recvErr error) *testAssigner {
ctx := context.Background()
assignmentClient, err := newPartitionAssignmentClient(ctx, "ignored", testServer.ClientConn())
if err != nil {
t.Fatal(err)
}
ta := &testAssigner{
recvError: recvErr,
t: t,
partitions: make(chan []int, 1),
}
asn, err := newAssigner(ctx, assignmentClient, fakeGenerateUUID, testReceiveSettings(), subscription, ta.receiveAssignment)
if err != nil {
t.Fatal(err)
}
ta.asn = asn
ta.initAndStart(t, ta.asn, "Assigner", assignmentClient)
return ta
}
func (ta *testAssigner) receiveAssignment(partitions partitionSet) error {
p := partitions.Ints()
sort.Ints(p)
ta.partitions <- p
if ta.recvError != nil {
return ta.recvError
}
return nil
}
func (ta *testAssigner) NextPartitions() []int {
select {
case <-time.After(serviceTestWaitTimeout):
ta.t.Errorf("%s partitions not received within %v", ta.name, serviceTestWaitTimeout)
return nil
case p := <-ta.partitions:
return p
}
}
func TestAssignerNoInitialResponse(t *testing.T) {
subscription := "projects/123456/locations/us-central1-b/subscriptions/my-subs"
verifiers := test.NewVerifiers(t)
stream := test.NewRPCVerifier(t)
barrier := stream.PushWithBarrier(initAssignmentReq(subscription, fakeUUID[:]), nil, nil)
verifiers.AddAssignmentStream(subscription, stream)
mockServer.OnTestStart(verifiers)
defer mockServer.OnTestEnd()
asn := newTestAssigner(t, subscription, nil)
// Assigner starts even though no initial response was received from the
// server.
if gotErr := asn.StartError(); gotErr != nil {
t.Errorf("Start() got err: (%v)", gotErr)
}
// To ensure test is deterministic, i.e. server must receive initial request
// before stopping the client.
barrier.Release()
asn.StopVerifyNoError()
}
func TestAssignerReconnect(t *testing.T) {
subscription := "projects/123456/locations/us-central1-b/subscriptions/my-subs"
permanentErr := status.Error(codes.FailedPrecondition, "failed")
verifiers := test.NewVerifiers(t)
// Simulate a transient error that results in a reconnect.
stream1 := test.NewRPCVerifier(t)
stream1.Push(initAssignmentReq(subscription, fakeUUID[:]), nil, status.Error(codes.Unavailable, "server unavailable"))
verifiers.AddAssignmentStream(subscription, stream1)
// Send 2 partition assignments before terminating with permanent error.
stream2 := test.NewRPCVerifier(t)
stream2.Push(initAssignmentReq(subscription, fakeUUID[:]), assignmentResp([]int64{3, 2, 4}), nil)
stream2.Push(assignmentAckReq(), assignmentResp([]int64{0, 3, 3}), nil)
stream2.Push(assignmentAckReq(), nil, permanentErr)
verifiers.AddAssignmentStream(subscription, stream2)
mockServer.OnTestStart(verifiers)
defer mockServer.OnTestEnd()
asn := newTestAssigner(t, subscription, nil)
if gotErr := asn.StartError(); gotErr != nil {
t.Errorf("Start() got err: (%v)", gotErr)
}
if got, want := asn.NextPartitions(), []int{2, 3, 4}; !testutil.Equal(got, want) {
t.Errorf("Partition assignment #1: got %v, want %v", got, want)
}
if got, want := asn.NextPartitions(), []int{0, 3}; !testutil.Equal(got, want) {
t.Errorf("Partition assignment #2: got %v, want %v", got, want)
}
if gotErr, wantErr := asn.FinalError(), permanentErr; !test.ErrorEqual(gotErr, wantErr) {
t.Errorf("Final err: (%v), want: (%v)", gotErr, wantErr)
}
}
func TestAssignerHandlePartitionFailure(t *testing.T) {
subscription := "projects/123456/locations/us-central1-b/subscriptions/my-subs"
verifiers := test.NewVerifiers(t)
stream := test.NewRPCVerifier(t)
stream.Push(initAssignmentReq(subscription, fakeUUID[:]), assignmentResp([]int64{1, 2}), nil)
verifiers.AddAssignmentStream(subscription, stream)
mockServer.OnTestStart(verifiers)
defer mockServer.OnTestEnd()
// Simulates the assigningSubscriber discarding assignments.
wantErr := errors.New("subscriber shutting down")
asn := newTestAssigner(t, subscription, wantErr)
if gotErr := asn.FinalError(); !test.ErrorEqual(gotErr, wantErr) {
t.Errorf("Final err: (%v), want: (%v)", gotErr, wantErr)
}
if got, want := asn.NextPartitions(), []int{1, 2}; !testutil.Equal(got, want) {
t.Errorf("Partition assignments: got %v, want %v", got, want)
}
}