blob: f403e3671519afd70f740e932485f55b20cf6ab3 [file] [edit]
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package managedwriter
import (
"context"
"fmt"
"math/rand"
"testing"
"time"
"cloud.google.com/go/bigquery/storage/apiv1/storagepb"
"github.com/googleapis/gax-go/v2"
)
func TestSimpleRouter(t *testing.T) {
ctx := context.Background()
pool := &connectionPool{
ctx: ctx,
open: func(ctx context.Context, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) {
return &testAppendRowsClient{}, nil
},
}
router := newSimpleRouter("")
if err := pool.activateRouter(router); err != nil {
t.Errorf("activateRouter: %v", err)
}
ms := &ManagedStream{
ctx: ctx,
retry: newStatelessRetryer(),
}
pw := newPendingWrite(ctx, ms, &storagepb.AppendRowsRequest{}, nil, "", "")
// picking before attaching should yield error
if _, err := pool.router.pickConnection(pw); err == nil {
t.Errorf("pickConnection: expected error, got success")
}
writer := &ManagedStream{
id: "writer",
}
if err := pool.addWriter(writer); err != nil {
t.Errorf("addWriter: %v", err)
}
if _, err := pool.router.pickConnection(pw); err != nil {
t.Errorf("pickConnection error: %v", err)
}
if err := pool.removeWriter(writer); err != nil {
t.Errorf("disconnectWriter: %v", err)
}
if _, err := pool.router.pickConnection(pw); err == nil {
t.Errorf("pickConnection: expected error, got success")
}
}
func TestSharedRouter_Basic(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
pool := &connectionPool{
ctx: ctx,
cancel: cancel,
open: func(ctx context.Context, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) {
return &testAppendRowsClient{}, nil
},
}
router := newSharedRouter(false, 0)
if err := pool.activateRouter(router); err != nil {
t.Errorf("activateRouter: %v", err)
}
if gotConns := len(router.exclusiveConns); gotConns != 0 {
t.Errorf("expected zero connections are start, got %d", gotConns)
}
ms := &ManagedStream{
ctx: ctx,
retry: newStatelessRetryer(),
}
pw := newPendingWrite(ctx, ms, &storagepb.AppendRowsRequest{}, nil, "", "")
// picking before attaching should yield error
if _, err := pool.router.pickConnection(pw); err == nil {
t.Errorf("pickConnection: expected error, got success")
}
// attaching a writer without an ID should error.
if err := pool.addWriter(ms); err == nil {
t.Errorf("expected id-less addWriter to fail")
}
ms.id = "writer"
if err := pool.addWriter(ms); err != nil {
t.Errorf("addWriter: %v", err)
}
if _, err := pool.router.pickConnection(pw); err != nil {
t.Errorf("pickConnection error: %v", err)
}
if err := pool.removeWriter(ms); err != nil {
t.Errorf("disconnectWriter: %v", err)
}
if _, err := pool.router.pickConnection(pw); err == nil {
t.Errorf("pickConnection: expected error, got success")
}
}
func TestSharedRouter_Multiplex(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
pool := &connectionPool{
id: newUUID(poolIDPrefix),
ctx: ctx,
cancel: cancel,
open: func(ctx context.Context, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) {
return &testAppendRowsClient{}, nil
},
baseFlowController: newFlowController(2, 10),
}
defer pool.Close()
router := newSharedRouter(true, 3)
if err := pool.activateRouter(router); err != nil {
t.Errorf("activateRouter: %v", err)
}
wantConnCount := 0
if got := len(router.multiConns); wantConnCount != got {
t.Errorf("wanted %d conns, got %d", wantConnCount, got)
}
writerA := &ManagedStream{
id: newUUID(writerIDPrefix),
streamSettings: &streamSettings{streamID: "projects/foo/datasets/bar/tables/baz/streams/_default"},
ctx: ctx,
cancel: cancel,
}
if err := pool.router.writerAttach(writerA); err != nil {
t.Fatalf("writerA attach: %v", err)
}
// after a writer attached, we expect one conn.
wantConnCount = 1
if got := len(router.multiConns); wantConnCount != got {
t.Errorf("wanted %d conns, got %d", wantConnCount, got)
}
writerB := &ManagedStream{
id: newUUID(writerIDPrefix),
streamSettings: &streamSettings{streamID: "projects/foo/datasets/bar/tables/baz/streams/_default"},
ctx: ctx,
cancel: cancel,
}
if err := pool.router.writerAttach(writerB); err != nil {
t.Fatalf("writerA attach: %v", err)
}
writerC := &ManagedStream{
id: newUUID(writerIDPrefix),
streamSettings: &streamSettings{streamID: "projects/foo/datasets/bar/tables/baz/streams/_default"},
ctx: ctx,
cancel: cancel,
}
if err := pool.router.writerAttach(writerC); err != nil {
t.Fatalf("writerA attach: %v", err)
}
wantConnCount = 1
if got := len(router.multiConns); wantConnCount != got {
t.Fatalf("wanted %d conns, got %d", wantConnCount, got)
}
pw := newPendingWrite(ctx, writerA, &storagepb.AppendRowsRequest{}, nil, "", "")
conn, err := router.pickConnection(pw)
if err != nil {
t.Fatalf("pickConnection writerA: %v", err)
}
// generate fake load on the conn associated with writer A
conn.fc.acquire(ctx, 1)
conn.fc.acquire(ctx, 1)
if !conn.isLoaded() {
t.Errorf("expected conn to be loaded, was not")
}
// wait for a watchdog interval
time.Sleep(watchDogInterval * 2)
wantConnCount = 2
// grab read lock so we can assert internal state of the router
router.multiMu.RLock()
defer router.multiMu.RUnlock()
if got := len(router.multiConns); wantConnCount != got {
t.Fatalf("wanted %d conns, got %d", wantConnCount, got)
}
gotLoad0 := router.multiConns[0].curLoad()
gotLoad1 := router.multiConns[1].curLoad()
if gotLoad0 > gotLoad1 {
t.Errorf("expected connections to be ordered by load, got %f, %f", gotLoad0, gotLoad1)
}
// verify that rebalance occurred
connsWithWriters := 0
for _, v := range router.invertedMultiMap {
if len(v) > 0 {
connsWithWriters++
}
}
if connsWithWriters < wantConnCount {
t.Errorf("wanted at least %d connections to have writers attached, got %d", wantConnCount, connsWithWriters)
}
}
func BenchmarkRoutingParallel(b *testing.B) {
for _, bm := range []struct {
desc string
router poolRouter
numWriters int
numDefaultWriters int
}{
{
desc: "SimpleRouter",
router: newSimpleRouter(""),
numWriters: 1,
numDefaultWriters: 1,
},
{
desc: "SimpleRouter",
router: newSimpleRouter(""),
numWriters: 10,
numDefaultWriters: 10,
},
{
desc: "SharedRouter_NoMultiplex",
router: newSharedRouter(false, 0),
numWriters: 1,
numDefaultWriters: 1,
},
{
desc: "SharedRouter_NoMultiplex",
router: newSharedRouter(false, 0),
numWriters: 10,
numDefaultWriters: 10,
},
{
desc: "SharedRouter_Multiplex1conn",
router: newSharedRouter(true, 1),
numWriters: 1,
numDefaultWriters: 1,
},
{
desc: "SharedRouterMultiplex1conn",
router: newSharedRouter(true, 1),
numWriters: 10,
numDefaultWriters: 10,
},
{
desc: "SharedRouterMultiplex1conn",
router: newSharedRouter(true, 1),
numWriters: 50,
numDefaultWriters: 50,
},
{
desc: "SharedRouterMultiplex10conn",
router: newSharedRouter(true, 10),
numWriters: 50,
numDefaultWriters: 50,
},
} {
ctx, cancel := context.WithCancel(context.Background())
pool := &connectionPool{
ctx: ctx,
cancel: cancel,
open: func(ctx context.Context, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) {
return &testAppendRowsClient{}, nil
},
}
if err := pool.activateRouter(bm.router); err != nil {
b.Errorf("%q: activateRouter: %v", bm.desc, err)
}
// setup both explicit and default stream writers.
var explicitWriters []*ManagedStream
var defaultWriters []*ManagedStream
for i := 0; i < bm.numWriters; i++ {
wCtx, wCancel := context.WithCancel(ctx)
writer := &ManagedStream{
id: newUUID(writerIDPrefix),
streamSettings: &streamSettings{streamID: "projects/foo/datasets/bar/tables/baz/streams/abc123"},
ctx: wCtx,
cancel: wCancel,
retry: newStatelessRetryer(),
}
explicitWriters = append(explicitWriters, writer)
}
for i := 0; i < bm.numDefaultWriters; i++ {
wCtx, wCancel := context.WithCancel(ctx)
writer := &ManagedStream{
id: newUUID(writerIDPrefix),
streamSettings: &streamSettings{streamID: "projects/foo/datasets/bar/tables/baz/streams/_default"},
ctx: wCtx,
cancel: wCancel,
retry: newStatelessRetryer(),
}
defaultWriters = append(defaultWriters, writer)
}
// attach all writers to router.
for k, writer := range explicitWriters {
if err := pool.addWriter(writer); err != nil {
b.Errorf("addWriter %d: %v", k, err)
}
}
for k, writer := range defaultWriters {
if err := pool.addWriter(writer); err != nil {
b.Errorf("addWriter %d: %v", k, err)
}
}
baseBenchName := fmt.Sprintf("%s_%dexwriters_%dmpwriters", bm.desc, bm.numWriters, bm.numDefaultWriters)
// Benchmark routing for explicit writers.
if bm.numWriters > 0 {
benchName := fmt.Sprintf("%s_explicitwriters", baseBenchName)
b.Run(benchName, func(b *testing.B) {
r := rand.New(rand.NewSource(1))
b.ResetTimer()
for i := 0; i < b.N; i++ {
// pick a random explicit writer each time.
writer := explicitWriters[r.Intn(bm.numWriters)]
pw := newPendingWrite(context.Background(), writer, &storagepb.AppendRowsRequest{}, nil, "", "")
if _, err := bm.router.pickConnection(pw); err != nil {
b.Errorf("pickConnection: %v", err)
}
}
})
}
// Benchmark concurrent routing for explicit writers.
if bm.numWriters > 0 {
benchName := fmt.Sprintf("%s_explicitwriters_concurrent", baseBenchName)
b.Run(benchName, func(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
r := rand.New(rand.NewSource(1))
for pb.Next() {
writer := explicitWriters[r.Intn(bm.numWriters)]
pw := newPendingWrite(context.Background(), writer, &storagepb.AppendRowsRequest{}, nil, "", "")
if _, err := bm.router.pickConnection(pw); err != nil {
b.Errorf("pickConnection: %v", err)
}
}
})
})
}
// Benchmark routing for default writers.
if bm.numDefaultWriters > 0 {
benchName := fmt.Sprintf("%s_defaultwriters", baseBenchName)
b.Run(benchName, func(b *testing.B) {
r := rand.New(rand.NewSource(1))
b.ResetTimer()
for i := 0; i < b.N; i++ {
// pick a random default writer each time.
writer := defaultWriters[r.Intn(bm.numDefaultWriters)]
pw := newPendingWrite(context.Background(), writer, &storagepb.AppendRowsRequest{}, nil, "", "")
if _, err := bm.router.pickConnection(pw); err != nil {
b.Errorf("pickConnection: %v", err)
}
}
})
}
// Benchmark concurrent routing for default writers.
if bm.numDefaultWriters > 0 {
benchName := fmt.Sprintf("%s_defaultwriters_concurrent", baseBenchName)
b.Run(benchName, func(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
r := rand.New(rand.NewSource(1))
for pb.Next() {
writer := defaultWriters[r.Intn(bm.numDefaultWriters)]
pw := newPendingWrite(context.Background(), writer, &storagepb.AppendRowsRequest{}, nil, "", "")
if _, err := bm.router.pickConnection(pw); err != nil {
b.Errorf("pickConnection: %v", err)
}
}
})
})
}
for _, writer := range explicitWriters {
writer.Close()
}
for _, writer := range defaultWriters {
writer.Close()
}
pool.Close()
}
}
func BenchmarkWatchdogPulse(b *testing.B) {
maxFlowInserts := 100
maxFlowBytes := 1024
for _, numWriters := range []int{1, 2, 5, 10, 50, 100, 250} {
for _, numConnections := range []int{1, 2, 4} {
ctx, cancel := context.WithCancel(context.Background())
// we build the router manually so we can control the watchdog for this benchmark.
router := newSharedRouter(false, numConnections)
pool := &connectionPool{
ctx: ctx,
cancel: cancel,
open: func(ctx context.Context, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) {
return &testAppendRowsClient{}, nil
},
baseFlowController: newFlowController(maxFlowInserts, maxFlowBytes),
}
if err := pool.activateRouter(router); err != nil {
b.Fatalf("(@%d-@%d): activateRouter: %v", numWriters, numConnections, err)
}
// now, set router as multiplex. We do this to avoid router activation starting the watchdog
// in a seperate goroutine.
router.multiplex = true
var writers []*ManagedStream
for i := 0; i < numWriters; i++ {
wCtx, wCancel := context.WithCancel(ctx)
writer := &ManagedStream{
id: newUUID(writerIDPrefix),
streamSettings: &streamSettings{streamID: "projects/foo/datasets/bar/tables/baz/streams/_default"},
ctx: wCtx,
cancel: wCancel,
retry: newStatelessRetryer(),
}
writers = append(writers, writer)
if err := pool.addWriter(writer); err != nil {
b.Fatalf("addWriter %d (@%d-@%d): %v", i, numWriters, numConnections, err)
}
}
// Generate fake load for all connections.
r := rand.New(rand.NewSource(time.Now().UnixNano()))
countLoad := make([]int, numConnections)
byteLoad := make([]int, numConnections)
for i := 0; i < numConnections; i++ {
countLoad[i] = r.Intn(maxFlowInserts)
byteLoad[i] = r.Intn(maxFlowBytes)
}
benchName := fmt.Sprintf("%dwriters_%dconns", numWriters, numConnections)
b.Run(benchName, func(b *testing.B) {
if b.N > 9999 {
b.Skip("benchmark unstable, only run with -benchtime=NNNNx")
}
for i := 0; i < b.N; i++ {
b.StopTimer()
// Each iteration, we reset the loads to the predetermined values, and repoint
// all writers to the first connection.
for c := 0; c < len(router.multiConns); c++ {
router.multiConns[c].fc.countTracked = int64(countLoad[c])
router.multiConns[c].fc.bytesTracked = int64(byteLoad[c])
}
for k := range router.multiMap {
router.multiMap[k] = router.multiConns[0]
}
router.invertedMultiMap = make(map[string][]*ManagedStream)
writerSlice := make([]*ManagedStream, len(writers))
copy(writerSlice, writers)
router.invertedMultiMap[router.multiConns[0].id] = writerSlice
b.StartTimer()
router.watchdogPulse()
}
})
for _, writer := range writers {
writer.Close()
}
pool.Close()
}
}
}