blob: 8feb9d2dee3e0bfdf389e747634162e78ac956c2 [file] [log] [blame]
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package managedwriter
import (
"bytes"
"encoding/binary"
"hash/crc32"
"time"
"cloud.google.com/go/bigquery/storage/apiv1/storagepb"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/descriptorpb"
)
// sendOptimizer handles the general task of optimizing AppendRowsRequest messages send to the backend.
//
// The general premise is that the ordering of AppendRowsRequests on a connection provides some opportunities
// to reduce payload size, thus potentially increasing throughput. Care must be taken, however, as deep inspection
// of requests is potentially more costly (in terms of CPU usage) than gains from reducing request sizes.
type sendOptimizer interface {
// signalReset is used to signal to the optimizer that the connection is freshly (re)opened, or that a previous
// send yielded an error.
signalReset()
// optimizeSend handles possible manipulation of a request, and triggers the send.
optimizeSend(arc storagepb.BigQueryWrite_AppendRowsClient, pw *pendingWrite) error
// isMultiplexing tracks if we've actually sent writes to more than a single stream on this connection.
isMultiplexing() bool
}
// verboseOptimizer is a primarily a testing optimizer that always sends the full request.
type verboseOptimizer struct {
}
func (vo *verboseOptimizer) signalReset() {
// This optimizer is stateless.
}
// optimizeSend populates a full request every time.
func (vo *verboseOptimizer) optimizeSend(arc storagepb.BigQueryWrite_AppendRowsClient, pw *pendingWrite) error {
return arc.Send(pw.constructFullRequest(true))
}
func (vo *verboseOptimizer) isMultiplexing() bool {
// we declare this no to ensure we always reconnect on schema changes.
return false
}
// simplexOptimizer is used for connections bearing AppendRowsRequest for only a single stream.
//
// The optimizations here are straightforward:
// * The first request on a connection is unmodified.
// * Subsequent requests can redact WriteStream, WriterSchema, and TraceID.
//
// Behavior of schema evolution differs based on the type of stream.
// * For an explicit stream, the connection must reconnect to signal schema change (handled in connection).
// * For default streams, the new descriptor (inside WriterSchema) can simply be sent.
type simplexOptimizer struct {
haveSent bool
}
func (so *simplexOptimizer) signalReset() {
so.haveSent = false
}
func (so *simplexOptimizer) optimizeSend(arc storagepb.BigQueryWrite_AppendRowsClient, pw *pendingWrite) error {
var err error
if so.haveSent {
// subsequent send, we can send the request unmodified.
err = arc.Send(pw.req)
} else {
// first request, build a full request.
err = arc.Send(pw.constructFullRequest(true))
}
so.haveSent = err == nil
return err
}
func (so *simplexOptimizer) isMultiplexing() bool {
// A simplex optimizer is not designed for multiplexing.
return false
}
// multiplexOptimizer is used for connections where requests for multiple default streams are sent on a common
// connection. Only default streams can currently be multiplexed.
//
// In this case, the optimizations are as follows:
// * We must send the WriteStream on all requests.
// * For sequential requests to the same stream, schema can be redacted after the first request.
// * Trace ID can be redacted from all requests after the first.
//
// Schema evolution is simply a case of sending the new WriterSchema as part of the request(s). No explicit
// reconnection is necessary.
type multiplexOptimizer struct {
prevStream string
prevTemplate *versionedTemplate
multiplexStreams bool
}
func (mo *multiplexOptimizer) signalReset() {
mo.prevStream = ""
mo.multiplexStreams = false
mo.prevTemplate = nil
}
func (mo *multiplexOptimizer) optimizeSend(arc storagepb.BigQueryWrite_AppendRowsClient, pw *pendingWrite) error {
var err error
if mo.prevStream == "" {
// startup case, send a full request (with traceID).
req := pw.constructFullRequest(true)
err = arc.Send(req)
if err == nil {
mo.prevStream = req.GetWriteStream()
mo.prevTemplate = pw.reqTmpl
}
} else {
// We have a previous send. Determine if it's the same stream or a different one.
if mo.prevStream == pw.writeStreamID {
// add the stream ID to the optimized request, as multiplex-optimization wants it present.
if pw.req.GetWriteStream() == "" {
pw.req.WriteStream = pw.writeStreamID
}
// swapOnSuccess tracks if we need to update schema versions on successful send.
swapOnSuccess := false
req := pw.req
if mo.prevTemplate != nil {
if !mo.prevTemplate.Compatible(pw.reqTmpl) {
swapOnSuccess = true
req = pw.constructFullRequest(false) // full request minus traceID.
}
}
err = arc.Send(req)
if err == nil && swapOnSuccess {
mo.prevTemplate = pw.reqTmpl
}
} else {
// The previous send was for a different stream. Send a full request, minus traceId.
req := pw.constructFullRequest(false)
err = arc.Send(req)
if err == nil {
// Send successful. Update state to reflect this send is now the "previous" state.
mo.prevStream = pw.writeStreamID
mo.prevTemplate = pw.reqTmpl
}
// Also, note that we've sent traffic for multiple streams, which means the backend recognizes this
// is a multiplex stream as well.
mo.multiplexStreams = true
}
}
return err
}
func (mo *multiplexOptimizer) isMultiplexing() bool {
return mo.multiplexStreams
}
// versionedTemplate is used for faster comparison of the templated part of
// an AppendRowsRequest, which bears settings-like fields related to schema
// and default value configuration. Direct proto comparison through something
// like proto.Equal is far too expensive, so versionTemplate leverages a faster
// hash-based comparison to avoid the deep equality checks.
type versionedTemplate struct {
versionTime time.Time
hashVal uint32
tmpl *storagepb.AppendRowsRequest
}
func newVersionedTemplate() *versionedTemplate {
vt := &versionedTemplate{
versionTime: time.Now(),
tmpl: &storagepb.AppendRowsRequest{},
}
vt.computeHash()
return vt
}
// computeHash is an internal utility function for calculating the hash value
// for faster comparison.
func (vt *versionedTemplate) computeHash() {
buf := new(bytes.Buffer)
if b, err := proto.Marshal(vt.tmpl); err == nil {
buf.Write(b)
} else {
// if we fail to serialize the proto (unlikely), consume the timestamp for input instead.
binary.Write(buf, binary.LittleEndian, vt.versionTime.UnixNano())
}
vt.hashVal = crc32.ChecksumIEEE(buf.Bytes())
}
type templateRevisionF func(m *storagepb.AppendRowsRequest)
// revise makes a new versionedTemplate from the existing template, applying any changes.
// The original revision is returned if there's no effective difference after changes are
// applied.
func (vt *versionedTemplate) revise(changes ...templateRevisionF) *versionedTemplate {
before := vt
if before == nil {
before = newVersionedTemplate()
}
if len(changes) == 0 {
// if there's no changes, return the base revision immediately.
return before
}
out := &versionedTemplate{
versionTime: time.Now(),
tmpl: proto.Clone(before.tmpl).(*storagepb.AppendRowsRequest),
}
for _, r := range changes {
r(out.tmpl)
}
out.computeHash()
if out.Compatible(before) {
// The changes didn't yield an measured difference. Return the base revision to avoid
// possible connection churn from no-op revisions.
return before
}
return out
}
// Compatible is effectively a fast equality check, that relies on the hash value
// and avoids the potentially very costly deep comparison of the proto message templates.
func (vt *versionedTemplate) Compatible(other *versionedTemplate) bool {
if other == nil {
return vt == nil
}
return vt.hashVal == other.hashVal
}
func reviseProtoSchema(newSchema *descriptorpb.DescriptorProto) templateRevisionF {
return func(m *storagepb.AppendRowsRequest) {
if m != nil {
m.Rows = &storagepb.AppendRowsRequest_ProtoRows{
ProtoRows: &storagepb.AppendRowsRequest_ProtoData{
WriterSchema: &storagepb.ProtoSchema{
ProtoDescriptor: proto.Clone(newSchema).(*descriptorpb.DescriptorProto),
},
},
}
}
}
}
func reviseMissingValueInterpretations(vi map[string]storagepb.AppendRowsRequest_MissingValueInterpretation) templateRevisionF {
return func(m *storagepb.AppendRowsRequest) {
if m != nil {
m.MissingValueInterpretations = vi
}
}
}
func reviseDefaultMissingValueInterpretation(def storagepb.AppendRowsRequest_MissingValueInterpretation) templateRevisionF {
return func(m *storagepb.AppendRowsRequest) {
if m != nil {
m.DefaultMissingValueInterpretation = def
}
}
}