blob: 090933d7e70641d72487d12b48f5b5780bd6527b [file] [log] [blame]
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package managedwriter
import (
"context"
"fmt"
"io"
"sync"
"time"
"cloud.google.com/go/bigquery/internal"
"cloud.google.com/go/bigquery/storage/apiv1/storagepb"
"github.com/googleapis/gax-go/v2"
"go.opencensus.io/tag"
"google.golang.org/grpc"
grpcstatus "google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/wrapperspb"
)
// StreamType indicates the type of stream this write client is managing.
type StreamType string
var (
// DefaultStream most closely mimics the legacy bigquery
// tabledata.insertAll semantics. Successful inserts are
// committed immediately, and there's no tracking offsets as
// all writes go into a "default" stream that always exists
// for a table.
DefaultStream StreamType = "DEFAULT"
// CommittedStream appends data immediately, but creates a
// discrete stream for the work so that offset tracking can
// be used to track writes.
CommittedStream StreamType = "COMMITTED"
// BufferedStream is a form of checkpointed stream, that allows
// you to advance the offset of visible rows via Flush operations.
BufferedStream StreamType = "BUFFERED"
// PendingStream is a stream in which no data is made visible to
// readers until the stream is finalized and committed explicitly.
PendingStream StreamType = "PENDING"
)
func streamTypeToEnum(t StreamType) storagepb.WriteStream_Type {
switch t {
case CommittedStream:
return storagepb.WriteStream_COMMITTED
case PendingStream:
return storagepb.WriteStream_PENDING
case BufferedStream:
return storagepb.WriteStream_BUFFERED
default:
return storagepb.WriteStream_TYPE_UNSPECIFIED
}
}
// ManagedStream is the abstraction over a single write stream.
type ManagedStream struct {
// Unique id for the managedstream instance.
id string
// pool retains a reference to the writer's pool. A writer is only associated to a single pool.
pool *connectionPool
streamSettings *streamSettings
// retains the current descriptor for the stream.
curTemplate *versionedTemplate
c *Client
retry *statelessRetryer
// writer state
mu sync.Mutex
ctx context.Context // used for stats/instrumentation, and to check the writer is live.
cancel context.CancelFunc
err error // retains any terminal error (writer was closed)
}
// streamSettings is for capturing configuration and option information.
type streamSettings struct {
// streamID contains the reference to the destination stream.
streamID string
// streamType governs behavior of the client, such as how
// offset handling is managed.
streamType StreamType
// MaxInflightRequests governs how many unacknowledged
// append writes can be outstanding into the system.
MaxInflightRequests int
// MaxInflightBytes governs how many unacknowledged
// request bytes can be outstanding into the system.
MaxInflightBytes int
// TraceID can be set when appending data on a stream. It's
// purpose is to aid in debug and diagnostic scenarios.
TraceID string
// dataOrigin can be set for classifying metrics generated
// by a stream.
dataOrigin string
// retains reference to the target table when resolving settings
destinationTable string
appendCallOptions []gax.CallOption
// enable multiplex?
multiplex bool
// retain a copy of the stream client func.
streamFunc streamClientFunc
}
func defaultStreamSettings() *streamSettings {
return &streamSettings{
streamType: DefaultStream,
MaxInflightRequests: 1000,
MaxInflightBytes: 0,
appendCallOptions: []gax.CallOption{
gax.WithGRPCOptions(grpc.MaxCallRecvMsgSize(10 * 1024 * 1024)),
},
}
}
// buildTraceID handles prefixing of a user-supplied trace ID with a client identifier.
func buildTraceID(s *streamSettings) string {
base := fmt.Sprintf("go-managedwriter:%s", internal.Version)
if s != nil && s.TraceID != "" {
return fmt.Sprintf("%s %s", base, s.TraceID)
}
return base
}
// StreamName returns the corresponding write stream ID being managed by this writer.
func (ms *ManagedStream) StreamName() string {
return ms.streamSettings.streamID
}
// StreamType returns the configured type for this stream.
func (ms *ManagedStream) StreamType() StreamType {
return ms.streamSettings.streamType
}
// FlushRows advances the offset at which rows in a BufferedStream are visible. Calling
// this method for other stream types yields an error.
func (ms *ManagedStream) FlushRows(ctx context.Context, offset int64, opts ...gax.CallOption) (int64, error) {
req := &storagepb.FlushRowsRequest{
WriteStream: ms.streamSettings.streamID,
Offset: &wrapperspb.Int64Value{
Value: offset,
},
}
resp, err := ms.c.rawClient.FlushRows(ctx, req, opts...)
recordWriterStat(ms, FlushRequests, 1)
if err != nil {
return 0, err
}
return resp.GetOffset(), nil
}
// Finalize is used to mark a stream as complete, and thus ensure no further data can
// be appended to the stream. You cannot finalize a DefaultStream, as it always exists.
//
// Finalizing does not advance the current offset of a BufferedStream, nor does it commit
// data in a PendingStream.
func (ms *ManagedStream) Finalize(ctx context.Context, opts ...gax.CallOption) (int64, error) {
// TODO: consider blocking for in-flight appends once we have an appendStream plumbed in.
req := &storagepb.FinalizeWriteStreamRequest{
Name: ms.streamSettings.streamID,
}
resp, err := ms.c.rawClient.FinalizeWriteStream(ctx, req, opts...)
if err != nil {
return 0, err
}
return resp.GetRowCount(), nil
}
// appendWithRetry handles the details of adding sending an append request on a stream. Appends are sent on a long
// lived bidirectional network stream, with it's own managed context (ms.ctx), and there's a per-request context
// attached to the pendingWrite.
func (ms *ManagedStream) appendWithRetry(pw *pendingWrite, opts ...gax.CallOption) error {
for {
ms.mu.Lock()
err := ms.err
ms.mu.Unlock()
if err != nil {
return err
}
conn, err := ms.pool.selectConn(pw)
if err != nil {
pw.markDone(nil, err)
return err
}
appendErr := conn.lockingAppend(pw)
if appendErr != nil {
// Append yielded an error. Retry by continuing or return.
status := grpcstatus.Convert(appendErr)
if status != nil {
recordCtx := ms.ctx
if ctx, err := tag.New(ms.ctx, tag.Insert(keyError, status.Code().String())); err == nil {
recordCtx = ctx
}
recordStat(recordCtx, AppendRequestErrors, 1)
}
bo, shouldRetry := ms.statelessRetryer().Retry(appendErr, pw.attemptCount)
if shouldRetry {
if err := gax.Sleep(ms.ctx, bo); err != nil {
return err
}
continue
}
// This append cannot be retried locally. It is not the responsibility of this function to finalize the pending
// write however, as that's handled by callers.
// Related: https://github.com/googleapis/google-cloud-go/issues/7380
return appendErr
}
return nil
}
}
// Close closes a managed stream.
func (ms *ManagedStream) Close() error {
ms.mu.Lock()
defer ms.mu.Unlock()
var returned error
if ms.pool != nil {
if err := ms.pool.removeWriter(ms); err != nil {
returned = err
}
}
// Cancel the underlying context for the stream, we don't allow re-open.
if ms.cancel != nil {
ms.cancel()
ms.cancel = nil
}
// For normal operation, mark the stream error as io.EOF.
if ms.err == nil {
ms.err = io.EOF
}
if returned == nil {
returned = ms.err
}
return returned
}
// buildRequest constructs an optimized AppendRowsRequest.
// Offset (if specified) is applied later.
func (ms *ManagedStream) buildRequest(data [][]byte) *storagepb.AppendRowsRequest {
return &storagepb.AppendRowsRequest{
Rows: &storagepb.AppendRowsRequest_ProtoRows{
ProtoRows: &storagepb.AppendRowsRequest_ProtoData{
Rows: &storagepb.ProtoRows{
SerializedRows: data,
},
},
},
}
}
// AppendRows sends the append requests to the service, and returns a single AppendResult for tracking
// the set of data.
//
// The format of the row data is binary serialized protocol buffer bytes. The message must be compatible
// with the schema currently set for the stream.
//
// Use the WithOffset() AppendOption to set an explicit offset for this append. Setting an offset for
// a default stream is unsupported.
//
// The size of a single request must be less than 10 MB in size.
// Requests larger than this return an error, typically `INVALID_ARGUMENT`.
func (ms *ManagedStream) AppendRows(ctx context.Context, data [][]byte, opts ...AppendOption) (*AppendResult, error) {
// before we do anything, ensure the writer isn't closed.
ms.mu.Lock()
err := ms.err
ms.mu.Unlock()
if err != nil {
return nil, err
}
// Ensure we build the request and pending write with a consistent schema version.
curTemplate := ms.curTemplate
req := ms.buildRequest(data)
pw := newPendingWrite(ctx, ms, req, curTemplate, ms.streamSettings.streamID, ms.streamSettings.TraceID)
// apply AppendOption opts
for _, opt := range opts {
opt(pw)
}
// Post-request fixup after options are applied.
if pw.reqTmpl != nil {
if pw.reqTmpl.tmpl != nil {
// MVIs must be set on each request, but _default_ MVIs persist across the stream lifetime. Sigh.
pw.req.MissingValueInterpretations = pw.reqTmpl.tmpl.GetMissingValueInterpretations()
}
}
// Call the underlying append. The stream has it's own retained context and will surface expiry on
// it's own, but we also need to respect any deadline for the provided context.
errCh := make(chan error)
var appendErr error
go func() {
select {
case errCh <- ms.appendWithRetry(pw):
case <-ctx.Done():
case <-ms.ctx.Done():
}
close(errCh)
}()
select {
case <-ctx.Done():
// It is incorrect to simply mark the request done, as it's potentially in flight in the bidi stream
// where we can't propagate a cancellation. Our options are to return the pending write even though
// it's in an ambiguous state, or to return the error and simply drop the pending write on the floor.
//
// This API expresses request idempotency through offset management, so users who care to use offsets
// can deal with the dropped request.
return nil, ctx.Err()
case <-ms.ctx.Done():
// Same as the request context being done, this indicates the writer context expired. For this case,
// we also attempt to close the writer.
ms.mu.Lock()
if ms.err == nil {
ms.err = ms.ctx.Err()
}
ms.mu.Unlock()
ms.Close()
// Don't relock to fetch the writer terminal error, as we've already ensured that the writer is closed.
return nil, ms.err
case appendErr = <-errCh:
if appendErr != nil {
return nil, appendErr
}
return pw.result, nil
}
}
// processRetry is responsible for evaluating and re-enqueing an append.
// If the append is not retried, it is marked complete.
func (ms *ManagedStream) processRetry(pw *pendingWrite, srcConn *connection, appendResp *storagepb.AppendRowsResponse, initialErr error) {
err := initialErr
for {
pause, shouldRetry := ms.statelessRetryer().Retry(err, pw.attemptCount)
if !shouldRetry {
// Should not attempt to re-append.
pw.markDone(appendResp, err)
return
}
time.Sleep(pause)
err = ms.appendWithRetry(pw)
if err != nil {
// Re-enqueue failed, send it through the loop again.
continue
}
// Break out of the loop, we were successful and the write has been
// re-inserted.
recordWriterStat(ms, AppendRetryCount, 1)
break
}
}
// returns the stateless retryer. If one's not set (re-enqueue retries disabled),
// it returns a retryer that only permits single attempts.
func (ms *ManagedStream) statelessRetryer() *statelessRetryer {
if ms.retry != nil {
return ms.retry
}
if ms.pool != nil {
return ms.pool.defaultRetryer()
}
return &statelessRetryer{
maxAttempts: 1,
}
}