blob: ced180295ef7f5883ad04f90de9b634ea26c0640 [file] [log] [blame]
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package managedwriter
import (
"context"
"fmt"
"testing"
"time"
"cloud.google.com/go/bigquery/storage/apiv1/storagepb"
"github.com/google/go-cmp/cmp"
"google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/testing/protocmp"
"google.golang.org/protobuf/types/descriptorpb"
"google.golang.org/protobuf/types/known/wrapperspb"
)
func TestPendingWrite(t *testing.T) {
ctx := context.Background()
wantReq := &storagepb.AppendRowsRequest{
Rows: &storagepb.AppendRowsRequest_ProtoRows{
ProtoRows: &storagepb.AppendRowsRequest_ProtoData{
Rows: &storagepb.ProtoRows{
SerializedRows: [][]byte{
[]byte("row1"),
[]byte("row2"),
[]byte("row3"),
},
},
},
},
}
// verify no offset behavior
pending := newPendingWrite(ctx, nil, wantReq, nil, "", "")
if pending.req.GetOffset() != nil {
t.Errorf("request should have no offset, but is present: %q", pending.req.GetOffset().GetValue())
}
if diff := cmp.Diff(pending.req, wantReq, protocmp.Transform()); diff != "" {
t.Errorf("request mismatch: -got, +want:\n%s", diff)
}
// Verify request is not acknowledged.
select {
case <-pending.result.Ready():
t.Errorf("got Ready() on incomplete AppendResult")
case <-time.After(100 * time.Millisecond):
}
// Mark completed, verify result.
pending.markDone(&storage.AppendRowsResponse{}, nil)
if gotOff := pending.result.offset(ctx); gotOff != NoStreamOffset {
t.Errorf("mismatch on completed AppendResult without offset: got %d want %d", gotOff, NoStreamOffset)
}
if pending.result.err != nil {
t.Errorf("mismatch in error on AppendResult, got %v want nil", pending.result.err)
}
// Create new write to verify error result.
pending = newPendingWrite(ctx, nil, wantReq, nil, "", "")
// Manually invoke option to apply offset to request.
// This would normally be appied as part of the AppendRows() method on the managed stream.
wantOffset := int64(101)
f := WithOffset(wantOffset)
f(pending)
if pending.req.GetOffset() == nil {
t.Errorf("expected offset, got none")
}
if pending.req.GetOffset().GetValue() != wantOffset {
t.Errorf("offset mismatch, got %d wanted %d", pending.req.GetOffset().GetValue(), wantOffset)
}
// Verify completion behavior with an error.
wantErr := fmt.Errorf("foo")
testResp := &storagepb.AppendRowsResponse{
Response: &storagepb.AppendRowsResponse_AppendResult_{
AppendResult: &storagepb.AppendRowsResponse_AppendResult{
Offset: &wrapperspb.Int64Value{
Value: wantOffset,
},
},
},
}
pending.markDone(testResp, wantErr)
if pending.req != nil {
t.Errorf("expected request to be cleared, is present: %#v", pending.req)
}
select {
case <-time.After(100 * time.Millisecond):
t.Errorf("possible blocking on completed AppendResult")
case <-pending.result.Ready():
gotOffset, gotErr := pending.result.GetResult(ctx)
if gotOffset != wantOffset {
t.Errorf("GetResult: mismatch on completed AppendResult offset: got %d want %d", gotOffset, wantOffset)
}
if gotErr != wantErr {
t.Errorf("GetResult: mismatch in errors, got %v want %v", gotErr, wantErr)
}
// Now, check FullResponse.
gotResp, gotErr := pending.result.FullResponse(ctx)
if gotErr != wantErr {
t.Errorf("FullResponse: mismatch in errors, got %v want %v", gotErr, wantErr)
}
if diff := cmp.Diff(gotResp, testResp, protocmp.Transform()); diff != "" {
t.Errorf("FullResponse diff: %s", diff)
}
}
}
func TestPendingWrite_ConstructFullRequest(t *testing.T) {
testDP := &descriptorpb.DescriptorProto{Name: proto.String("foo")}
testTmpl := newVersionedTemplate().revise(reviseProtoSchema(testDP))
testEmptyTraceID := buildTraceID(&streamSettings{})
for _, tc := range []struct {
desc string
pw *pendingWrite
addTrace bool
want *storagepb.AppendRowsRequest
}{
{
desc: "nil request",
pw: &pendingWrite{
reqTmpl: testTmpl,
},
want: &storagepb.AppendRowsRequest{
Rows: &storagepb.AppendRowsRequest_ProtoRows{
ProtoRows: &storagepb.AppendRowsRequest_ProtoData{
WriterSchema: &storagepb.ProtoSchema{
ProtoDescriptor: testDP,
},
},
},
},
},
{
desc: "empty req w/trace",
pw: &pendingWrite{
req: &storagepb.AppendRowsRequest{},
reqTmpl: testTmpl,
},
addTrace: true,
want: &storagepb.AppendRowsRequest{
Rows: &storagepb.AppendRowsRequest_ProtoRows{
ProtoRows: &storagepb.AppendRowsRequest_ProtoData{
WriterSchema: &storagepb.ProtoSchema{
ProtoDescriptor: testDP,
},
},
},
TraceId: testEmptyTraceID,
},
},
{
desc: "basic req",
pw: &pendingWrite{
req: &storagepb.AppendRowsRequest{},
reqTmpl: testTmpl,
},
want: &storagepb.AppendRowsRequest{
Rows: &storagepb.AppendRowsRequest_ProtoRows{
ProtoRows: &storagepb.AppendRowsRequest_ProtoData{
WriterSchema: &storagepb.ProtoSchema{
ProtoDescriptor: testDP,
},
},
},
},
},
{
desc: "everything w/trace",
pw: &pendingWrite{
req: &storagepb.AppendRowsRequest{},
reqTmpl: testTmpl,
traceID: "foo",
writeStreamID: "streamid",
},
addTrace: true,
want: &storagepb.AppendRowsRequest{
WriteStream: "streamid",
Rows: &storagepb.AppendRowsRequest_ProtoRows{
ProtoRows: &storagepb.AppendRowsRequest_ProtoData{
WriterSchema: &storagepb.ProtoSchema{
ProtoDescriptor: testDP,
},
},
},
TraceId: buildTraceID(&streamSettings{TraceID: "foo"}),
},
},
} {
got := tc.pw.constructFullRequest(tc.addTrace)
if diff := cmp.Diff(got, tc.want, protocmp.Transform()); diff != "" {
t.Errorf("%s diff: %s", tc.desc, diff)
}
}
}