blob: 018886f355f050f7518a96888787371a4f37a8be [file] [log] [blame]
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
package wire
import (
"context"
"testing"
"cloud.google.com/go/pubsublite/internal/test"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// testCommitter wraps a committer for ease of testing.
type testCommitter struct {
cmt *committer
serviceTestProxy
}
func newTestCommitter(t *testing.T, subscription subscriptionPartition, acks *ackTracker) *testCommitter {
ctx := context.Background()
cursorClient, err := newCursorClient(ctx, "ignored", testServer.ClientConn())
if err != nil {
t.Fatal(err)
}
tc := &testCommitter{
cmt: newCommitter(ctx, cursorClient, testReceiveSettings(), subscription, acks, true),
}
tc.initAndStart(t, tc.cmt, "Committer", cursorClient)
return tc
}
// SendBatchCommit invokes the periodic background batch commit. Note that the
// periodic task is disabled in tests.
func (tc *testCommitter) SendBatchCommit() {
tc.cmt.commitOffsetToStream()
}
func (tc *testCommitter) Terminate() {
tc.cmt.Terminate()
}
func (tc *testCommitter) BlockingReset() error {
return tc.cmt.BlockingReset()
}
func TestCommitterStreamReconnect(t *testing.T) {
subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-subs", 0}
ack1 := newAckConsumer(33, 0, nil)
ack2 := newAckConsumer(55, 0, nil)
acks := newAckTracker()
acks.Push(ack1)
acks.Push(ack2)
verifiers := test.NewVerifiers(t)
// Simulate a transient error that results in a reconnect.
stream1 := test.NewRPCVerifier(t)
stream1.Push(initCommitReq(subscription), initCommitResp(), nil)
barrier := stream1.PushWithBarrier(commitReq(34), nil, status.Error(codes.Unavailable, "server unavailable"))
verifiers.AddCommitStream(subscription.Path, subscription.Partition, stream1)
// When the stream reconnects, the latest commit offset should be sent to the
// server.
stream2 := test.NewRPCVerifier(t)
stream2.Push(initCommitReq(subscription), initCommitResp(), nil)
stream2.Push(commitReq(56), commitResp(1), nil)
verifiers.AddCommitStream(subscription.Path, subscription.Partition, stream2)
mockServer.OnTestStart(verifiers)
defer mockServer.OnTestEnd()
cmt := newTestCommitter(t, subscription, acks)
if gotErr := cmt.StartError(); gotErr != nil {
t.Errorf("Start() got err: (%v)", gotErr)
}
// Send 2 commits.
ack1.Ack()
cmt.SendBatchCommit()
ack2.Ack()
cmt.SendBatchCommit()
// Then send the retryable error, which results in reconnect.
barrier.Release()
cmt.StopVerifyNoError()
}
func TestCommitterStopFlushesCommits(t *testing.T) {
subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-subs", 0}
ack1 := newAckConsumer(33, 0, nil)
ack2 := newAckConsumer(55, 0, nil)
acks := newAckTracker()
acks.Push(ack1)
acks.Push(ack2)
verifiers := test.NewVerifiers(t)
stream := test.NewRPCVerifier(t)
stream.Push(initCommitReq(subscription), initCommitResp(), nil)
stream.Push(commitReq(34), commitResp(1), nil)
stream.Push(commitReq(56), commitResp(1), nil)
verifiers.AddCommitStream(subscription.Path, subscription.Partition, stream)
mockServer.OnTestStart(verifiers)
defer mockServer.OnTestEnd()
cmt := newTestCommitter(t, subscription, acks)
if gotErr := cmt.StartError(); gotErr != nil {
t.Errorf("Start() got err: (%v)", gotErr)
}
ack1.Ack()
cmt.Stop() // Stop should flush the first offset
ack2.Ack() // Acks after Stop() are processed
cmt.SendBatchCommit()
// Committer terminates when all acks are processed.
if gotErr := cmt.FinalError(); gotErr != nil {
t.Errorf("Final err: (%v), want: <nil>", gotErr)
}
}
func TestCommitterTerminateDiscardsOutstandingAcks(t *testing.T) {
subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-subs", 0}
ack1 := newAckConsumer(33, 0, nil)
ack2 := newAckConsumer(55, 0, nil)
acks := newAckTracker()
acks.Push(ack1)
acks.Push(ack2)
verifiers := test.NewVerifiers(t)
stream := test.NewRPCVerifier(t)
stream.Push(initCommitReq(subscription), initCommitResp(), nil)
stream.Push(commitReq(34), commitResp(1), nil)
verifiers.AddCommitStream(subscription.Path, subscription.Partition, stream)
mockServer.OnTestStart(verifiers)
defer mockServer.OnTestEnd()
cmt := newTestCommitter(t, subscription, acks)
if gotErr := cmt.StartError(); gotErr != nil {
t.Errorf("Start() got err: (%v)", gotErr)
}
ack1.Ack()
cmt.Terminate() // Terminate should flush the first offset
ack2.Ack() // Acks after Terminate() are discarded
cmt.SendBatchCommit() // Should do nothing (server does not expect second commit)
if gotErr := cmt.FinalError(); gotErr != nil {
t.Errorf("Final err: (%v), want: <nil>", gotErr)
}
}
func TestCommitterStopThenTerminateDiscardsOutstandingAcks(t *testing.T) {
subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-subs", 0}
ack := newAckConsumer(33, 0, nil)
acks := newAckTracker()
acks.Push(ack)
verifiers := test.NewVerifiers(t)
stream := test.NewRPCVerifier(t)
stream.Push(initCommitReq(subscription), initCommitResp(), nil)
// No commits expected.
verifiers.AddCommitStream(subscription.Path, subscription.Partition, stream)
mockServer.OnTestStart(verifiers)
defer mockServer.OnTestEnd()
cmt := newTestCommitter(t, subscription, acks)
if gotErr := cmt.StartError(); gotErr != nil {
t.Errorf("Start() got err: (%v)", gotErr)
}
cmt.Stop() // Stop waits for outstanding acks
cmt.Terminate() // Terminate should discard all outstanding acks
if gotErr := cmt.FinalError(); gotErr != nil {
t.Errorf("Final err: (%v), want: <nil>", gotErr)
}
}
func TestCommitterPermanentStreamError(t *testing.T) {
subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-subs", 0}
acks := newAckTracker()
wantErr := status.Error(codes.FailedPrecondition, "failed")
verifiers := test.NewVerifiers(t)
stream := test.NewRPCVerifier(t)
stream.Push(initCommitReq(subscription), nil, wantErr)
verifiers.AddCommitStream(subscription.Path, subscription.Partition, stream)
mockServer.OnTestStart(verifiers)
defer mockServer.OnTestEnd()
cmt := newTestCommitter(t, subscription, acks)
if gotErr := cmt.StartError(); !test.ErrorEqual(gotErr, wantErr) {
t.Errorf("Start() got err: (%v), want: (%v)", gotErr, wantErr)
}
}
func TestCommitterInvalidInitialResponse(t *testing.T) {
subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-subs", 0}
acks := newAckTracker()
verifiers := test.NewVerifiers(t)
stream := test.NewRPCVerifier(t)
stream.Push(initCommitReq(subscription), commitResp(1234), nil) // Invalid initial response
verifiers.AddCommitStream(subscription.Path, subscription.Partition, stream)
mockServer.OnTestStart(verifiers)
defer mockServer.OnTestEnd()
cmt := newTestCommitter(t, subscription, acks)
wantErr := errInvalidInitialCommitResponse
if gotErr := cmt.StartError(); !test.ErrorEqual(gotErr, wantErr) {
t.Errorf("Start() got err: (%v), want: (%v)", gotErr, wantErr)
}
if gotErr := cmt.FinalError(); !test.ErrorEqual(gotErr, wantErr) {
t.Errorf("Final err: (%v), want: (%v)", gotErr, wantErr)
}
}
func TestCommitterInvalidCommitResponse(t *testing.T) {
subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-subs", 0}
ack := newAckConsumer(33, 0, nil)
acks := newAckTracker()
acks.Push(ack)
verifiers := test.NewVerifiers(t)
stream := test.NewRPCVerifier(t)
stream.Push(initCommitReq(subscription), initCommitResp(), nil)
stream.Push(commitReq(34), initCommitResp(), nil) // Invalid commit response
verifiers.AddCommitStream(subscription.Path, subscription.Partition, stream)
mockServer.OnTestStart(verifiers)
defer mockServer.OnTestEnd()
cmt := newTestCommitter(t, subscription, acks)
if gotErr := cmt.StartError(); gotErr != nil {
t.Errorf("Start() got err: (%v)", gotErr)
}
ack.Ack()
cmt.SendBatchCommit()
if gotErr, wantErr := cmt.FinalError(), errInvalidCommitResponse; !test.ErrorEqual(gotErr, wantErr) {
t.Errorf("Final err: (%v), want: (%v)", gotErr, wantErr)
}
}
func TestCommitterExcessConfirmedOffsets(t *testing.T) {
subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-subs", 0}
ack := newAckConsumer(33, 0, nil)
acks := newAckTracker()
acks.Push(ack)
verifiers := test.NewVerifiers(t)
stream := test.NewRPCVerifier(t)
stream.Push(initCommitReq(subscription), initCommitResp(), nil)
stream.Push(commitReq(34), commitResp(2), nil) // More confirmed offsets than committed
verifiers.AddCommitStream(subscription.Path, subscription.Partition, stream)
mockServer.OnTestStart(verifiers)
defer mockServer.OnTestEnd()
cmt := newTestCommitter(t, subscription, acks)
if gotErr := cmt.StartError(); gotErr != nil {
t.Errorf("Start() got err: (%v)", gotErr)
}
ack.Ack()
cmt.SendBatchCommit()
wantMsg := "server acknowledged 2 cursor commits"
if gotErr := cmt.FinalError(); !test.ErrorHasMsg(gotErr, wantMsg) {
t.Errorf("Final err: (%v), want msg: (%v)", gotErr, wantMsg)
}
}
func TestCommitterZeroConfirmedOffsets(t *testing.T) {
subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-subs", 0}
ack := newAckConsumer(33, 0, nil)
acks := newAckTracker()
acks.Push(ack)
verifiers := test.NewVerifiers(t)
stream := test.NewRPCVerifier(t)
stream.Push(initCommitReq(subscription), initCommitResp(), nil)
stream.Push(commitReq(34), commitResp(0), nil) // Zero confirmed offsets (invalid)
verifiers.AddCommitStream(subscription.Path, subscription.Partition, stream)
mockServer.OnTestStart(verifiers)
defer mockServer.OnTestEnd()
cmt := newTestCommitter(t, subscription, acks)
if gotErr := cmt.StartError(); gotErr != nil {
t.Errorf("Start() got err: (%v)", gotErr)
}
ack.Ack()
cmt.SendBatchCommit()
wantMsg := "server acknowledged an invalid commit count"
if gotErr := cmt.FinalError(); !test.ErrorHasMsg(gotErr, wantMsg) {
t.Errorf("Final err: (%v), want msg: (%v)", gotErr, wantMsg)
}
}
func TestCommitterBlockingResetNormalCompletion(t *testing.T) {
subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-subs", 0}
ack1 := newAckConsumer(33, 0, nil)
ack2 := newAckConsumer(55, 0, nil)
acks := newAckTracker()
acks.Push(ack1)
acks.Push(ack2)
verifiers := test.NewVerifiers(t)
stream := test.NewRPCVerifier(t)
stream.Push(initCommitReq(subscription), initCommitResp(), nil)
barrier := stream.PushWithBarrier(commitReq(34), commitResp(1), nil)
verifiers.AddCommitStream(subscription.Path, subscription.Partition, stream)
mockServer.OnTestStart(verifiers)
defer mockServer.OnTestEnd()
cmt := newTestCommitter(t, subscription, acks)
if gotErr := cmt.StartError(); gotErr != nil {
t.Errorf("Start() got err: (%v)", gotErr)
}
ack1.Ack()
complete := test.NewCondition("blocking reset complete")
go func() {
if err := cmt.BlockingReset(); err != nil {
t.Errorf("BlockingReset() got err: (%v), want: <nil>", err)
}
cmt.BlockingReset()
complete.SetDone()
}()
complete.VerifyNotDone(t)
// Until the commit response is received, committer.BlockingReset should not
// return.
barrier.ReleaseAfter(func() {
complete.VerifyNotDone(t)
})
complete.WaitUntilDone(t, serviceTestWaitTimeout)
// Ack tracker should be reset.
if got, want := acks.CommitOffset(), nilCursorOffset; got != want {
t.Errorf("ackTracker.CommitOffset() got %v, want %v", got, want)
}
if got, want := acks.Empty(), true; got != want {
t.Errorf("ackTracker.Empty() got %v, want %v", got, want)
}
// This ack should have been discarded.
ack2.Ack()
// Calling committer.BlockingReset again should immediately return.
if err := cmt.BlockingReset(); err != nil {
t.Errorf("BlockingReset() got err: (%v), want: <nil>", err)
}
cmt.StopVerifyNoError()
}
func TestCommitterBlockingResetCommitterStopped(t *testing.T) {
subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-subs", 0}
ack1 := newAckConsumer(33, 0, nil)
ack2 := newAckConsumer(55, 0, nil)
acks := newAckTracker()
acks.Push(ack1)
acks.Push(ack2)
verifiers := test.NewVerifiers(t)
stream := test.NewRPCVerifier(t)
stream.Push(initCommitReq(subscription), initCommitResp(), nil)
barrier := stream.PushWithBarrier(commitReq(34), commitResp(1), nil)
verifiers.AddCommitStream(subscription.Path, subscription.Partition, stream)
mockServer.OnTestStart(verifiers)
defer mockServer.OnTestEnd()
cmt := newTestCommitter(t, subscription, acks)
if gotErr := cmt.StartError(); gotErr != nil {
t.Errorf("Start() got err: (%v)", gotErr)
}
ack1.Ack()
complete := test.NewCondition("blocking reset complete")
go func() {
if got, want := cmt.BlockingReset(), ErrServiceStopped; !test.ErrorEqual(got, want) {
t.Errorf("BlockingReset() got: (%v), want: (%v)", got, want)
}
complete.SetDone()
}()
complete.VerifyNotDone(t)
// committer.BlockingReset should return when the committer is stopped.
barrier.ReleaseAfter(func() {
cmt.Stop()
complete.WaitUntilDone(t, serviceTestWaitTimeout)
})
cmt.Terminate()
if gotErr := cmt.FinalError(); gotErr != nil {
t.Errorf("Final err: (%v), want: <nil>", gotErr)
}
}
func TestCommitterBlockingResetFatalError(t *testing.T) {
subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-subs", 0}
ack1 := newAckConsumer(33, 0, nil)
ack2 := newAckConsumer(55, 0, nil)
acks := newAckTracker()
acks.Push(ack1)
acks.Push(ack2)
serverErr := status.Error(codes.FailedPrecondition, "failed")
verifiers := test.NewVerifiers(t)
stream := test.NewRPCVerifier(t)
stream.Push(initCommitReq(subscription), initCommitResp(), nil)
stream.Push(commitReq(34), nil, serverErr)
verifiers.AddCommitStream(subscription.Path, subscription.Partition, stream)
mockServer.OnTestStart(verifiers)
defer mockServer.OnTestEnd()
cmt := newTestCommitter(t, subscription, acks)
if gotErr := cmt.StartError(); gotErr != nil {
t.Errorf("Start() got err: (%v)", gotErr)
}
ack1.Ack()
complete := test.NewCondition("blocking reset complete")
go func() {
if got, want := cmt.BlockingReset(), ErrServiceStopped; !test.ErrorEqual(got, want) {
t.Errorf("BlockingReset() got: (%v), want: (%v)", got, want)
}
complete.SetDone()
}()
// committer.BlockingReset should return when the committer terminates due to
// fatal server error.
complete.WaitUntilDone(t, serviceTestWaitTimeout)
if gotErr := cmt.FinalError(); !test.ErrorEqual(gotErr, serverErr) {
t.Errorf("Final err: (%v), want: (%v)", gotErr, serverErr)
}
}