blob: ec872ec1386185caab6657bc0533c9ad2ceb029c [file] [log] [blame]
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package managedwriter
import (
"context"
"fmt"
"runtime"
"strings"
"sync"
"cloud.google.com/go/bigquery/internal"
storage "cloud.google.com/go/bigquery/storage/apiv1"
"cloud.google.com/go/bigquery/storage/apiv1/storagepb"
"cloud.google.com/go/internal/detect"
"github.com/google/uuid"
"github.com/googleapis/gax-go/v2"
"google.golang.org/api/option"
"google.golang.org/grpc/metadata"
)
// DetectProjectID is a sentinel value that instructs NewClient to detect the
// project ID. It is given in place of the projectID argument. NewClient will
// use the project ID from the given credentials or the default credentials
// (https://developers.google.com/accounts/docs/application-default-credentials)
// if no credentials were provided. When providing credentials, not all
// options will allow NewClient to extract the project ID. Specifically a JWT
// does not have the project ID encoded.
const DetectProjectID = "*detect-project-id*"
// Client is a managed BigQuery Storage write client scoped to a single project.
type Client struct {
rawClient *storage.BigQueryWriteClient
projectID string
// retained context. primarily used for connection management and the underlying
// client.
ctx context.Context
cancel context.CancelFunc
// cfg retains general settings (custom ClientOptions).
cfg *writerClientConfig
// mu guards access to shared connectionPool instances.
mu sync.Mutex
// When multiplexing is enabled, this map retains connectionPools keyed by region ID.
pools map[string]*connectionPool
}
// NewClient instantiates a new client.
//
// The context provided here is retained and used for background connection management
// between the client and the BigQuery Storage service.
func NewClient(ctx context.Context, projectID string, opts ...option.ClientOption) (c *Client, err error) {
// Set a reasonable default for the gRPC connection pool size.
numConns := runtime.GOMAXPROCS(0)
if numConns > 4 {
numConns = 4
}
o := []option.ClientOption{
option.WithGRPCConnectionPool(numConns),
}
o = append(o, opts...)
cCtx, cancel := context.WithCancel(ctx)
rawClient, err := storage.NewBigQueryWriteClient(cCtx, o...)
if err != nil {
cancel()
return nil, err
}
rawClient.SetGoogleClientInfo("gccl", internal.Version)
// Handle project autodetection.
projectID, err = detect.ProjectID(ctx, projectID, "", opts...)
if err != nil {
cancel()
return nil, err
}
return &Client{
rawClient: rawClient,
projectID: projectID,
ctx: cCtx,
cancel: cancel,
cfg: newWriterClientConfig(opts...),
pools: make(map[string]*connectionPool),
}, nil
}
// Close releases resources held by the client.
func (c *Client) Close() error {
// Shutdown the per-region pools.
c.mu.Lock()
defer c.mu.Unlock()
var firstErr error
for _, pool := range c.pools {
if err := pool.Close(); err != nil && firstErr == nil {
firstErr = err
}
}
// Close the underlying client stub.
if err := c.rawClient.Close(); err != nil && firstErr == nil {
firstErr = err
}
// Cancel the retained client context.
if c.cancel != nil {
c.cancel()
}
return firstErr
}
// NewManagedStream establishes a new managed stream for appending data into a table.
//
// Context here is retained for use by the underlying streaming connections the managed stream may create.
func (c *Client) NewManagedStream(ctx context.Context, opts ...WriterOption) (*ManagedStream, error) {
return c.buildManagedStream(ctx, c.rawClient.AppendRows, false, opts...)
}
// createOpenF builds the opener function we need to access the AppendRows bidi stream.
func createOpenF(streamFunc streamClientFunc, routingHeader string) func(ctx context.Context, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) {
return func(ctx context.Context, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) {
if routingHeader != "" {
ctx = metadata.AppendToOutgoingContext(ctx, "x-goog-request-params", routingHeader)
}
arc, err := streamFunc(ctx, opts...)
if err != nil {
return nil, err
}
return arc, nil
}
}
func (c *Client) buildManagedStream(ctx context.Context, streamFunc streamClientFunc, skipSetup bool, opts ...WriterOption) (*ManagedStream, error) {
// First, we create a minimal managed stream.
writer := &ManagedStream{
id: newUUID(writerIDPrefix),
c: c,
streamSettings: defaultStreamSettings(),
curTemplate: newVersionedTemplate(),
}
// apply writer options.
for _, opt := range opts {
opt(writer)
}
// skipSetup allows for customization at test time.
// Examine out config writer and apply settings to the real one.
if !skipSetup {
if err := c.validateOptions(ctx, writer); err != nil {
return nil, err
}
if writer.streamSettings.streamID == "" {
// not instantiated with a stream, construct one.
streamName := fmt.Sprintf("%s/streams/_default", writer.streamSettings.destinationTable)
if writer.streamSettings.streamType != DefaultStream {
// For everything but a default stream, we create a new stream on behalf of the user.
req := &storagepb.CreateWriteStreamRequest{
Parent: writer.streamSettings.destinationTable,
WriteStream: &storagepb.WriteStream{
Type: streamTypeToEnum(writer.streamSettings.streamType),
}}
resp, err := writer.c.rawClient.CreateWriteStream(ctx, req)
if err != nil {
return nil, fmt.Errorf("couldn't create write stream: %w", err)
}
streamName = resp.GetName()
}
writer.streamSettings.streamID = streamName
}
}
// we maintain a pool per region, and attach all exclusive and multiplex writers to that pool.
pool, err := c.resolvePool(ctx, writer.streamSettings, streamFunc)
if err != nil {
return nil, err
}
// Add the writer to the pool.
if err := pool.addWriter(writer); err != nil {
return nil, err
}
writer.ctx, writer.cancel = context.WithCancel(ctx)
// Attach any tag keys to the context on the writer, so instrumentation works as expected.
writer.ctx = setupWriterStatContext(writer)
return writer, nil
}
// validateOptions is used to validate that we received a sane/compatible set of WriterOptions
// for constructing a new managed stream.
func (c *Client) validateOptions(ctx context.Context, ms *ManagedStream) error {
if ms == nil {
return fmt.Errorf("no managed stream definition")
}
if ms.streamSettings.streamID != "" {
// User supplied a stream, we need to verify it exists.
info, err := c.getWriteStream(ctx, ms.streamSettings.streamID, false)
if err != nil {
return fmt.Errorf("a streamname was specified, but lookup of stream failed: %v", err)
}
// update type and destination based on stream metadata
ms.streamSettings.streamType = StreamType(info.Type.String())
ms.streamSettings.destinationTable = TableParentFromStreamName(ms.streamSettings.streamID)
}
if ms.streamSettings.destinationTable == "" {
return fmt.Errorf("no destination table specified")
}
// we could auto-select DEFAULT here, but let's force users to be specific for now.
if ms.StreamType() == "" {
return fmt.Errorf("stream type wasn't specified")
}
return nil
}
// resolvePool either returns an existing connectionPool, or returns a new pool if this is the first writer in a given region.
func (c *Client) resolvePool(ctx context.Context, settings *streamSettings, streamFunc streamClientFunc) (*connectionPool, error) {
c.mu.Lock()
defer c.mu.Unlock()
resp, err := c.getWriteStream(ctx, settings.streamID, false)
if err != nil {
return nil, err
}
loc := resp.GetLocation()
if pool, ok := c.pools[loc]; ok {
return pool, nil
}
// No existing pool available, create one for the location and add to shared pools.
pool, err := c.createPool(loc, streamFunc)
if err != nil {
return nil, err
}
c.pools[loc] = pool
return pool, nil
}
// createPool builds a connectionPool.
func (c *Client) createPool(location string, streamFunc streamClientFunc) (*connectionPool, error) {
cCtx, cancel := context.WithCancel(c.ctx)
if c.cfg == nil {
cancel()
return nil, fmt.Errorf("missing client config")
}
var routingHeader string
/*
* TODO: set once backend respects the new routing header
* if location != "" && c.projectID != "" {
* routingHeader = fmt.Sprintf("write_location=projects/%s/locations/%s", c.projectID, location)
* }
*/
pool := &connectionPool{
id: newUUID(poolIDPrefix),
location: location,
ctx: cCtx,
cancel: cancel,
open: createOpenF(streamFunc, routingHeader),
callOptions: c.cfg.defaultAppendRowsCallOptions,
baseFlowController: newFlowController(c.cfg.defaultInflightRequests, c.cfg.defaultInflightBytes),
}
router := newSharedRouter(c.cfg.useMultiplex, c.cfg.maxMultiplexPoolSize)
if err := pool.activateRouter(router); err != nil {
return nil, err
}
return pool, nil
}
// BatchCommitWriteStreams atomically commits a group of PENDING streams that belong to the same
// parent table.
//
// Streams must be finalized before commit and cannot be committed multiple
// times. Once a stream is committed, data in the stream becomes available
// for read operations.
func (c *Client) BatchCommitWriteStreams(ctx context.Context, req *storagepb.BatchCommitWriteStreamsRequest, opts ...gax.CallOption) (*storagepb.BatchCommitWriteStreamsResponse, error) {
return c.rawClient.BatchCommitWriteStreams(ctx, req, opts...)
}
// CreateWriteStream creates a write stream to the given table.
// Additionally, every table has a special stream named ‘_default’
// to which data can be written. This stream doesn’t need to be created using
// CreateWriteStream. It is a stream that can be used simultaneously by any
// number of clients. Data written to this stream is considered committed as
// soon as an acknowledgement is received.
func (c *Client) CreateWriteStream(ctx context.Context, req *storagepb.CreateWriteStreamRequest, opts ...gax.CallOption) (*storagepb.WriteStream, error) {
return c.rawClient.CreateWriteStream(ctx, req, opts...)
}
// GetWriteStream returns information about a given WriteStream.
func (c *Client) GetWriteStream(ctx context.Context, req *storagepb.GetWriteStreamRequest, opts ...gax.CallOption) (*storagepb.WriteStream, error) {
return c.rawClient.GetWriteStream(ctx, req, opts...)
}
// getWriteStream is an internal version of GetWriteStream used for writer setup and validation.
func (c *Client) getWriteStream(ctx context.Context, streamName string, fullView bool) (*storagepb.WriteStream, error) {
req := &storagepb.GetWriteStreamRequest{
Name: streamName,
}
if fullView {
req.View = storagepb.WriteStreamView_FULL
}
return c.rawClient.GetWriteStream(ctx, req)
}
// TableParentFromStreamName is a utility function for extracting the parent table
// prefix from a stream name. When an invalid stream ID is passed, this simply returns
// the original stream name.
func TableParentFromStreamName(streamName string) string {
// Stream IDs have the following prefix:
// projects/{project}/datasets/{dataset}/tables/{table}/blah
parts := strings.SplitN(streamName, "/", 7)
if len(parts) < 7 {
// invalid; just pass back the input
return streamName
}
return strings.Join(parts[:6], "/")
}
// TableParentFromParts constructs a table identifier using individual identifiers and
// returns a string in the form "projects/{project}/datasets/{dataset}/tables/{table}".
func TableParentFromParts(projectID, datasetID, tableID string) string {
return fmt.Sprintf("projects/%s/datasets/%s/tables/%s", projectID, datasetID, tableID)
}
// newUUID simplifies generating UUIDs for internal resources.
func newUUID(prefix string) string {
id := uuid.New()
return fmt.Sprintf("%s_%s", prefix, id.String())
}
// canMultiplex returns true if the input identifier supports multiplexing. Currently the only stream
// type that supports multiplexing are default streams.
func canMultiplex(in string) bool {
// TODO: strengthen validation
return strings.HasSuffix(in, "default")
}