| // Copyright 2023 Google LLC |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // https://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package managedwriter |
| |
| import ( |
| "bytes" |
| "encoding/binary" |
| "hash/crc32" |
| "time" |
| |
| "cloud.google.com/go/bigquery/storage/apiv1/storagepb" |
| "google.golang.org/protobuf/proto" |
| "google.golang.org/protobuf/types/descriptorpb" |
| ) |
| |
| // sendOptimizer handles the general task of optimizing AppendRowsRequest messages send to the backend. |
| // |
| // The general premise is that the ordering of AppendRowsRequests on a connection provides some opportunities |
| // to reduce payload size, thus potentially increasing throughput. Care must be taken, however, as deep inspection |
| // of requests is potentially more costly (in terms of CPU usage) than gains from reducing request sizes. |
| type sendOptimizer interface { |
| // signalReset is used to signal to the optimizer that the connection is freshly (re)opened, or that a previous |
| // send yielded an error. |
| signalReset() |
| |
| // optimizeSend handles possible manipulation of a request, and triggers the send. |
| optimizeSend(arc storagepb.BigQueryWrite_AppendRowsClient, pw *pendingWrite) error |
| |
| // isMultiplexing tracks if we've actually sent writes to more than a single stream on this connection. |
| isMultiplexing() bool |
| } |
| |
| // verboseOptimizer is a primarily a testing optimizer that always sends the full request. |
| type verboseOptimizer struct { |
| } |
| |
| func (vo *verboseOptimizer) signalReset() { |
| // This optimizer is stateless. |
| } |
| |
| // optimizeSend populates a full request every time. |
| func (vo *verboseOptimizer) optimizeSend(arc storagepb.BigQueryWrite_AppendRowsClient, pw *pendingWrite) error { |
| return arc.Send(pw.constructFullRequest(true)) |
| } |
| |
| func (vo *verboseOptimizer) isMultiplexing() bool { |
| // we declare this no to ensure we always reconnect on schema changes. |
| return false |
| } |
| |
| // simplexOptimizer is used for connections bearing AppendRowsRequest for only a single stream. |
| // |
| // The optimizations here are straightforward: |
| // * The first request on a connection is unmodified. |
| // * Subsequent requests can redact WriteStream, WriterSchema, and TraceID. |
| // |
| // Behavior of schema evolution differs based on the type of stream. |
| // * For an explicit stream, the connection must reconnect to signal schema change (handled in connection). |
| // * For default streams, the new descriptor (inside WriterSchema) can simply be sent. |
| type simplexOptimizer struct { |
| haveSent bool |
| } |
| |
| func (so *simplexOptimizer) signalReset() { |
| so.haveSent = false |
| } |
| |
| func (so *simplexOptimizer) optimizeSend(arc storagepb.BigQueryWrite_AppendRowsClient, pw *pendingWrite) error { |
| var err error |
| if so.haveSent { |
| // subsequent send, we can send the request unmodified. |
| err = arc.Send(pw.req) |
| } else { |
| // first request, build a full request. |
| err = arc.Send(pw.constructFullRequest(true)) |
| } |
| so.haveSent = err == nil |
| return err |
| } |
| |
| func (so *simplexOptimizer) isMultiplexing() bool { |
| // A simplex optimizer is not designed for multiplexing. |
| return false |
| } |
| |
| // multiplexOptimizer is used for connections where requests for multiple default streams are sent on a common |
| // connection. Only default streams can currently be multiplexed. |
| // |
| // In this case, the optimizations are as follows: |
| // * We must send the WriteStream on all requests. |
| // * For sequential requests to the same stream, schema can be redacted after the first request. |
| // * Trace ID can be redacted from all requests after the first. |
| // |
| // Schema evolution is simply a case of sending the new WriterSchema as part of the request(s). No explicit |
| // reconnection is necessary. |
| type multiplexOptimizer struct { |
| prevStream string |
| prevTemplate *versionedTemplate |
| multiplexStreams bool |
| } |
| |
| func (mo *multiplexOptimizer) signalReset() { |
| mo.prevStream = "" |
| mo.multiplexStreams = false |
| mo.prevTemplate = nil |
| } |
| |
| func (mo *multiplexOptimizer) optimizeSend(arc storagepb.BigQueryWrite_AppendRowsClient, pw *pendingWrite) error { |
| var err error |
| if mo.prevStream == "" { |
| // startup case, send a full request (with traceID). |
| req := pw.constructFullRequest(true) |
| err = arc.Send(req) |
| if err == nil { |
| mo.prevStream = req.GetWriteStream() |
| mo.prevTemplate = pw.reqTmpl |
| } |
| } else { |
| // We have a previous send. Determine if it's the same stream or a different one. |
| if mo.prevStream == pw.writeStreamID { |
| // add the stream ID to the optimized request, as multiplex-optimization wants it present. |
| if pw.req.GetWriteStream() == "" { |
| pw.req.WriteStream = pw.writeStreamID |
| } |
| // swapOnSuccess tracks if we need to update schema versions on successful send. |
| swapOnSuccess := false |
| req := pw.req |
| if mo.prevTemplate != nil { |
| if !mo.prevTemplate.Compatible(pw.reqTmpl) { |
| swapOnSuccess = true |
| req = pw.constructFullRequest(false) // full request minus traceID. |
| } |
| } |
| err = arc.Send(req) |
| if err == nil && swapOnSuccess { |
| mo.prevTemplate = pw.reqTmpl |
| } |
| } else { |
| // The previous send was for a different stream. Send a full request, minus traceId. |
| req := pw.constructFullRequest(false) |
| err = arc.Send(req) |
| if err == nil { |
| // Send successful. Update state to reflect this send is now the "previous" state. |
| mo.prevStream = pw.writeStreamID |
| mo.prevTemplate = pw.reqTmpl |
| } |
| // Also, note that we've sent traffic for multiple streams, which means the backend recognizes this |
| // is a multiplex stream as well. |
| mo.multiplexStreams = true |
| } |
| } |
| return err |
| } |
| |
| func (mo *multiplexOptimizer) isMultiplexing() bool { |
| return mo.multiplexStreams |
| } |
| |
| // versionedTemplate is used for faster comparison of the templated part of |
| // an AppendRowsRequest, which bears settings-like fields related to schema |
| // and default value configuration. Direct proto comparison through something |
| // like proto.Equal is far too expensive, so versionTemplate leverages a faster |
| // hash-based comparison to avoid the deep equality checks. |
| type versionedTemplate struct { |
| versionTime time.Time |
| hashVal uint32 |
| tmpl *storagepb.AppendRowsRequest |
| } |
| |
| func newVersionedTemplate() *versionedTemplate { |
| vt := &versionedTemplate{ |
| versionTime: time.Now(), |
| tmpl: &storagepb.AppendRowsRequest{}, |
| } |
| vt.computeHash() |
| return vt |
| } |
| |
| // computeHash is an internal utility function for calculating the hash value |
| // for faster comparison. |
| func (vt *versionedTemplate) computeHash() { |
| buf := new(bytes.Buffer) |
| if b, err := proto.Marshal(vt.tmpl); err == nil { |
| buf.Write(b) |
| } else { |
| // if we fail to serialize the proto (unlikely), consume the timestamp for input instead. |
| binary.Write(buf, binary.LittleEndian, vt.versionTime.UnixNano()) |
| } |
| vt.hashVal = crc32.ChecksumIEEE(buf.Bytes()) |
| } |
| |
| type templateRevisionF func(m *storagepb.AppendRowsRequest) |
| |
| // revise makes a new versionedTemplate from the existing template, applying any changes. |
| // The original revision is returned if there's no effective difference after changes are |
| // applied. |
| func (vt *versionedTemplate) revise(changes ...templateRevisionF) *versionedTemplate { |
| before := vt |
| if before == nil { |
| before = newVersionedTemplate() |
| } |
| if len(changes) == 0 { |
| // if there's no changes, return the base revision immediately. |
| return before |
| } |
| out := &versionedTemplate{ |
| versionTime: time.Now(), |
| tmpl: proto.Clone(before.tmpl).(*storagepb.AppendRowsRequest), |
| } |
| for _, r := range changes { |
| r(out.tmpl) |
| } |
| out.computeHash() |
| if out.Compatible(before) { |
| // The changes didn't yield an measured difference. Return the base revision to avoid |
| // possible connection churn from no-op revisions. |
| return before |
| } |
| return out |
| } |
| |
| // Compatible is effectively a fast equality check, that relies on the hash value |
| // and avoids the potentially very costly deep comparison of the proto message templates. |
| func (vt *versionedTemplate) Compatible(other *versionedTemplate) bool { |
| if other == nil { |
| return vt == nil |
| } |
| return vt.hashVal == other.hashVal |
| } |
| |
| func reviseProtoSchema(newSchema *descriptorpb.DescriptorProto) templateRevisionF { |
| return func(m *storagepb.AppendRowsRequest) { |
| if m != nil { |
| m.Rows = &storagepb.AppendRowsRequest_ProtoRows{ |
| ProtoRows: &storagepb.AppendRowsRequest_ProtoData{ |
| WriterSchema: &storagepb.ProtoSchema{ |
| ProtoDescriptor: proto.Clone(newSchema).(*descriptorpb.DescriptorProto), |
| }, |
| }, |
| } |
| } |
| } |
| } |
| |
| func reviseMissingValueInterpretations(vi map[string]storagepb.AppendRowsRequest_MissingValueInterpretation) templateRevisionF { |
| return func(m *storagepb.AppendRowsRequest) { |
| if m != nil { |
| m.MissingValueInterpretations = vi |
| } |
| } |
| } |
| |
| func reviseDefaultMissingValueInterpretation(def storagepb.AppendRowsRequest_MissingValueInterpretation) templateRevisionF { |
| return func(m *storagepb.AppendRowsRequest) { |
| if m != nil { |
| m.DefaultMissingValueInterpretation = def |
| } |
| } |
| } |