| // Copyright 2022 Google LLC |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // https://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package managedwriter |
| |
| import ( |
| "context" |
| "errors" |
| "fmt" |
| "io" |
| "sync" |
| |
| "cloud.google.com/go/bigquery/storage/apiv1/storagepb" |
| "github.com/googleapis/gax-go/v2" |
| "go.opencensus.io/tag" |
| "google.golang.org/grpc/codes" |
| grpcstatus "google.golang.org/grpc/status" |
| ) |
| |
| const ( |
| poolIDPrefix string = "connectionpool" |
| connIDPrefix string = "connection" |
| writerIDPrefix string = "writer" |
| ) |
| |
| var ( |
| errNoRouterForPool = errors.New("no router for connection pool") |
| ) |
| |
| // connectionPool represents a pooled set of connections. |
| // |
| // The pool retains references to connections, and maintains the mapping between writers |
| // and connections. |
| type connectionPool struct { |
| id string |
| location string // BQ region associated with this pool. |
| |
| // the pool retains the long-lived context responsible for opening/maintaining bidi connections. |
| ctx context.Context |
| cancel context.CancelFunc |
| |
| baseFlowController *flowController // template flow controller used for building connections. |
| |
| // We centralize the open function on the pool, rather than having an instance of the open func on every |
| // connection. Opening the connection is a stateless operation. |
| open func(ctx context.Context, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) |
| |
| // We specify default calloptions for the pool. |
| // Explicit connections may have their own calloptions as well. |
| callOptions []gax.CallOption |
| |
| router poolRouter // poolManager makes the decisions about connections and routing. |
| |
| retry *statelessRetryer // default retryer for the pool. |
| } |
| |
| // activateRouter handles wiring up a connection pool and it's router. |
| func (pool *connectionPool) activateRouter(rtr poolRouter) error { |
| if pool.router != nil { |
| return fmt.Errorf("router already activated") |
| } |
| if err := rtr.poolAttach(pool); err != nil { |
| return fmt.Errorf("router rejected attach: %w", err) |
| } |
| pool.router = rtr |
| return nil |
| } |
| |
| func (pool *connectionPool) Close() error { |
| // Signal router and cancel context, which should propagate to all writers. |
| var err error |
| if pool.router != nil { |
| err = pool.router.poolDetach() |
| } |
| if cancel := pool.cancel; cancel != nil { |
| cancel() |
| } |
| return err |
| } |
| |
| // pickConnection is used by writers to select a connection. |
| func (pool *connectionPool) selectConn(pw *pendingWrite) (*connection, error) { |
| if pool.router == nil { |
| return nil, errNoRouterForPool |
| } |
| return pool.router.pickConnection(pw) |
| } |
| |
| func (pool *connectionPool) addWriter(writer *ManagedStream) error { |
| if p := writer.pool; p != nil { |
| return fmt.Errorf("writer already attached to pool %q", p.id) |
| } |
| if pool.router == nil { |
| return errNoRouterForPool |
| } |
| if err := pool.router.writerAttach(writer); err != nil { |
| return err |
| } |
| writer.pool = pool |
| return nil |
| } |
| |
| func (pool *connectionPool) removeWriter(writer *ManagedStream) error { |
| if pool.router == nil { |
| return errNoRouterForPool |
| } |
| detachErr := pool.router.writerDetach(writer) |
| return detachErr |
| } |
| |
| func (cp *connectionPool) mergeCallOptions(co *connection) []gax.CallOption { |
| if co == nil { |
| return cp.callOptions |
| } |
| var mergedOpts []gax.CallOption |
| mergedOpts = append(mergedOpts, cp.callOptions...) |
| mergedOpts = append(mergedOpts, co.callOptions...) |
| return mergedOpts |
| } |
| |
| // openWithRetry establishes a new bidi stream and channel pair. It is used by connection objects |
| // when (re)opening the network connection to the backend. |
| // |
| // The connection.getStream() func should be the only consumer of this. |
| func (cp *connectionPool) openWithRetry(co *connection) (storagepb.BigQueryWrite_AppendRowsClient, chan *pendingWrite, error) { |
| r := &unaryRetryer{} |
| for { |
| arc, err := cp.open(co.ctx, cp.mergeCallOptions(co)...) |
| metricCtx := cp.ctx |
| if err == nil { |
| // accumulate AppendClientOpenCount for the success case. |
| recordStat(metricCtx, AppendClientOpenCount, 1) |
| } |
| if err != nil { |
| if tagCtx, tagErr := tag.New(cp.ctx, tag.Insert(keyError, grpcstatus.Code(err).String())); tagErr == nil { |
| metricCtx = tagCtx |
| } |
| // accumulate AppendClientOpenCount for the error case. |
| recordStat(metricCtx, AppendClientOpenCount, 1) |
| bo, shouldRetry := r.Retry(err) |
| if shouldRetry { |
| recordStat(cp.ctx, AppendClientOpenRetryCount, 1) |
| if err := gax.Sleep(cp.ctx, bo); err != nil { |
| return nil, nil, err |
| } |
| continue |
| } else { |
| // non-retriable error while opening |
| return nil, nil, err |
| } |
| } |
| |
| // The channel relationship with its ARC is 1:1. If we get a new ARC, create a new pending |
| // write channel and fire up the associated receive processor. The channel ensures that |
| // responses for a connection are processed in the same order that appends were sent. |
| depth := 1000 // default backend queue limit |
| if d := co.fc.maxInsertCount; d > 0 { |
| depth = d |
| } |
| ch := make(chan *pendingWrite, depth) |
| go connRecvProcessor(co.ctx, co, arc, ch) |
| return arc, ch, nil |
| } |
| } |
| |
| // returns the stateless default retryer for the pool. If one's not set (re-enqueue retries disabled), |
| // it returns a retryer that only permits single attempts. |
| func (cp *connectionPool) defaultRetryer() *statelessRetryer { |
| if cp.retry != nil { |
| return cp.retry |
| } |
| return &statelessRetryer{ |
| maxAttempts: 1, |
| } |
| } |
| |
| // connection models the underlying AppendRows grpc bidi connection used for writing |
| // data and receiving acknowledgements. It is responsible for enqueing writes and processing |
| // responses from the backend. |
| type connection struct { |
| id string |
| pool *connectionPool // each connection retains a reference to its owning pool. |
| |
| fc *flowController // each connection has it's own flow controller. |
| callOptions []gax.CallOption // custom calloptions for this connection. |
| ctx context.Context // retained context for maintaining the connection, derived from the owning pool. |
| cancel context.CancelFunc |
| |
| retry *statelessRetryer |
| optimizer sendOptimizer |
| |
| mu sync.Mutex |
| arc *storagepb.BigQueryWrite_AppendRowsClient // reference to the grpc connection (send, recv, close) |
| reconnect bool // |
| err error // terminal connection error |
| pending chan *pendingWrite |
| |
| loadBytesThreshold int |
| loadCountThreshold int |
| } |
| |
| type connectionMode string |
| |
| const ( |
| multiplexConnectionMode connectionMode = "MULTIPLEX" |
| simplexConnectionMode connectionMode = "SIMPLEX" |
| verboseConnectionMode connectionMode = "VERBOSE" |
| ) |
| |
| func newConnection(pool *connectionPool, mode connectionMode, settings *streamSettings) *connection { |
| if pool == nil { |
| return nil |
| } |
| // create and retain a cancellable context. |
| connCtx, cancel := context.WithCancel(pool.ctx) |
| |
| // Resolve local overrides for flow control and call options |
| fcRequests := 0 |
| fcBytes := 0 |
| var opts []gax.CallOption |
| |
| if pool.baseFlowController != nil { |
| fcRequests = pool.baseFlowController.maxInsertCount |
| fcBytes = pool.baseFlowController.maxInsertBytes |
| } |
| if settings != nil { |
| if settings.MaxInflightRequests > 0 { |
| fcRequests = settings.MaxInflightRequests |
| } |
| if settings.MaxInflightBytes > 0 { |
| fcBytes = settings.MaxInflightBytes |
| } |
| opts = settings.appendCallOptions |
| } |
| fc := newFlowController(fcRequests, fcBytes) |
| countLimit, byteLimit := computeLoadThresholds(fc) |
| |
| return &connection{ |
| id: newUUID(connIDPrefix), |
| pool: pool, |
| fc: fc, |
| ctx: connCtx, |
| cancel: cancel, |
| optimizer: optimizer(mode), |
| loadBytesThreshold: byteLimit, |
| loadCountThreshold: countLimit, |
| callOptions: opts, |
| } |
| } |
| |
| func computeLoadThresholds(fc *flowController) (countLimit, byteLimit int) { |
| countLimit = 1000 |
| byteLimit = 0 |
| if fc != nil { |
| if fc.maxInsertBytes > 0 { |
| // 20% of byte limit |
| byteLimit = int(float64(fc.maxInsertBytes) * 0.2) |
| } |
| if fc.maxInsertCount > 0 { |
| // MIN(1, 20% of insert limit) |
| countLimit = int(float64(fc.maxInsertCount) * 0.2) |
| if countLimit < 1 { |
| countLimit = 1 |
| } |
| } |
| } |
| return |
| } |
| |
| func optimizer(mode connectionMode) sendOptimizer { |
| switch mode { |
| case multiplexConnectionMode: |
| return &multiplexOptimizer{} |
| case verboseConnectionMode: |
| return &verboseOptimizer{} |
| case simplexConnectionMode: |
| return &simplexOptimizer{} |
| } |
| return nil |
| } |
| |
| // release is used to signal flow control release when a write is no longer in flight. |
| func (co *connection) release(pw *pendingWrite) { |
| co.fc.release(pw.reqSize) |
| } |
| |
| // signal indicating that multiplex traffic level is high enough to warrant adding more connections. |
| func (co *connection) isLoaded() bool { |
| if co.loadCountThreshold > 0 && co.fc.count() > co.loadCountThreshold { |
| return true |
| } |
| if co.loadBytesThreshold > 0 && co.fc.bytes() > co.loadBytesThreshold { |
| return true |
| } |
| return false |
| } |
| |
| // curLoad is a representation of connection load. |
| // Its primary purpose is comparing the load of different connections. |
| func (co *connection) curLoad() float64 { |
| load := float64(co.fc.count()) / float64(co.loadCountThreshold+1) |
| if co.fc.maxInsertBytes > 0 { |
| load += (float64(co.fc.bytes()) / float64(co.loadBytesThreshold+1)) |
| load = load / 2 |
| } |
| return load |
| } |
| |
| // close closes a connection. |
| func (co *connection) close() { |
| co.mu.Lock() |
| defer co.mu.Unlock() |
| // first, cancel the retained context. |
| if co.cancel != nil { |
| co.cancel() |
| co.cancel = nil |
| } |
| // close sending if we have a real ARC. |
| if co.arc != nil && (*co.arc) != (storagepb.BigQueryWrite_AppendRowsClient)(nil) { |
| (*co.arc).CloseSend() |
| co.arc = nil |
| } |
| // mark terminal error if not already set. |
| if co.err != nil { |
| co.err = io.EOF |
| } |
| // signal pending channel close. |
| if co.pending != nil { |
| close(co.pending) |
| } |
| } |
| |
| // lockingAppend handles a single append request on a given connection. |
| func (co *connection) lockingAppend(pw *pendingWrite) error { |
| // Don't both calling/retrying if this append's context is already expired. |
| if err := pw.reqCtx.Err(); err != nil { |
| return err |
| } |
| |
| if err := co.fc.acquire(pw.reqCtx, pw.reqSize); err != nil { |
| // We've failed to acquire. This may get retried on a different connection, so marking the write done is incorrect. |
| return err |
| } |
| |
| var statsOnExit func(ctx context.Context) |
| |
| // critical section: Things that need to happen inside the critical section: |
| // |
| // * get/open conenction |
| // * issue the append |
| // * add the pending write to the channel for the connection (ordering for the response) |
| co.mu.Lock() |
| defer func() { |
| sCtx := co.ctx |
| co.mu.Unlock() |
| if statsOnExit != nil && sCtx != nil { |
| statsOnExit(sCtx) |
| } |
| }() |
| |
| var arc *storagepb.BigQueryWrite_AppendRowsClient |
| var ch chan *pendingWrite |
| var err error |
| |
| // Handle promotion of per-request schema to default schema in the case of updates. |
| // Additionally, we check multiplex status as schema changes for explicit streams |
| // require reconnect, whereas multiplex does not. |
| forceReconnect := false |
| promoted := false |
| if pw.writer != nil && pw.reqTmpl != nil { |
| if !pw.reqTmpl.Compatible(pw.writer.curTemplate) { |
| if pw.writer.curTemplate == nil { |
| // promote because there's no current template |
| pw.writer.curTemplate = pw.reqTmpl |
| promoted = true |
| } else { |
| if pw.writer.curTemplate.versionTime.Before(pw.reqTmpl.versionTime) { |
| pw.writer.curTemplate = pw.reqTmpl |
| promoted = true |
| } |
| } |
| } |
| } |
| if promoted { |
| if co.optimizer == nil { |
| forceReconnect = true |
| } else { |
| if !co.optimizer.isMultiplexing() { |
| forceReconnect = true |
| } |
| } |
| } |
| |
| arc, ch, err = co.getStream(arc, forceReconnect) |
| if err != nil { |
| return err |
| } |
| |
| pw.attemptCount = pw.attemptCount + 1 |
| if co.optimizer != nil { |
| err = co.optimizer.optimizeSend((*arc), pw) |
| if err != nil { |
| // Reset optimizer state on error. |
| co.optimizer.signalReset() |
| } |
| } else { |
| // No optimizer present, send a fully populated request. |
| err = (*arc).Send(pw.constructFullRequest(true)) |
| } |
| if err != nil { |
| // Refund the flow controller immediately, as there's nothing to refund on the receiver. |
| co.fc.release(pw.reqSize) |
| if shouldReconnect(err) { |
| metricCtx := co.ctx // start with the ctx that must be present |
| if pw.writer != nil { |
| metricCtx = pw.writer.ctx // the writer ctx bears the stream/origin tagging, so prefer it. |
| } |
| if tagCtx, tagErr := tag.New(metricCtx, tag.Insert(keyError, grpcstatus.Code(err).String())); tagErr == nil { |
| metricCtx = tagCtx |
| } |
| recordStat(metricCtx, AppendRequestReconnects, 1) |
| // if we think this connection is unhealthy, force a reconnect on the next send. |
| co.reconnect = true |
| } |
| return err |
| } |
| |
| // Compute numRows, once we pass ownership to the channel the request may be |
| // cleared. |
| var numRows int64 |
| if r := pw.req.GetProtoRows(); r != nil { |
| if pr := r.GetRows(); pr != nil { |
| numRows = int64(len(pr.GetSerializedRows())) |
| } |
| } |
| statsOnExit = func(ctx context.Context) { |
| // these will get recorded once we exit the critical section. |
| // TODO: resolve open questions around what labels should be attached (connection, streamID, etc) |
| recordStat(ctx, AppendRequestRows, numRows) |
| recordStat(ctx, AppendRequests, 1) |
| recordStat(ctx, AppendRequestBytes, int64(pw.reqSize)) |
| } |
| ch <- pw |
| return nil |
| } |
| |
| // getStream returns either a valid ARC client stream or permanent error. |
| // |
| // Any calls to getStream should do so in possesion of the critical section lock. |
| func (co *connection) getStream(arc *storagepb.BigQueryWrite_AppendRowsClient, forceReconnect bool) (*storagepb.BigQueryWrite_AppendRowsClient, chan *pendingWrite, error) { |
| if co.err != nil { |
| return nil, nil, co.err |
| } |
| co.err = co.ctx.Err() |
| if co.err != nil { |
| return nil, nil, co.err |
| } |
| |
| // Previous activity on the stream indicated it is not healthy, so propagate that as a reconnect. |
| if co.reconnect { |
| forceReconnect = true |
| co.reconnect = false |
| } |
| // Always return the retained ARC if the arg differs. |
| if arc != co.arc && !forceReconnect { |
| return co.arc, co.pending, nil |
| } |
| // We need to (re)open a connection. Cleanup previous connection, channel, and context if they are present. |
| if co.arc != nil && (*co.arc) != (storagepb.BigQueryWrite_AppendRowsClient)(nil) { |
| (*co.arc).CloseSend() |
| } |
| if co.pending != nil { |
| close(co.pending) |
| } |
| if co.cancel != nil { |
| co.cancel() |
| co.ctx, co.cancel = context.WithCancel(co.pool.ctx) |
| } |
| |
| co.arc = new(storagepb.BigQueryWrite_AppendRowsClient) |
| // We're going to (re)open the connection, so clear any optimizer state. |
| if co.optimizer != nil { |
| co.optimizer.signalReset() |
| } |
| *co.arc, co.pending, co.err = co.pool.openWithRetry(co) |
| return co.arc, co.pending, co.err |
| } |
| |
| // enables testing |
| type streamClientFunc func(context.Context, ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) |
| |
| var errConnectionCanceled = grpcstatus.Error(codes.Canceled, "client connection context was canceled") |
| |
| // connRecvProcessor is used to propagate append responses back up with the originating write requests. It |
| // It runs as a goroutine. A connection object allows for reconnection, and each reconnection establishes a new |
| // context, processing goroutine and backing channel. |
| func connRecvProcessor(ctx context.Context, co *connection, arc storagepb.BigQueryWrite_AppendRowsClient, ch <-chan *pendingWrite) { |
| for { |
| select { |
| case <-ctx.Done(): |
| // Channel context is done, which means we're not getting further updates on in flight appends and should |
| // process everything left in the existing channel/connection. |
| doneErr := ctx.Err() |
| if doneErr == context.Canceled { |
| // This is a special case. Connection recovery ends up cancelling a context as part of a reconnection, and with |
| // request retrying enabled we can possibly re-enqueue writes. To allow graceful retry for this behavior, we |
| // we translate this to an rpc status error to avoid doing things like introducing context errors as part of the retry predicate. |
| // |
| // The tradeoff here is that write retries may roundtrip multiple times for something like a pool shutdown, even though the final |
| // outcome would result in an error. |
| doneErr = errConnectionCanceled |
| } |
| for { |
| pw, ok := <-ch |
| if !ok { |
| return |
| } |
| // This connection will not recover, but still attempt to keep flow controller state consistent. |
| co.release(pw) |
| |
| // TODO: Determine if/how we should report this case, as we have no viable context for propagating. |
| |
| // Because we can't tell locally if this write is done, we pass it back to the retrier for possible re-enqueue. |
| pw.writer.processRetry(pw, co, nil, doneErr) |
| } |
| case nextWrite, ok := <-ch: |
| if !ok { |
| // Channel closed, all elements processed. |
| return |
| } |
| // block until we get a corresponding response or err from stream. |
| resp, err := arc.Recv() |
| co.release(nextWrite) |
| if err != nil { |
| // The Recv() itself yielded an error. We increment AppendResponseErrors by one, tagged by the status |
| // code. |
| status := grpcstatus.Convert(err) |
| metricCtx := ctx |
| if tagCtx, tagErr := tag.New(ctx, tag.Insert(keyError, codes.Code(status.Code()).String())); tagErr == nil { |
| metricCtx = tagCtx |
| } |
| recordStat(metricCtx, AppendResponseErrors, 1) |
| |
| nextWrite.writer.processRetry(nextWrite, co, nil, err) |
| continue |
| } |
| // Record that we did in fact get a response from the backend. |
| recordStat(ctx, AppendResponses, 1) |
| |
| if status := resp.GetError(); status != nil { |
| // The response was received successfully, but the response embeds a status error in the payload. |
| // Increment AppendResponseErrors, tagged by status code. |
| metricCtx := ctx |
| if tagCtx, tagErr := tag.New(ctx, tag.Insert(keyError, codes.Code(status.GetCode()).String())); tagErr == nil { |
| metricCtx = tagCtx |
| } |
| recordStat(metricCtx, AppendResponseErrors, 1) |
| respErr := grpcstatus.ErrorProto(status) |
| |
| nextWrite.writer.processRetry(nextWrite, co, resp, respErr) |
| |
| continue |
| } |
| // We had no error in the receive or in the response. Mark the write done. |
| nextWrite.markDone(resp, nil) |
| } |
| } |
| } |