| // Copyright 2021 Google LLC |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // https://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package managedwriter |
| |
| import ( |
| "context" |
| "fmt" |
| "io" |
| "sync" |
| "time" |
| |
| "cloud.google.com/go/bigquery/internal" |
| "cloud.google.com/go/bigquery/storage/apiv1/storagepb" |
| "github.com/googleapis/gax-go/v2" |
| "go.opencensus.io/tag" |
| "google.golang.org/grpc" |
| grpcstatus "google.golang.org/grpc/status" |
| "google.golang.org/protobuf/types/known/wrapperspb" |
| ) |
| |
| // StreamType indicates the type of stream this write client is managing. |
| type StreamType string |
| |
| var ( |
| // DefaultStream most closely mimics the legacy bigquery |
| // tabledata.insertAll semantics. Successful inserts are |
| // committed immediately, and there's no tracking offsets as |
| // all writes go into a "default" stream that always exists |
| // for a table. |
| DefaultStream StreamType = "DEFAULT" |
| |
| // CommittedStream appends data immediately, but creates a |
| // discrete stream for the work so that offset tracking can |
| // be used to track writes. |
| CommittedStream StreamType = "COMMITTED" |
| |
| // BufferedStream is a form of checkpointed stream, that allows |
| // you to advance the offset of visible rows via Flush operations. |
| BufferedStream StreamType = "BUFFERED" |
| |
| // PendingStream is a stream in which no data is made visible to |
| // readers until the stream is finalized and committed explicitly. |
| PendingStream StreamType = "PENDING" |
| ) |
| |
| func streamTypeToEnum(t StreamType) storagepb.WriteStream_Type { |
| switch t { |
| case CommittedStream: |
| return storagepb.WriteStream_COMMITTED |
| case PendingStream: |
| return storagepb.WriteStream_PENDING |
| case BufferedStream: |
| return storagepb.WriteStream_BUFFERED |
| default: |
| return storagepb.WriteStream_TYPE_UNSPECIFIED |
| } |
| } |
| |
| // ManagedStream is the abstraction over a single write stream. |
| type ManagedStream struct { |
| // Unique id for the managedstream instance. |
| id string |
| |
| // pool retains a reference to the writer's pool. A writer is only associated to a single pool. |
| pool *connectionPool |
| |
| streamSettings *streamSettings |
| // retains the current descriptor for the stream. |
| curTemplate *versionedTemplate |
| c *Client |
| retry *statelessRetryer |
| |
| // writer state |
| mu sync.Mutex |
| ctx context.Context // used for stats/instrumentation, and to check the writer is live. |
| cancel context.CancelFunc |
| err error // retains any terminal error (writer was closed) |
| } |
| |
| // streamSettings is for capturing configuration and option information. |
| type streamSettings struct { |
| |
| // streamID contains the reference to the destination stream. |
| streamID string |
| |
| // streamType governs behavior of the client, such as how |
| // offset handling is managed. |
| streamType StreamType |
| |
| // MaxInflightRequests governs how many unacknowledged |
| // append writes can be outstanding into the system. |
| MaxInflightRequests int |
| |
| // MaxInflightBytes governs how many unacknowledged |
| // request bytes can be outstanding into the system. |
| MaxInflightBytes int |
| |
| // TraceID can be set when appending data on a stream. It's |
| // purpose is to aid in debug and diagnostic scenarios. |
| TraceID string |
| |
| // dataOrigin can be set for classifying metrics generated |
| // by a stream. |
| dataOrigin string |
| |
| // retains reference to the target table when resolving settings |
| destinationTable string |
| |
| appendCallOptions []gax.CallOption |
| |
| // enable multiplex? |
| multiplex bool |
| |
| // retain a copy of the stream client func. |
| streamFunc streamClientFunc |
| } |
| |
| func defaultStreamSettings() *streamSettings { |
| return &streamSettings{ |
| streamType: DefaultStream, |
| MaxInflightRequests: 1000, |
| MaxInflightBytes: 0, |
| appendCallOptions: []gax.CallOption{ |
| gax.WithGRPCOptions(grpc.MaxCallRecvMsgSize(10 * 1024 * 1024)), |
| }, |
| } |
| } |
| |
| // buildTraceID handles prefixing of a user-supplied trace ID with a client identifier. |
| func buildTraceID(s *streamSettings) string { |
| base := fmt.Sprintf("go-managedwriter:%s", internal.Version) |
| if s != nil && s.TraceID != "" { |
| return fmt.Sprintf("%s %s", base, s.TraceID) |
| } |
| return base |
| } |
| |
| // StreamName returns the corresponding write stream ID being managed by this writer. |
| func (ms *ManagedStream) StreamName() string { |
| return ms.streamSettings.streamID |
| } |
| |
| // StreamType returns the configured type for this stream. |
| func (ms *ManagedStream) StreamType() StreamType { |
| return ms.streamSettings.streamType |
| } |
| |
| // FlushRows advances the offset at which rows in a BufferedStream are visible. Calling |
| // this method for other stream types yields an error. |
| func (ms *ManagedStream) FlushRows(ctx context.Context, offset int64, opts ...gax.CallOption) (int64, error) { |
| req := &storagepb.FlushRowsRequest{ |
| WriteStream: ms.streamSettings.streamID, |
| Offset: &wrapperspb.Int64Value{ |
| Value: offset, |
| }, |
| } |
| resp, err := ms.c.rawClient.FlushRows(ctx, req, opts...) |
| recordWriterStat(ms, FlushRequests, 1) |
| if err != nil { |
| return 0, err |
| } |
| return resp.GetOffset(), nil |
| } |
| |
| // Finalize is used to mark a stream as complete, and thus ensure no further data can |
| // be appended to the stream. You cannot finalize a DefaultStream, as it always exists. |
| // |
| // Finalizing does not advance the current offset of a BufferedStream, nor does it commit |
| // data in a PendingStream. |
| func (ms *ManagedStream) Finalize(ctx context.Context, opts ...gax.CallOption) (int64, error) { |
| // TODO: consider blocking for in-flight appends once we have an appendStream plumbed in. |
| req := &storagepb.FinalizeWriteStreamRequest{ |
| Name: ms.streamSettings.streamID, |
| } |
| resp, err := ms.c.rawClient.FinalizeWriteStream(ctx, req, opts...) |
| if err != nil { |
| return 0, err |
| } |
| return resp.GetRowCount(), nil |
| } |
| |
| // appendWithRetry handles the details of adding sending an append request on a stream. Appends are sent on a long |
| // lived bidirectional network stream, with it's own managed context (ms.ctx), and there's a per-request context |
| // attached to the pendingWrite. |
| func (ms *ManagedStream) appendWithRetry(pw *pendingWrite, opts ...gax.CallOption) error { |
| for { |
| ms.mu.Lock() |
| err := ms.err |
| ms.mu.Unlock() |
| if err != nil { |
| return err |
| } |
| conn, err := ms.pool.selectConn(pw) |
| if err != nil { |
| pw.markDone(nil, err) |
| return err |
| } |
| appendErr := conn.lockingAppend(pw) |
| if appendErr != nil { |
| // Append yielded an error. Retry by continuing or return. |
| status := grpcstatus.Convert(appendErr) |
| if status != nil { |
| recordCtx := ms.ctx |
| if ctx, err := tag.New(ms.ctx, tag.Insert(keyError, status.Code().String())); err == nil { |
| recordCtx = ctx |
| } |
| recordStat(recordCtx, AppendRequestErrors, 1) |
| } |
| bo, shouldRetry := ms.statelessRetryer().Retry(appendErr, pw.attemptCount) |
| if shouldRetry { |
| if err := gax.Sleep(ms.ctx, bo); err != nil { |
| return err |
| } |
| continue |
| } |
| // This append cannot be retried locally. It is not the responsibility of this function to finalize the pending |
| // write however, as that's handled by callers. |
| // Related: https://github.com/googleapis/google-cloud-go/issues/7380 |
| return appendErr |
| } |
| return nil |
| } |
| } |
| |
| // Close closes a managed stream. |
| func (ms *ManagedStream) Close() error { |
| |
| ms.mu.Lock() |
| defer ms.mu.Unlock() |
| |
| var returned error |
| |
| if ms.pool != nil { |
| if err := ms.pool.removeWriter(ms); err != nil { |
| returned = err |
| } |
| } |
| |
| // Cancel the underlying context for the stream, we don't allow re-open. |
| if ms.cancel != nil { |
| ms.cancel() |
| ms.cancel = nil |
| } |
| |
| // For normal operation, mark the stream error as io.EOF. |
| if ms.err == nil { |
| ms.err = io.EOF |
| } |
| if returned == nil { |
| returned = ms.err |
| } |
| return returned |
| } |
| |
| // buildRequest constructs an optimized AppendRowsRequest. |
| // Offset (if specified) is applied later. |
| func (ms *ManagedStream) buildRequest(data [][]byte) *storagepb.AppendRowsRequest { |
| return &storagepb.AppendRowsRequest{ |
| Rows: &storagepb.AppendRowsRequest_ProtoRows{ |
| ProtoRows: &storagepb.AppendRowsRequest_ProtoData{ |
| Rows: &storagepb.ProtoRows{ |
| SerializedRows: data, |
| }, |
| }, |
| }, |
| } |
| } |
| |
| // AppendRows sends the append requests to the service, and returns a single AppendResult for tracking |
| // the set of data. |
| // |
| // The format of the row data is binary serialized protocol buffer bytes. The message must be compatible |
| // with the schema currently set for the stream. |
| // |
| // Use the WithOffset() AppendOption to set an explicit offset for this append. Setting an offset for |
| // a default stream is unsupported. |
| // |
| // The size of a single request must be less than 10 MB in size. |
| // Requests larger than this return an error, typically `INVALID_ARGUMENT`. |
| func (ms *ManagedStream) AppendRows(ctx context.Context, data [][]byte, opts ...AppendOption) (*AppendResult, error) { |
| // before we do anything, ensure the writer isn't closed. |
| ms.mu.Lock() |
| err := ms.err |
| ms.mu.Unlock() |
| if err != nil { |
| return nil, err |
| } |
| // Ensure we build the request and pending write with a consistent schema version. |
| curTemplate := ms.curTemplate |
| req := ms.buildRequest(data) |
| pw := newPendingWrite(ctx, ms, req, curTemplate, ms.streamSettings.streamID, ms.streamSettings.TraceID) |
| // apply AppendOption opts |
| for _, opt := range opts { |
| opt(pw) |
| } |
| // Post-request fixup after options are applied. |
| if pw.reqTmpl != nil { |
| if pw.reqTmpl.tmpl != nil { |
| // MVIs must be set on each request, but _default_ MVIs persist across the stream lifetime. Sigh. |
| pw.req.MissingValueInterpretations = pw.reqTmpl.tmpl.GetMissingValueInterpretations() |
| } |
| } |
| |
| // Call the underlying append. The stream has it's own retained context and will surface expiry on |
| // it's own, but we also need to respect any deadline for the provided context. |
| errCh := make(chan error) |
| var appendErr error |
| go func() { |
| select { |
| case errCh <- ms.appendWithRetry(pw): |
| case <-ctx.Done(): |
| case <-ms.ctx.Done(): |
| } |
| close(errCh) |
| }() |
| select { |
| case <-ctx.Done(): |
| // It is incorrect to simply mark the request done, as it's potentially in flight in the bidi stream |
| // where we can't propagate a cancellation. Our options are to return the pending write even though |
| // it's in an ambiguous state, or to return the error and simply drop the pending write on the floor. |
| // |
| // This API expresses request idempotency through offset management, so users who care to use offsets |
| // can deal with the dropped request. |
| return nil, ctx.Err() |
| case <-ms.ctx.Done(): |
| // Same as the request context being done, this indicates the writer context expired. For this case, |
| // we also attempt to close the writer. |
| ms.mu.Lock() |
| if ms.err == nil { |
| ms.err = ms.ctx.Err() |
| } |
| ms.mu.Unlock() |
| ms.Close() |
| // Don't relock to fetch the writer terminal error, as we've already ensured that the writer is closed. |
| return nil, ms.err |
| case appendErr = <-errCh: |
| if appendErr != nil { |
| return nil, appendErr |
| } |
| return pw.result, nil |
| } |
| } |
| |
| // processRetry is responsible for evaluating and re-enqueing an append. |
| // If the append is not retried, it is marked complete. |
| func (ms *ManagedStream) processRetry(pw *pendingWrite, srcConn *connection, appendResp *storagepb.AppendRowsResponse, initialErr error) { |
| err := initialErr |
| for { |
| pause, shouldRetry := ms.statelessRetryer().Retry(err, pw.attemptCount) |
| if !shouldRetry { |
| // Should not attempt to re-append. |
| pw.markDone(appendResp, err) |
| return |
| } |
| time.Sleep(pause) |
| err = ms.appendWithRetry(pw) |
| if err != nil { |
| // Re-enqueue failed, send it through the loop again. |
| continue |
| } |
| // Break out of the loop, we were successful and the write has been |
| // re-inserted. |
| recordWriterStat(ms, AppendRetryCount, 1) |
| break |
| } |
| } |
| |
| // returns the stateless retryer. If one's not set (re-enqueue retries disabled), |
| // it returns a retryer that only permits single attempts. |
| func (ms *ManagedStream) statelessRetryer() *statelessRetryer { |
| if ms.retry != nil { |
| return ms.retry |
| } |
| if ms.pool != nil { |
| return ms.pool.defaultRetryer() |
| } |
| return &statelessRetryer{ |
| maxAttempts: 1, |
| } |
| } |