blob: 10948c5c10227e1d323d3be5a76345d0295155bd [file] [log] [blame]
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package managedwriter
import (
"context"
"io"
"testing"
"cloud.google.com/go/bigquery/storage/apiv1/storagepb"
"cloud.google.com/go/bigquery/storage/managedwriter/testdata"
"github.com/google/go-cmp/cmp"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/testing/protocmp"
"google.golang.org/protobuf/types/descriptorpb"
)
func TestSendOptimizer(t *testing.T) {
exampleReq := &storagepb.AppendRowsRequest{
Rows: &storagepb.AppendRowsRequest_ProtoRows{
ProtoRows: &storagepb.AppendRowsRequest_ProtoData{
Rows: &storagepb.ProtoRows{
SerializedRows: [][]byte{[]byte("row_data")},
},
},
},
}
exampleStreamID := "foo"
exampleTraceID := "trace_id"
exampleReqFull := proto.Clone(exampleReq).(*storagepb.AppendRowsRequest)
exampleReqFull.WriteStream = exampleStreamID
exampleReqFull.TraceId = buildTraceID(&streamSettings{TraceID: exampleTraceID})
exampleDP := &descriptorpb.DescriptorProto{Name: proto.String("schema")}
exampleReqFull.GetProtoRows().WriterSchema = &storagepb.ProtoSchema{
ProtoDescriptor: proto.Clone(exampleDP).(*descriptorpb.DescriptorProto),
}
ctx := context.Background()
var testCases = []struct {
description string
optimizer sendOptimizer
reqs []*pendingWrite
sendResults []error
wantReqs []*storagepb.AppendRowsRequest
}{
{
description: "verbose-optimizer",
optimizer: &verboseOptimizer{},
reqs: func() []*pendingWrite {
tmpl := newVersionedTemplate().revise(reviseProtoSchema(exampleDP))
return []*pendingWrite{
newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID),
newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID),
newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID),
}
}(),
sendResults: []error{
nil,
io.EOF,
io.EOF,
},
wantReqs: []*storagepb.AppendRowsRequest{
proto.Clone(exampleReqFull).(*storagepb.AppendRowsRequest),
proto.Clone(exampleReqFull).(*storagepb.AppendRowsRequest),
proto.Clone(exampleReqFull).(*storagepb.AppendRowsRequest),
},
},
{
description: "simplex no errors",
optimizer: &simplexOptimizer{},
reqs: func() []*pendingWrite {
tmpl := newVersionedTemplate().revise(reviseProtoSchema(exampleDP))
return []*pendingWrite{
newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID),
newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID),
newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID),
}
}(),
sendResults: []error{
nil,
nil,
nil,
},
wantReqs: func() []*storagepb.AppendRowsRequest {
want := make([]*storagepb.AppendRowsRequest, 3)
// first has no redactions.
want[0] = proto.Clone(exampleReqFull).(*storagepb.AppendRowsRequest)
req := proto.Clone(want[0]).(*storagepb.AppendRowsRequest)
req.GetProtoRows().WriterSchema = nil
req.TraceId = ""
req.WriteStream = ""
// second and third are optimized.
want[1] = req
want[2] = req
return want
}(),
},
{
description: "simplex w/partial errors",
optimizer: &simplexOptimizer{},
reqs: func() []*pendingWrite {
tmpl := newVersionedTemplate().revise(reviseProtoSchema(exampleDP))
return []*pendingWrite{
newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID),
newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID),
newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID),
}
}(),
sendResults: []error{
nil,
io.EOF,
nil,
},
wantReqs: func() []*storagepb.AppendRowsRequest {
want := make([]*storagepb.AppendRowsRequest, 3)
want[0] = proto.Clone(exampleReqFull).(*storagepb.AppendRowsRequest)
req := proto.Clone(want[0]).(*storagepb.AppendRowsRequest)
req.GetProtoRows().WriterSchema = nil
req.TraceId = ""
req.WriteStream = ""
// second request is optimized
want[1] = req
// error causes third request to be full again.
want[2] = want[0]
return want
}(),
},
{
description: "multiplex single all errors",
optimizer: &multiplexOptimizer{},
reqs: func() []*pendingWrite {
tmpl := newVersionedTemplate().revise(reviseProtoSchema(exampleDP))
return []*pendingWrite{
newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID),
newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID),
newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID),
}
}(),
sendResults: []error{
io.EOF,
io.EOF,
io.EOF,
},
wantReqs: []*storagepb.AppendRowsRequest{
proto.Clone(exampleReqFull).(*storagepb.AppendRowsRequest),
proto.Clone(exampleReqFull).(*storagepb.AppendRowsRequest),
proto.Clone(exampleReqFull).(*storagepb.AppendRowsRequest),
},
},
{
description: "multiplex single no errors",
optimizer: &multiplexOptimizer{},
reqs: func() []*pendingWrite {
tmpl := newVersionedTemplate().revise(reviseProtoSchema(exampleDP))
return []*pendingWrite{
newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID),
newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID),
newPendingWrite(ctx, nil, proto.Clone(exampleReq).(*storagepb.AppendRowsRequest), tmpl, exampleStreamID, exampleTraceID),
}
}(),
sendResults: []error{
nil,
nil,
nil,
},
wantReqs: func() []*storagepb.AppendRowsRequest {
want := make([]*storagepb.AppendRowsRequest, 3)
want[0] = proto.Clone(exampleReqFull).(*storagepb.AppendRowsRequest)
req := proto.Clone(want[0]).(*storagepb.AppendRowsRequest)
req.GetProtoRows().WriterSchema = nil
req.TraceId = ""
want[1] = req
want[2] = req
return want
}(),
},
{
description: "multiplex interleave",
optimizer: &multiplexOptimizer{},
reqs: func() []*pendingWrite {
tmplA := newVersionedTemplate().revise(reviseProtoSchema(exampleDP))
tmplB := newVersionedTemplate().revise(reviseProtoSchema(protodesc.ToDescriptorProto((&testdata.AllSupportedTypes{}).ProtoReflect().Descriptor())))
reqA := proto.Clone(exampleReq).(*storagepb.AppendRowsRequest)
reqA.WriteStream = "alpha"
reqB := proto.Clone(exampleReq).(*storagepb.AppendRowsRequest)
reqB.WriteStream = "beta"
writes := make([]*pendingWrite, 10)
writes[0] = newPendingWrite(ctx, nil, reqA, tmplA, reqA.GetWriteStream(), exampleTraceID)
writes[1] = newPendingWrite(ctx, nil, reqA, tmplA, reqA.GetWriteStream(), exampleTraceID)
writes[2] = newPendingWrite(ctx, nil, reqB, tmplB, reqB.GetWriteStream(), exampleTraceID)
writes[3] = newPendingWrite(ctx, nil, reqA, tmplA, reqA.GetWriteStream(), exampleTraceID)
writes[4] = newPendingWrite(ctx, nil, reqB, tmplB, reqB.GetWriteStream(), exampleTraceID)
writes[5] = newPendingWrite(ctx, nil, reqB, tmplB, reqB.GetWriteStream(), exampleTraceID)
writes[6] = newPendingWrite(ctx, nil, reqB, tmplB, reqB.GetWriteStream(), exampleTraceID)
writes[7] = newPendingWrite(ctx, nil, reqB, tmplB, reqB.GetWriteStream(), exampleTraceID)
writes[8] = newPendingWrite(ctx, nil, reqA, tmplA, reqA.GetWriteStream(), exampleTraceID)
writes[9] = newPendingWrite(ctx, nil, reqA, tmplA, reqA.GetWriteStream(), exampleTraceID)
return writes
}(),
sendResults: []error{
nil,
nil,
nil,
nil,
nil,
io.EOF,
nil,
nil,
nil,
io.EOF,
},
wantReqs: func() []*storagepb.AppendRowsRequest {
want := make([]*storagepb.AppendRowsRequest, 10)
wantReqAFull := proto.Clone(exampleReqFull).(*storagepb.AppendRowsRequest)
wantReqAFull.WriteStream = "alpha"
wantReqANoTrace := proto.Clone(wantReqAFull).(*storagepb.AppendRowsRequest)
wantReqANoTrace.TraceId = ""
wantReqAOpt := proto.Clone(wantReqAFull).(*storagepb.AppendRowsRequest)
wantReqAOpt.GetProtoRows().WriterSchema = nil
wantReqAOpt.TraceId = ""
wantReqBFull := proto.Clone(exampleReqFull).(*storagepb.AppendRowsRequest)
wantReqBFull.WriteStream = "beta"
wantReqBFull.GetProtoRows().GetWriterSchema().ProtoDescriptor = protodesc.ToDescriptorProto((&testdata.AllSupportedTypes{}).ProtoReflect().Descriptor())
wantReqBNoTrace := proto.Clone(wantReqBFull).(*storagepb.AppendRowsRequest)
wantReqBNoTrace.TraceId = ""
wantReqBOpt := proto.Clone(wantReqBFull).(*storagepb.AppendRowsRequest)
wantReqBOpt.GetProtoRows().WriterSchema = nil
wantReqBOpt.TraceId = ""
want[0] = wantReqAFull
want[1] = wantReqAOpt
want[2] = wantReqBNoTrace
want[3] = wantReqANoTrace
want[4] = wantReqBNoTrace
want[5] = wantReqBOpt
want[6] = wantReqBFull
want[7] = wantReqBOpt
want[8] = wantReqANoTrace
want[9] = wantReqAOpt
return want
}(),
},
{
description: "multiplex w/evolution",
optimizer: &multiplexOptimizer{},
reqs: func() []*pendingWrite {
tmplOld := newVersionedTemplate().revise(reviseProtoSchema(exampleDP))
tmplNew := tmplOld.revise(reviseProtoSchema(&descriptorpb.DescriptorProto{Name: proto.String("new")}))
example := proto.Clone(exampleReq).(*storagepb.AppendRowsRequest)
writes := make([]*pendingWrite, 4)
writes[0] = newPendingWrite(ctx, nil, example, tmplOld, exampleStreamID, exampleTraceID)
writes[1] = newPendingWrite(ctx, nil, example, tmplOld, exampleStreamID, exampleTraceID)
writes[2] = newPendingWrite(ctx, nil, example, tmplNew, exampleStreamID, exampleTraceID)
writes[3] = newPendingWrite(ctx, nil, example, tmplNew, exampleStreamID, exampleTraceID)
return writes
}(),
sendResults: []error{
nil,
nil,
nil,
nil,
},
wantReqs: func() []*storagepb.AppendRowsRequest {
want := make([]*storagepb.AppendRowsRequest, 4)
wantBaseReqFull := proto.Clone(exampleReqFull).(*storagepb.AppendRowsRequest)
wantBaseReqOpt := proto.Clone(wantBaseReqFull).(*storagepb.AppendRowsRequest)
wantBaseReqOpt.TraceId = ""
wantBaseReqOpt.GetProtoRows().WriterSchema = nil
wantEvolved := proto.Clone(wantBaseReqOpt).(*storagepb.AppendRowsRequest)
wantEvolved.GetProtoRows().WriterSchema = &storagepb.ProtoSchema{
ProtoDescriptor: &descriptorpb.DescriptorProto{Name: proto.String("new")},
}
want[0] = wantBaseReqFull
want[1] = wantBaseReqOpt
want[2] = wantEvolved
want[3] = wantBaseReqOpt
return want
}(),
},
}
for _, tc := range testCases {
testARC := &testAppendRowsClient{}
testARC.sendF = func(req *storagepb.AppendRowsRequest) error {
testARC.requests = append(testARC.requests, proto.Clone(req).(*storagepb.AppendRowsRequest))
respErr := tc.sendResults[0]
tc.sendResults = tc.sendResults[1:]
return respErr
}
for _, req := range tc.reqs {
err := tc.optimizer.optimizeSend(testARC, req)
if err != nil {
tc.optimizer.signalReset()
}
}
// now, compare.
for k, wr := range tc.wantReqs {
if diff := cmp.Diff(testARC.requests[k], wr, protocmp.Transform()); diff != "" {
t.Errorf("%s (req %d) mismatch: -got, +want:\n%s", tc.description, k, diff)
}
}
}
}
func TestVersionedTemplate(t *testing.T) {
testCases := []struct {
desc string
inputTmpl *storagepb.AppendRowsRequest
changes []templateRevisionF
wantCompatible bool
}{
{
desc: "nil template",
wantCompatible: true,
},
{
desc: "no changes",
inputTmpl: &storagepb.AppendRowsRequest{},
wantCompatible: true,
},
{
desc: "empty schema",
inputTmpl: &storagepb.AppendRowsRequest{},
changes: []templateRevisionF{
reviseProtoSchema(nil),
},
wantCompatible: false,
},
{
desc: "same default mvi",
inputTmpl: &storagepb.AppendRowsRequest{
DefaultMissingValueInterpretation: storagepb.AppendRowsRequest_NULL_VALUE,
},
changes: []templateRevisionF{
reviseDefaultMissingValueInterpretation(storagepb.AppendRowsRequest_NULL_VALUE),
},
wantCompatible: true,
},
{
desc: "differing default mvi",
inputTmpl: &storagepb.AppendRowsRequest{
DefaultMissingValueInterpretation: storagepb.AppendRowsRequest_NULL_VALUE,
},
changes: []templateRevisionF{
reviseDefaultMissingValueInterpretation(storagepb.AppendRowsRequest_DEFAULT_VALUE),
},
wantCompatible: false,
},
}
for _, tc := range testCases {
orig := newVersionedTemplate()
orig.tmpl = tc.inputTmpl
orig.computeHash()
rev := orig.revise(tc.changes...)
if orig.Compatible(rev) != rev.Compatible(orig) {
t.Errorf("case %q: inconsistent compatibility, orig %t rev %t", tc.desc, orig.Compatible(rev), rev.Compatible(orig))
}
if got := orig.Compatible(rev); tc.wantCompatible != got {
t.Errorf("case %q: Compatible mismatch, got %t want %t", tc.desc, got, tc.wantCompatible)
}
}
}