| // Copyright 2021 Google LLC |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // https://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package managedwriter |
| |
| import ( |
| "sync" |
| "testing" |
| |
| "cloud.google.com/go/bigquery/storage/apiv1/storagepb" |
| "github.com/google/go-cmp/cmp" |
| "github.com/google/go-cmp/cmp/cmpopts" |
| "github.com/googleapis/gax-go/v2" |
| "google.golang.org/api/option" |
| "google.golang.org/grpc" |
| "google.golang.org/protobuf/proto" |
| "google.golang.org/protobuf/testing/protocmp" |
| "google.golang.org/protobuf/types/descriptorpb" |
| ) |
| |
| func TestCustomClientOptions(t *testing.T) { |
| testCases := []struct { |
| desc string |
| options []option.ClientOption |
| want *writerClientConfig |
| }{ |
| { |
| desc: "no options", |
| want: &writerClientConfig{}, |
| }, |
| { |
| desc: "multiplex enable", |
| options: []option.ClientOption{ |
| WithMultiplexing(), |
| }, |
| want: &writerClientConfig{ |
| useMultiplex: true, |
| maxMultiplexPoolSize: 1, |
| }, |
| }, |
| { |
| desc: "multiplex max", |
| options: []option.ClientOption{ |
| WithMultiplexPoolLimit(99), |
| }, |
| want: &writerClientConfig{ |
| maxMultiplexPoolSize: 99, |
| }, |
| }, |
| { |
| desc: "default requests", |
| options: []option.ClientOption{ |
| WithDefaultInflightRequests(42), |
| }, |
| want: &writerClientConfig{ |
| defaultInflightRequests: 42, |
| }, |
| }, |
| { |
| desc: "default bytes", |
| options: []option.ClientOption{ |
| WithDefaultInflightBytes(123), |
| }, |
| want: &writerClientConfig{ |
| defaultInflightBytes: 123, |
| }, |
| }, |
| { |
| desc: "default call options", |
| options: []option.ClientOption{ |
| WithDefaultAppendRowsCallOption(gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(1))), |
| }, |
| want: &writerClientConfig{ |
| defaultAppendRowsCallOptions: []gax.CallOption{ |
| gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(1)), |
| }, |
| }, |
| }, |
| { |
| desc: "unusual values", |
| options: []option.ClientOption{ |
| WithMultiplexing(), |
| WithMultiplexPoolLimit(-8), |
| WithDefaultInflightBytes(-1), |
| WithDefaultInflightRequests(-99), |
| }, |
| want: &writerClientConfig{ |
| useMultiplex: true, |
| maxMultiplexPoolSize: 1, |
| defaultInflightRequests: 0, |
| defaultInflightBytes: 0, |
| }, |
| }, |
| { |
| desc: "multiple options", |
| options: []option.ClientOption{ |
| WithMultiplexing(), |
| WithMultiplexPoolLimit(10), |
| WithDefaultInflightRequests(99), |
| WithDefaultInflightBytes(12345), |
| WithDefaultAppendRowsCallOption(gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(1))), |
| }, |
| want: &writerClientConfig{ |
| useMultiplex: true, |
| maxMultiplexPoolSize: 10, |
| defaultInflightRequests: 99, |
| defaultInflightBytes: 12345, |
| defaultAppendRowsCallOptions: []gax.CallOption{ |
| gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(1)), |
| }, |
| }, |
| }, |
| } |
| for _, tc := range testCases { |
| gotCfg := newWriterClientConfig(tc.options...) |
| |
| if diff := cmp.Diff(gotCfg, tc.want, cmp.AllowUnexported(writerClientConfig{})); diff != "" { |
| t.Errorf("diff in case (%s):\n%v", tc.desc, diff) |
| } |
| } |
| } |
| |
| func TestWriterOptions(t *testing.T) { |
| |
| testCases := []struct { |
| desc string |
| options []WriterOption |
| want *ManagedStream |
| }{ |
| { |
| desc: "WithType", |
| options: []WriterOption{WithType(BufferedStream)}, |
| want: func() *ManagedStream { |
| ms := &ManagedStream{ |
| streamSettings: defaultStreamSettings(), |
| curTemplate: newVersionedTemplate(), |
| } |
| ms.streamSettings.streamType = BufferedStream |
| return ms |
| }(), |
| }, |
| { |
| desc: "WithMaxInflightRequests", |
| options: []WriterOption{WithMaxInflightRequests(2)}, |
| want: func() *ManagedStream { |
| ms := &ManagedStream{ |
| streamSettings: defaultStreamSettings(), |
| curTemplate: newVersionedTemplate(), |
| } |
| ms.streamSettings.MaxInflightRequests = 2 |
| return ms |
| }(), |
| }, |
| { |
| desc: "WithMaxInflightBytes", |
| options: []WriterOption{WithMaxInflightBytes(5)}, |
| want: func() *ManagedStream { |
| ms := &ManagedStream{ |
| streamSettings: defaultStreamSettings(), |
| curTemplate: newVersionedTemplate(), |
| } |
| ms.streamSettings.MaxInflightBytes = 5 |
| return ms |
| }(), |
| }, |
| { |
| desc: "WithTraceID", |
| options: []WriterOption{WithTraceID("foo")}, |
| want: func() *ManagedStream { |
| ms := &ManagedStream{ |
| streamSettings: defaultStreamSettings(), |
| curTemplate: newVersionedTemplate(), |
| } |
| ms.streamSettings.TraceID = "foo" |
| return ms |
| }(), |
| }, |
| { |
| desc: "WithDestinationTable", |
| options: []WriterOption{WithDestinationTable("foo")}, |
| want: func() *ManagedStream { |
| ms := &ManagedStream{ |
| streamSettings: defaultStreamSettings(), |
| curTemplate: newVersionedTemplate(), |
| } |
| ms.streamSettings.destinationTable = "foo" |
| return ms |
| }(), |
| }, |
| { |
| desc: "WithDataOrigin", |
| options: []WriterOption{WithDataOrigin("origin")}, |
| want: func() *ManagedStream { |
| ms := &ManagedStream{ |
| streamSettings: defaultStreamSettings(), |
| curTemplate: newVersionedTemplate(), |
| } |
| ms.streamSettings.dataOrigin = "origin" |
| return ms |
| }(), |
| }, |
| { |
| desc: "WithCallOption", |
| options: []WriterOption{WithAppendRowsCallOption(gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(1)))}, |
| want: func() *ManagedStream { |
| ms := &ManagedStream{ |
| streamSettings: defaultStreamSettings(), |
| curTemplate: newVersionedTemplate(), |
| } |
| ms.streamSettings.appendCallOptions = append(ms.streamSettings.appendCallOptions, |
| gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(1))) |
| return ms |
| }(), |
| }, |
| { |
| desc: "EnableRetries", |
| options: []WriterOption{EnableWriteRetries(true)}, |
| want: func() *ManagedStream { |
| ms := &ManagedStream{ |
| streamSettings: defaultStreamSettings(), |
| curTemplate: newVersionedTemplate(), |
| } |
| ms.retry = newStatelessRetryer() |
| return ms |
| }(), |
| }, |
| { |
| desc: "WithSchemaDescriptor", |
| options: []WriterOption{WithSchemaDescriptor(&descriptorpb.DescriptorProto{Name: proto.String("name")})}, |
| want: func() *ManagedStream { |
| ms := &ManagedStream{ |
| streamSettings: defaultStreamSettings(), |
| curTemplate: newVersionedTemplate(), |
| } |
| ms.curTemplate.tmpl = &storagepb.AppendRowsRequest{ |
| Rows: &storagepb.AppendRowsRequest_ProtoRows{ |
| ProtoRows: &storagepb.AppendRowsRequest_ProtoData{ |
| WriterSchema: &storagepb.ProtoSchema{ |
| ProtoDescriptor: &descriptorpb.DescriptorProto{Name: proto.String("name")}, |
| }, |
| }, |
| }, |
| } |
| return ms |
| }(), |
| }, |
| { |
| desc: "WithDefaultMissingValueInterpretation", |
| options: []WriterOption{WithDefaultMissingValueInterpretation(storagepb.AppendRowsRequest_DEFAULT_VALUE)}, |
| want: func() *ManagedStream { |
| ms := &ManagedStream{ |
| streamSettings: defaultStreamSettings(), |
| curTemplate: newVersionedTemplate(), |
| } |
| ms.curTemplate.tmpl = &storagepb.AppendRowsRequest{ |
| DefaultMissingValueInterpretation: storagepb.AppendRowsRequest_DEFAULT_VALUE, |
| } |
| return ms |
| }(), |
| }, |
| { |
| desc: "WithtMissingValueInterpretations", |
| options: []WriterOption{WithMissingValueInterpretations(map[string]storagepb.AppendRowsRequest_MissingValueInterpretation{ |
| "foo": storagepb.AppendRowsRequest_DEFAULT_VALUE, |
| "bar": storagepb.AppendRowsRequest_NULL_VALUE, |
| })}, |
| want: func() *ManagedStream { |
| ms := &ManagedStream{ |
| streamSettings: defaultStreamSettings(), |
| curTemplate: newVersionedTemplate(), |
| } |
| ms.curTemplate.tmpl = &storagepb.AppendRowsRequest{ |
| MissingValueInterpretations: map[string]storagepb.AppendRowsRequest_MissingValueInterpretation{ |
| "foo": storagepb.AppendRowsRequest_DEFAULT_VALUE, |
| "bar": storagepb.AppendRowsRequest_NULL_VALUE, |
| }, |
| } |
| return ms |
| }(), |
| }, |
| { |
| desc: "multiple", |
| options: []WriterOption{ |
| WithType(PendingStream), |
| WithMaxInflightBytes(5), |
| WithTraceID("traceid"), |
| EnableWriteRetries(true), |
| WithSchemaDescriptor(&descriptorpb.DescriptorProto{Name: proto.String("name")}), |
| WithDefaultMissingValueInterpretation(storagepb.AppendRowsRequest_DEFAULT_VALUE), |
| WithMissingValueInterpretations(map[string]storagepb.AppendRowsRequest_MissingValueInterpretation{ |
| "foo": storagepb.AppendRowsRequest_DEFAULT_VALUE, |
| "bar": storagepb.AppendRowsRequest_NULL_VALUE, |
| }), |
| }, |
| want: func() *ManagedStream { |
| ms := &ManagedStream{ |
| streamSettings: defaultStreamSettings(), |
| curTemplate: newVersionedTemplate(), |
| } |
| ms.curTemplate.tmpl = &storagepb.AppendRowsRequest{ |
| Rows: &storagepb.AppendRowsRequest_ProtoRows{ |
| ProtoRows: &storagepb.AppendRowsRequest_ProtoData{ |
| WriterSchema: &storagepb.ProtoSchema{ |
| ProtoDescriptor: &descriptorpb.DescriptorProto{Name: proto.String("name")}, |
| }, |
| }, |
| }, |
| MissingValueInterpretations: map[string]storagepb.AppendRowsRequest_MissingValueInterpretation{ |
| "foo": storagepb.AppendRowsRequest_DEFAULT_VALUE, |
| "bar": storagepb.AppendRowsRequest_NULL_VALUE, |
| }, |
| DefaultMissingValueInterpretation: storagepb.AppendRowsRequest_DEFAULT_VALUE, |
| } |
| ms.streamSettings.MaxInflightBytes = 5 |
| ms.streamSettings.streamType = PendingStream |
| ms.streamSettings.TraceID = "traceid" |
| ms.retry = newStatelessRetryer() |
| return ms |
| }(), |
| }, |
| } |
| |
| for _, tc := range testCases { |
| got := &ManagedStream{ |
| streamSettings: defaultStreamSettings(), |
| curTemplate: newVersionedTemplate(), |
| } |
| for _, o := range tc.options { |
| o(got) |
| } |
| |
| if diff := cmp.Diff(got, tc.want, |
| cmp.AllowUnexported(ManagedStream{}, streamSettings{}), |
| cmp.AllowUnexported(sync.Mutex{}), |
| cmp.AllowUnexported(versionedTemplate{}), |
| cmpopts.IgnoreFields(versionedTemplate{}, "versionTime", "hashVal"), |
| protocmp.Transform(), // versionedTemplate embeds proto messages. |
| cmpopts.IgnoreUnexported(statelessRetryer{})); diff != "" { |
| t.Errorf("diff in case (%s):\n%v", tc.desc, diff) |
| } |
| |
| } |
| } |