| // Copyright 2022 Google LLC |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // https://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package managedwriter |
| |
| import ( |
| "context" |
| "errors" |
| "fmt" |
| "io" |
| "testing" |
| "time" |
| |
| "cloud.google.com/go/bigquery/storage/apiv1/storagepb" |
| "github.com/googleapis/gax-go/v2" |
| "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1" |
| statuspb "google.golang.org/genproto/googleapis/rpc/status" |
| "google.golang.org/grpc" |
| "google.golang.org/grpc/codes" |
| "google.golang.org/grpc/status" |
| ) |
| |
| func TestConnection_OpenWithRetry(t *testing.T) { |
| |
| testCases := []struct { |
| desc string |
| errors []error |
| wantFail bool |
| }{ |
| { |
| desc: "no error", |
| errors: []error{nil}, |
| wantFail: false, |
| }, |
| { |
| desc: "transient failures", |
| errors: []error{ |
| status.Errorf(codes.Unavailable, "try 1"), |
| status.Errorf(codes.Unavailable, "try 2"), |
| nil}, |
| wantFail: false, |
| }, |
| { |
| desc: "terminal error", |
| errors: []error{status.Errorf(codes.InvalidArgument, "bad args")}, |
| wantFail: true, |
| }, |
| } |
| |
| for _, tc := range testCases { |
| pool := &connectionPool{ |
| ctx: context.Background(), |
| open: func(ctx context.Context, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) { |
| if len(tc.errors) == 0 { |
| panic("out of errors") |
| } |
| err := tc.errors[0] |
| tc.errors = tc.errors[1:] |
| if err == nil { |
| return &testAppendRowsClient{}, nil |
| } |
| return nil, err |
| }, |
| } |
| if err := pool.activateRouter(newSimpleRouter("")); err != nil { |
| t.Errorf("activateRouter: %v", err) |
| } |
| writer := &ManagedStream{id: "foo"} |
| if err := pool.addWriter(writer); err != nil { |
| t.Errorf("addWriter: %v", err) |
| } |
| |
| conn, err := pool.router.pickConnection(nil) |
| if err != nil { |
| t.Errorf("case %s, failed to add connection: %v", tc.desc, err) |
| } |
| arc, ch, err := pool.openWithRetry(conn) |
| if tc.wantFail && err == nil { |
| t.Errorf("case %s: wanted failure, got success", tc.desc) |
| } |
| if !tc.wantFail && err != nil { |
| t.Errorf("case %s: wanted success, got %v", tc.desc, err) |
| } |
| if err == nil { |
| if arc == nil { |
| t.Errorf("case %s: expected append client, got nil", tc.desc) |
| } |
| if ch == nil { |
| t.Errorf("case %s: expected channel, got nil", tc.desc) |
| } |
| } |
| } |
| } |
| |
| // Ensure we properly refund the flow control during send failures. |
| // https://github.com/googleapis/google-cloud-go/issues/9540 |
| func TestConnection_LockingAppendFlowRelease(t *testing.T) { |
| ctx := context.Background() |
| |
| pool := &connectionPool{ |
| ctx: ctx, |
| baseFlowController: newFlowController(10, 0), |
| open: openTestArc(&testAppendRowsClient{}, |
| func(req *storagepb.AppendRowsRequest) error { |
| // Append always reports EOF on send. |
| return io.EOF |
| }, nil), |
| } |
| router := newSimpleRouter("") |
| if err := pool.activateRouter(router); err != nil { |
| t.Errorf("activateRouter: %v", err) |
| } |
| |
| writer := &ManagedStream{id: "foo", ctx: ctx} |
| if err := pool.addWriter(writer); err != nil { |
| t.Errorf("addWriter: %v", err) |
| } |
| |
| pw := newPendingWrite(ctx, writer, &storagepb.AppendRowsRequest{WriteStream: "somestream"}, newVersionedTemplate(), "", "") |
| for i := 0; i < 5; i++ { |
| conn, err := router.pool.selectConn(pw) |
| if err != nil { |
| t.Errorf("selectConn: %v", err) |
| } |
| |
| // Ensure FC is empty before lockingAppend |
| if got := conn.fc.count(); got != 0 { |
| t.Errorf("attempt %d expected empty flow count, got %d", i, got) |
| } |
| if got := conn.fc.bytes(); got != 0 { |
| t.Errorf("attempt %d expected empty flow bytes, got %d", i, got) |
| } |
| // invoke lockingAppend, which fails |
| if err := conn.lockingAppend(pw); err != io.EOF { |
| t.Errorf("lockingAppend attempt %d: expected io.EOF, got %v", i, err) |
| } |
| // Ensure we're refunded due to failure |
| if got := conn.fc.count(); got != 0 { |
| t.Errorf("attempt %d expected empty flow count, got %d", i, got) |
| } |
| if got := conn.fc.bytes(); got != 0 { |
| t.Errorf("attempt %d expected empty flow bytes, got %d", i, got) |
| } |
| } |
| } |
| |
| // Ensures we don't lose track of channels/connections during reconnects. |
| // https://github.com/googleapis/google-cloud-go/issues/6766 |
| func TestConnection_LeakingReconnect(t *testing.T) { |
| |
| ctx := context.Background() |
| |
| pool := &connectionPool{ |
| ctx: ctx, |
| baseFlowController: newFlowController(10, 0), |
| open: openTestArc(&testAppendRowsClient{}, |
| func(req *storagepb.AppendRowsRequest) error { |
| // Append always reports EOF on send. |
| return io.EOF |
| }, nil), |
| } |
| router := newSimpleRouter("") |
| if err := pool.activateRouter(router); err != nil { |
| t.Errorf("activateRouter: %v", err) |
| } |
| writer := &ManagedStream{id: "foo"} |
| if err := pool.addWriter(writer); err != nil { |
| t.Errorf("addWriter: %v", err) |
| } |
| |
| var chans []chan *pendingWrite |
| |
| for i := 0; i < 10; i++ { |
| _, ch, err := router.conn.getStream(nil, true) |
| if err != nil { |
| t.Fatalf("failed getStream(%d): %v", i, err) |
| } |
| chans = append(chans, ch) |
| } |
| var closedCount int |
| for _, ch := range chans { |
| select { |
| case _, ok := <-ch: |
| if !ok { |
| closedCount = closedCount + 1 |
| } |
| case <-time.After(time.Second): |
| // we blocked, likely indicative that the channel is open. |
| continue |
| } |
| } |
| if wantClosed := len(chans) - 1; wantClosed != closedCount { |
| t.Errorf("closed count mismatch, got %d want %d", closedCount, wantClosed) |
| } |
| } |
| |
| // Ensures we're propagating call options as expected. |
| // Background: https://github.com/googleapis/google-cloud-go/issues/6487 |
| func TestConnectionPool_OpenCallOptionPropagation(t *testing.T) { |
| ctx, cancel := context.WithCancel(context.Background()) |
| cancel() |
| |
| pool := &connectionPool{ |
| ctx: ctx, |
| cancel: cancel, |
| open: createOpenF(func(ctx context.Context, opts ...gax.CallOption) (storage.BigQueryWrite_AppendRowsClient, error) { |
| if len(opts) == 0 { |
| t.Fatalf("no options were propagated") |
| } |
| return nil, fmt.Errorf("no real client") |
| }, ""), |
| callOptions: []gax.CallOption{ |
| gax.WithGRPCOptions(grpc.MaxCallRecvMsgSize(99)), |
| }, |
| } |
| conn := newConnection(pool, "", nil) |
| pool.openWithRetry(conn) |
| } |
| |
| // This test evaluates how the receiver deals with a pending write. |
| func TestConnection_Receiver(t *testing.T) { |
| |
| var customErr = fmt.Errorf("foo") |
| |
| testCases := []struct { |
| description string |
| recvResp []*testRecvResponse |
| wantFinalErr error |
| wantTotalAttempts int |
| }{ |
| { |
| description: "no errors", |
| recvResp: []*testRecvResponse{ |
| { |
| resp: &storagepb.AppendRowsResponse{}, |
| err: nil, |
| }, |
| }, |
| wantTotalAttempts: 1, |
| }, |
| { |
| description: "recv err w/io.EOF", |
| recvResp: []*testRecvResponse{ |
| { |
| resp: nil, |
| err: io.EOF, |
| }, |
| { |
| resp: &storagepb.AppendRowsResponse{}, |
| err: nil, |
| }, |
| }, |
| wantTotalAttempts: 2, |
| }, |
| { |
| description: "recv err retried and then failed", |
| recvResp: []*testRecvResponse{ |
| { |
| resp: nil, |
| err: io.EOF, |
| }, |
| { |
| resp: nil, |
| err: customErr, |
| }, |
| }, |
| wantTotalAttempts: 2, |
| wantFinalErr: customErr, |
| }, |
| { |
| description: "recv err w/ custom error", |
| recvResp: []*testRecvResponse{ |
| { |
| resp: nil, |
| err: customErr, |
| }, |
| { |
| resp: &storagepb.AppendRowsResponse{}, |
| err: nil, |
| }, |
| }, |
| wantTotalAttempts: 1, |
| wantFinalErr: customErr, |
| }, |
| |
| { |
| description: "resp embeds Unavailable", |
| recvResp: []*testRecvResponse{ |
| { |
| resp: &storagepb.AppendRowsResponse{ |
| Response: &storagepb.AppendRowsResponse_Error{ |
| Error: &statuspb.Status{ |
| Code: int32(codes.Unavailable), |
| Message: "foo", |
| }, |
| }, |
| }, |
| err: nil, |
| }, |
| { |
| resp: &storagepb.AppendRowsResponse{}, |
| err: nil, |
| }, |
| }, |
| wantTotalAttempts: 2, |
| }, |
| { |
| description: "resp embeds generic ResourceExhausted", |
| recvResp: []*testRecvResponse{ |
| { |
| resp: &storagepb.AppendRowsResponse{ |
| Response: &storagepb.AppendRowsResponse_Error{ |
| Error: &statuspb.Status{ |
| Code: int32(codes.ResourceExhausted), |
| Message: "foo", |
| }, |
| }, |
| }, |
| err: nil, |
| }, |
| }, |
| wantTotalAttempts: 1, |
| wantFinalErr: func() error { |
| return status.ErrorProto(&statuspb.Status{ |
| Code: int32(codes.ResourceExhausted), |
| Message: "foo", |
| }) |
| }(), |
| }, |
| { |
| description: "resp embeds throughput ResourceExhausted", |
| recvResp: []*testRecvResponse{ |
| { |
| resp: &storagepb.AppendRowsResponse{ |
| Response: &storagepb.AppendRowsResponse_Error{ |
| Error: &statuspb.Status{ |
| Code: int32(codes.ResourceExhausted), |
| Message: "Exceeds 'AppendRows throughput' quota for stream blah", |
| }, |
| }, |
| }, |
| err: nil, |
| }, |
| { |
| resp: &storagepb.AppendRowsResponse{}, |
| err: nil, |
| }, |
| }, |
| wantTotalAttempts: 2, |
| }, |
| { |
| description: "retriable failures until max attempts", |
| recvResp: []*testRecvResponse{ |
| { |
| err: io.EOF, |
| }, |
| { |
| err: io.EOF, |
| }, |
| { |
| err: io.EOF, |
| }, |
| { |
| err: io.EOF, |
| }, |
| }, |
| wantTotalAttempts: 4, |
| wantFinalErr: io.EOF, |
| }, |
| } |
| |
| for _, tc := range testCases { |
| ctx, cancel := context.WithCancel(context.Background()) |
| |
| testArc := &testAppendRowsClient{ |
| responses: tc.recvResp, |
| } |
| |
| pool := &connectionPool{ |
| ctx: ctx, |
| open: openTestArc(testArc, nil, |
| func() (*storagepb.AppendRowsResponse, error) { |
| if len(testArc.responses) == 0 { |
| panic("out of responses") |
| } |
| curResp := testArc.responses[0] |
| testArc.responses = testArc.responses[1:] |
| return curResp.resp, curResp.err |
| }, |
| ), |
| baseFlowController: newFlowController(0, 0), |
| } |
| router := newSimpleRouter("") |
| if err := pool.activateRouter(router); err != nil { |
| t.Errorf("activateRouter: %v", err) |
| } |
| |
| ms := &ManagedStream{ |
| id: "foo", |
| ctx: ctx, |
| retry: newStatelessRetryer(), |
| } |
| if err := pool.addWriter(ms); err != nil { |
| t.Errorf("addWriter: %v", err) |
| } |
| conn := router.conn |
| // use openWithRetry to get the reference to the channel and add our test pending write. |
| _, ch, _ := pool.openWithRetry(conn) |
| pw := newPendingWrite(ctx, ms, &storagepb.AppendRowsRequest{}, nil, "", "") |
| pw.writer = ms |
| pw.attemptCount = 1 // we're injecting directly, but attribute this as a single attempt. |
| ch <- pw |
| |
| // Wait until the write is marked done. |
| <-pw.result.Ready() |
| |
| // Check retry count is as expected. |
| gotTotalAttempts, err := pw.result.TotalAttempts(ctx) |
| if err != nil { |
| t.Errorf("%s: failed to get total attempts: %v", tc.description, err) |
| } |
| if gotTotalAttempts != tc.wantTotalAttempts { |
| t.Errorf("%s: got %d total attempts, want %d attempts", tc.description, gotTotalAttempts, tc.wantTotalAttempts) |
| } |
| |
| // Check that the write got the expected final result. |
| if gotFinalErr := pw.result.err; !errors.Is(gotFinalErr, tc.wantFinalErr) { |
| t.Errorf("%s: got final error %v, wanted final error %v", tc.description, gotFinalErr, tc.wantFinalErr) |
| } |
| cancel() |
| } |
| } |