blob: 18f9881238439e1382cc0f51cac2c90d7df8d809 [file] [log] [blame]
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package managedwriter
import (
"cloud.google.com/go/bigquery/storage/apiv1/storagepb"
"github.com/googleapis/gax-go/v2"
"google.golang.org/api/option"
"google.golang.org/api/option/internaloption"
"google.golang.org/protobuf/types/descriptorpb"
"google.golang.org/protobuf/types/known/wrapperspb"
)
// encapsulates custom client-level config settings.
type writerClientConfig struct {
useMultiplex bool
maxMultiplexPoolSize int
defaultInflightRequests int
defaultInflightBytes int
defaultAppendRowsCallOptions []gax.CallOption
}
// newWriterClientConfig builds a client config based on package-specific custom ClientOptions.
func newWriterClientConfig(opts ...option.ClientOption) *writerClientConfig {
conf := &writerClientConfig{}
for _, opt := range opts {
if wOpt, ok := opt.(writerClientOption); ok {
wOpt.ApplyWriterOpt(conf)
}
}
// Normalize the config to ensure we're dealing with sane values.
if conf.useMultiplex {
if conf.maxMultiplexPoolSize < 1 {
conf.maxMultiplexPoolSize = 1
}
}
if conf.defaultInflightBytes < 0 {
conf.defaultInflightBytes = 0
}
if conf.defaultInflightRequests < 0 {
conf.defaultInflightRequests = 0
}
return conf
}
// writerClientOption allows us to extend ClientOptions for client-specific needs.
type writerClientOption interface {
option.ClientOption
ApplyWriterOpt(*writerClientConfig)
}
// WithMultiplexing is an EXPERIMENTAL option that controls connection sharing
// when instantiating the Client. Only writes to default streams can leverage the
// multiplex pool. Internally, the client maintains a pool of connections per BigQuery
// destination region, and will grow the pool to it's maximum allowed size if there's
// sufficient traffic on the shared connection(s).
//
// This ClientOption is EXPERIMENTAL and subject to change.
func WithMultiplexing() option.ClientOption {
return &enableMultiplexSetting{useMultiplex: true}
}
type enableMultiplexSetting struct {
internaloption.EmbeddableAdapter
useMultiplex bool
}
func (s *enableMultiplexSetting) ApplyWriterOpt(c *writerClientConfig) {
c.useMultiplex = s.useMultiplex
}
// WithMultiplexPoolLimit is an EXPERIMENTAL option that sets the maximum
// shared multiplex pool size when instantiating the Client. If multiplexing
// is not enabled, this setting is ignored. By default, the limit is a single
// shared connection. This limit is applied per destination region.
//
// This ClientOption is EXPERIMENTAL and subject to change.
func WithMultiplexPoolLimit(maxSize int) option.ClientOption {
return &maxMultiplexPoolSizeSetting{maxSize: maxSize}
}
type maxMultiplexPoolSizeSetting struct {
internaloption.EmbeddableAdapter
maxSize int
}
func (s *maxMultiplexPoolSizeSetting) ApplyWriterOpt(c *writerClientConfig) {
c.maxMultiplexPoolSize = s.maxSize
}
// WithDefaultInflightRequests is an EXPERIMENTAL ClientOption for controlling
// the default limit of how many individual AppendRows write requests can
// be in flight on a connection at a time. This limit is enforced on all connections
// created by the instantiated Client.
//
// Note: the WithMaxInflightRequests WriterOption can still be used to control
// the behavior for individual ManagedStream writers when not using multiplexing.
//
// This ClientOption is EXPERIMENTAL and subject to change.
func WithDefaultInflightRequests(n int) option.ClientOption {
return &defaultInflightRequestsSetting{maxRequests: n}
}
type defaultInflightRequestsSetting struct {
internaloption.EmbeddableAdapter
maxRequests int
}
func (s *defaultInflightRequestsSetting) ApplyWriterOpt(c *writerClientConfig) {
c.defaultInflightRequests = s.maxRequests
}
// WithDefaultInflightBytes is an EXPERIMENTAL ClientOption for controlling
// the default byte limit for how many individual AppendRows write requests can
// be in flight on a connection at a time. This limit is enforced on all connections
// created by the instantiated Client.
//
// Note: the WithMaxInflightBytes WriterOption can still be used to control
// the behavior for individual ManagedStream writers when not using multiplexing.
//
// This ClientOption is EXPERIMENTAL and subject to change.
func WithDefaultInflightBytes(n int) option.ClientOption {
return &defaultInflightBytesSetting{maxBytes: n}
}
type defaultInflightBytesSetting struct {
internaloption.EmbeddableAdapter
maxBytes int
}
func (s *defaultInflightBytesSetting) ApplyWriterOpt(c *writerClientConfig) {
c.defaultInflightBytes = s.maxBytes
}
// WithDefaultAppendRowsCallOption is an EXPERIMENTAL ClientOption for controlling
// the gax.CallOptions passed when opening the underlying AppendRows bidi
// stream connections used by this library to communicate with the BigQuery
// Storage service. This option is propagated to all
// connections created by the instantiated Client.
//
// Note: the WithAppendRowsCallOption WriterOption can still be used to control
// the behavior for individual ManagedStream writers that don't participate
// in multiplexing.
//
// This ClientOption is EXPERIMENTAL and subject to change.
func WithDefaultAppendRowsCallOption(o gax.CallOption) option.ClientOption {
return &defaultAppendRowsCallOptionSetting{opt: o}
}
type defaultAppendRowsCallOptionSetting struct {
internaloption.EmbeddableAdapter
opt gax.CallOption
}
func (s *defaultAppendRowsCallOptionSetting) ApplyWriterOpt(c *writerClientConfig) {
c.defaultAppendRowsCallOptions = append(c.defaultAppendRowsCallOptions, s.opt)
}
// WriterOption are variadic options used to configure a ManagedStream instance.
type WriterOption func(*ManagedStream)
// WithType sets the stream type for the managed stream.
func WithType(st StreamType) WriterOption {
return func(ms *ManagedStream) {
ms.streamSettings.streamType = st
}
}
// WithStreamName allows users to set the stream name this writer will
// append to explicitly. By default, the managed client will create the
// stream when instantiated if necessary.
//
// Note: Supplying this option causes other options which affect stream construction
// such as WithStreamType and WithDestinationTable to be ignored.
func WithStreamName(name string) WriterOption {
return func(ms *ManagedStream) {
ms.streamSettings.streamID = name
}
}
// WithDestinationTable specifies the destination table to which a created
// stream will append rows. Format of the table:
//
// projects/{projectid}/datasets/{dataset}/tables/{table}
func WithDestinationTable(destTable string) WriterOption {
return func(ms *ManagedStream) {
ms.streamSettings.destinationTable = destTable
}
}
// WithMaxInflightRequests bounds the inflight appends on the write connection.
//
// Note: See the WithDefaultInflightRequests ClientOption for setting a default
// when instantiating a client, rather than setting this limit per-writer.
// This WriterOption is ignored for ManagedStreams that participate in multiplexing.
func WithMaxInflightRequests(n int) WriterOption {
return func(ms *ManagedStream) {
ms.streamSettings.MaxInflightRequests = n
}
}
// WithMaxInflightBytes bounds the inflight append request bytes on the write connection.
//
// Note: See the WithDefaultInflightBytes ClientOption for setting a default
// when instantiating a client, rather than setting this limit per-writer.
// This WriterOption is ignored for ManagedStreams that participate in multiplexing.
func WithMaxInflightBytes(n int) WriterOption {
return func(ms *ManagedStream) {
ms.streamSettings.MaxInflightBytes = n
}
}
// WithTraceID allows instruments requests to the service with a custom trace prefix.
// This is generally for diagnostic purposes only.
func WithTraceID(traceID string) WriterOption {
return func(ms *ManagedStream) {
ms.streamSettings.TraceID = traceID
}
}
// WithSchemaDescriptor describes the format of the serialized data being sent by
// AppendRows calls on the stream.
func WithSchemaDescriptor(dp *descriptorpb.DescriptorProto) WriterOption {
return func(ms *ManagedStream) {
ms.curTemplate = ms.curTemplate.revise(reviseProtoSchema(dp))
}
}
// WithMissingValueInterpretations controls how missing values are interpreted
// for individual columns.
//
// You must provide a map to indicate how to interpret missing value for some fields. Missing
// values are fields present in user schema but missing in rows. The key is
// the field name. The value is the interpretation of missing values for the
// field.
//
// For example, the following option would indicate that missing values in the "foo"
// column are interpreted as null, whereas missing values in the "bar" column are
// treated as the default value:
//
// WithMissingValueInterpretations(map[string]storagepb.AppendRowsRequest_MissingValueInterpretation{
// "foo": storagepb.AppendRowsRequest_DEFAULT_VALUE,
// "bar": storagepb.AppendRowsRequest_NULL_VALUE,
// })
//
// If a field is not in this map and has missing values, the missing values
// in this field are interpreted as NULL unless overridden with a default missing
// value interpretation.
//
// Currently, field name can only be top-level column name, can't be a struct
// field path like 'foo.bar'.
func WithMissingValueInterpretations(mvi map[string]storagepb.AppendRowsRequest_MissingValueInterpretation) WriterOption {
return func(ms *ManagedStream) {
ms.curTemplate = ms.curTemplate.revise(reviseMissingValueInterpretations(mvi))
}
}
// WithDefaultMissingValueInterpretation controls how missing values are interpreted by
// for a given stream. See WithMissingValueIntepretations for more information about
// missing values.
//
// WithMissingValueIntepretations set for individual colums can override the default chosen
// with this option.
//
// For example, if you want to write
// `NULL` instead of using default values for some columns, you can set
// `default_missing_value_interpretation` to `DEFAULT_VALUE` and at the same
// time, set `missing_value_interpretations` to `NULL_VALUE` on those columns.
func WithDefaultMissingValueInterpretation(def storagepb.AppendRowsRequest_MissingValueInterpretation) WriterOption {
return func(ms *ManagedStream) {
ms.curTemplate = ms.curTemplate.revise(reviseDefaultMissingValueInterpretation(def))
}
}
// WithDataOrigin is used to attach an origin context to the instrumentation metrics
// emitted by the library.
func WithDataOrigin(dataOrigin string) WriterOption {
return func(ms *ManagedStream) {
ms.streamSettings.dataOrigin = dataOrigin
}
}
// WithAppendRowsCallOption is used to supply additional call options to the ManagedStream when
// it opens the underlying append stream.
//
// Note: See the DefaultAppendRowsCallOption ClientOption for setting defaults
// when instantiating a client, rather than setting this limit per-writer. This WriterOption
// is ignored for ManagedStream writers that participate in multiplexing.
func WithAppendRowsCallOption(o gax.CallOption) WriterOption {
return func(ms *ManagedStream) {
ms.streamSettings.appendCallOptions = append(ms.streamSettings.appendCallOptions, o)
}
}
// EnableWriteRetries enables ManagedStream to automatically retry failed appends.
//
// Enabling retries is best suited for cases where users want to achieve at-least-once
// append semantics. Use of automatic retries may complicate patterns where the user
// is designing for exactly-once append semantics.
func EnableWriteRetries(enable bool) WriterOption {
return func(ms *ManagedStream) {
if enable {
ms.retry = newStatelessRetryer()
}
}
}
// AppendOption are options that can be passed when appending data with a managed stream instance.
type AppendOption func(*pendingWrite)
// UpdateSchemaDescriptor is used to update the descriptor message schema associated
// with a given stream.
func UpdateSchemaDescriptor(schema *descriptorpb.DescriptorProto) AppendOption {
return func(pw *pendingWrite) {
pw.reqTmpl = pw.reqTmpl.revise(reviseProtoSchema(schema))
}
}
// UpdateMissingValueInterpretations updates the per-column missing-value intepretations settings,
// and is retained for subsequent writes. See the WithMissingValueInterpretations WriterOption for
// more details.
func UpdateMissingValueInterpretations(mvi map[string]storagepb.AppendRowsRequest_MissingValueInterpretation) AppendOption {
return func(pw *pendingWrite) {
pw.reqTmpl = pw.reqTmpl.revise(reviseMissingValueInterpretations(mvi))
}
}
// UpdateDefaultMissingValueInterpretation updates the default intepretations setting for the stream,
// and is retained for subsequent writes. See the WithDefaultMissingValueInterpretations WriterOption for
// more details.
func UpdateDefaultMissingValueInterpretation(def storagepb.AppendRowsRequest_MissingValueInterpretation) AppendOption {
return func(pw *pendingWrite) {
pw.reqTmpl = pw.reqTmpl.revise(reviseDefaultMissingValueInterpretation(def))
}
}
// WithOffset sets an explicit offset value for this append request.
func WithOffset(offset int64) AppendOption {
return func(pw *pendingWrite) {
pw.req.Offset = &wrapperspb.Int64Value{
Value: offset,
}
}
}