| // Copyright 2020 Google LLC |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // https://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| |
| package wire |
| |
| import ( |
| "context" |
| "errors" |
| "reflect" |
| "testing" |
| "time" |
| |
| "cloud.google.com/go/internal/testutil" |
| "cloud.google.com/go/pubsublite/internal/test" |
| "golang.org/x/xerrors" |
| "google.golang.org/grpc" |
| "google.golang.org/grpc/codes" |
| "google.golang.org/grpc/status" |
| |
| vkit "cloud.google.com/go/pubsublite/apiv1" |
| pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1" |
| ) |
| |
| const defaultStreamTimeout = 30 * time.Second |
| |
| var errInvalidInitialResponse = errors.New("invalid initial response") |
| |
| // testStreamHandler is a simplified publisher service that owns a |
| // retryableStream. |
| type testStreamHandler struct { |
| Topic topicPartition |
| InitialReq *pb.PublishRequest |
| Stream *retryableStream |
| |
| t *testing.T |
| statuses chan streamStatus |
| responses chan interface{} |
| pubClient *vkit.PublisherClient |
| } |
| |
| func newTestStreamHandler(t *testing.T, timeout time.Duration) *testStreamHandler { |
| ctx := context.Background() |
| pubClient, err := newPublisherClient(ctx, "ignored", testServer.ClientConn()) |
| if err != nil { |
| t.Fatal(err) |
| } |
| |
| topic := topicPartition{Path: "path/to/topic", Partition: 1} |
| sh := &testStreamHandler{ |
| Topic: topic, |
| InitialReq: initPubReq(topic), |
| t: t, |
| statuses: make(chan streamStatus, 3), |
| responses: make(chan interface{}, 1), |
| pubClient: pubClient, |
| } |
| sh.Stream = newRetryableStream(ctx, sh, timeout, reflect.TypeOf(pb.PublishResponse{})) |
| return sh |
| } |
| |
| func (sh *testStreamHandler) NextStatus() streamStatus { |
| select { |
| case status := <-sh.statuses: |
| return status |
| case <-time.After(defaultStreamTimeout): |
| sh.t.Errorf("Stream did not change state within %v", defaultStreamTimeout) |
| return streamUninitialized |
| } |
| } |
| |
| func (sh *testStreamHandler) NextResponse() interface{} { |
| select { |
| case response := <-sh.responses: |
| return response |
| case <-time.After(defaultStreamTimeout): |
| sh.t.Errorf("Stream did not receive response within %v", defaultStreamTimeout) |
| return nil |
| } |
| } |
| |
| func (sh *testStreamHandler) newStream(ctx context.Context) (grpc.ClientStream, error) { |
| return sh.pubClient.Publish(ctx) |
| } |
| |
| func (sh *testStreamHandler) validateInitialResponse(response interface{}) error { |
| pubResponse, _ := response.(*pb.PublishResponse) |
| if pubResponse.GetInitialResponse() == nil { |
| return errInvalidInitialResponse |
| } |
| return nil |
| } |
| |
| func (sh *testStreamHandler) initialRequest() (interface{}, initialResponseRequired) { |
| return sh.InitialReq, initialResponseRequired(true) |
| } |
| |
| func (sh *testStreamHandler) onStreamStatusChange(status streamStatus) { |
| sh.statuses <- status |
| |
| // Close connections. |
| if status == streamTerminated { |
| sh.pubClient.Close() |
| } |
| } |
| |
| func (sh *testStreamHandler) onResponse(response interface{}) { |
| sh.responses <- response |
| } |
| |
| func TestRetryableStreamStartOnce(t *testing.T) { |
| pub := newTestStreamHandler(t, defaultStreamTimeout) |
| |
| verifiers := test.NewVerifiers(t) |
| stream := test.NewRPCVerifier(t) |
| stream.Push(pub.InitialReq, initPubResp(), nil) |
| verifiers.AddPublishStream(pub.Topic.Path, pub.Topic.Partition, stream) |
| |
| mockServer.OnTestStart(verifiers) |
| defer mockServer.OnTestEnd() |
| |
| // Ensure that new streams are not opened if the publisher is started twice |
| // (note: only 1 stream verifier was added to the mock server above). |
| pub.Stream.Start() |
| pub.Stream.Start() |
| pub.Stream.Start() |
| if got, want := pub.NextStatus(), streamReconnecting; got != want { |
| t.Errorf("Stream status change: got %d, want %d", got, want) |
| } |
| if got, want := pub.NextStatus(), streamConnected; got != want { |
| t.Errorf("Stream status change: got %d, want %d", got, want) |
| } |
| |
| pub.Stream.Stop() |
| if got, want := pub.NextStatus(), streamTerminated; got != want { |
| t.Errorf("Stream status change: got %d, want %d", got, want) |
| } |
| if gotErr := pub.Stream.Error(); gotErr != nil { |
| t.Errorf("Stream final err: got (%v), want <nil>", gotErr) |
| } |
| } |
| |
| func TestRetryableStreamStopWhileConnecting(t *testing.T) { |
| pub := newTestStreamHandler(t, defaultStreamTimeout) |
| |
| verifiers := test.NewVerifiers(t) |
| stream := test.NewRPCVerifier(t) |
| barrier := stream.PushWithBarrier(pub.InitialReq, initPubResp(), nil) |
| verifiers.AddPublishStream(pub.Topic.Path, pub.Topic.Partition, stream) |
| |
| mockServer.OnTestStart(verifiers) |
| defer mockServer.OnTestEnd() |
| |
| pub.Stream.Start() |
| if got, want := pub.NextStatus(), streamReconnecting; got != want { |
| t.Errorf("Stream status change: got %d, want %d", got, want) |
| } |
| |
| barrier.Release() |
| pub.Stream.Stop() |
| |
| // The stream should transition to terminated and the client stream should be |
| // discarded. |
| if got, want := pub.NextStatus(), streamTerminated; got != want { |
| t.Errorf("Stream status change: got %d, want %d", got, want) |
| } |
| if pub.Stream.currentStream() != nil { |
| t.Error("Client stream should be nil") |
| } |
| if gotErr := pub.Stream.Error(); gotErr != nil { |
| t.Errorf("Stream final err: got (%v), want <nil>", gotErr) |
| } |
| } |
| |
| func TestRetryableStreamStopAbortsRetries(t *testing.T) { |
| pub := newTestStreamHandler(t, defaultStreamTimeout) |
| |
| verifiers := test.NewVerifiers(t) |
| stream := test.NewRPCVerifier(t) |
| // Aborted is a retryable error, but the stream should not be retried because |
| // the publisher is stopped. |
| barrier := stream.PushWithBarrier(pub.InitialReq, nil, status.Error(codes.Aborted, "abort retry")) |
| verifiers.AddPublishStream(pub.Topic.Path, pub.Topic.Partition, stream) |
| |
| mockServer.OnTestStart(verifiers) |
| defer mockServer.OnTestEnd() |
| |
| pub.Stream.Start() |
| if got, want := pub.NextStatus(), streamReconnecting; got != want { |
| t.Errorf("Stream status change: got %d, want %d", got, want) |
| } |
| |
| barrier.Release() |
| pub.Stream.Stop() |
| |
| // The stream should transition to terminated and the client stream should be |
| // discarded. |
| if got, want := pub.NextStatus(), streamTerminated; got != want { |
| t.Errorf("Stream status change: got %d, want %d", got, want) |
| } |
| if pub.Stream.currentStream() != nil { |
| t.Error("Client stream should be nil") |
| } |
| if gotErr := pub.Stream.Error(); gotErr != nil { |
| t.Errorf("Stream final err: got (%v), want <nil>", gotErr) |
| } |
| } |
| |
| func TestRetryableStreamConnectRetries(t *testing.T) { |
| pub := newTestStreamHandler(t, defaultStreamTimeout) |
| |
| verifiers := test.NewVerifiers(t) |
| |
| // First 2 errors are retryable. |
| stream1 := test.NewRPCVerifier(t) |
| stream1.Push(pub.InitialReq, nil, status.Error(codes.Unavailable, "server unavailable")) |
| verifiers.AddPublishStream(pub.Topic.Path, pub.Topic.Partition, stream1) |
| |
| stream2 := test.NewRPCVerifier(t) |
| stream2.Push(pub.InitialReq, nil, status.Error(codes.Internal, "internal")) |
| verifiers.AddPublishStream(pub.Topic.Path, pub.Topic.Partition, stream2) |
| |
| // Third stream should succeed. |
| stream3 := test.NewRPCVerifier(t) |
| stream3.Push(pub.InitialReq, initPubResp(), nil) |
| verifiers.AddPublishStream(pub.Topic.Path, pub.Topic.Partition, stream3) |
| |
| mockServer.OnTestStart(verifiers) |
| defer mockServer.OnTestEnd() |
| |
| pub.Stream.Start() |
| if got, want := pub.NextStatus(), streamReconnecting; got != want { |
| t.Errorf("Stream status change: got %d, want %d", got, want) |
| } |
| if got, want := pub.NextStatus(), streamConnected; got != want { |
| t.Errorf("Stream status change: got %d, want %d", got, want) |
| } |
| |
| pub.Stream.Stop() |
| if got, want := pub.NextStatus(), streamTerminated; got != want { |
| t.Errorf("Stream status change: got %d, want %d", got, want) |
| } |
| } |
| |
| func TestRetryableStreamConnectPermanentFailure(t *testing.T) { |
| pub := newTestStreamHandler(t, defaultStreamTimeout) |
| permanentErr := status.Error(codes.PermissionDenied, "denied") |
| |
| verifiers := test.NewVerifiers(t) |
| // The stream connection results in a non-retryable error, so the publisher |
| // cannot start. |
| stream := test.NewRPCVerifier(t) |
| stream.Push(pub.InitialReq, nil, permanentErr) |
| verifiers.AddPublishStream(pub.Topic.Path, pub.Topic.Partition, stream) |
| |
| mockServer.OnTestStart(verifiers) |
| defer mockServer.OnTestEnd() |
| |
| pub.Stream.Start() |
| if got, want := pub.NextStatus(), streamReconnecting; got != want { |
| t.Errorf("Stream status change: got %d, want %d", got, want) |
| } |
| if got, want := pub.NextStatus(), streamTerminated; got != want { |
| t.Errorf("Stream status change: got %d, want %d", got, want) |
| } |
| if pub.Stream.currentStream() != nil { |
| t.Error("Client stream should be nil") |
| } |
| if gotErr := pub.Stream.Error(); !test.ErrorEqual(gotErr, permanentErr) { |
| t.Errorf("Stream final err: got (%v), want (%v)", gotErr, permanentErr) |
| } |
| } |
| |
| func TestRetryableStreamConnectTimeout(t *testing.T) { |
| // Set a very low timeout to ensure no retries. |
| timeout := time.Millisecond |
| pub := newTestStreamHandler(t, timeout) |
| pub.Stream.initTimeout = defaultInitTimeout |
| wantErr := status.Error(codes.DeadlineExceeded, "timeout") |
| |
| verifiers := test.NewVerifiers(t) |
| stream := test.NewRPCVerifier(t) |
| barrier := stream.PushWithBarrier(pub.InitialReq, nil, wantErr) |
| verifiers.AddPublishStream(pub.Topic.Path, pub.Topic.Partition, stream) |
| |
| mockServer.OnTestStart(verifiers) |
| defer mockServer.OnTestEnd() |
| |
| pub.Stream.Start() |
| if got, want := pub.NextStatus(), streamReconnecting; got != want { |
| t.Errorf("Stream status change: got %d, want %d", got, want) |
| } |
| |
| // Send the initial server response well after the timeout setting. |
| time.Sleep(10 * timeout) |
| barrier.Release() |
| |
| if got, want := pub.NextStatus(), streamTerminated; got != want { |
| t.Errorf("Stream status change: got %d, want %d", got, want) |
| } |
| if pub.Stream.currentStream() != nil { |
| t.Error("Client stream should be nil") |
| } |
| if gotErr := pub.Stream.Error(); !xerrors.Is(gotErr, ErrBackendUnavailable) { |
| t.Errorf("Stream final err: got (%v), want (%v)", gotErr, ErrBackendUnavailable) |
| } |
| } |
| |
| func TestRetryableStreamInitTimeout(t *testing.T) { |
| const streamInitTimeout = 50 * time.Millisecond |
| const streamResponseDelay = 75 * time.Millisecond |
| |
| pub := newTestStreamHandler(t, defaultStreamTimeout) |
| pub.Stream.initTimeout = streamInitTimeout |
| |
| verifiers := test.NewVerifiers(t) |
| |
| // First stream will have a delayed response. |
| stream1 := test.NewRPCVerifier(t) |
| barrier := stream1.PushWithBarrier(pub.InitialReq, initPubResp(), nil) |
| verifiers.AddPublishStream(pub.Topic.Path, pub.Topic.Partition, stream1) |
| |
| // Second stream should succeed. |
| stream2 := test.NewRPCVerifier(t) |
| stream2.Push(pub.InitialReq, initPubResp(), nil) |
| verifiers.AddPublishStream(pub.Topic.Path, pub.Topic.Partition, stream2) |
| |
| mockServer.OnTestStart(verifiers) |
| defer mockServer.OnTestEnd() |
| |
| pub.Stream.Start() |
| if got, want := pub.NextStatus(), streamReconnecting; got != want { |
| t.Errorf("Stream status change: got %d, want %d", got, want) |
| } |
| |
| barrier.ReleaseAfter(func() { |
| time.Sleep(streamResponseDelay) |
| }) |
| if got, want := pub.NextStatus(), streamConnected; got != want { |
| t.Errorf("Stream status change: got %d, want %d", got, want) |
| } |
| |
| pub.Stream.Stop() |
| if got, want := pub.NextStatus(), streamTerminated; got != want { |
| t.Errorf("Stream status change: got %d, want %d", got, want) |
| } |
| } |
| |
| func TestRetryableStreamSendReceive(t *testing.T) { |
| pub := newTestStreamHandler(t, defaultStreamTimeout) |
| req := msgPubReq(&pb.PubSubMessage{Data: []byte("msg")}) |
| wantResp := msgPubResp(5) |
| |
| verifiers := test.NewVerifiers(t) |
| stream := test.NewRPCVerifier(t) |
| barrier := stream.PushWithBarrier(pub.InitialReq, initPubResp(), nil) |
| stream.Push(req, wantResp, nil) |
| verifiers.AddPublishStream(pub.Topic.Path, pub.Topic.Partition, stream) |
| |
| mockServer.OnTestStart(verifiers) |
| defer mockServer.OnTestEnd() |
| |
| pub.Stream.Start() |
| if got, want := pub.NextStatus(), streamReconnecting; got != want { |
| t.Errorf("Stream status change: got %d, want %d", got, want) |
| } |
| |
| // While the stream is reconnecting, requests are discarded. |
| if got, want := pub.Stream.Send(req), false; got != want { |
| t.Errorf("Stream send: got %v, want %v", got, want) |
| } |
| |
| barrier.Release() |
| if got, want := pub.NextStatus(), streamConnected; got != want { |
| t.Errorf("Stream status change: got %d, want %d", got, want) |
| } |
| |
| if got, want := pub.Stream.Send(req), true; got != want { |
| t.Errorf("Stream send: got %v, want %v", got, want) |
| } |
| if gotResp := pub.NextResponse(); !testutil.Equal(gotResp, wantResp) { |
| t.Errorf("Stream response: got %v, want %v", gotResp, wantResp) |
| } |
| |
| pub.Stream.Stop() |
| if got, want := pub.NextStatus(), streamTerminated; got != want { |
| t.Errorf("Stream status change: got %d, want %d", got, want) |
| } |
| if gotErr := pub.Stream.Error(); gotErr != nil { |
| t.Errorf("Stream final err: got (%v), want <nil>", gotErr) |
| } |
| } |
| |
| func TestRetryableStreamConnectReceivesResetSignal(t *testing.T) { |
| pub := newTestStreamHandler(t, defaultStreamTimeout) |
| |
| verifiers := test.NewVerifiers(t) |
| |
| stream1 := test.NewRPCVerifier(t) |
| // Reset signal received during stream initialization. |
| stream1.Push(pub.InitialReq, nil, makeStreamResetSignal()) |
| verifiers.AddPublishStream(pub.Topic.Path, pub.Topic.Partition, stream1) |
| |
| stream2 := test.NewRPCVerifier(t) |
| stream2.Push(pub.InitialReq, initPubResp(), nil) |
| verifiers.AddPublishStream(pub.Topic.Path, pub.Topic.Partition, stream2) |
| |
| mockServer.OnTestStart(verifiers) |
| defer mockServer.OnTestEnd() |
| |
| pub.Stream.Start() |
| if got, want := pub.NextStatus(), streamReconnecting; got != want { |
| t.Errorf("Stream status change: got %d, want %d", got, want) |
| } |
| if got, want := pub.NextStatus(), streamResetState; got != want { |
| t.Errorf("Stream status change: got %d, want %d", got, want) |
| } |
| if got, want := pub.NextStatus(), streamConnected; got != want { |
| t.Errorf("Stream status change: got %d, want %d", got, want) |
| } |
| |
| pub.Stream.Stop() |
| if got, want := pub.NextStatus(), streamTerminated; got != want { |
| t.Errorf("Stream status change: got %d, want %d", got, want) |
| } |
| if gotErr := pub.Stream.Error(); gotErr != nil { |
| t.Errorf("Stream final err: got (%v), want <nil>", gotErr) |
| } |
| } |
| |
| func TestRetryableStreamDisconnectedWithResetSignal(t *testing.T) { |
| pub := newTestStreamHandler(t, defaultStreamTimeout) |
| |
| verifiers := test.NewVerifiers(t) |
| |
| stream1 := test.NewRPCVerifier(t) |
| stream1.Push(pub.InitialReq, initPubResp(), nil) |
| // Reset signal received after stream is connected. |
| stream1.Push(nil, nil, makeStreamResetSignal()) |
| verifiers.AddPublishStream(pub.Topic.Path, pub.Topic.Partition, stream1) |
| |
| stream2 := test.NewRPCVerifier(t) |
| stream2.Push(pub.InitialReq, initPubResp(), nil) |
| verifiers.AddPublishStream(pub.Topic.Path, pub.Topic.Partition, stream2) |
| |
| mockServer.OnTestStart(verifiers) |
| defer mockServer.OnTestEnd() |
| |
| pub.Stream.Start() |
| if got, want := pub.NextStatus(), streamReconnecting; got != want { |
| t.Errorf("Stream status change: got %d, want %d", got, want) |
| } |
| if got, want := pub.NextStatus(), streamConnected; got != want { |
| t.Errorf("Stream status change: got %d, want %d", got, want) |
| } |
| if got, want := pub.NextStatus(), streamReconnecting; got != want { |
| t.Errorf("Stream status change: got %d, want %d", got, want) |
| } |
| if got, want := pub.NextStatus(), streamResetState; got != want { |
| t.Errorf("Stream status change: got %d, want %d", got, want) |
| } |
| if got, want := pub.NextStatus(), streamConnected; got != want { |
| t.Errorf("Stream status change: got %d, want %d", got, want) |
| } |
| |
| pub.Stream.Stop() |
| if got, want := pub.NextStatus(), streamTerminated; got != want { |
| t.Errorf("Stream status change: got %d, want %d", got, want) |
| } |
| if gotErr := pub.Stream.Error(); gotErr != nil { |
| t.Errorf("Stream final err: got (%v), want <nil>", gotErr) |
| } |
| } |