blob: 4a7f2470a6ad2483c249fdec74f0e16259308211 [file] [log] [blame] [edit]
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package managedwriter
import (
"context"
"fmt"
"math"
"sync"
"testing"
"time"
"cloud.google.com/go/bigquery"
"cloud.google.com/go/bigquery/storage/apiv1/storagepb"
"cloud.google.com/go/bigquery/storage/managedwriter/adapt"
"cloud.google.com/go/bigquery/storage/managedwriter/testdata"
"cloud.google.com/go/internal/testutil"
"cloud.google.com/go/internal/uid"
"github.com/google/go-cmp/cmp"
"github.com/googleapis/gax-go/v2/apierror"
"go.opencensus.io/stats/view"
"google.golang.org/api/option"
"google.golang.org/grpc/codes"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/testing/protocmp"
"google.golang.org/protobuf/types/descriptorpb"
"google.golang.org/protobuf/types/dynamicpb"
"google.golang.org/protobuf/types/known/wrapperspb"
)
var (
datasetIDs = uid.NewSpace("managedwriter_test_dataset", &uid.Options{Sep: '_', Time: time.Now()})
tableIDs = uid.NewSpace("table", &uid.Options{Sep: '_', Time: time.Now()})
defaultTestTimeout = 90 * time.Second
)
// our test data has cardinality 5 for names, 3 for values
var testSimpleData = []*testdata.SimpleMessageProto2{
{Name: proto.String("one"), Value: proto.Int64(1)},
{Name: proto.String("two"), Value: proto.Int64(2)},
{Name: proto.String("three"), Value: proto.Int64(3)},
{Name: proto.String("four"), Value: proto.Int64(1)},
{Name: proto.String("five"), Value: proto.Int64(2)},
}
func getTestClients(ctx context.Context, t *testing.T, opts ...option.ClientOption) (*Client, *bigquery.Client) {
if testing.Short() {
t.Skip("Integration tests skipped in short mode")
}
projID := testutil.ProjID()
if projID == "" {
t.Skip("Integration tests skipped. See CONTRIBUTING.md for details")
}
ts := testutil.TokenSource(ctx, "https://www.googleapis.com/auth/bigquery")
if ts == nil {
t.Skip("Integration tests skipped. See CONTRIBUTING.md for details")
}
opts = append(opts, option.WithTokenSource(ts))
client, err := NewClient(ctx, projID, opts...)
if err != nil {
t.Fatalf("couldn't create managedwriter client: %v", err)
}
bqClient, err := bigquery.NewClient(ctx, projID, opts...)
if err != nil {
t.Fatalf("couldn't create bigquery client: %v", err)
}
return client, bqClient
}
// setupTestDataset generates a unique dataset for testing, and a cleanup that can be deferred.
func setupTestDataset(ctx context.Context, t *testing.T, bqc *bigquery.Client, location string) (ds *bigquery.Dataset, cleanup func(), err error) {
dataset := bqc.Dataset(datasetIDs.New())
if err := dataset.Create(ctx, &bigquery.DatasetMetadata{Location: location}); err != nil {
return nil, nil, err
}
return dataset, func() {
if err := dataset.DeleteWithContents(ctx); err != nil {
t.Logf("could not cleanup dataset %q: %v", dataset.DatasetID, err)
}
}, nil
}
// setupDynamicDescriptors aids testing when not using a supplied proto
func setupDynamicDescriptors(t *testing.T, schema bigquery.Schema) (protoreflect.MessageDescriptor, *descriptorpb.DescriptorProto) {
convertedSchema, err := adapt.BQSchemaToStorageTableSchema(schema)
if err != nil {
t.Fatalf("adapt.BQSchemaToStorageTableSchema: %v", err)
}
descriptor, err := adapt.StorageSchemaToProto2Descriptor(convertedSchema, "root")
if err != nil {
t.Fatalf("adapt.StorageSchemaToDescriptor: %v", err)
}
messageDescriptor, ok := descriptor.(protoreflect.MessageDescriptor)
if !ok {
t.Fatalf("adapted descriptor is not a message descriptor")
}
dp, err := adapt.NormalizeDescriptor(messageDescriptor)
if err != nil {
t.Fatalf("NormalizeDescriptor: %v", err)
}
return messageDescriptor, dp
}
func TestIntegration_ClientGetWriteStream(t *testing.T) {
ctx := context.Background()
mwClient, bqClient := getTestClients(ctx, t)
defer mwClient.Close()
defer bqClient.Close()
wantLocation := "us-east1"
dataset, cleanup, err := setupTestDataset(ctx, t, bqClient, wantLocation)
if err != nil {
t.Fatalf("failed to init test dataset: %v", err)
}
defer cleanup()
testTable := dataset.Table(tableIDs.New())
if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil {
t.Fatalf("failed to create test table %q: %v", testTable.FullyQualifiedName(), err)
}
apiSchema, _ := adapt.BQSchemaToStorageTableSchema(testdata.SimpleMessageSchema)
parent := TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)
explicitStream, err := mwClient.CreateWriteStream(ctx, &storagepb.CreateWriteStreamRequest{
Parent: parent,
WriteStream: &storagepb.WriteStream{
Type: storagepb.WriteStream_PENDING,
},
})
if err != nil {
t.Fatalf("CreateWriteStream: %v", err)
}
testCases := []struct {
description string
isDefault bool
streamID string
wantType storagepb.WriteStream_Type
}{
{
description: "default",
isDefault: true,
streamID: fmt.Sprintf("%s/streams/_default", parent),
wantType: storagepb.WriteStream_COMMITTED,
},
{
description: "explicit pending",
streamID: explicitStream.Name,
wantType: storagepb.WriteStream_PENDING,
},
}
for _, tc := range testCases {
for _, fullView := range []bool{false, true} {
info, err := mwClient.getWriteStream(ctx, tc.streamID, fullView)
if err != nil {
t.Errorf("%s (%T): getWriteStream failed: %v", tc.description, fullView, err)
}
if info.GetType() != tc.wantType {
t.Errorf("%s (%T): got type %d, want type %d", tc.description, fullView, info.GetType(), tc.wantType)
}
if info.GetLocation() != wantLocation {
t.Errorf("%s (%T) view: got location %s, want location %s", tc.description, fullView, info.GetLocation(), wantLocation)
}
if info.GetCommitTime() != nil {
t.Errorf("%s (%T)expected empty commit time, got %v", tc.description, fullView, info.GetCommitTime())
}
if !tc.isDefault {
if info.GetCreateTime() == nil {
t.Errorf("%s (%T): expected create time, was empty", tc.description, fullView)
}
} else {
if info.GetCreateTime() != nil {
t.Errorf("%s (%T): expected empty time, got %v", tc.description, fullView, info.GetCreateTime())
}
}
if !fullView {
if info.GetTableSchema() != nil {
t.Errorf("%s (%T) basic view: expected no schema, was populated", tc.description, fullView)
}
} else {
if diff := cmp.Diff(info.GetTableSchema(), apiSchema, protocmp.Transform()); diff != "" {
t.Errorf("%s (%T) schema mismatch: -got, +want:\n%s", tc.description, fullView, diff)
}
}
}
}
}
func TestIntegration_ManagedWriter(t *testing.T) {
mwClient, bqClient := getTestClients(context.Background(), t)
defer mwClient.Close()
defer bqClient.Close()
dataset, cleanup, err := setupTestDataset(context.Background(), t, bqClient, "asia-east1")
if err != nil {
t.Fatalf("failed to init test dataset: %v", err)
}
defer cleanup()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
t.Run("group", func(t *testing.T) {
t.Run("DefaultStream", func(t *testing.T) {
t.Parallel()
testDefaultStream(ctx, t, mwClient, bqClient, dataset)
})
t.Run("DefaultStreamDynamicJSON", func(t *testing.T) {
t.Parallel()
testDefaultStreamDynamicJSON(ctx, t, mwClient, bqClient, dataset)
})
t.Run("CommittedStream", func(t *testing.T) {
t.Parallel()
testCommittedStream(ctx, t, mwClient, bqClient, dataset)
})
t.Run("ErrorBehaviors", func(t *testing.T) {
t.Parallel()
testErrorBehaviors(ctx, t, mwClient, bqClient, dataset)
})
t.Run("BufferedStream", func(t *testing.T) {
t.Parallel()
testBufferedStream(ctx, t, mwClient, bqClient, dataset)
})
t.Run("PendingStream", func(t *testing.T) {
t.Parallel()
testPendingStream(ctx, t, mwClient, bqClient, dataset)
})
t.Run("SimpleCDC", func(t *testing.T) {
t.Parallel()
testSimpleCDC(ctx, t, mwClient, bqClient, dataset)
})
t.Run("Instrumentation", func(t *testing.T) {
// Don't run this in parallel, we only want to collect stats from this subtest.
testInstrumentation(ctx, t, mwClient, bqClient, dataset)
})
t.Run("TestLargeInsertNoRetry", func(t *testing.T) {
testLargeInsertNoRetry(ctx, t, mwClient, bqClient, dataset)
})
t.Run("TestLargeInsertWithRetry", func(t *testing.T) {
testLargeInsertWithRetry(ctx, t, mwClient, bqClient, dataset)
})
t.Run("DefaultValueHandling", func(t *testing.T) {
testDefaultValueHandling(ctx, t, mwClient, bqClient, dataset)
})
})
}
func TestIntegration_SchemaEvolution(t *testing.T) {
testcases := []struct {
desc string
clientOpts []option.ClientOption
writerOpts []WriterOption
}{
{
desc: "Simplex_Committed",
writerOpts: []WriterOption{
WithType(CommittedStream),
},
},
{
desc: "Simplex_Default",
writerOpts: []WriterOption{
WithType(DefaultStream),
},
},
{
desc: "Multiplex_Default",
clientOpts: []option.ClientOption{
WithMultiplexing(),
WithMultiplexPoolLimit(2),
},
writerOpts: []WriterOption{
WithType(DefaultStream),
},
},
}
for _, tc := range testcases {
mwClient, bqClient := getTestClients(context.Background(), t, tc.clientOpts...)
defer mwClient.Close()
defer bqClient.Close()
dataset, cleanup, err := setupTestDataset(context.Background(), t, bqClient, "asia-east1")
if err != nil {
t.Fatalf("failed to init test dataset: %v", err)
}
defer cleanup()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
t.Run(tc.desc, func(t *testing.T) {
testSchemaEvolution(ctx, t, mwClient, bqClient, dataset, tc.writerOpts...)
})
}
}
func testDefaultStream(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
testTable := dataset.Table(tableIDs.New())
if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil {
t.Fatalf("failed to create test table %q: %v", testTable.FullyQualifiedName(), err)
}
// We'll use a precompiled test proto, but we need it's corresponding descriptorproto representation
// to send as the stream's schema.
m := &testdata.SimpleMessageProto2{}
descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())
// setup a new stream.
ms, err := mwClient.NewManagedStream(ctx,
WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
WithType(DefaultStream),
WithSchemaDescriptor(descriptorProto),
)
if err != nil {
t.Fatalf("NewManagedStream: %v", err)
}
if ms.id == "" {
t.Errorf("managed stream is missing ID")
}
validateTableConstraints(ctx, t, bqClient, testTable, "before send",
withExactRowCount(0))
// First, send the test rows individually.
var result *AppendResult
for k, mesg := range testSimpleData {
b, err := proto.Marshal(mesg)
if err != nil {
t.Errorf("failed to marshal message %d: %v", k, err)
}
data := [][]byte{b}
result, err = ms.AppendRows(ctx, data)
if err != nil {
t.Errorf("single-row append %d failed: %v", k, err)
}
}
// Wait for the result to indicate ready, then validate.
o, err := result.GetResult(ctx)
if err != nil {
t.Errorf("result error for last send: %v", err)
}
if o != NoStreamOffset {
t.Errorf("offset mismatch, got %d want %d", o, NoStreamOffset)
}
validateTableConstraints(ctx, t, bqClient, testTable, "after first send round",
withExactRowCount(int64(len(testSimpleData))),
withDistinctValues("name", int64(len(testSimpleData))))
// Now, send the test rows grouped into in a single append.
var data [][]byte
for k, mesg := range testSimpleData {
b, err := proto.Marshal(mesg)
if err != nil {
t.Errorf("failed to marshal message %d: %v", k, err)
}
data = append(data, b)
}
result, err = ms.AppendRows(ctx, data)
if err != nil {
t.Errorf("grouped-row append failed: %v", err)
}
// Wait for the result to indicate ready, then validate again. Our total rows have increased, but
// cardinality should not.
o, err = result.GetResult(ctx)
if err != nil {
t.Errorf("result error for last send: %v", err)
}
if o != NoStreamOffset {
t.Errorf("offset mismatch, got %d want %d", o, NoStreamOffset)
}
validateTableConstraints(ctx, t, bqClient, testTable, "after second send round",
withExactRowCount(int64(2*len(testSimpleData))),
withDistinctValues("name", int64(len(testSimpleData))),
withDistinctValues("value", int64(3)),
)
}
func testDefaultStreamDynamicJSON(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
testTable := dataset.Table(tableIDs.New())
if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.GithubArchiveSchema}); err != nil {
t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
}
md, descriptorProto := setupDynamicDescriptors(t, testdata.GithubArchiveSchema)
ms, err := mwClient.NewManagedStream(ctx,
WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
WithType(DefaultStream),
WithSchemaDescriptor(descriptorProto),
)
if err != nil {
t.Fatalf("NewManagedStream: %v", err)
}
validateTableConstraints(ctx, t, bqClient, testTable, "before send",
withExactRowCount(0))
sampleJSONData := [][]byte{
[]byte(`{"type": "foo", "public": true, "repo": {"id": 99, "name": "repo_name_1", "url": "https://one.example.com"}}`),
[]byte(`{"type": "bar", "public": false, "repo": {"id": 101, "name": "repo_name_2", "url": "https://two.example.com"}}`),
[]byte(`{"type": "baz", "public": true, "repo": {"id": 456, "name": "repo_name_3", "url": "https://three.example.com"}}`),
[]byte(`{"type": "wow", "public": false, "repo": {"id": 123, "name": "repo_name_4", "url": "https://four.example.com"}}`),
[]byte(`{"type": "yay", "public": true, "repo": {"name": "repo_name_5", "url": "https://five.example.com"}}`),
}
var result *AppendResult
for k, v := range sampleJSONData {
message := dynamicpb.NewMessage(md)
// First, json->proto message
err = protojson.Unmarshal(v, message)
if err != nil {
t.Fatalf("failed to Unmarshal json message for row %d: %v", k, err)
}
// Then, proto message -> bytes.
b, err := proto.Marshal(message)
if err != nil {
t.Fatalf("failed to marshal proto bytes for row %d: %v", k, err)
}
result, err = ms.AppendRows(ctx, [][]byte{b})
if err != nil {
t.Errorf("single-row append %d failed: %v", k, err)
}
}
// Wait for the result to indicate ready, then validate.
o, err := result.GetResult(ctx)
if err != nil {
t.Errorf("result error for last send: %v", err)
}
if o != NoStreamOffset {
t.Errorf("offset mismatch, got %d want %d", o, NoStreamOffset)
}
validateTableConstraints(ctx, t, bqClient, testTable, "after send",
withExactRowCount(int64(len(sampleJSONData))),
withDistinctValues("type", int64(len(sampleJSONData))),
withDistinctValues("public", int64(2)))
}
func testBufferedStream(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
testTable := dataset.Table(tableIDs.New())
if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil {
t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
}
m := &testdata.SimpleMessageProto2{}
descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())
ms, err := mwClient.NewManagedStream(ctx,
WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
WithType(BufferedStream),
WithSchemaDescriptor(descriptorProto),
)
if err != nil {
t.Fatalf("NewManagedStream: %v", err)
}
info, err := ms.c.getWriteStream(ctx, ms.streamSettings.streamID, false)
if err != nil {
t.Errorf("couldn't get stream info: %v", err)
}
if info.GetType().String() != string(ms.StreamType()) {
t.Errorf("mismatch on stream type, got %s want %s", info.GetType(), ms.StreamType())
}
validateTableConstraints(ctx, t, bqClient, testTable, "before send",
withExactRowCount(0))
var expectedRows int64
for k, mesg := range testSimpleData {
b, err := proto.Marshal(mesg)
if err != nil {
t.Fatalf("failed to marshal message %d: %v", k, err)
}
data := [][]byte{b}
results, err := ms.AppendRows(ctx, data)
if err != nil {
t.Fatalf("single-row append %d failed: %v", k, err)
}
// Wait for acknowledgement.
offset, err := results.GetResult(ctx)
if err != nil {
t.Fatalf("got error from pending result %d: %v", k, err)
}
validateTableConstraints(ctx, t, bqClient, testTable, fmt.Sprintf("before flush %d", k),
withExactRowCount(expectedRows),
withDistinctValues("name", expectedRows))
// move offset and re-validate.
flushOffset, err := ms.FlushRows(ctx, offset)
if err != nil {
t.Errorf("failed to flush offset to %d: %v", offset, err)
}
expectedRows = flushOffset + 1
validateTableConstraints(ctx, t, bqClient, testTable, fmt.Sprintf("after flush %d", k),
withExactRowCount(expectedRows),
withDistinctValues("name", expectedRows))
}
}
func testCommittedStream(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
testTable := dataset.Table(tableIDs.New())
if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil {
t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
}
m := &testdata.SimpleMessageProto2{}
descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())
// setup a new stream.
ms, err := mwClient.NewManagedStream(ctx,
WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
WithType(CommittedStream),
WithSchemaDescriptor(descriptorProto),
)
if err != nil {
t.Fatalf("NewManagedStream: %v", err)
}
validateTableConstraints(ctx, t, bqClient, testTable, "before send",
withExactRowCount(0))
var result *AppendResult
for k, mesg := range testSimpleData {
b, err := proto.Marshal(mesg)
if err != nil {
t.Errorf("failed to marshal message %d: %v", k, err)
}
data := [][]byte{b}
result, err = ms.AppendRows(ctx, data, WithOffset(int64(k)))
if err != nil {
t.Errorf("single-row append %d failed: %v", k, err)
}
}
// Wait for the result to indicate ready, then validate.
o, err := result.GetResult(ctx)
if err != nil {
t.Errorf("result error for last send: %v", err)
}
wantOffset := int64(len(testSimpleData) - 1)
if o != wantOffset {
t.Errorf("offset mismatch, got %d want %d", o, wantOffset)
}
validateTableConstraints(ctx, t, bqClient, testTable, "after send",
withExactRowCount(int64(len(testSimpleData))))
}
// testSimpleCDC demonstrates basic Change Data Capture (CDC) functionality. We add an initial set of
// rows to a table, then use CDC to apply updates.
func testSimpleCDC(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
testTable := dataset.Table(tableIDs.New())
if err := testTable.Create(ctx, &bigquery.TableMetadata{
Schema: testdata.ExampleEmployeeSchema,
Clustering: &bigquery.Clustering{
Fields: []string{"id"},
},
}); err != nil {
t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
}
// Mark the primary key using an ALTER TABLE DDL.
tableIdentifier, _ := testTable.Identifier(bigquery.StandardSQLID)
sql := fmt.Sprintf("ALTER TABLE %s ADD PRIMARY KEY(id) NOT ENFORCED;", tableIdentifier)
if _, err := bqClient.Query(sql).Read(ctx); err != nil {
t.Fatalf("failed ALTER TABLE: %v", err)
}
m := &testdata.ExampleEmployeeCDC{}
descriptorProto, err := adapt.NormalizeDescriptor(m.ProtoReflect().Descriptor())
if err != nil {
t.Fatalf("NormalizeDescriptor: %v", err)
}
// Setup an initial writer for sending initial inserts.
writer, err := mwClient.NewManagedStream(ctx,
WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
WithType(CommittedStream),
WithSchemaDescriptor(descriptorProto),
)
if err != nil {
t.Fatalf("NewManagedStream: %v", err)
}
defer writer.Close()
validateTableConstraints(ctx, t, bqClient, testTable, "before send",
withExactRowCount(0))
initialEmployees := []*testdata.ExampleEmployeeCDC{
{
Id: proto.Int64(1),
Username: proto.String("alice"),
GivenName: proto.String("Alice CEO"),
Departments: []string{"product", "support", "internal"},
Salary: proto.Int64(1),
XCHANGE_TYPE: proto.String("INSERT"),
},
{
Id: proto.Int64(2),
Username: proto.String("bob"),
GivenName: proto.String("Bob Bobberson"),
Departments: []string{"research"},
Salary: proto.Int64(100000),
XCHANGE_TYPE: proto.String("INSERT"),
},
{
Id: proto.Int64(3),
Username: proto.String("clarice"),
GivenName: proto.String("Clarice Clearwater"),
Departments: []string{"product"},
Salary: proto.Int64(100001),
XCHANGE_TYPE: proto.String("INSERT"),
},
}
// First append inserts all the initial employees.
data := make([][]byte, len(initialEmployees))
for k, mesg := range initialEmployees {
b, err := proto.Marshal(mesg)
if err != nil {
t.Fatalf("failed to marshal record %d: %v", k, err)
}
data[k] = b
}
result, err := writer.AppendRows(ctx, data)
if err != nil {
t.Errorf("initial insert failed (%s): %v", writer.StreamName(), err)
}
if _, err := result.GetResult(ctx); err != nil {
t.Errorf("result error for initial insert (%s): %v", writer.StreamName(), err)
}
validateTableConstraints(ctx, t, bqClient, testTable, "initial inserts",
withExactRowCount(int64(len(initialEmployees))))
// Create a second writer for applying modifications.
updateWriter, err := mwClient.NewManagedStream(ctx,
WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
WithType(DefaultStream),
WithSchemaDescriptor(descriptorProto),
)
if err != nil {
t.Fatalf("NewManagedStream: %v", err)
}
defer updateWriter.Close()
// Change bob via an UPSERT CDC
newBob := proto.Clone(initialEmployees[1]).(*testdata.ExampleEmployeeCDC)
newBob.Salary = proto.Int64(105000)
newBob.Departments = []string{"research", "product"}
newBob.XCHANGE_TYPE = proto.String("UPSERT")
b, err := proto.Marshal(newBob)
if err != nil {
t.Fatalf("failed to marshal new bob: %v", err)
}
result, err = updateWriter.AppendRows(ctx, [][]byte{b})
if err != nil {
t.Fatalf("bob modification failed (%s): %v", updateWriter.StreamName(), err)
}
if _, err := result.GetResult(ctx); err != nil {
t.Fatalf("result error for bob modification (%s): %v", updateWriter.StreamName(), err)
}
validateTableConstraints(ctx, t, bqClient, testTable, "after bob modification",
withExactRowCount(int64(len(initialEmployees))),
withDistinctValues("id", int64(len(initialEmployees))))
// remote clarice via DELETE CDC
removeClarice := &testdata.ExampleEmployeeCDC{
Id: proto.Int64(3),
XCHANGE_TYPE: proto.String("DELETE"),
}
b, err = proto.Marshal(removeClarice)
if err != nil {
t.Fatalf("failed to marshal clarice removal: %v", err)
}
result, err = updateWriter.AppendRows(ctx, [][]byte{b})
if err != nil {
t.Fatalf("clarice removal failed (%s): %v", updateWriter.StreamName(), err)
}
if _, err := result.GetResult(ctx); err != nil {
t.Fatalf("result error for clarice removal (%s): %v", updateWriter.StreamName(), err)
}
validateTableConstraints(ctx, t, bqClient, testTable, "after clarice removal",
withExactRowCount(int64(len(initialEmployees))-1))
}
// testErrorBehaviors intentionally issues problematic requests to verify error behaviors.
func testErrorBehaviors(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
testTable := dataset.Table(tableIDs.New())
if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil {
t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
}
m := &testdata.SimpleMessageProto2{}
descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())
// setup a new stream.
ms, err := mwClient.NewManagedStream(ctx,
WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
WithType(CommittedStream),
WithSchemaDescriptor(descriptorProto),
)
if err != nil {
t.Fatalf("NewManagedStream: %v", err)
}
validateTableConstraints(ctx, t, bqClient, testTable, "before send",
withExactRowCount(0))
data := make([][]byte, len(testSimpleData))
for k, mesg := range testSimpleData {
b, err := proto.Marshal(mesg)
if err != nil {
t.Errorf("failed to marshal message %d: %v", k, err)
}
data[k] = b
}
// Send an append at an invalid offset.
result, err := ms.AppendRows(ctx, data, WithOffset(99))
if err != nil {
t.Errorf("failed to send append: %v", err)
}
//
off, err := result.GetResult(ctx)
if err == nil {
t.Errorf("expected error, got offset %d", off)
}
apiErr, ok := apierror.FromError(err)
if !ok {
t.Errorf("expected apierror, got %T: %v", err, err)
}
se := &storagepb.StorageError{}
e := apiErr.Details().ExtractProtoMessage(se)
if e != nil {
t.Errorf("expected storage error, but extraction failed: %v", e)
}
wantCode := storagepb.StorageError_OFFSET_OUT_OF_RANGE
if se.GetCode() != wantCode {
t.Errorf("wanted %s, got %s", wantCode.String(), se.GetCode().String())
}
// Send "real" append to advance the offset.
result, err = ms.AppendRows(ctx, data, WithOffset(0))
if err != nil {
t.Errorf("failed to send append: %v", err)
}
off, err = result.GetResult(ctx)
if err != nil {
t.Errorf("expected offset, got error %v", err)
}
wantOffset := int64(0)
if off != wantOffset {
t.Errorf("offset mismatch, got %d want %d", off, wantOffset)
}
// Now, send at the start offset again.
result, err = ms.AppendRows(ctx, data, WithOffset(0))
if err != nil {
t.Errorf("failed to send append: %v", err)
}
off, err = result.GetResult(ctx)
if err == nil {
t.Errorf("expected error, got offset %d", off)
}
apiErr, ok = apierror.FromError(err)
if !ok {
t.Errorf("expected apierror, got %T: %v", err, err)
}
se = &storagepb.StorageError{}
e = apiErr.Details().ExtractProtoMessage(se)
if e != nil {
t.Errorf("expected storage error, but extraction failed: %v", e)
}
wantCode = storagepb.StorageError_OFFSET_ALREADY_EXISTS
if se.GetCode() != wantCode {
t.Errorf("wanted %s, got %s", wantCode.String(), se.GetCode().String())
}
// Finalize the stream.
if _, err := ms.Finalize(ctx); err != nil {
t.Errorf("Finalize had error: %v", err)
}
// Send another append, which is disallowed for finalized streams.
result, err = ms.AppendRows(ctx, data)
if err != nil {
t.Errorf("failed to send append: %v", err)
}
off, err = result.GetResult(ctx)
if err == nil {
t.Errorf("expected error, got offset %d", off)
}
apiErr, ok = apierror.FromError(err)
if !ok {
t.Errorf("expected apierror, got %T: %v", err, err)
}
se = &storagepb.StorageError{}
e = apiErr.Details().ExtractProtoMessage(se)
if e != nil {
t.Errorf("expected storage error, but extraction failed: %v", e)
}
wantCode = storagepb.StorageError_STREAM_FINALIZED
if se.GetCode() != wantCode {
t.Errorf("wanted %s, got %s", wantCode.String(), se.GetCode().String())
}
}
func testPendingStream(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
testTable := dataset.Table(tableIDs.New())
if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil {
t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
}
m := &testdata.SimpleMessageProto2{}
descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())
ms, err := mwClient.NewManagedStream(ctx,
WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
WithType(PendingStream),
WithSchemaDescriptor(descriptorProto),
)
if err != nil {
t.Fatalf("NewManagedStream: %v", err)
}
validateTableConstraints(ctx, t, bqClient, testTable, "before send",
withExactRowCount(0))
// Send data.
var result *AppendResult
for k, mesg := range testSimpleData {
b, err := proto.Marshal(mesg)
if err != nil {
t.Errorf("failed to marshal message %d: %v", k, err)
}
data := [][]byte{b}
result, err = ms.AppendRows(ctx, data, WithOffset(int64(k)))
if err != nil {
t.Errorf("single-row append %d failed: %v", k, err)
}
// Be explicit about waiting/checking each response.
off, err := result.GetResult(ctx)
if err != nil {
t.Errorf("response %d error: %v", k, err)
}
if off != int64(k) {
t.Errorf("offset mismatch, got %d want %d", off, k)
}
}
wantRows := int64(len(testSimpleData))
// Mark stream complete.
trackedOffset, err := ms.Finalize(ctx)
if err != nil {
t.Errorf("Finalize: %v", err)
}
if trackedOffset != wantRows {
t.Errorf("Finalize mismatched offset, got %d want %d", trackedOffset, wantRows)
}
// Commit stream and validate.
req := &storagepb.BatchCommitWriteStreamsRequest{
Parent: TableParentFromStreamName(ms.StreamName()),
WriteStreams: []string{ms.StreamName()},
}
resp, err := mwClient.BatchCommitWriteStreams(ctx, req)
if err != nil {
t.Errorf("client.BatchCommit: %v", err)
}
if len(resp.StreamErrors) > 0 {
t.Errorf("stream errors present: %v", resp.StreamErrors)
}
validateTableConstraints(ctx, t, bqClient, testTable, "after send",
withExactRowCount(int64(len(testSimpleData))))
}
func testLargeInsertNoRetry(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
testTable := dataset.Table(tableIDs.New())
if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil {
t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
}
m := &testdata.SimpleMessageProto2{}
descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())
ms, err := mwClient.NewManagedStream(ctx,
WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
WithType(CommittedStream),
WithSchemaDescriptor(descriptorProto),
)
if err != nil {
t.Fatalf("NewManagedStream: %v", err)
}
validateTableConstraints(ctx, t, bqClient, testTable, "before send",
withExactRowCount(0))
// Construct a Very Large request.
var data [][]byte
targetSize := 11 * 1024 * 1024 // 11 MB
b, err := proto.Marshal(testSimpleData[0])
if err != nil {
t.Errorf("failed to marshal message: %v", err)
}
numRows := targetSize / len(b)
data = make([][]byte, numRows)
for i := 0; i < numRows; i++ {
data[i] = b
}
result, err := ms.AppendRows(ctx, data, WithOffset(0))
if err != nil {
t.Errorf("single append failed: %v", err)
}
_, err = result.GetResult(ctx)
if err != nil {
apiErr, ok := apierror.FromError(err)
if !ok {
t.Errorf("GetResult error was not an instance of ApiError")
}
status := apiErr.GRPCStatus()
if status.Code() != codes.InvalidArgument {
t.Errorf("expected InvalidArgument status, got %v", status)
}
}
// our next append is small and should succeed.
result, err = ms.AppendRows(ctx, [][]byte{b})
if err != nil {
t.Fatalf("second append failed: %v", err)
}
_, err = result.GetResult(ctx)
if err != nil {
t.Errorf("failure result from second append: %v", err)
}
validateTableConstraints(ctx, t, bqClient, testTable, "final",
withExactRowCount(1))
}
func testLargeInsertWithRetry(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
testTable := dataset.Table(tableIDs.New())
if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil {
t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
}
m := &testdata.SimpleMessageProto2{}
descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())
ms, err := mwClient.NewManagedStream(ctx,
WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
WithType(CommittedStream),
WithSchemaDescriptor(descriptorProto),
EnableWriteRetries(true),
)
if err != nil {
t.Fatalf("NewManagedStream: %v", err)
}
validateTableConstraints(ctx, t, bqClient, testTable, "before send",
withExactRowCount(0))
// Construct a Very Large request.
var data [][]byte
targetSize := 11 * 1024 * 1024 // 11 MB
b, err := proto.Marshal(testSimpleData[0])
if err != nil {
t.Errorf("failed to marshal message: %v", err)
}
numRows := targetSize / len(b)
data = make([][]byte, numRows)
for i := 0; i < numRows; i++ {
data[i] = b
}
result, err := ms.AppendRows(ctx, data, WithOffset(0))
if err != nil {
t.Errorf("single append failed: %v", err)
}
_, err = result.GetResult(ctx)
if err != nil {
apiErr, ok := apierror.FromError(err)
if !ok {
t.Errorf("GetResult error was not an instance of ApiError")
}
status := apiErr.GRPCStatus()
if status.Code() != codes.InvalidArgument {
t.Errorf("expected InvalidArgument status, got %v", status)
}
}
// The second append will succeed.
result, err = ms.AppendRows(ctx, [][]byte{b})
if err != nil {
t.Fatalf("second append expected to succeed, got error: %v", err)
}
_, err = result.GetResult(ctx)
if err != nil {
t.Errorf("failure result from second append: %v", err)
}
if attempts, _ := result.TotalAttempts(ctx); attempts != 1 {
t.Errorf("expected 1 attempts, got %d", attempts)
}
}
func testInstrumentation(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
testedViews := []*view.View{
AppendRequestsView,
AppendResponsesView,
AppendClientOpenView,
}
if err := view.Register(testedViews...); err != nil {
t.Fatalf("couldn't register views: %v", err)
}
testTable := dataset.Table(tableIDs.New())
if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil {
t.Fatalf("failed to create test table %q: %v", testTable.FullyQualifiedName(), err)
}
m := &testdata.SimpleMessageProto2{}
descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())
// setup a new stream.
ms, err := mwClient.NewManagedStream(ctx,
WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
WithType(DefaultStream),
WithSchemaDescriptor(descriptorProto),
)
if err != nil {
t.Fatalf("NewManagedStream: %v", err)
}
var result *AppendResult
for k, mesg := range testSimpleData {
b, err := proto.Marshal(mesg)
if err != nil {
t.Errorf("failed to marshal message %d: %v", k, err)
}
data := [][]byte{b}
result, err = ms.AppendRows(ctx, data)
if err != nil {
t.Errorf("single-row append %d failed: %v", k, err)
}
}
// Wait for the result to indicate ready.
result.Ready()
// Ick. Stats reporting can't force flushing, and there's a race here. Sleep to give the recv goroutine a chance
// to report.
time.Sleep(time.Second)
// metric to key tag names
wantTags := map[string][]string{
"cloud.google.com/go/bigquery/storage/managedwriter/stream_open_count": {"error"},
"cloud.google.com/go/bigquery/storage/managedwriter/stream_open_retry_count": nil,
"cloud.google.com/go/bigquery/storage/managedwriter/append_requests": {"streamID"},
"cloud.google.com/go/bigquery/storage/managedwriter/append_request_bytes": {"streamID"},
"cloud.google.com/go/bigquery/storage/managedwriter/append_request_errors": {"streamID"},
"cloud.google.com/go/bigquery/storage/managedwriter/append_rows": {"streamID"},
}
for _, tv := range testedViews {
// Attempt to further improve race failures by retrying metrics retrieval.
metricData, err := func() ([]*view.Row, error) {
attempt := 0
for {
data, err := view.RetrieveData(tv.Name)
attempt = attempt + 1
if attempt > 5 {
return data, err
}
if err == nil && len(data) == 1 {
return data, err
}
time.Sleep(time.Duration(attempt) * 500 * time.Millisecond)
}
}()
if err != nil {
t.Errorf("view %q RetrieveData: %v", tv.Name, err)
}
if mlen := len(metricData); mlen != 1 {
t.Errorf("%q: expected 1 row of metrics, got %d", tv.Name, mlen)
continue
}
if wantKeys, ok := wantTags[tv.Name]; ok {
if wantKeys == nil {
if n := len(tv.TagKeys); n != 0 {
t.Errorf("expected view %q to have no keys, but %d present", tv.Name, n)
}
} else {
for _, wk := range wantKeys {
var found bool
for _, gk := range tv.TagKeys {
if gk.Name() == wk {
found = true
break
}
}
if !found {
t.Errorf("expected view %q to have key %q, but wasn't present", tv.Name, wk)
}
}
}
}
entry := metricData[0].Data
sum, ok := entry.(*view.SumData)
if !ok {
t.Errorf("unexpected metric type: %T", entry)
}
got := sum.Value
var want int64
switch tv {
case AppendRequestsView:
want = int64(len(testSimpleData))
case AppendResponsesView:
want = int64(len(testSimpleData))
case AppendClientOpenView:
want = 1
}
// float comparison; diff more than error bound is error
if math.Abs(got-float64(want)) > 0.1 {
t.Errorf("%q: metric mismatch, got %f want %d", tv.Name, got, want)
}
}
}
func testSchemaEvolution(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset, opts ...WriterOption) {
testTable := dataset.Table(tableIDs.New())
if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil {
t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
}
m := &testdata.SimpleMessageProto2{}
descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())
// setup a new stream.
opts = append(opts, WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)))
opts = append(opts, WithSchemaDescriptor(descriptorProto))
ms, err := mwClient.NewManagedStream(ctx, opts...)
if err != nil {
t.Fatalf("NewManagedStream: %v", err)
}
validateTableConstraints(ctx, t, bqClient, testTable, "before send",
withExactRowCount(0))
var result *AppendResult
var curOffset int64
var latestRow []byte
for k, mesg := range testSimpleData {
b, err := proto.Marshal(mesg)
if err != nil {
t.Errorf("failed to marshal message %d: %v", k, err)
}
latestRow = b
data := [][]byte{b}
result, err = ms.AppendRows(ctx, data)
if err != nil {
t.Errorf("single-row append %d failed: %v", k, err)
}
curOffset = curOffset + int64(len(data))
}
// Wait for the result to indicate ready, then validate.
_, err = result.GetResult(ctx)
if err != nil {
t.Errorf("error on append: %v", err)
}
validateTableConstraints(ctx, t, bqClient, testTable, "after send",
withExactRowCount(int64(len(testSimpleData))))
// Now, evolve the underlying table schema.
_, err = testTable.Update(ctx, bigquery.TableMetadataToUpdate{Schema: testdata.SimpleMessageEvolvedSchema}, "")
if err != nil {
t.Errorf("failed to evolve table schema: %v", err)
}
// Resend latest row until we get a new schema notification.
// It _should_ be possible to send duplicates, but this currently will not propagate the schema error.
// Internal issue: b/211899346
//
// The alternative here would be to block on GetWriteStream until we get a different write stream, but
// this subjects us to a possible race, as the backend that services GetWriteStream isn't necessarily the
// one in charge of the stream, and thus may report ready early.
for {
resp, err := ms.AppendRows(ctx, [][]byte{latestRow})
if err != nil {
t.Errorf("got error on dupe append: %v", err)
break
}
curOffset = curOffset + 1
s, err := resp.UpdatedSchema(ctx)
if err != nil {
t.Errorf("getting schema error: %v", err)
}
if s != nil {
break
}
}
// ready evolved message and descriptor
m2 := &testdata.SimpleMessageEvolvedProto2{
Name: proto.String("evolved"),
Value: proto.Int64(180),
Other: proto.String("hello evolution"),
}
descriptorProto = protodesc.ToDescriptorProto(m2.ProtoReflect().Descriptor())
b, err := proto.Marshal(m2)
if err != nil {
t.Errorf("failed to marshal evolved message: %v", err)
}
// Send an append with an evolved schema
res, err := ms.AppendRows(ctx, [][]byte{b}, UpdateSchemaDescriptor(descriptorProto))
if err != nil {
t.Errorf("failed evolved append: %v", err)
}
_, err = res.GetResult(ctx)
if err != nil {
t.Errorf("error on evolved append: %v", err)
}
curOffset = curOffset + 1
// Try to force connection errors from concurrent appends.
// We drop setting of offset to avoid commingling out-of-order append errors.
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
id := i
wg.Add(1)
go func() {
res, err := ms.AppendRows(ctx, [][]byte{b})
if err != nil {
t.Errorf("failed concurrent append %d: %v", id, err)
}
_, err = res.GetResult(ctx)
if err != nil {
t.Errorf("error on concurrent append %d: %v", id, err)
}
wg.Done()
}()
}
wg.Wait()
validateTableConstraints(ctx, t, bqClient, testTable, "after evolved records send",
withExactRowCount(int64(curOffset+5)),
withNullCount("name", 0),
withNonNullCount("other", 6),
)
}
func testDefaultValueHandling(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset, opts ...WriterOption) {
testTable := dataset.Table(tableIDs.New())
if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.DefaultValueSchema}); err != nil {
t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
}
m := &testdata.DefaultValuesPartialSchema{
// We only populate the id, as remaining fields are used to test default values.
Id: proto.String("someval"),
}
var data []byte
var err error
if data, err = proto.Marshal(m); err != nil {
t.Fatalf("failed to marshal test row data")
}
descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())
// setup a new stream.
opts = append(opts, WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)))
opts = append(opts, WithSchemaDescriptor(descriptorProto))
ms, err := mwClient.NewManagedStream(ctx, opts...)
if err != nil {
t.Fatalf("NewManagedStream: %v", err)
}
validateTableConstraints(ctx, t, bqClient, testTable, "before send",
withExactRowCount(0))
var result *AppendResult
// Send one row, verify default values were set as expected.
result, err = ms.AppendRows(ctx, [][]byte{data})
if err != nil {
t.Errorf("append failed: %v", err)
}
// Wait for the result to indicate ready, then validate.
_, err = result.GetResult(ctx)
if err != nil {
t.Errorf("error on append: %v", err)
}
validateTableConstraints(ctx, t, bqClient, testTable, "after first row",
withExactRowCount(1),
withNonNullCount("id", 1),
withNullCount("strcol_withdef", 1),
withNullCount("intcol_withdef", 1),
withNullCount("otherstr_withdef", 0)) // not part of partial schema
// Change default MVI to use nulls.
// We expect the fields in the partial schema to leverage nulls rather than default values.
// The fields outside the partial schema continue to obey default values.
result, err = ms.AppendRows(ctx, [][]byte{data}, UpdateDefaultMissingValueInterpretation(storagepb.AppendRowsRequest_DEFAULT_VALUE))
if err != nil {
t.Errorf("append failed: %v", err)
}
// Wait for the result to indicate ready, then validate.
_, err = result.GetResult(ctx)
if err != nil {
t.Errorf("error on append: %v", err)
}
validateTableConstraints(ctx, t, bqClient, testTable, "after second row (default mvi is DEFAULT_VALUE)",
withExactRowCount(2),
withNullCount("strcol_withdef", 1), // doesn't increment, as it gets default value
withNullCount("intcol_withdef", 1)) // doesn't increment, as it gets default value
// Change per-column MVI to use default value
result, err = ms.AppendRows(ctx, [][]byte{data},
UpdateMissingValueInterpretations(map[string]storagepb.AppendRowsRequest_MissingValueInterpretation{
"strcol_withdef": storagepb.AppendRowsRequest_NULL_VALUE,
}))
if err != nil {
t.Errorf("append failed: %v", err)
}
// Wait for the result to indicate ready, then validate.
_, err = result.GetResult(ctx)
if err != nil {
t.Errorf("error on append: %v", err)
}
validateTableConstraints(ctx, t, bqClient, testTable, "after third row (explicit column mvi)",
withExactRowCount(3),
withNullCount("strcol_withdef", 2), // increments as it's null for this column
withNullCount("intcol_withdef", 1), // doesn't increment, still default value
withNonNullCount("otherstr_withdef", 3), // not part of descriptor, always gets default value
withNullCount("otherstr", 3), // not part of descriptor, always gets null
withNullCount("strcol", 3), // no default value defined, always gets null
withNullCount("intcol", 3), // no default value defined, always gets null
)
}
func TestIntegration_DetectProjectID(t *testing.T) {
ctx := context.Background()
testCreds := testutil.Credentials(ctx)
if testCreds == nil {
t.Skip("test credentials not present, skipping")
}
if _, err := NewClient(ctx, DetectProjectID, option.WithCredentials(testCreds)); err != nil {
t.Errorf("test NewClient: %v", err)
}
badTS := testutil.ErroringTokenSource{}
if badClient, err := NewClient(ctx, DetectProjectID, option.WithTokenSource(badTS)); err == nil {
t.Errorf("expected error from bad token source, NewClient succeeded with project: %s", badClient.projectID)
}
}
func TestIntegration_ProtoNormalization(t *testing.T) {
mwClient, bqClient := getTestClients(context.Background(), t)
defer mwClient.Close()
defer bqClient.Close()
dataset, cleanup, err := setupTestDataset(context.Background(), t, bqClient, "us-east1")
if err != nil {
t.Fatalf("failed to init test dataset: %v", err)
}
defer cleanup()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
t.Run("group", func(t *testing.T) {
t.Run("ComplexType", func(t *testing.T) {
t.Parallel()
schema := testdata.ComplexTypeSchema
mesg := &testdata.ComplexType{
NestedRepeatedType: []*testdata.NestedType{
{
InnerType: []*testdata.InnerType{
{Value: []string{"a", "b", "c"}},
{Value: []string{"x", "y", "z"}},
},
},
},
InnerType: &testdata.InnerType{
Value: []string{"top"},
},
RangeType: &testdata.RangeTypeTimestamp{
End: proto.Int64(time.Now().UnixNano()),
},
}
b, err := proto.Marshal(mesg)
if err != nil {
t.Fatalf("proto.Marshal: %v", err)
}
descriptor := (mesg).ProtoReflect().Descriptor()
testProtoNormalization(ctx, t, mwClient, bqClient, dataset, schema, descriptor, b)
})
t.Run("WithWellKnownTypes", func(t *testing.T) {
t.Parallel()
schema := testdata.WithWellKnownTypesSchema
mesg := &testdata.WithWellKnownTypes{
Int64Value: proto.Int64(123),
WrappedInt64: &wrapperspb.Int64Value{
Value: 456,
},
StringValue: []string{"a", "b"},
WrappedString: []*wrapperspb.StringValue{
{Value: "foo"},
{Value: "bar"},
},
}
b, err := proto.Marshal(mesg)
if err != nil {
t.Fatalf("proto.Marshal: %v", err)
}
descriptor := (mesg).ProtoReflect().Descriptor()
testProtoNormalization(ctx, t, mwClient, bqClient, dataset, schema, descriptor, b)
})
t.Run("WithExternalEnum", func(t *testing.T) {
t.Parallel()
schema := testdata.ExternalEnumMessageSchema
mesg := &testdata.ExternalEnumMessage{
MsgA: &testdata.EnumMsgA{
Foo: proto.String("foo"),
Bar: testdata.ExtEnum_THING.Enum(),
},
MsgB: &testdata.EnumMsgB{
Baz: testdata.ExtEnum_OTHER_THING.Enum(),
},
}
b, err := proto.Marshal(mesg)
if err != nil {
t.Fatalf("proto.Marshal: %v", err)
}
descriptor := (mesg).ProtoReflect().Descriptor()
testProtoNormalization(ctx, t, mwClient, bqClient, dataset, schema, descriptor, b)
})
})
}
func testProtoNormalization(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset, schema bigquery.Schema, descriptor protoreflect.MessageDescriptor, sampleRow []byte) {
testTable := dataset.Table(tableIDs.New())
if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: schema}); err != nil {
t.Fatalf("failed to create test table %q: %v", testTable.FullyQualifiedName(), err)
}
dp, err := adapt.NormalizeDescriptor(descriptor)
if err != nil {
t.Fatalf("NormalizeDescriptor: %v", err)
}
// setup a new stream.
ms, err := mwClient.NewManagedStream(ctx,
WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
WithType(DefaultStream),
WithSchemaDescriptor(dp),
)
if err != nil {
t.Fatalf("NewManagedStream: %v", err)
}
result, err := ms.AppendRows(ctx, [][]byte{sampleRow})
if err != nil {
t.Errorf("append failed: %v", err)
}
_, err = result.GetResult(ctx)
if err != nil {
t.Errorf("error in response: %v", err)
}
}
func TestIntegration_MultiplexWrites(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
mwClient, bqClient := getTestClients(ctx, t,
WithMultiplexing(),
WithMultiplexPoolLimit(2),
)
defer mwClient.Close()
defer bqClient.Close()
dataset, cleanup, err := setupTestDataset(ctx, t, bqClient, "us-east1")
if err != nil {
t.Fatalf("failed to init test dataset: %v", err)
}
defer cleanup()
wantWrites := 10
testTables := []struct {
tbl *bigquery.Table
schema bigquery.Schema
dp *descriptorpb.DescriptorProto
sampleRow []byte
constraints []constraintOption
}{
{
tbl: dataset.Table(tableIDs.New()),
schema: testdata.SimpleMessageSchema,
dp: func() *descriptorpb.DescriptorProto {
m := &testdata.SimpleMessageProto2{}
dp, _ := adapt.NormalizeDescriptor(m.ProtoReflect().Descriptor())
return dp
}(),
sampleRow: func() []byte {
msg := &testdata.SimpleMessageProto2{
Name: proto.String("sample_name"),
Value: proto.Int64(1001),
}
b, _ := proto.Marshal(msg)
return b
}(),
constraints: []constraintOption{
withExactRowCount(int64(wantWrites)),
withStringValueCount("name", "sample_name", int64(wantWrites)),
},
},
{
tbl: dataset.Table(tableIDs.New()),
schema: testdata.ValidationBaseSchema,
dp: func() *descriptorpb.DescriptorProto {
m := &testdata.ValidationP2Optional{}
dp, _ := adapt.NormalizeDescriptor(m.ProtoReflect().Descriptor())
return dp
}(),
sampleRow: func() []byte {
msg := &testdata.ValidationP2Optional{
Int64Field: proto.Int64(69),
StringField: proto.String("validation_string"),
}
b, _ := proto.Marshal(msg)
return b
}(),
constraints: []constraintOption{
withExactRowCount(int64(wantWrites)),
withStringValueCount("string_field", "validation_string", int64(wantWrites)),
},
},
{
tbl: dataset.Table(tableIDs.New()),
schema: testdata.GithubArchiveSchema,
dp: func() *descriptorpb.DescriptorProto {
m := &testdata.GithubArchiveMessageProto2{}
dp, _ := adapt.NormalizeDescriptor(m.ProtoReflect().Descriptor())
return dp
}(),
sampleRow: func() []byte {
msg := &testdata.GithubArchiveMessageProto2{
Payload: proto.String("payload_string"),
Id: proto.String("some_id"),
}
b, _ := proto.Marshal(msg)
return b
}(),
constraints: []constraintOption{
withExactRowCount(int64(wantWrites)),
withStringValueCount("payload", "payload_string", int64(wantWrites)),
},
},
}
// setup tables
for _, testTable := range testTables {
if err := testTable.tbl.Create(ctx, &bigquery.TableMetadata{Schema: testTable.schema}); err != nil {
t.Fatalf("failed to create test table %q: %v", testTable.tbl.FullyQualifiedName(), err)
}
}
var gotFirstPool *connectionPool
var results []*AppendResult
for i := 0; i < wantWrites; i++ {
for k, testTable := range testTables {
// create a writer and send a single append
ms, err := mwClient.NewManagedStream(ctx,
WithDestinationTable(TableParentFromParts(testTable.tbl.ProjectID, testTable.tbl.DatasetID, testTable.tbl.TableID)),
WithType(DefaultStream),
WithSchemaDescriptor(testTable.dp),
EnableWriteRetries(true),
)
if err != nil {
t.Fatalf("NewManagedStream %d: %v", k, err)
}
if i == 0 && k == 0 {
if ms.pool == nil {
t.Errorf("expected a non-nil pool reference for first writer")
}
gotFirstPool = ms.pool
} else {
if ms.pool != gotFirstPool {
t.Errorf("expected same pool reference, got a different pool")
}
}
defer ms.Close() // we won't clean these up until the end of the test, rather than per use.
if err != nil {
t.Fatalf("failed to create ManagedStream for table %d on iteration %d: %v", k, i, err)
}
res, err := ms.AppendRows(ctx, [][]byte{testTable.sampleRow})
if err != nil {
t.Fatalf("failed to append to table %d on iteration %d: %v", k, i, err)
}
results = append(results, res)
}
}
// drain results
for k, res := range results {
if _, err := res.GetResult(ctx); err != nil {
t.Errorf("result %d yielded error: %v", k, err)
}
}
// validate the tables
for _, testTable := range testTables {
validateTableConstraints(ctx, t, bqClient, testTable.tbl, "", testTable.constraints...)
}
}
func TestIntegration_MingledContexts(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
mwClient, bqClient := getTestClients(ctx, t,
WithMultiplexing(),
WithMultiplexPoolLimit(2),
)
defer mwClient.Close()
defer bqClient.Close()
wantLocation := "us-east4"
dataset, cleanup, err := setupTestDataset(ctx, t, bqClient, wantLocation)
if err != nil {
t.Fatalf("failed to init test dataset: %v", err)
}
defer cleanup()
testTable := dataset.Table(tableIDs.New())
if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil {
t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
}
m := &testdata.SimpleMessageProto2{}
descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())
numWriters := 4
contexts := make([]context.Context, numWriters)
cancels := make([]context.CancelFunc, numWriters)
writers := make([]*ManagedStream, numWriters)
for i := 0; i < numWriters; i++ {
contexts[i], cancels[i] = context.WithCancel(ctx)
ms, err := mwClient.NewManagedStream(contexts[i],
WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
WithType(DefaultStream),
WithSchemaDescriptor(descriptorProto),
)
if err != nil {
t.Fatalf("instantating writer %d failed: %v", i, err)
}
writers[i] = ms
}
sampleRow, err := proto.Marshal(&testdata.SimpleMessageProto2{
Name: proto.String("datafield"),
Value: proto.Int64(1234),
})
if err != nil {
t.Fatalf("failed to generate sample row")
}
for i := 0; i < numWriters; i++ {
res, err := writers[i].AppendRows(contexts[i], [][]byte{sampleRow})
if err != nil {
t.Errorf("initial write on %d failed: %v", i, err)
} else {
if _, err := res.GetResult(contexts[i]); err != nil {
t.Errorf("GetResult initial write %d: %v", i, err)
}
}
}
// cancel the first context
cancels[0]()
// repeat writes on all other writers with the second context
for i := 1; i < numWriters; i++ {
res, err := writers[i].AppendRows(contexts[i], [][]byte{sampleRow})
if err != nil {
t.Errorf("second write on %d failed: %v", i, err)
} else {
if _, err := res.GetResult(contexts[1]); err != nil {
t.Errorf("GetResult err on second write %d: %v", i, err)
}
}
}
// check that writes to the first writer should fail, even with a valid request context.
if _, err := writers[0].AppendRows(contexts[1], [][]byte{sampleRow}); err == nil {
t.Errorf("write succeeded on first writer when it should have failed")
}
// cancel the second context as well, ensure writer created with good context and bad request context fails
cancels[1]()
if _, err := writers[2].AppendRows(contexts[1], [][]byte{sampleRow}); err == nil {
t.Errorf("write succeeded on third writer with a bad request context")
}
// repeat writes on remaining good writers/contexts
for i := 2; i < numWriters; i++ {
res, err := writers[i].AppendRows(contexts[i], [][]byte{sampleRow})
if err != nil {
t.Errorf("second write on %d failed: %v", i, err)
} else {
if _, err := res.GetResult(contexts[i]); err != nil {
t.Errorf("GetResult err on second write %d: %v", i, err)
}
}
}
}