blob: ceb1201e4088fcc9fd50182eaae02ee326b42444 [file] [log] [blame]
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package managedwriter
import (
"context"
"errors"
"fmt"
"io"
"sync"
"cloud.google.com/go/bigquery/storage/apiv1/storagepb"
"github.com/googleapis/gax-go/v2"
"go.opencensus.io/tag"
"google.golang.org/grpc/codes"
grpcstatus "google.golang.org/grpc/status"
)
const (
poolIDPrefix string = "connectionpool"
connIDPrefix string = "connection"
writerIDPrefix string = "writer"
)
var (
errNoRouterForPool = errors.New("no router for connection pool")
)
// connectionPool represents a pooled set of connections.
//
// The pool retains references to connections, and maintains the mapping between writers
// and connections.
type connectionPool struct {
id string
location string // BQ region associated with this pool.
// the pool retains the long-lived context responsible for opening/maintaining bidi connections.
ctx context.Context
cancel context.CancelFunc
baseFlowController *flowController // template flow controller used for building connections.
// We centralize the open function on the pool, rather than having an instance of the open func on every
// connection. Opening the connection is a stateless operation.
open func(ctx context.Context, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error)
// We specify default calloptions for the pool.
// Explicit connections may have their own calloptions as well.
callOptions []gax.CallOption
router poolRouter // poolManager makes the decisions about connections and routing.
retry *statelessRetryer // default retryer for the pool.
}
// activateRouter handles wiring up a connection pool and it's router.
func (pool *connectionPool) activateRouter(rtr poolRouter) error {
if pool.router != nil {
return fmt.Errorf("router already activated")
}
if err := rtr.poolAttach(pool); err != nil {
return fmt.Errorf("router rejected attach: %w", err)
}
pool.router = rtr
return nil
}
func (pool *connectionPool) Close() error {
// Signal router and cancel context, which should propagate to all writers.
var err error
if pool.router != nil {
err = pool.router.poolDetach()
}
if cancel := pool.cancel; cancel != nil {
cancel()
}
return err
}
// pickConnection is used by writers to select a connection.
func (pool *connectionPool) selectConn(pw *pendingWrite) (*connection, error) {
if pool.router == nil {
return nil, errNoRouterForPool
}
return pool.router.pickConnection(pw)
}
func (pool *connectionPool) addWriter(writer *ManagedStream) error {
if p := writer.pool; p != nil {
return fmt.Errorf("writer already attached to pool %q", p.id)
}
if pool.router == nil {
return errNoRouterForPool
}
if err := pool.router.writerAttach(writer); err != nil {
return err
}
writer.pool = pool
return nil
}
func (pool *connectionPool) removeWriter(writer *ManagedStream) error {
if pool.router == nil {
return errNoRouterForPool
}
detachErr := pool.router.writerDetach(writer)
return detachErr
}
func (cp *connectionPool) mergeCallOptions(co *connection) []gax.CallOption {
if co == nil {
return cp.callOptions
}
var mergedOpts []gax.CallOption
mergedOpts = append(mergedOpts, cp.callOptions...)
mergedOpts = append(mergedOpts, co.callOptions...)
return mergedOpts
}
// openWithRetry establishes a new bidi stream and channel pair. It is used by connection objects
// when (re)opening the network connection to the backend.
//
// The connection.getStream() func should be the only consumer of this.
func (cp *connectionPool) openWithRetry(co *connection) (storagepb.BigQueryWrite_AppendRowsClient, chan *pendingWrite, error) {
r := &unaryRetryer{}
for {
arc, err := cp.open(co.ctx, cp.mergeCallOptions(co)...)
metricCtx := cp.ctx
if err == nil {
// accumulate AppendClientOpenCount for the success case.
recordStat(metricCtx, AppendClientOpenCount, 1)
}
if err != nil {
if tagCtx, tagErr := tag.New(cp.ctx, tag.Insert(keyError, grpcstatus.Code(err).String())); tagErr == nil {
metricCtx = tagCtx
}
// accumulate AppendClientOpenCount for the error case.
recordStat(metricCtx, AppendClientOpenCount, 1)
bo, shouldRetry := r.Retry(err)
if shouldRetry {
recordStat(cp.ctx, AppendClientOpenRetryCount, 1)
if err := gax.Sleep(cp.ctx, bo); err != nil {
return nil, nil, err
}
continue
} else {
// non-retriable error while opening
return nil, nil, err
}
}
// The channel relationship with its ARC is 1:1. If we get a new ARC, create a new pending
// write channel and fire up the associated receive processor. The channel ensures that
// responses for a connection are processed in the same order that appends were sent.
depth := 1000 // default backend queue limit
if d := co.fc.maxInsertCount; d > 0 {
depth = d
}
ch := make(chan *pendingWrite, depth)
go connRecvProcessor(co.ctx, co, arc, ch)
return arc, ch, nil
}
}
// returns the stateless default retryer for the pool. If one's not set (re-enqueue retries disabled),
// it returns a retryer that only permits single attempts.
func (cp *connectionPool) defaultRetryer() *statelessRetryer {
if cp.retry != nil {
return cp.retry
}
return &statelessRetryer{
maxAttempts: 1,
}
}
// connection models the underlying AppendRows grpc bidi connection used for writing
// data and receiving acknowledgements. It is responsible for enqueing writes and processing
// responses from the backend.
type connection struct {
id string
pool *connectionPool // each connection retains a reference to its owning pool.
fc *flowController // each connection has it's own flow controller.
callOptions []gax.CallOption // custom calloptions for this connection.
ctx context.Context // retained context for maintaining the connection, derived from the owning pool.
cancel context.CancelFunc
retry *statelessRetryer
optimizer sendOptimizer
mu sync.Mutex
arc *storagepb.BigQueryWrite_AppendRowsClient // reference to the grpc connection (send, recv, close)
reconnect bool //
err error // terminal connection error
pending chan *pendingWrite
loadBytesThreshold int
loadCountThreshold int
}
type connectionMode string
const (
multiplexConnectionMode connectionMode = "MULTIPLEX"
simplexConnectionMode connectionMode = "SIMPLEX"
verboseConnectionMode connectionMode = "VERBOSE"
)
func newConnection(pool *connectionPool, mode connectionMode, settings *streamSettings) *connection {
if pool == nil {
return nil
}
// create and retain a cancellable context.
connCtx, cancel := context.WithCancel(pool.ctx)
// Resolve local overrides for flow control and call options
fcRequests := 0
fcBytes := 0
var opts []gax.CallOption
if pool.baseFlowController != nil {
fcRequests = pool.baseFlowController.maxInsertCount
fcBytes = pool.baseFlowController.maxInsertBytes
}
if settings != nil {
if settings.MaxInflightRequests > 0 {
fcRequests = settings.MaxInflightRequests
}
if settings.MaxInflightBytes > 0 {
fcBytes = settings.MaxInflightBytes
}
opts = settings.appendCallOptions
}
fc := newFlowController(fcRequests, fcBytes)
countLimit, byteLimit := computeLoadThresholds(fc)
return &connection{
id: newUUID(connIDPrefix),
pool: pool,
fc: fc,
ctx: connCtx,
cancel: cancel,
optimizer: optimizer(mode),
loadBytesThreshold: byteLimit,
loadCountThreshold: countLimit,
callOptions: opts,
}
}
func computeLoadThresholds(fc *flowController) (countLimit, byteLimit int) {
countLimit = 1000
byteLimit = 0
if fc != nil {
if fc.maxInsertBytes > 0 {
// 20% of byte limit
byteLimit = int(float64(fc.maxInsertBytes) * 0.2)
}
if fc.maxInsertCount > 0 {
// MIN(1, 20% of insert limit)
countLimit = int(float64(fc.maxInsertCount) * 0.2)
if countLimit < 1 {
countLimit = 1
}
}
}
return
}
func optimizer(mode connectionMode) sendOptimizer {
switch mode {
case multiplexConnectionMode:
return &multiplexOptimizer{}
case verboseConnectionMode:
return &verboseOptimizer{}
case simplexConnectionMode:
return &simplexOptimizer{}
}
return nil
}
// release is used to signal flow control release when a write is no longer in flight.
func (co *connection) release(pw *pendingWrite) {
co.fc.release(pw.reqSize)
}
// signal indicating that multiplex traffic level is high enough to warrant adding more connections.
func (co *connection) isLoaded() bool {
if co.loadCountThreshold > 0 && co.fc.count() > co.loadCountThreshold {
return true
}
if co.loadBytesThreshold > 0 && co.fc.bytes() > co.loadBytesThreshold {
return true
}
return false
}
// curLoad is a representation of connection load.
// Its primary purpose is comparing the load of different connections.
func (co *connection) curLoad() float64 {
load := float64(co.fc.count()) / float64(co.loadCountThreshold+1)
if co.fc.maxInsertBytes > 0 {
load += (float64(co.fc.bytes()) / float64(co.loadBytesThreshold+1))
load = load / 2
}
return load
}
// close closes a connection.
func (co *connection) close() {
co.mu.Lock()
defer co.mu.Unlock()
// first, cancel the retained context.
if co.cancel != nil {
co.cancel()
co.cancel = nil
}
// close sending if we have a real ARC.
if co.arc != nil && (*co.arc) != (storagepb.BigQueryWrite_AppendRowsClient)(nil) {
(*co.arc).CloseSend()
co.arc = nil
}
// mark terminal error if not already set.
if co.err != nil {
co.err = io.EOF
}
// signal pending channel close.
if co.pending != nil {
close(co.pending)
}
}
// lockingAppend handles a single append request on a given connection.
func (co *connection) lockingAppend(pw *pendingWrite) error {
// Don't both calling/retrying if this append's context is already expired.
if err := pw.reqCtx.Err(); err != nil {
return err
}
if err := co.fc.acquire(pw.reqCtx, pw.reqSize); err != nil {
// We've failed to acquire. This may get retried on a different connection, so marking the write done is incorrect.
return err
}
var statsOnExit func(ctx context.Context)
// critical section: Things that need to happen inside the critical section:
//
// * get/open conenction
// * issue the append
// * add the pending write to the channel for the connection (ordering for the response)
co.mu.Lock()
defer func() {
sCtx := co.ctx
co.mu.Unlock()
if statsOnExit != nil && sCtx != nil {
statsOnExit(sCtx)
}
}()
var arc *storagepb.BigQueryWrite_AppendRowsClient
var ch chan *pendingWrite
var err error
// Handle promotion of per-request schema to default schema in the case of updates.
// Additionally, we check multiplex status as schema changes for explicit streams
// require reconnect, whereas multiplex does not.
forceReconnect := false
promoted := false
if pw.writer != nil && pw.reqTmpl != nil {
if !pw.reqTmpl.Compatible(pw.writer.curTemplate) {
if pw.writer.curTemplate == nil {
// promote because there's no current template
pw.writer.curTemplate = pw.reqTmpl
promoted = true
} else {
if pw.writer.curTemplate.versionTime.Before(pw.reqTmpl.versionTime) {
pw.writer.curTemplate = pw.reqTmpl
promoted = true
}
}
}
}
if promoted {
if co.optimizer == nil {
forceReconnect = true
} else {
if !co.optimizer.isMultiplexing() {
forceReconnect = true
}
}
}
arc, ch, err = co.getStream(arc, forceReconnect)
if err != nil {
return err
}
pw.attemptCount = pw.attemptCount + 1
if co.optimizer != nil {
err = co.optimizer.optimizeSend((*arc), pw)
if err != nil {
// Reset optimizer state on error.
co.optimizer.signalReset()
}
} else {
// No optimizer present, send a fully populated request.
err = (*arc).Send(pw.constructFullRequest(true))
}
if err != nil {
// Refund the flow controller immediately, as there's nothing to refund on the receiver.
co.fc.release(pw.reqSize)
if shouldReconnect(err) {
metricCtx := co.ctx // start with the ctx that must be present
if pw.writer != nil {
metricCtx = pw.writer.ctx // the writer ctx bears the stream/origin tagging, so prefer it.
}
if tagCtx, tagErr := tag.New(metricCtx, tag.Insert(keyError, grpcstatus.Code(err).String())); tagErr == nil {
metricCtx = tagCtx
}
recordStat(metricCtx, AppendRequestReconnects, 1)
// if we think this connection is unhealthy, force a reconnect on the next send.
co.reconnect = true
}
return err
}
// Compute numRows, once we pass ownership to the channel the request may be
// cleared.
var numRows int64
if r := pw.req.GetProtoRows(); r != nil {
if pr := r.GetRows(); pr != nil {
numRows = int64(len(pr.GetSerializedRows()))
}
}
statsOnExit = func(ctx context.Context) {
// these will get recorded once we exit the critical section.
// TODO: resolve open questions around what labels should be attached (connection, streamID, etc)
recordStat(ctx, AppendRequestRows, numRows)
recordStat(ctx, AppendRequests, 1)
recordStat(ctx, AppendRequestBytes, int64(pw.reqSize))
}
ch <- pw
return nil
}
// getStream returns either a valid ARC client stream or permanent error.
//
// Any calls to getStream should do so in possesion of the critical section lock.
func (co *connection) getStream(arc *storagepb.BigQueryWrite_AppendRowsClient, forceReconnect bool) (*storagepb.BigQueryWrite_AppendRowsClient, chan *pendingWrite, error) {
if co.err != nil {
return nil, nil, co.err
}
co.err = co.ctx.Err()
if co.err != nil {
return nil, nil, co.err
}
// Previous activity on the stream indicated it is not healthy, so propagate that as a reconnect.
if co.reconnect {
forceReconnect = true
co.reconnect = false
}
// Always return the retained ARC if the arg differs.
if arc != co.arc && !forceReconnect {
return co.arc, co.pending, nil
}
// We need to (re)open a connection. Cleanup previous connection, channel, and context if they are present.
if co.arc != nil && (*co.arc) != (storagepb.BigQueryWrite_AppendRowsClient)(nil) {
(*co.arc).CloseSend()
}
if co.pending != nil {
close(co.pending)
}
if co.cancel != nil {
co.cancel()
co.ctx, co.cancel = context.WithCancel(co.pool.ctx)
}
co.arc = new(storagepb.BigQueryWrite_AppendRowsClient)
// We're going to (re)open the connection, so clear any optimizer state.
if co.optimizer != nil {
co.optimizer.signalReset()
}
*co.arc, co.pending, co.err = co.pool.openWithRetry(co)
return co.arc, co.pending, co.err
}
// enables testing
type streamClientFunc func(context.Context, ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error)
var errConnectionCanceled = grpcstatus.Error(codes.Canceled, "client connection context was canceled")
// connRecvProcessor is used to propagate append responses back up with the originating write requests. It
// It runs as a goroutine. A connection object allows for reconnection, and each reconnection establishes a new
// context, processing goroutine and backing channel.
func connRecvProcessor(ctx context.Context, co *connection, arc storagepb.BigQueryWrite_AppendRowsClient, ch <-chan *pendingWrite) {
for {
select {
case <-ctx.Done():
// Channel context is done, which means we're not getting further updates on in flight appends and should
// process everything left in the existing channel/connection.
doneErr := ctx.Err()
if doneErr == context.Canceled {
// This is a special case. Connection recovery ends up cancelling a context as part of a reconnection, and with
// request retrying enabled we can possibly re-enqueue writes. To allow graceful retry for this behavior, we
// we translate this to an rpc status error to avoid doing things like introducing context errors as part of the retry predicate.
//
// The tradeoff here is that write retries may roundtrip multiple times for something like a pool shutdown, even though the final
// outcome would result in an error.
doneErr = errConnectionCanceled
}
for {
pw, ok := <-ch
if !ok {
return
}
// This connection will not recover, but still attempt to keep flow controller state consistent.
co.release(pw)
// TODO: Determine if/how we should report this case, as we have no viable context for propagating.
// Because we can't tell locally if this write is done, we pass it back to the retrier for possible re-enqueue.
pw.writer.processRetry(pw, co, nil, doneErr)
}
case nextWrite, ok := <-ch:
if !ok {
// Channel closed, all elements processed.
return
}
// block until we get a corresponding response or err from stream.
resp, err := arc.Recv()
co.release(nextWrite)
if err != nil {
// The Recv() itself yielded an error. We increment AppendResponseErrors by one, tagged by the status
// code.
status := grpcstatus.Convert(err)
metricCtx := ctx
if tagCtx, tagErr := tag.New(ctx, tag.Insert(keyError, codes.Code(status.Code()).String())); tagErr == nil {
metricCtx = tagCtx
}
recordStat(metricCtx, AppendResponseErrors, 1)
nextWrite.writer.processRetry(nextWrite, co, nil, err)
continue
}
// Record that we did in fact get a response from the backend.
recordStat(ctx, AppendResponses, 1)
if status := resp.GetError(); status != nil {
// The response was received successfully, but the response embeds a status error in the payload.
// Increment AppendResponseErrors, tagged by status code.
metricCtx := ctx
if tagCtx, tagErr := tag.New(ctx, tag.Insert(keyError, codes.Code(status.GetCode()).String())); tagErr == nil {
metricCtx = tagCtx
}
recordStat(metricCtx, AppendResponseErrors, 1)
respErr := grpcstatus.ErrorProto(status)
nextWrite.writer.processRetry(nextWrite, co, resp, respErr)
continue
}
// We had no error in the receive or in the response. Mark the write done.
nextWrite.markDone(resp, nil)
}
}
}