| // Copyright 2021 Google LLC |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // https://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package managedwriter |
| |
| import ( |
| "context" |
| |
| "cloud.google.com/go/bigquery/storage/apiv1/storagepb" |
| "github.com/googleapis/gax-go/v2/apierror" |
| grpcstatus "google.golang.org/grpc/status" |
| "google.golang.org/protobuf/proto" |
| ) |
| |
| // NoStreamOffset is a sentinel value for signalling we're not tracking |
| // stream offset (e.g. a default stream which allows simultaneous append streams). |
| const NoStreamOffset int64 = -1 |
| |
| // AppendResult tracks the status of a batch of data rows. |
| type AppendResult struct { |
| ready chan struct{} |
| |
| // if the append failed without a response, this will retain a reference to the error. |
| err error |
| |
| // retains the original response. |
| response *storagepb.AppendRowsResponse |
| |
| // retains the number of times this individual write was enqueued. |
| totalAttempts int |
| } |
| |
| func newAppendResult() *AppendResult { |
| return &AppendResult{ |
| ready: make(chan struct{}), |
| } |
| } |
| |
| // Ready blocks until the append request has reached a completed state, |
| // which may be a successful append or an error. |
| func (ar *AppendResult) Ready() <-chan struct{} { return ar.ready } |
| |
| // GetResult returns the optional offset of this row, as well as any error encountered while |
| // processing the append. |
| // |
| // This call blocks until the result is ready, or context is no longer valid. |
| func (ar *AppendResult) GetResult(ctx context.Context) (int64, error) { |
| select { |
| case <-ctx.Done(): |
| return NoStreamOffset, ctx.Err() |
| case <-ar.Ready(): |
| full, err := ar.FullResponse(ctx) |
| offset := NoStreamOffset |
| if full != nil { |
| if result := full.GetAppendResult(); result != nil { |
| if off := result.GetOffset(); off != nil { |
| offset = off.GetValue() |
| } |
| } |
| } |
| return offset, err |
| } |
| } |
| |
| // FullResponse returns the full content of the AppendRowsResponse, and any error encountered while |
| // processing the append. |
| // |
| // The AppendRowResponse may contain an embedded error. An embedded error in the response will be |
| // converted and returned as the error response, so this method may return both the |
| // AppendRowsResponse and an error. |
| // |
| // This call blocks until the result is ready, or context is no longer valid. |
| func (ar *AppendResult) FullResponse(ctx context.Context) (*storagepb.AppendRowsResponse, error) { |
| select { |
| case <-ctx.Done(): |
| return nil, ctx.Err() |
| case <-ar.Ready(): |
| var err error |
| if ar.err != nil { |
| err = ar.err |
| } else { |
| if ar.response != nil { |
| if status := ar.response.GetError(); status != nil { |
| statusErr := grpcstatus.ErrorProto(status) |
| // Provide an APIError if possible. |
| if apiErr, ok := apierror.FromError(statusErr); ok { |
| err = apiErr |
| } else { |
| err = statusErr |
| } |
| } |
| } |
| } |
| if ar.response != nil { |
| return proto.Clone(ar.response).(*storagepb.AppendRowsResponse), err |
| } |
| return nil, err |
| } |
| } |
| |
| func (ar *AppendResult) offset(ctx context.Context) int64 { |
| select { |
| case <-ctx.Done(): |
| return NoStreamOffset |
| case <-ar.Ready(): |
| if ar.response != nil { |
| if result := ar.response.GetAppendResult(); result != nil { |
| if off := result.GetOffset(); off != nil { |
| return off.GetValue() |
| } |
| } |
| } |
| return NoStreamOffset |
| } |
| } |
| |
| // UpdatedSchema returns the updated schema for a table if supplied by the backend as part |
| // of the append response. |
| // |
| // This call blocks until the result is ready, or context is no longer valid. |
| func (ar *AppendResult) UpdatedSchema(ctx context.Context) (*storagepb.TableSchema, error) { |
| select { |
| case <-ctx.Done(): |
| return nil, ctx.Err() |
| case <-ar.Ready(): |
| if ar.response != nil { |
| if schema := ar.response.GetUpdatedSchema(); schema != nil { |
| return proto.Clone(schema).(*storagepb.TableSchema), nil |
| } |
| } |
| return nil, nil |
| } |
| } |
| |
| // TotalAttempts returns the number of times this write was attempted. |
| // |
| // This call blocks until the result is ready, or context is no longer valid. |
| func (ar *AppendResult) TotalAttempts(ctx context.Context) (int, error) { |
| select { |
| case <-ctx.Done(): |
| return 0, ctx.Err() |
| case <-ar.Ready(): |
| return ar.totalAttempts, nil |
| } |
| } |
| |
| // pendingWrite tracks state for a set of rows that are part of a single |
| // append request. |
| type pendingWrite struct { |
| // writer retains a reference to the origin of a pending write. Primary |
| // used is to inform routing decisions. |
| writer *ManagedStream |
| |
| // We store the request as it's simplex-optimized form, as statistically that's the most |
| // likely outcome when processing requests and it allows us to be efficient on send. |
| // We retain the additional information to build the complete request in the related fields. |
| req *storagepb.AppendRowsRequest |
| reqTmpl *versionedTemplate // request template at time of creation |
| traceID string |
| writeStreamID string |
| |
| // Reference to the AppendResult which is exposed to the user. |
| result *AppendResult |
| |
| // Flow control is based on the unoptimized request size. |
| reqSize int |
| |
| // retains the original request context, primarily for checking against |
| // cancellation signals. |
| reqCtx context.Context |
| |
| // tracks the number of times we've attempted this append request. |
| attemptCount int |
| } |
| |
| // newPendingWrite constructs the proto request and attaches references |
| // to the pending results for later consumption. The provided context is |
| // embedded in the pending write, as the write may be retried and we want |
| // to respect the original context for expiry/cancellation etc. |
| func newPendingWrite(ctx context.Context, src *ManagedStream, req *storagepb.AppendRowsRequest, reqTmpl *versionedTemplate, writeStreamID, traceID string) *pendingWrite { |
| pw := &pendingWrite{ |
| writer: src, |
| result: newAppendResult(), |
| reqCtx: ctx, |
| |
| req: req, // minimal req, typically just row data |
| reqTmpl: reqTmpl, // remainder of templated request |
| writeStreamID: writeStreamID, |
| traceID: traceID, |
| } |
| // Compute the approx size for flow control purposes. |
| pw.reqSize = proto.Size(pw.req) + len(writeStreamID) + len(traceID) |
| if pw.reqTmpl != nil { |
| pw.reqSize += proto.Size(pw.reqTmpl.tmpl) |
| } |
| return pw |
| } |
| |
| // markDone propagates finalization of an append request to the associated |
| // AppendResult. |
| func (pw *pendingWrite) markDone(resp *storagepb.AppendRowsResponse, err error) { |
| // First, propagate necessary state from the pendingWrite to the final result. |
| if resp != nil { |
| pw.result.response = resp |
| } |
| pw.result.err = err |
| pw.result.totalAttempts = pw.attemptCount |
| |
| // Close the result's ready channel. |
| close(pw.result.ready) |
| // Cleanup references remaining on the write explicitly. |
| pw.req = nil |
| pw.reqTmpl = nil |
| pw.writer = nil |
| pw.reqCtx = nil |
| } |
| |
| func (pw *pendingWrite) constructFullRequest(addTrace bool) *storagepb.AppendRowsRequest { |
| req := &storagepb.AppendRowsRequest{} |
| if pw.reqTmpl != nil { |
| req = proto.Clone(pw.reqTmpl.tmpl).(*storagepb.AppendRowsRequest) |
| } |
| if pw.req != nil { |
| proto.Merge(req, pw.req) |
| } |
| if addTrace { |
| req.TraceId = buildTraceID(&streamSettings{TraceID: pw.traceID}) |
| } |
| req.WriteStream = pw.writeStreamID |
| return req |
| } |