blob: 2163ac4919f7a0fc56503ca9555772eea8f03b56 [file] [log] [blame]
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package managedwriter
import (
"context"
"testing"
"github.com/googleapis/gax-go/v2"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)
func TestTableParentFromStreamName(t *testing.T) {
testCases := []struct {
in string
want string
}{
{
"bad",
"bad",
},
{
"projects/foo/datasets/bar/tables/baz",
"projects/foo/datasets/bar/tables/baz",
},
{
"projects/foo/datasets/bar/tables/baz/zip/zam/zoomie",
"projects/foo/datasets/bar/tables/baz",
},
{
"projects/foo/datasets/bar/tables/baz/_default",
"projects/foo/datasets/bar/tables/baz",
},
}
for _, tc := range testCases {
got := TableParentFromStreamName(tc.in)
if got != tc.want {
t.Errorf("mismatch on %s: got %s want %s", tc.in, got, tc.want)
}
}
}
func TestCreatePool_Location(t *testing.T) {
t.Skip("skipping until new write_location is allowed")
c := &Client{
cfg: &writerClientConfig{},
ctx: context.Background(),
projectID: "myproj",
}
pool, err := c.createPool("foo", nil)
if err != nil {
t.Fatalf("createPool: %v", err)
}
meta, ok := metadata.FromOutgoingContext(pool.ctx)
if !ok {
t.Fatalf("no metadata in outgoing context")
}
vals, ok := meta["x-goog-request-params"]
if !ok {
t.Fatalf("metadata key not present")
}
found := false
for _, v := range vals {
if v == "write_location=projects/myproj/locations/foo" {
found = true
break
}
}
if !found {
t.Fatal("expected location header not found")
}
}
// TestCreatePool tests the result of calling createPool with different combinations
// of global configuration and per-writer configuration.
func TestCreatePool(t *testing.T) {
testCases := []struct {
desc string
cfg *writerClientConfig
settings *streamSettings
wantMaxBytes int
wantMaxRequests int
wantCallOptions int
wantPoolCallOptions int
}{
{
desc: "cfg, no settings",
cfg: &writerClientConfig{
defaultInflightRequests: 12,
defaultInflightBytes: 2048,
},
wantMaxBytes: 2048,
wantMaxRequests: 12,
},
{
desc: "empty cfg, w/settings",
cfg: &writerClientConfig{},
settings: &streamSettings{
MaxInflightRequests: 99,
MaxInflightBytes: 1024,
appendCallOptions: []gax.CallOption{gax.WithPath("foo")},
},
wantMaxBytes: 1024,
wantMaxRequests: 99,
wantCallOptions: 1,
},
{
desc: "both cfg and settings",
cfg: &writerClientConfig{
defaultInflightRequests: 123,
defaultInflightBytes: 456,
defaultAppendRowsCallOptions: []gax.CallOption{gax.WithGRPCOptions(grpc.MaxCallRecvMsgSize(999))},
},
settings: &streamSettings{
MaxInflightRequests: 99,
MaxInflightBytes: 1024,
},
wantMaxBytes: 1024,
wantMaxRequests: 99,
wantPoolCallOptions: 1,
},
{
desc: "merge defaults and settings",
cfg: &writerClientConfig{
defaultInflightRequests: 123,
defaultInflightBytes: 456,
defaultAppendRowsCallOptions: []gax.CallOption{gax.WithGRPCOptions(grpc.MaxCallRecvMsgSize(999))},
},
settings: &streamSettings{
MaxInflightBytes: 1024,
appendCallOptions: []gax.CallOption{gax.WithPath("foo")},
},
wantMaxBytes: 1024,
wantMaxRequests: 123,
wantCallOptions: 1,
wantPoolCallOptions: 1,
},
}
for _, tc := range testCases {
c := &Client{
cfg: tc.cfg,
ctx: context.Background(),
}
pool, err := c.createPool("", nil)
if err != nil {
t.Errorf("case %q: createPool errored unexpectedly: %v", tc.desc, err)
continue
}
writer := &ManagedStream{
id: "foo",
streamSettings: tc.settings,
}
if err = pool.addWriter(writer); err != nil {
t.Errorf("case %q: addWriter: %v", tc.desc, err)
}
pw := newPendingWrite(context.Background(), writer, nil, nil, "", "")
gotConn, err := pool.selectConn(pw)
if err != nil {
t.Errorf("case %q: selectConn: %v", tc.desc, err)
}
// too many go-cmp overrides needed to quickly diff here, look at the interesting fields explicitly.
if gotVal := gotConn.fc.maxInsertBytes; gotVal != tc.wantMaxBytes {
t.Errorf("case %q: flowController maxInsertBytes mismatch, got %d want %d", tc.desc, gotVal, tc.wantMaxBytes)
}
if gotVal := gotConn.fc.maxInsertCount; gotVal != tc.wantMaxRequests {
t.Errorf("case %q: flowController maxInsertCount mismatch, got %d want %d", tc.desc, gotVal, tc.wantMaxRequests)
}
if gotVal := len(gotConn.callOptions); gotVal != tc.wantCallOptions {
t.Errorf("case %q: calloption count mismatch, got %d want %d", tc.desc, gotVal, tc.wantCallOptions)
}
if gotVal := len(pool.callOptions); gotVal != tc.wantPoolCallOptions {
t.Errorf("case %q: POOL calloption count mismatch, got %d want %d", tc.desc, gotVal, tc.wantPoolCallOptions)
}
}
}