| // Copyright 2021 Google LLC |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // https://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package managedwriter |
| |
| import ( |
| "context" |
| "fmt" |
| "math" |
| "sync" |
| "testing" |
| "time" |
| |
| "cloud.google.com/go/bigquery" |
| "cloud.google.com/go/bigquery/storage/apiv1/storagepb" |
| "cloud.google.com/go/bigquery/storage/managedwriter/adapt" |
| "cloud.google.com/go/bigquery/storage/managedwriter/testdata" |
| "cloud.google.com/go/internal/testutil" |
| "cloud.google.com/go/internal/uid" |
| "github.com/google/go-cmp/cmp" |
| "github.com/googleapis/gax-go/v2/apierror" |
| "go.opencensus.io/stats/view" |
| "google.golang.org/api/option" |
| "google.golang.org/grpc/codes" |
| "google.golang.org/protobuf/encoding/protojson" |
| "google.golang.org/protobuf/proto" |
| "google.golang.org/protobuf/reflect/protodesc" |
| "google.golang.org/protobuf/reflect/protoreflect" |
| "google.golang.org/protobuf/testing/protocmp" |
| "google.golang.org/protobuf/types/descriptorpb" |
| "google.golang.org/protobuf/types/dynamicpb" |
| "google.golang.org/protobuf/types/known/wrapperspb" |
| ) |
| |
| var ( |
| datasetIDs = uid.NewSpace("managedwriter_test_dataset", &uid.Options{Sep: '_', Time: time.Now()}) |
| tableIDs = uid.NewSpace("table", &uid.Options{Sep: '_', Time: time.Now()}) |
| defaultTestTimeout = 90 * time.Second |
| ) |
| |
| // our test data has cardinality 5 for names, 3 for values |
| var testSimpleData = []*testdata.SimpleMessageProto2{ |
| {Name: proto.String("one"), Value: proto.Int64(1)}, |
| {Name: proto.String("two"), Value: proto.Int64(2)}, |
| {Name: proto.String("three"), Value: proto.Int64(3)}, |
| {Name: proto.String("four"), Value: proto.Int64(1)}, |
| {Name: proto.String("five"), Value: proto.Int64(2)}, |
| } |
| |
| func getTestClients(ctx context.Context, t *testing.T, opts ...option.ClientOption) (*Client, *bigquery.Client) { |
| if testing.Short() { |
| t.Skip("Integration tests skipped in short mode") |
| } |
| projID := testutil.ProjID() |
| if projID == "" { |
| t.Skip("Integration tests skipped. See CONTRIBUTING.md for details") |
| } |
| ts := testutil.TokenSource(ctx, "https://www.googleapis.com/auth/bigquery") |
| if ts == nil { |
| t.Skip("Integration tests skipped. See CONTRIBUTING.md for details") |
| } |
| opts = append(opts, option.WithTokenSource(ts)) |
| client, err := NewClient(ctx, projID, opts...) |
| if err != nil { |
| t.Fatalf("couldn't create managedwriter client: %v", err) |
| } |
| |
| bqClient, err := bigquery.NewClient(ctx, projID, opts...) |
| if err != nil { |
| t.Fatalf("couldn't create bigquery client: %v", err) |
| } |
| return client, bqClient |
| } |
| |
| // setupTestDataset generates a unique dataset for testing, and a cleanup that can be deferred. |
| func setupTestDataset(ctx context.Context, t *testing.T, bqc *bigquery.Client, location string) (ds *bigquery.Dataset, cleanup func(), err error) { |
| dataset := bqc.Dataset(datasetIDs.New()) |
| if err := dataset.Create(ctx, &bigquery.DatasetMetadata{Location: location}); err != nil { |
| return nil, nil, err |
| } |
| return dataset, func() { |
| if err := dataset.DeleteWithContents(ctx); err != nil { |
| t.Logf("could not cleanup dataset %q: %v", dataset.DatasetID, err) |
| } |
| }, nil |
| } |
| |
| // setupDynamicDescriptors aids testing when not using a supplied proto |
| func setupDynamicDescriptors(t *testing.T, schema bigquery.Schema) (protoreflect.MessageDescriptor, *descriptorpb.DescriptorProto) { |
| convertedSchema, err := adapt.BQSchemaToStorageTableSchema(schema) |
| if err != nil { |
| t.Fatalf("adapt.BQSchemaToStorageTableSchema: %v", err) |
| } |
| |
| descriptor, err := adapt.StorageSchemaToProto2Descriptor(convertedSchema, "root") |
| if err != nil { |
| t.Fatalf("adapt.StorageSchemaToDescriptor: %v", err) |
| } |
| messageDescriptor, ok := descriptor.(protoreflect.MessageDescriptor) |
| if !ok { |
| t.Fatalf("adapted descriptor is not a message descriptor") |
| } |
| dp, err := adapt.NormalizeDescriptor(messageDescriptor) |
| if err != nil { |
| t.Fatalf("NormalizeDescriptor: %v", err) |
| } |
| return messageDescriptor, dp |
| } |
| |
| func TestIntegration_ClientGetWriteStream(t *testing.T) { |
| ctx := context.Background() |
| mwClient, bqClient := getTestClients(ctx, t) |
| defer mwClient.Close() |
| defer bqClient.Close() |
| |
| wantLocation := "us-east1" |
| dataset, cleanup, err := setupTestDataset(ctx, t, bqClient, wantLocation) |
| if err != nil { |
| t.Fatalf("failed to init test dataset: %v", err) |
| } |
| defer cleanup() |
| |
| testTable := dataset.Table(tableIDs.New()) |
| if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil { |
| t.Fatalf("failed to create test table %q: %v", testTable.FullyQualifiedName(), err) |
| } |
| |
| apiSchema, _ := adapt.BQSchemaToStorageTableSchema(testdata.SimpleMessageSchema) |
| parent := TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID) |
| explicitStream, err := mwClient.CreateWriteStream(ctx, &storagepb.CreateWriteStreamRequest{ |
| Parent: parent, |
| WriteStream: &storagepb.WriteStream{ |
| Type: storagepb.WriteStream_PENDING, |
| }, |
| }) |
| if err != nil { |
| t.Fatalf("CreateWriteStream: %v", err) |
| } |
| |
| testCases := []struct { |
| description string |
| isDefault bool |
| streamID string |
| wantType storagepb.WriteStream_Type |
| }{ |
| { |
| description: "default", |
| isDefault: true, |
| streamID: fmt.Sprintf("%s/streams/_default", parent), |
| wantType: storagepb.WriteStream_COMMITTED, |
| }, |
| { |
| description: "explicit pending", |
| streamID: explicitStream.Name, |
| wantType: storagepb.WriteStream_PENDING, |
| }, |
| } |
| |
| for _, tc := range testCases { |
| for _, fullView := range []bool{false, true} { |
| info, err := mwClient.getWriteStream(ctx, tc.streamID, fullView) |
| if err != nil { |
| t.Errorf("%s (%T): getWriteStream failed: %v", tc.description, fullView, err) |
| } |
| if info.GetType() != tc.wantType { |
| t.Errorf("%s (%T): got type %d, want type %d", tc.description, fullView, info.GetType(), tc.wantType) |
| } |
| if info.GetLocation() != wantLocation { |
| t.Errorf("%s (%T) view: got location %s, want location %s", tc.description, fullView, info.GetLocation(), wantLocation) |
| } |
| if info.GetCommitTime() != nil { |
| t.Errorf("%s (%T)expected empty commit time, got %v", tc.description, fullView, info.GetCommitTime()) |
| } |
| |
| if !tc.isDefault { |
| if info.GetCreateTime() == nil { |
| t.Errorf("%s (%T): expected create time, was empty", tc.description, fullView) |
| } |
| } else { |
| if info.GetCreateTime() != nil { |
| t.Errorf("%s (%T): expected empty time, got %v", tc.description, fullView, info.GetCreateTime()) |
| } |
| } |
| |
| if !fullView { |
| if info.GetTableSchema() != nil { |
| t.Errorf("%s (%T) basic view: expected no schema, was populated", tc.description, fullView) |
| } |
| } else { |
| if diff := cmp.Diff(info.GetTableSchema(), apiSchema, protocmp.Transform()); diff != "" { |
| t.Errorf("%s (%T) schema mismatch: -got, +want:\n%s", tc.description, fullView, diff) |
| } |
| } |
| } |
| } |
| } |
| |
| func TestIntegration_ManagedWriter(t *testing.T) { |
| mwClient, bqClient := getTestClients(context.Background(), t) |
| defer mwClient.Close() |
| defer bqClient.Close() |
| |
| dataset, cleanup, err := setupTestDataset(context.Background(), t, bqClient, "asia-east1") |
| if err != nil { |
| t.Fatalf("failed to init test dataset: %v", err) |
| } |
| defer cleanup() |
| |
| ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer cancel() |
| |
| t.Run("group", func(t *testing.T) { |
| t.Run("DefaultStream", func(t *testing.T) { |
| t.Parallel() |
| testDefaultStream(ctx, t, mwClient, bqClient, dataset) |
| }) |
| t.Run("DefaultStreamDynamicJSON", func(t *testing.T) { |
| t.Parallel() |
| testDefaultStreamDynamicJSON(ctx, t, mwClient, bqClient, dataset) |
| }) |
| t.Run("CommittedStream", func(t *testing.T) { |
| t.Parallel() |
| testCommittedStream(ctx, t, mwClient, bqClient, dataset) |
| }) |
| t.Run("ErrorBehaviors", func(t *testing.T) { |
| t.Parallel() |
| testErrorBehaviors(ctx, t, mwClient, bqClient, dataset) |
| }) |
| t.Run("BufferedStream", func(t *testing.T) { |
| t.Parallel() |
| testBufferedStream(ctx, t, mwClient, bqClient, dataset) |
| }) |
| t.Run("PendingStream", func(t *testing.T) { |
| t.Parallel() |
| testPendingStream(ctx, t, mwClient, bqClient, dataset) |
| }) |
| t.Run("SimpleCDC", func(t *testing.T) { |
| t.Parallel() |
| testSimpleCDC(ctx, t, mwClient, bqClient, dataset) |
| }) |
| t.Run("Instrumentation", func(t *testing.T) { |
| // Don't run this in parallel, we only want to collect stats from this subtest. |
| testInstrumentation(ctx, t, mwClient, bqClient, dataset) |
| }) |
| t.Run("TestLargeInsertNoRetry", func(t *testing.T) { |
| testLargeInsertNoRetry(ctx, t, mwClient, bqClient, dataset) |
| }) |
| t.Run("TestLargeInsertWithRetry", func(t *testing.T) { |
| testLargeInsertWithRetry(ctx, t, mwClient, bqClient, dataset) |
| }) |
| t.Run("DefaultValueHandling", func(t *testing.T) { |
| testDefaultValueHandling(ctx, t, mwClient, bqClient, dataset) |
| }) |
| }) |
| } |
| |
| func TestIntegration_SchemaEvolution(t *testing.T) { |
| |
| testcases := []struct { |
| desc string |
| clientOpts []option.ClientOption |
| writerOpts []WriterOption |
| }{ |
| { |
| desc: "Simplex_Committed", |
| writerOpts: []WriterOption{ |
| WithType(CommittedStream), |
| }, |
| }, |
| { |
| desc: "Simplex_Default", |
| writerOpts: []WriterOption{ |
| WithType(DefaultStream), |
| }, |
| }, |
| { |
| desc: "Multiplex_Default", |
| clientOpts: []option.ClientOption{ |
| WithMultiplexing(), |
| WithMultiplexPoolLimit(2), |
| }, |
| writerOpts: []WriterOption{ |
| WithType(DefaultStream), |
| }, |
| }, |
| } |
| |
| for _, tc := range testcases { |
| mwClient, bqClient := getTestClients(context.Background(), t, tc.clientOpts...) |
| defer mwClient.Close() |
| defer bqClient.Close() |
| |
| dataset, cleanup, err := setupTestDataset(context.Background(), t, bqClient, "asia-east1") |
| if err != nil { |
| t.Fatalf("failed to init test dataset: %v", err) |
| } |
| defer cleanup() |
| |
| ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer cancel() |
| t.Run(tc.desc, func(t *testing.T) { |
| testSchemaEvolution(ctx, t, mwClient, bqClient, dataset, tc.writerOpts...) |
| }) |
| } |
| } |
| |
| func testDefaultStream(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) { |
| testTable := dataset.Table(tableIDs.New()) |
| if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil { |
| t.Fatalf("failed to create test table %q: %v", testTable.FullyQualifiedName(), err) |
| } |
| // We'll use a precompiled test proto, but we need it's corresponding descriptorproto representation |
| // to send as the stream's schema. |
| m := &testdata.SimpleMessageProto2{} |
| descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor()) |
| |
| // setup a new stream. |
| ms, err := mwClient.NewManagedStream(ctx, |
| WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)), |
| WithType(DefaultStream), |
| WithSchemaDescriptor(descriptorProto), |
| ) |
| if err != nil { |
| t.Fatalf("NewManagedStream: %v", err) |
| } |
| if ms.id == "" { |
| t.Errorf("managed stream is missing ID") |
| } |
| validateTableConstraints(ctx, t, bqClient, testTable, "before send", |
| withExactRowCount(0)) |
| |
| // First, send the test rows individually. |
| var result *AppendResult |
| for k, mesg := range testSimpleData { |
| b, err := proto.Marshal(mesg) |
| if err != nil { |
| t.Errorf("failed to marshal message %d: %v", k, err) |
| } |
| data := [][]byte{b} |
| result, err = ms.AppendRows(ctx, data) |
| if err != nil { |
| t.Errorf("single-row append %d failed: %v", k, err) |
| } |
| } |
| // Wait for the result to indicate ready, then validate. |
| o, err := result.GetResult(ctx) |
| if err != nil { |
| t.Errorf("result error for last send: %v", err) |
| } |
| if o != NoStreamOffset { |
| t.Errorf("offset mismatch, got %d want %d", o, NoStreamOffset) |
| } |
| validateTableConstraints(ctx, t, bqClient, testTable, "after first send round", |
| withExactRowCount(int64(len(testSimpleData))), |
| withDistinctValues("name", int64(len(testSimpleData)))) |
| |
| // Now, send the test rows grouped into in a single append. |
| var data [][]byte |
| for k, mesg := range testSimpleData { |
| b, err := proto.Marshal(mesg) |
| if err != nil { |
| t.Errorf("failed to marshal message %d: %v", k, err) |
| } |
| data = append(data, b) |
| } |
| result, err = ms.AppendRows(ctx, data) |
| if err != nil { |
| t.Errorf("grouped-row append failed: %v", err) |
| } |
| // Wait for the result to indicate ready, then validate again. Our total rows have increased, but |
| // cardinality should not. |
| o, err = result.GetResult(ctx) |
| if err != nil { |
| t.Errorf("result error for last send: %v", err) |
| } |
| if o != NoStreamOffset { |
| t.Errorf("offset mismatch, got %d want %d", o, NoStreamOffset) |
| } |
| validateTableConstraints(ctx, t, bqClient, testTable, "after second send round", |
| withExactRowCount(int64(2*len(testSimpleData))), |
| withDistinctValues("name", int64(len(testSimpleData))), |
| withDistinctValues("value", int64(3)), |
| ) |
| } |
| |
| func testDefaultStreamDynamicJSON(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) { |
| testTable := dataset.Table(tableIDs.New()) |
| if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.GithubArchiveSchema}); err != nil { |
| t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err) |
| } |
| |
| md, descriptorProto := setupDynamicDescriptors(t, testdata.GithubArchiveSchema) |
| |
| ms, err := mwClient.NewManagedStream(ctx, |
| WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)), |
| WithType(DefaultStream), |
| WithSchemaDescriptor(descriptorProto), |
| ) |
| if err != nil { |
| t.Fatalf("NewManagedStream: %v", err) |
| } |
| validateTableConstraints(ctx, t, bqClient, testTable, "before send", |
| withExactRowCount(0)) |
| |
| sampleJSONData := [][]byte{ |
| []byte(`{"type": "foo", "public": true, "repo": {"id": 99, "name": "repo_name_1", "url": "https://one.example.com"}}`), |
| []byte(`{"type": "bar", "public": false, "repo": {"id": 101, "name": "repo_name_2", "url": "https://two.example.com"}}`), |
| []byte(`{"type": "baz", "public": true, "repo": {"id": 456, "name": "repo_name_3", "url": "https://three.example.com"}}`), |
| []byte(`{"type": "wow", "public": false, "repo": {"id": 123, "name": "repo_name_4", "url": "https://four.example.com"}}`), |
| []byte(`{"type": "yay", "public": true, "repo": {"name": "repo_name_5", "url": "https://five.example.com"}}`), |
| } |
| |
| var result *AppendResult |
| for k, v := range sampleJSONData { |
| message := dynamicpb.NewMessage(md) |
| |
| // First, json->proto message |
| err = protojson.Unmarshal(v, message) |
| if err != nil { |
| t.Fatalf("failed to Unmarshal json message for row %d: %v", k, err) |
| } |
| // Then, proto message -> bytes. |
| b, err := proto.Marshal(message) |
| if err != nil { |
| t.Fatalf("failed to marshal proto bytes for row %d: %v", k, err) |
| } |
| result, err = ms.AppendRows(ctx, [][]byte{b}) |
| if err != nil { |
| t.Errorf("single-row append %d failed: %v", k, err) |
| } |
| } |
| |
| // Wait for the result to indicate ready, then validate. |
| o, err := result.GetResult(ctx) |
| if err != nil { |
| t.Errorf("result error for last send: %v", err) |
| } |
| if o != NoStreamOffset { |
| t.Errorf("offset mismatch, got %d want %d", o, NoStreamOffset) |
| } |
| validateTableConstraints(ctx, t, bqClient, testTable, "after send", |
| withExactRowCount(int64(len(sampleJSONData))), |
| withDistinctValues("type", int64(len(sampleJSONData))), |
| withDistinctValues("public", int64(2))) |
| } |
| |
| func testBufferedStream(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) { |
| testTable := dataset.Table(tableIDs.New()) |
| if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil { |
| t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err) |
| } |
| |
| m := &testdata.SimpleMessageProto2{} |
| descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor()) |
| |
| ms, err := mwClient.NewManagedStream(ctx, |
| WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)), |
| WithType(BufferedStream), |
| WithSchemaDescriptor(descriptorProto), |
| ) |
| if err != nil { |
| t.Fatalf("NewManagedStream: %v", err) |
| } |
| |
| info, err := ms.c.getWriteStream(ctx, ms.streamSettings.streamID, false) |
| if err != nil { |
| t.Errorf("couldn't get stream info: %v", err) |
| } |
| if info.GetType().String() != string(ms.StreamType()) { |
| t.Errorf("mismatch on stream type, got %s want %s", info.GetType(), ms.StreamType()) |
| } |
| validateTableConstraints(ctx, t, bqClient, testTable, "before send", |
| withExactRowCount(0)) |
| |
| var expectedRows int64 |
| for k, mesg := range testSimpleData { |
| b, err := proto.Marshal(mesg) |
| if err != nil { |
| t.Fatalf("failed to marshal message %d: %v", k, err) |
| } |
| data := [][]byte{b} |
| results, err := ms.AppendRows(ctx, data) |
| if err != nil { |
| t.Fatalf("single-row append %d failed: %v", k, err) |
| } |
| // Wait for acknowledgement. |
| offset, err := results.GetResult(ctx) |
| if err != nil { |
| t.Fatalf("got error from pending result %d: %v", k, err) |
| } |
| validateTableConstraints(ctx, t, bqClient, testTable, fmt.Sprintf("before flush %d", k), |
| withExactRowCount(expectedRows), |
| withDistinctValues("name", expectedRows)) |
| |
| // move offset and re-validate. |
| flushOffset, err := ms.FlushRows(ctx, offset) |
| if err != nil { |
| t.Errorf("failed to flush offset to %d: %v", offset, err) |
| } |
| expectedRows = flushOffset + 1 |
| validateTableConstraints(ctx, t, bqClient, testTable, fmt.Sprintf("after flush %d", k), |
| withExactRowCount(expectedRows), |
| withDistinctValues("name", expectedRows)) |
| } |
| } |
| |
| func testCommittedStream(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) { |
| testTable := dataset.Table(tableIDs.New()) |
| if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil { |
| t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err) |
| } |
| |
| m := &testdata.SimpleMessageProto2{} |
| descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor()) |
| |
| // setup a new stream. |
| ms, err := mwClient.NewManagedStream(ctx, |
| WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)), |
| WithType(CommittedStream), |
| WithSchemaDescriptor(descriptorProto), |
| ) |
| if err != nil { |
| t.Fatalf("NewManagedStream: %v", err) |
| } |
| validateTableConstraints(ctx, t, bqClient, testTable, "before send", |
| withExactRowCount(0)) |
| |
| var result *AppendResult |
| for k, mesg := range testSimpleData { |
| b, err := proto.Marshal(mesg) |
| if err != nil { |
| t.Errorf("failed to marshal message %d: %v", k, err) |
| } |
| data := [][]byte{b} |
| result, err = ms.AppendRows(ctx, data, WithOffset(int64(k))) |
| if err != nil { |
| t.Errorf("single-row append %d failed: %v", k, err) |
| } |
| } |
| // Wait for the result to indicate ready, then validate. |
| o, err := result.GetResult(ctx) |
| if err != nil { |
| t.Errorf("result error for last send: %v", err) |
| } |
| wantOffset := int64(len(testSimpleData) - 1) |
| if o != wantOffset { |
| t.Errorf("offset mismatch, got %d want %d", o, wantOffset) |
| } |
| validateTableConstraints(ctx, t, bqClient, testTable, "after send", |
| withExactRowCount(int64(len(testSimpleData)))) |
| } |
| |
| // testSimpleCDC demonstrates basic Change Data Capture (CDC) functionality. We add an initial set of |
| // rows to a table, then use CDC to apply updates. |
| func testSimpleCDC(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) { |
| testTable := dataset.Table(tableIDs.New()) |
| |
| if err := testTable.Create(ctx, &bigquery.TableMetadata{ |
| Schema: testdata.ExampleEmployeeSchema, |
| Clustering: &bigquery.Clustering{ |
| Fields: []string{"id"}, |
| }, |
| }); err != nil { |
| t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err) |
| } |
| |
| // Mark the primary key using an ALTER TABLE DDL. |
| tableIdentifier, _ := testTable.Identifier(bigquery.StandardSQLID) |
| sql := fmt.Sprintf("ALTER TABLE %s ADD PRIMARY KEY(id) NOT ENFORCED;", tableIdentifier) |
| if _, err := bqClient.Query(sql).Read(ctx); err != nil { |
| t.Fatalf("failed ALTER TABLE: %v", err) |
| } |
| |
| m := &testdata.ExampleEmployeeCDC{} |
| descriptorProto, err := adapt.NormalizeDescriptor(m.ProtoReflect().Descriptor()) |
| if err != nil { |
| t.Fatalf("NormalizeDescriptor: %v", err) |
| } |
| |
| // Setup an initial writer for sending initial inserts. |
| writer, err := mwClient.NewManagedStream(ctx, |
| WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)), |
| WithType(CommittedStream), |
| WithSchemaDescriptor(descriptorProto), |
| ) |
| if err != nil { |
| t.Fatalf("NewManagedStream: %v", err) |
| } |
| defer writer.Close() |
| validateTableConstraints(ctx, t, bqClient, testTable, "before send", |
| withExactRowCount(0)) |
| |
| initialEmployees := []*testdata.ExampleEmployeeCDC{ |
| { |
| Id: proto.Int64(1), |
| Username: proto.String("alice"), |
| GivenName: proto.String("Alice CEO"), |
| Departments: []string{"product", "support", "internal"}, |
| Salary: proto.Int64(1), |
| XCHANGE_TYPE: proto.String("INSERT"), |
| }, |
| { |
| Id: proto.Int64(2), |
| Username: proto.String("bob"), |
| GivenName: proto.String("Bob Bobberson"), |
| Departments: []string{"research"}, |
| Salary: proto.Int64(100000), |
| XCHANGE_TYPE: proto.String("INSERT"), |
| }, |
| { |
| Id: proto.Int64(3), |
| Username: proto.String("clarice"), |
| GivenName: proto.String("Clarice Clearwater"), |
| Departments: []string{"product"}, |
| Salary: proto.Int64(100001), |
| XCHANGE_TYPE: proto.String("INSERT"), |
| }, |
| } |
| |
| // First append inserts all the initial employees. |
| data := make([][]byte, len(initialEmployees)) |
| for k, mesg := range initialEmployees { |
| b, err := proto.Marshal(mesg) |
| if err != nil { |
| t.Fatalf("failed to marshal record %d: %v", k, err) |
| } |
| data[k] = b |
| } |
| result, err := writer.AppendRows(ctx, data) |
| if err != nil { |
| t.Errorf("initial insert failed (%s): %v", writer.StreamName(), err) |
| } |
| if _, err := result.GetResult(ctx); err != nil { |
| t.Errorf("result error for initial insert (%s): %v", writer.StreamName(), err) |
| } |
| validateTableConstraints(ctx, t, bqClient, testTable, "initial inserts", |
| withExactRowCount(int64(len(initialEmployees)))) |
| |
| // Create a second writer for applying modifications. |
| updateWriter, err := mwClient.NewManagedStream(ctx, |
| WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)), |
| WithType(DefaultStream), |
| WithSchemaDescriptor(descriptorProto), |
| ) |
| if err != nil { |
| t.Fatalf("NewManagedStream: %v", err) |
| } |
| defer updateWriter.Close() |
| |
| // Change bob via an UPSERT CDC |
| newBob := proto.Clone(initialEmployees[1]).(*testdata.ExampleEmployeeCDC) |
| newBob.Salary = proto.Int64(105000) |
| newBob.Departments = []string{"research", "product"} |
| newBob.XCHANGE_TYPE = proto.String("UPSERT") |
| b, err := proto.Marshal(newBob) |
| if err != nil { |
| t.Fatalf("failed to marshal new bob: %v", err) |
| } |
| result, err = updateWriter.AppendRows(ctx, [][]byte{b}) |
| if err != nil { |
| t.Fatalf("bob modification failed (%s): %v", updateWriter.StreamName(), err) |
| } |
| if _, err := result.GetResult(ctx); err != nil { |
| t.Fatalf("result error for bob modification (%s): %v", updateWriter.StreamName(), err) |
| } |
| validateTableConstraints(ctx, t, bqClient, testTable, "after bob modification", |
| withExactRowCount(int64(len(initialEmployees))), |
| withDistinctValues("id", int64(len(initialEmployees)))) |
| |
| // remote clarice via DELETE CDC |
| removeClarice := &testdata.ExampleEmployeeCDC{ |
| Id: proto.Int64(3), |
| XCHANGE_TYPE: proto.String("DELETE"), |
| } |
| b, err = proto.Marshal(removeClarice) |
| if err != nil { |
| t.Fatalf("failed to marshal clarice removal: %v", err) |
| } |
| result, err = updateWriter.AppendRows(ctx, [][]byte{b}) |
| if err != nil { |
| t.Fatalf("clarice removal failed (%s): %v", updateWriter.StreamName(), err) |
| } |
| if _, err := result.GetResult(ctx); err != nil { |
| t.Fatalf("result error for clarice removal (%s): %v", updateWriter.StreamName(), err) |
| } |
| |
| validateTableConstraints(ctx, t, bqClient, testTable, "after clarice removal", |
| withExactRowCount(int64(len(initialEmployees))-1)) |
| } |
| |
| // testErrorBehaviors intentionally issues problematic requests to verify error behaviors. |
| func testErrorBehaviors(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) { |
| testTable := dataset.Table(tableIDs.New()) |
| if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil { |
| t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err) |
| } |
| |
| m := &testdata.SimpleMessageProto2{} |
| descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor()) |
| |
| // setup a new stream. |
| ms, err := mwClient.NewManagedStream(ctx, |
| WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)), |
| WithType(CommittedStream), |
| WithSchemaDescriptor(descriptorProto), |
| ) |
| if err != nil { |
| t.Fatalf("NewManagedStream: %v", err) |
| } |
| validateTableConstraints(ctx, t, bqClient, testTable, "before send", |
| withExactRowCount(0)) |
| |
| data := make([][]byte, len(testSimpleData)) |
| for k, mesg := range testSimpleData { |
| b, err := proto.Marshal(mesg) |
| if err != nil { |
| t.Errorf("failed to marshal message %d: %v", k, err) |
| } |
| data[k] = b |
| } |
| |
| // Send an append at an invalid offset. |
| result, err := ms.AppendRows(ctx, data, WithOffset(99)) |
| if err != nil { |
| t.Errorf("failed to send append: %v", err) |
| } |
| // |
| off, err := result.GetResult(ctx) |
| if err == nil { |
| t.Errorf("expected error, got offset %d", off) |
| } |
| |
| apiErr, ok := apierror.FromError(err) |
| if !ok { |
| t.Errorf("expected apierror, got %T: %v", err, err) |
| } |
| se := &storagepb.StorageError{} |
| e := apiErr.Details().ExtractProtoMessage(se) |
| if e != nil { |
| t.Errorf("expected storage error, but extraction failed: %v", e) |
| } |
| wantCode := storagepb.StorageError_OFFSET_OUT_OF_RANGE |
| if se.GetCode() != wantCode { |
| t.Errorf("wanted %s, got %s", wantCode.String(), se.GetCode().String()) |
| } |
| // Send "real" append to advance the offset. |
| result, err = ms.AppendRows(ctx, data, WithOffset(0)) |
| if err != nil { |
| t.Errorf("failed to send append: %v", err) |
| } |
| off, err = result.GetResult(ctx) |
| if err != nil { |
| t.Errorf("expected offset, got error %v", err) |
| } |
| wantOffset := int64(0) |
| if off != wantOffset { |
| t.Errorf("offset mismatch, got %d want %d", off, wantOffset) |
| } |
| // Now, send at the start offset again. |
| result, err = ms.AppendRows(ctx, data, WithOffset(0)) |
| if err != nil { |
| t.Errorf("failed to send append: %v", err) |
| } |
| off, err = result.GetResult(ctx) |
| if err == nil { |
| t.Errorf("expected error, got offset %d", off) |
| } |
| apiErr, ok = apierror.FromError(err) |
| if !ok { |
| t.Errorf("expected apierror, got %T: %v", err, err) |
| } |
| se = &storagepb.StorageError{} |
| e = apiErr.Details().ExtractProtoMessage(se) |
| if e != nil { |
| t.Errorf("expected storage error, but extraction failed: %v", e) |
| } |
| wantCode = storagepb.StorageError_OFFSET_ALREADY_EXISTS |
| if se.GetCode() != wantCode { |
| t.Errorf("wanted %s, got %s", wantCode.String(), se.GetCode().String()) |
| } |
| // Finalize the stream. |
| if _, err := ms.Finalize(ctx); err != nil { |
| t.Errorf("Finalize had error: %v", err) |
| } |
| // Send another append, which is disallowed for finalized streams. |
| result, err = ms.AppendRows(ctx, data) |
| if err != nil { |
| t.Errorf("failed to send append: %v", err) |
| } |
| off, err = result.GetResult(ctx) |
| if err == nil { |
| t.Errorf("expected error, got offset %d", off) |
| } |
| apiErr, ok = apierror.FromError(err) |
| if !ok { |
| t.Errorf("expected apierror, got %T: %v", err, err) |
| } |
| se = &storagepb.StorageError{} |
| e = apiErr.Details().ExtractProtoMessage(se) |
| if e != nil { |
| t.Errorf("expected storage error, but extraction failed: %v", e) |
| } |
| wantCode = storagepb.StorageError_STREAM_FINALIZED |
| if se.GetCode() != wantCode { |
| t.Errorf("wanted %s, got %s", wantCode.String(), se.GetCode().String()) |
| } |
| } |
| |
| func testPendingStream(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) { |
| testTable := dataset.Table(tableIDs.New()) |
| if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil { |
| t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err) |
| } |
| |
| m := &testdata.SimpleMessageProto2{} |
| descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor()) |
| |
| ms, err := mwClient.NewManagedStream(ctx, |
| WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)), |
| WithType(PendingStream), |
| WithSchemaDescriptor(descriptorProto), |
| ) |
| if err != nil { |
| t.Fatalf("NewManagedStream: %v", err) |
| } |
| validateTableConstraints(ctx, t, bqClient, testTable, "before send", |
| withExactRowCount(0)) |
| |
| // Send data. |
| var result *AppendResult |
| for k, mesg := range testSimpleData { |
| b, err := proto.Marshal(mesg) |
| if err != nil { |
| t.Errorf("failed to marshal message %d: %v", k, err) |
| } |
| data := [][]byte{b} |
| result, err = ms.AppendRows(ctx, data, WithOffset(int64(k))) |
| if err != nil { |
| t.Errorf("single-row append %d failed: %v", k, err) |
| } |
| // Be explicit about waiting/checking each response. |
| off, err := result.GetResult(ctx) |
| if err != nil { |
| t.Errorf("response %d error: %v", k, err) |
| } |
| if off != int64(k) { |
| t.Errorf("offset mismatch, got %d want %d", off, k) |
| } |
| } |
| wantRows := int64(len(testSimpleData)) |
| |
| // Mark stream complete. |
| trackedOffset, err := ms.Finalize(ctx) |
| if err != nil { |
| t.Errorf("Finalize: %v", err) |
| } |
| |
| if trackedOffset != wantRows { |
| t.Errorf("Finalize mismatched offset, got %d want %d", trackedOffset, wantRows) |
| } |
| |
| // Commit stream and validate. |
| req := &storagepb.BatchCommitWriteStreamsRequest{ |
| Parent: TableParentFromStreamName(ms.StreamName()), |
| WriteStreams: []string{ms.StreamName()}, |
| } |
| |
| resp, err := mwClient.BatchCommitWriteStreams(ctx, req) |
| if err != nil { |
| t.Errorf("client.BatchCommit: %v", err) |
| } |
| if len(resp.StreamErrors) > 0 { |
| t.Errorf("stream errors present: %v", resp.StreamErrors) |
| } |
| validateTableConstraints(ctx, t, bqClient, testTable, "after send", |
| withExactRowCount(int64(len(testSimpleData)))) |
| } |
| |
| func testLargeInsertNoRetry(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) { |
| testTable := dataset.Table(tableIDs.New()) |
| if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil { |
| t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err) |
| } |
| |
| m := &testdata.SimpleMessageProto2{} |
| descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor()) |
| |
| ms, err := mwClient.NewManagedStream(ctx, |
| WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)), |
| WithType(CommittedStream), |
| WithSchemaDescriptor(descriptorProto), |
| ) |
| if err != nil { |
| t.Fatalf("NewManagedStream: %v", err) |
| } |
| validateTableConstraints(ctx, t, bqClient, testTable, "before send", |
| withExactRowCount(0)) |
| |
| // Construct a Very Large request. |
| var data [][]byte |
| targetSize := 11 * 1024 * 1024 // 11 MB |
| b, err := proto.Marshal(testSimpleData[0]) |
| if err != nil { |
| t.Errorf("failed to marshal message: %v", err) |
| } |
| |
| numRows := targetSize / len(b) |
| data = make([][]byte, numRows) |
| |
| for i := 0; i < numRows; i++ { |
| data[i] = b |
| } |
| |
| result, err := ms.AppendRows(ctx, data, WithOffset(0)) |
| if err != nil { |
| t.Errorf("single append failed: %v", err) |
| } |
| _, err = result.GetResult(ctx) |
| if err != nil { |
| apiErr, ok := apierror.FromError(err) |
| if !ok { |
| t.Errorf("GetResult error was not an instance of ApiError") |
| } |
| status := apiErr.GRPCStatus() |
| if status.Code() != codes.InvalidArgument { |
| t.Errorf("expected InvalidArgument status, got %v", status) |
| } |
| } |
| // our next append is small and should succeed. |
| result, err = ms.AppendRows(ctx, [][]byte{b}) |
| if err != nil { |
| t.Fatalf("second append failed: %v", err) |
| } |
| _, err = result.GetResult(ctx) |
| if err != nil { |
| t.Errorf("failure result from second append: %v", err) |
| } |
| |
| validateTableConstraints(ctx, t, bqClient, testTable, "final", |
| withExactRowCount(1)) |
| } |
| |
| func testLargeInsertWithRetry(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) { |
| testTable := dataset.Table(tableIDs.New()) |
| if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil { |
| t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err) |
| } |
| |
| m := &testdata.SimpleMessageProto2{} |
| descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor()) |
| |
| ms, err := mwClient.NewManagedStream(ctx, |
| WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)), |
| WithType(CommittedStream), |
| WithSchemaDescriptor(descriptorProto), |
| EnableWriteRetries(true), |
| ) |
| if err != nil { |
| t.Fatalf("NewManagedStream: %v", err) |
| } |
| validateTableConstraints(ctx, t, bqClient, testTable, "before send", |
| withExactRowCount(0)) |
| |
| // Construct a Very Large request. |
| var data [][]byte |
| targetSize := 11 * 1024 * 1024 // 11 MB |
| b, err := proto.Marshal(testSimpleData[0]) |
| if err != nil { |
| t.Errorf("failed to marshal message: %v", err) |
| } |
| |
| numRows := targetSize / len(b) |
| data = make([][]byte, numRows) |
| |
| for i := 0; i < numRows; i++ { |
| data[i] = b |
| } |
| |
| result, err := ms.AppendRows(ctx, data, WithOffset(0)) |
| if err != nil { |
| t.Errorf("single append failed: %v", err) |
| } |
| _, err = result.GetResult(ctx) |
| if err != nil { |
| apiErr, ok := apierror.FromError(err) |
| if !ok { |
| t.Errorf("GetResult error was not an instance of ApiError") |
| } |
| status := apiErr.GRPCStatus() |
| if status.Code() != codes.InvalidArgument { |
| t.Errorf("expected InvalidArgument status, got %v", status) |
| } |
| } |
| |
| // The second append will succeed. |
| result, err = ms.AppendRows(ctx, [][]byte{b}) |
| if err != nil { |
| t.Fatalf("second append expected to succeed, got error: %v", err) |
| } |
| _, err = result.GetResult(ctx) |
| if err != nil { |
| t.Errorf("failure result from second append: %v", err) |
| } |
| if attempts, _ := result.TotalAttempts(ctx); attempts != 1 { |
| t.Errorf("expected 1 attempts, got %d", attempts) |
| } |
| } |
| |
| func testInstrumentation(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) { |
| testedViews := []*view.View{ |
| AppendRequestsView, |
| AppendResponsesView, |
| AppendClientOpenView, |
| } |
| |
| if err := view.Register(testedViews...); err != nil { |
| t.Fatalf("couldn't register views: %v", err) |
| } |
| |
| testTable := dataset.Table(tableIDs.New()) |
| if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil { |
| t.Fatalf("failed to create test table %q: %v", testTable.FullyQualifiedName(), err) |
| } |
| |
| m := &testdata.SimpleMessageProto2{} |
| descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor()) |
| |
| // setup a new stream. |
| ms, err := mwClient.NewManagedStream(ctx, |
| WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)), |
| WithType(DefaultStream), |
| WithSchemaDescriptor(descriptorProto), |
| ) |
| if err != nil { |
| t.Fatalf("NewManagedStream: %v", err) |
| } |
| |
| var result *AppendResult |
| for k, mesg := range testSimpleData { |
| b, err := proto.Marshal(mesg) |
| if err != nil { |
| t.Errorf("failed to marshal message %d: %v", k, err) |
| } |
| data := [][]byte{b} |
| result, err = ms.AppendRows(ctx, data) |
| if err != nil { |
| t.Errorf("single-row append %d failed: %v", k, err) |
| } |
| } |
| // Wait for the result to indicate ready. |
| result.Ready() |
| // Ick. Stats reporting can't force flushing, and there's a race here. Sleep to give the recv goroutine a chance |
| // to report. |
| time.Sleep(time.Second) |
| |
| // metric to key tag names |
| wantTags := map[string][]string{ |
| "cloud.google.com/go/bigquery/storage/managedwriter/stream_open_count": {"error"}, |
| "cloud.google.com/go/bigquery/storage/managedwriter/stream_open_retry_count": nil, |
| "cloud.google.com/go/bigquery/storage/managedwriter/append_requests": {"streamID"}, |
| "cloud.google.com/go/bigquery/storage/managedwriter/append_request_bytes": {"streamID"}, |
| "cloud.google.com/go/bigquery/storage/managedwriter/append_request_errors": {"streamID"}, |
| "cloud.google.com/go/bigquery/storage/managedwriter/append_rows": {"streamID"}, |
| } |
| |
| for _, tv := range testedViews { |
| // Attempt to further improve race failures by retrying metrics retrieval. |
| metricData, err := func() ([]*view.Row, error) { |
| attempt := 0 |
| for { |
| data, err := view.RetrieveData(tv.Name) |
| attempt = attempt + 1 |
| if attempt > 5 { |
| return data, err |
| } |
| if err == nil && len(data) == 1 { |
| return data, err |
| } |
| time.Sleep(time.Duration(attempt) * 500 * time.Millisecond) |
| } |
| }() |
| if err != nil { |
| t.Errorf("view %q RetrieveData: %v", tv.Name, err) |
| } |
| if mlen := len(metricData); mlen != 1 { |
| t.Errorf("%q: expected 1 row of metrics, got %d", tv.Name, mlen) |
| continue |
| } |
| if wantKeys, ok := wantTags[tv.Name]; ok { |
| if wantKeys == nil { |
| if n := len(tv.TagKeys); n != 0 { |
| t.Errorf("expected view %q to have no keys, but %d present", tv.Name, n) |
| } |
| } else { |
| for _, wk := range wantKeys { |
| var found bool |
| for _, gk := range tv.TagKeys { |
| if gk.Name() == wk { |
| found = true |
| break |
| } |
| } |
| if !found { |
| t.Errorf("expected view %q to have key %q, but wasn't present", tv.Name, wk) |
| } |
| } |
| } |
| } |
| entry := metricData[0].Data |
| sum, ok := entry.(*view.SumData) |
| if !ok { |
| t.Errorf("unexpected metric type: %T", entry) |
| } |
| got := sum.Value |
| var want int64 |
| switch tv { |
| case AppendRequestsView: |
| want = int64(len(testSimpleData)) |
| case AppendResponsesView: |
| want = int64(len(testSimpleData)) |
| case AppendClientOpenView: |
| want = 1 |
| } |
| |
| // float comparison; diff more than error bound is error |
| if math.Abs(got-float64(want)) > 0.1 { |
| t.Errorf("%q: metric mismatch, got %f want %d", tv.Name, got, want) |
| } |
| } |
| } |
| |
| func testSchemaEvolution(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset, opts ...WriterOption) { |
| testTable := dataset.Table(tableIDs.New()) |
| if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil { |
| t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err) |
| } |
| |
| m := &testdata.SimpleMessageProto2{} |
| descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor()) |
| |
| // setup a new stream. |
| opts = append(opts, WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID))) |
| opts = append(opts, WithSchemaDescriptor(descriptorProto)) |
| ms, err := mwClient.NewManagedStream(ctx, opts...) |
| if err != nil { |
| t.Fatalf("NewManagedStream: %v", err) |
| } |
| validateTableConstraints(ctx, t, bqClient, testTable, "before send", |
| withExactRowCount(0)) |
| |
| var result *AppendResult |
| var curOffset int64 |
| var latestRow []byte |
| for k, mesg := range testSimpleData { |
| b, err := proto.Marshal(mesg) |
| if err != nil { |
| t.Errorf("failed to marshal message %d: %v", k, err) |
| } |
| latestRow = b |
| data := [][]byte{b} |
| result, err = ms.AppendRows(ctx, data) |
| if err != nil { |
| t.Errorf("single-row append %d failed: %v", k, err) |
| } |
| curOffset = curOffset + int64(len(data)) |
| } |
| // Wait for the result to indicate ready, then validate. |
| _, err = result.GetResult(ctx) |
| if err != nil { |
| t.Errorf("error on append: %v", err) |
| } |
| |
| validateTableConstraints(ctx, t, bqClient, testTable, "after send", |
| withExactRowCount(int64(len(testSimpleData)))) |
| |
| // Now, evolve the underlying table schema. |
| _, err = testTable.Update(ctx, bigquery.TableMetadataToUpdate{Schema: testdata.SimpleMessageEvolvedSchema}, "") |
| if err != nil { |
| t.Errorf("failed to evolve table schema: %v", err) |
| } |
| |
| // Resend latest row until we get a new schema notification. |
| // It _should_ be possible to send duplicates, but this currently will not propagate the schema error. |
| // Internal issue: b/211899346 |
| // |
| // The alternative here would be to block on GetWriteStream until we get a different write stream, but |
| // this subjects us to a possible race, as the backend that services GetWriteStream isn't necessarily the |
| // one in charge of the stream, and thus may report ready early. |
| for { |
| resp, err := ms.AppendRows(ctx, [][]byte{latestRow}) |
| if err != nil { |
| t.Errorf("got error on dupe append: %v", err) |
| break |
| } |
| curOffset = curOffset + 1 |
| s, err := resp.UpdatedSchema(ctx) |
| if err != nil { |
| t.Errorf("getting schema error: %v", err) |
| } |
| if s != nil { |
| break |
| } |
| } |
| |
| // ready evolved message and descriptor |
| m2 := &testdata.SimpleMessageEvolvedProto2{ |
| Name: proto.String("evolved"), |
| Value: proto.Int64(180), |
| Other: proto.String("hello evolution"), |
| } |
| descriptorProto = protodesc.ToDescriptorProto(m2.ProtoReflect().Descriptor()) |
| b, err := proto.Marshal(m2) |
| if err != nil { |
| t.Errorf("failed to marshal evolved message: %v", err) |
| } |
| // Send an append with an evolved schema |
| res, err := ms.AppendRows(ctx, [][]byte{b}, UpdateSchemaDescriptor(descriptorProto)) |
| if err != nil { |
| t.Errorf("failed evolved append: %v", err) |
| } |
| _, err = res.GetResult(ctx) |
| if err != nil { |
| t.Errorf("error on evolved append: %v", err) |
| } |
| curOffset = curOffset + 1 |
| |
| // Try to force connection errors from concurrent appends. |
| // We drop setting of offset to avoid commingling out-of-order append errors. |
| var wg sync.WaitGroup |
| for i := 0; i < 5; i++ { |
| id := i |
| wg.Add(1) |
| go func() { |
| res, err := ms.AppendRows(ctx, [][]byte{b}) |
| if err != nil { |
| t.Errorf("failed concurrent append %d: %v", id, err) |
| } |
| _, err = res.GetResult(ctx) |
| if err != nil { |
| t.Errorf("error on concurrent append %d: %v", id, err) |
| } |
| wg.Done() |
| }() |
| } |
| wg.Wait() |
| |
| validateTableConstraints(ctx, t, bqClient, testTable, "after evolved records send", |
| withExactRowCount(int64(curOffset+5)), |
| withNullCount("name", 0), |
| withNonNullCount("other", 6), |
| ) |
| } |
| |
| func testDefaultValueHandling(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset, opts ...WriterOption) { |
| testTable := dataset.Table(tableIDs.New()) |
| if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.DefaultValueSchema}); err != nil { |
| t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err) |
| } |
| |
| m := &testdata.DefaultValuesPartialSchema{ |
| // We only populate the id, as remaining fields are used to test default values. |
| Id: proto.String("someval"), |
| } |
| var data []byte |
| var err error |
| if data, err = proto.Marshal(m); err != nil { |
| t.Fatalf("failed to marshal test row data") |
| } |
| descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor()) |
| |
| // setup a new stream. |
| opts = append(opts, WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID))) |
| opts = append(opts, WithSchemaDescriptor(descriptorProto)) |
| ms, err := mwClient.NewManagedStream(ctx, opts...) |
| if err != nil { |
| t.Fatalf("NewManagedStream: %v", err) |
| } |
| validateTableConstraints(ctx, t, bqClient, testTable, "before send", |
| withExactRowCount(0)) |
| |
| var result *AppendResult |
| |
| // Send one row, verify default values were set as expected. |
| |
| result, err = ms.AppendRows(ctx, [][]byte{data}) |
| if err != nil { |
| t.Errorf("append failed: %v", err) |
| } |
| // Wait for the result to indicate ready, then validate. |
| _, err = result.GetResult(ctx) |
| if err != nil { |
| t.Errorf("error on append: %v", err) |
| } |
| |
| validateTableConstraints(ctx, t, bqClient, testTable, "after first row", |
| withExactRowCount(1), |
| withNonNullCount("id", 1), |
| withNullCount("strcol_withdef", 1), |
| withNullCount("intcol_withdef", 1), |
| withNullCount("otherstr_withdef", 0)) // not part of partial schema |
| |
| // Change default MVI to use nulls. |
| // We expect the fields in the partial schema to leverage nulls rather than default values. |
| // The fields outside the partial schema continue to obey default values. |
| result, err = ms.AppendRows(ctx, [][]byte{data}, UpdateDefaultMissingValueInterpretation(storagepb.AppendRowsRequest_DEFAULT_VALUE)) |
| if err != nil { |
| t.Errorf("append failed: %v", err) |
| } |
| // Wait for the result to indicate ready, then validate. |
| _, err = result.GetResult(ctx) |
| if err != nil { |
| t.Errorf("error on append: %v", err) |
| } |
| |
| validateTableConstraints(ctx, t, bqClient, testTable, "after second row (default mvi is DEFAULT_VALUE)", |
| withExactRowCount(2), |
| withNullCount("strcol_withdef", 1), // doesn't increment, as it gets default value |
| withNullCount("intcol_withdef", 1)) // doesn't increment, as it gets default value |
| |
| // Change per-column MVI to use default value |
| result, err = ms.AppendRows(ctx, [][]byte{data}, |
| UpdateMissingValueInterpretations(map[string]storagepb.AppendRowsRequest_MissingValueInterpretation{ |
| "strcol_withdef": storagepb.AppendRowsRequest_NULL_VALUE, |
| })) |
| if err != nil { |
| t.Errorf("append failed: %v", err) |
| } |
| // Wait for the result to indicate ready, then validate. |
| _, err = result.GetResult(ctx) |
| if err != nil { |
| t.Errorf("error on append: %v", err) |
| } |
| |
| validateTableConstraints(ctx, t, bqClient, testTable, "after third row (explicit column mvi)", |
| withExactRowCount(3), |
| withNullCount("strcol_withdef", 2), // increments as it's null for this column |
| withNullCount("intcol_withdef", 1), // doesn't increment, still default value |
| withNonNullCount("otherstr_withdef", 3), // not part of descriptor, always gets default value |
| withNullCount("otherstr", 3), // not part of descriptor, always gets null |
| withNullCount("strcol", 3), // no default value defined, always gets null |
| withNullCount("intcol", 3), // no default value defined, always gets null |
| ) |
| } |
| |
| func TestIntegration_DetectProjectID(t *testing.T) { |
| ctx := context.Background() |
| testCreds := testutil.Credentials(ctx) |
| if testCreds == nil { |
| t.Skip("test credentials not present, skipping") |
| } |
| |
| if _, err := NewClient(ctx, DetectProjectID, option.WithCredentials(testCreds)); err != nil { |
| t.Errorf("test NewClient: %v", err) |
| } |
| |
| badTS := testutil.ErroringTokenSource{} |
| |
| if badClient, err := NewClient(ctx, DetectProjectID, option.WithTokenSource(badTS)); err == nil { |
| t.Errorf("expected error from bad token source, NewClient succeeded with project: %s", badClient.projectID) |
| } |
| } |
| |
| func TestIntegration_ProtoNormalization(t *testing.T) { |
| mwClient, bqClient := getTestClients(context.Background(), t) |
| defer mwClient.Close() |
| defer bqClient.Close() |
| |
| dataset, cleanup, err := setupTestDataset(context.Background(), t, bqClient, "us-east1") |
| if err != nil { |
| t.Fatalf("failed to init test dataset: %v", err) |
| } |
| defer cleanup() |
| |
| ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer cancel() |
| |
| t.Run("group", func(t *testing.T) { |
| t.Run("ComplexType", func(t *testing.T) { |
| t.Parallel() |
| schema := testdata.ComplexTypeSchema |
| mesg := &testdata.ComplexType{ |
| NestedRepeatedType: []*testdata.NestedType{ |
| { |
| InnerType: []*testdata.InnerType{ |
| {Value: []string{"a", "b", "c"}}, |
| {Value: []string{"x", "y", "z"}}, |
| }, |
| }, |
| }, |
| InnerType: &testdata.InnerType{ |
| Value: []string{"top"}, |
| }, |
| RangeType: &testdata.RangeTypeTimestamp{ |
| End: proto.Int64(time.Now().UnixNano()), |
| }, |
| } |
| b, err := proto.Marshal(mesg) |
| if err != nil { |
| t.Fatalf("proto.Marshal: %v", err) |
| } |
| descriptor := (mesg).ProtoReflect().Descriptor() |
| testProtoNormalization(ctx, t, mwClient, bqClient, dataset, schema, descriptor, b) |
| }) |
| t.Run("WithWellKnownTypes", func(t *testing.T) { |
| t.Parallel() |
| schema := testdata.WithWellKnownTypesSchema |
| mesg := &testdata.WithWellKnownTypes{ |
| Int64Value: proto.Int64(123), |
| WrappedInt64: &wrapperspb.Int64Value{ |
| Value: 456, |
| }, |
| StringValue: []string{"a", "b"}, |
| WrappedString: []*wrapperspb.StringValue{ |
| {Value: "foo"}, |
| {Value: "bar"}, |
| }, |
| } |
| b, err := proto.Marshal(mesg) |
| if err != nil { |
| t.Fatalf("proto.Marshal: %v", err) |
| } |
| descriptor := (mesg).ProtoReflect().Descriptor() |
| testProtoNormalization(ctx, t, mwClient, bqClient, dataset, schema, descriptor, b) |
| }) |
| t.Run("WithExternalEnum", func(t *testing.T) { |
| t.Parallel() |
| schema := testdata.ExternalEnumMessageSchema |
| mesg := &testdata.ExternalEnumMessage{ |
| MsgA: &testdata.EnumMsgA{ |
| Foo: proto.String("foo"), |
| Bar: testdata.ExtEnum_THING.Enum(), |
| }, |
| MsgB: &testdata.EnumMsgB{ |
| Baz: testdata.ExtEnum_OTHER_THING.Enum(), |
| }, |
| } |
| b, err := proto.Marshal(mesg) |
| if err != nil { |
| t.Fatalf("proto.Marshal: %v", err) |
| } |
| descriptor := (mesg).ProtoReflect().Descriptor() |
| testProtoNormalization(ctx, t, mwClient, bqClient, dataset, schema, descriptor, b) |
| }) |
| }) |
| } |
| |
| func testProtoNormalization(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset, schema bigquery.Schema, descriptor protoreflect.MessageDescriptor, sampleRow []byte) { |
| testTable := dataset.Table(tableIDs.New()) |
| if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: schema}); err != nil { |
| t.Fatalf("failed to create test table %q: %v", testTable.FullyQualifiedName(), err) |
| } |
| |
| dp, err := adapt.NormalizeDescriptor(descriptor) |
| if err != nil { |
| t.Fatalf("NormalizeDescriptor: %v", err) |
| } |
| |
| // setup a new stream. |
| ms, err := mwClient.NewManagedStream(ctx, |
| WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)), |
| WithType(DefaultStream), |
| WithSchemaDescriptor(dp), |
| ) |
| if err != nil { |
| t.Fatalf("NewManagedStream: %v", err) |
| } |
| result, err := ms.AppendRows(ctx, [][]byte{sampleRow}) |
| if err != nil { |
| t.Errorf("append failed: %v", err) |
| } |
| |
| _, err = result.GetResult(ctx) |
| if err != nil { |
| t.Errorf("error in response: %v", err) |
| } |
| } |
| |
| func TestIntegration_MultiplexWrites(t *testing.T) { |
| ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer cancel() |
| mwClient, bqClient := getTestClients(ctx, t, |
| WithMultiplexing(), |
| WithMultiplexPoolLimit(2), |
| ) |
| defer mwClient.Close() |
| defer bqClient.Close() |
| |
| dataset, cleanup, err := setupTestDataset(ctx, t, bqClient, "us-east1") |
| if err != nil { |
| t.Fatalf("failed to init test dataset: %v", err) |
| } |
| defer cleanup() |
| |
| wantWrites := 10 |
| |
| testTables := []struct { |
| tbl *bigquery.Table |
| schema bigquery.Schema |
| dp *descriptorpb.DescriptorProto |
| sampleRow []byte |
| constraints []constraintOption |
| }{ |
| { |
| tbl: dataset.Table(tableIDs.New()), |
| schema: testdata.SimpleMessageSchema, |
| dp: func() *descriptorpb.DescriptorProto { |
| m := &testdata.SimpleMessageProto2{} |
| dp, _ := adapt.NormalizeDescriptor(m.ProtoReflect().Descriptor()) |
| return dp |
| }(), |
| sampleRow: func() []byte { |
| msg := &testdata.SimpleMessageProto2{ |
| Name: proto.String("sample_name"), |
| Value: proto.Int64(1001), |
| } |
| b, _ := proto.Marshal(msg) |
| return b |
| }(), |
| constraints: []constraintOption{ |
| withExactRowCount(int64(wantWrites)), |
| withStringValueCount("name", "sample_name", int64(wantWrites)), |
| }, |
| }, |
| { |
| tbl: dataset.Table(tableIDs.New()), |
| schema: testdata.ValidationBaseSchema, |
| dp: func() *descriptorpb.DescriptorProto { |
| m := &testdata.ValidationP2Optional{} |
| dp, _ := adapt.NormalizeDescriptor(m.ProtoReflect().Descriptor()) |
| return dp |
| }(), |
| sampleRow: func() []byte { |
| msg := &testdata.ValidationP2Optional{ |
| Int64Field: proto.Int64(69), |
| StringField: proto.String("validation_string"), |
| } |
| b, _ := proto.Marshal(msg) |
| return b |
| }(), |
| constraints: []constraintOption{ |
| withExactRowCount(int64(wantWrites)), |
| withStringValueCount("string_field", "validation_string", int64(wantWrites)), |
| }, |
| }, |
| { |
| tbl: dataset.Table(tableIDs.New()), |
| schema: testdata.GithubArchiveSchema, |
| dp: func() *descriptorpb.DescriptorProto { |
| m := &testdata.GithubArchiveMessageProto2{} |
| dp, _ := adapt.NormalizeDescriptor(m.ProtoReflect().Descriptor()) |
| return dp |
| }(), |
| sampleRow: func() []byte { |
| msg := &testdata.GithubArchiveMessageProto2{ |
| Payload: proto.String("payload_string"), |
| Id: proto.String("some_id"), |
| } |
| b, _ := proto.Marshal(msg) |
| return b |
| }(), |
| constraints: []constraintOption{ |
| withExactRowCount(int64(wantWrites)), |
| withStringValueCount("payload", "payload_string", int64(wantWrites)), |
| }, |
| }, |
| } |
| |
| // setup tables |
| for _, testTable := range testTables { |
| if err := testTable.tbl.Create(ctx, &bigquery.TableMetadata{Schema: testTable.schema}); err != nil { |
| t.Fatalf("failed to create test table %q: %v", testTable.tbl.FullyQualifiedName(), err) |
| } |
| } |
| |
| var gotFirstPool *connectionPool |
| var results []*AppendResult |
| for i := 0; i < wantWrites; i++ { |
| for k, testTable := range testTables { |
| // create a writer and send a single append |
| ms, err := mwClient.NewManagedStream(ctx, |
| WithDestinationTable(TableParentFromParts(testTable.tbl.ProjectID, testTable.tbl.DatasetID, testTable.tbl.TableID)), |
| WithType(DefaultStream), |
| WithSchemaDescriptor(testTable.dp), |
| EnableWriteRetries(true), |
| ) |
| if err != nil { |
| t.Fatalf("NewManagedStream %d: %v", k, err) |
| } |
| if i == 0 && k == 0 { |
| if ms.pool == nil { |
| t.Errorf("expected a non-nil pool reference for first writer") |
| } |
| gotFirstPool = ms.pool |
| } else { |
| if ms.pool != gotFirstPool { |
| t.Errorf("expected same pool reference, got a different pool") |
| } |
| } |
| defer ms.Close() // we won't clean these up until the end of the test, rather than per use. |
| if err != nil { |
| t.Fatalf("failed to create ManagedStream for table %d on iteration %d: %v", k, i, err) |
| } |
| res, err := ms.AppendRows(ctx, [][]byte{testTable.sampleRow}) |
| if err != nil { |
| t.Fatalf("failed to append to table %d on iteration %d: %v", k, i, err) |
| } |
| results = append(results, res) |
| } |
| } |
| |
| // drain results |
| for k, res := range results { |
| if _, err := res.GetResult(ctx); err != nil { |
| t.Errorf("result %d yielded error: %v", k, err) |
| } |
| } |
| |
| // validate the tables |
| for _, testTable := range testTables { |
| validateTableConstraints(ctx, t, bqClient, testTable.tbl, "", testTable.constraints...) |
| } |
| |
| } |
| |
| func TestIntegration_MingledContexts(t *testing.T) { |
| ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer cancel() |
| mwClient, bqClient := getTestClients(ctx, t, |
| WithMultiplexing(), |
| WithMultiplexPoolLimit(2), |
| ) |
| defer mwClient.Close() |
| defer bqClient.Close() |
| |
| wantLocation := "us-east4" |
| |
| dataset, cleanup, err := setupTestDataset(ctx, t, bqClient, wantLocation) |
| if err != nil { |
| t.Fatalf("failed to init test dataset: %v", err) |
| } |
| defer cleanup() |
| |
| testTable := dataset.Table(tableIDs.New()) |
| if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil { |
| t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err) |
| } |
| |
| m := &testdata.SimpleMessageProto2{} |
| descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor()) |
| |
| numWriters := 4 |
| contexts := make([]context.Context, numWriters) |
| cancels := make([]context.CancelFunc, numWriters) |
| writers := make([]*ManagedStream, numWriters) |
| for i := 0; i < numWriters; i++ { |
| contexts[i], cancels[i] = context.WithCancel(ctx) |
| ms, err := mwClient.NewManagedStream(contexts[i], |
| WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)), |
| WithType(DefaultStream), |
| WithSchemaDescriptor(descriptorProto), |
| ) |
| if err != nil { |
| t.Fatalf("instantating writer %d failed: %v", i, err) |
| } |
| writers[i] = ms |
| } |
| |
| sampleRow, err := proto.Marshal(&testdata.SimpleMessageProto2{ |
| Name: proto.String("datafield"), |
| Value: proto.Int64(1234), |
| }) |
| if err != nil { |
| t.Fatalf("failed to generate sample row") |
| } |
| |
| for i := 0; i < numWriters; i++ { |
| res, err := writers[i].AppendRows(contexts[i], [][]byte{sampleRow}) |
| if err != nil { |
| t.Errorf("initial write on %d failed: %v", i, err) |
| } else { |
| if _, err := res.GetResult(contexts[i]); err != nil { |
| t.Errorf("GetResult initial write %d: %v", i, err) |
| } |
| } |
| } |
| |
| // cancel the first context |
| cancels[0]() |
| // repeat writes on all other writers with the second context |
| for i := 1; i < numWriters; i++ { |
| res, err := writers[i].AppendRows(contexts[i], [][]byte{sampleRow}) |
| if err != nil { |
| t.Errorf("second write on %d failed: %v", i, err) |
| } else { |
| if _, err := res.GetResult(contexts[1]); err != nil { |
| t.Errorf("GetResult err on second write %d: %v", i, err) |
| } |
| } |
| } |
| |
| // check that writes to the first writer should fail, even with a valid request context. |
| if _, err := writers[0].AppendRows(contexts[1], [][]byte{sampleRow}); err == nil { |
| t.Errorf("write succeeded on first writer when it should have failed") |
| } |
| |
| // cancel the second context as well, ensure writer created with good context and bad request context fails |
| cancels[1]() |
| if _, err := writers[2].AppendRows(contexts[1], [][]byte{sampleRow}); err == nil { |
| t.Errorf("write succeeded on third writer with a bad request context") |
| } |
| |
| // repeat writes on remaining good writers/contexts |
| for i := 2; i < numWriters; i++ { |
| res, err := writers[i].AppendRows(contexts[i], [][]byte{sampleRow}) |
| if err != nil { |
| t.Errorf("second write on %d failed: %v", i, err) |
| } else { |
| if _, err := res.GetResult(contexts[i]); err != nil { |
| t.Errorf("GetResult err on second write %d: %v", i, err) |
| } |
| } |
| } |
| } |