blob: 2d21b3eec408a8776c0c5453a33215fe6bbbee00 [file] [log] [blame]
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package managedwriter
import (
"context"
"fmt"
"runtime"
"strings"
storage "cloud.google.com/go/bigquery/storage/apiv1"
"cloud.google.com/go/internal/detect"
"github.com/googleapis/gax-go/v2"
"google.golang.org/api/option"
storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)
// DetectProjectID is a sentinel value that instructs NewClient to detect the
// project ID. It is given in place of the projectID argument. NewClient will
// use the project ID from the given credentials or the default credentials
// (https://developers.google.com/accounts/docs/application-default-credentials)
// if no credentials were provided. When providing credentials, not all
// options will allow NewClient to extract the project ID. Specifically a JWT
// does not have the project ID encoded.
const DetectProjectID = "*detect-project-id*"
// Client is a managed BigQuery Storage write client scoped to a single project.
type Client struct {
rawClient *storage.BigQueryWriteClient
projectID string
}
// NewClient instantiates a new client.
func NewClient(ctx context.Context, projectID string, opts ...option.ClientOption) (c *Client, err error) {
numConns := runtime.GOMAXPROCS(0)
if numConns > 4 {
numConns = 4
}
o := []option.ClientOption{
option.WithGRPCConnectionPool(numConns),
}
o = append(o, opts...)
rawClient, err := storage.NewBigQueryWriteClient(ctx, o...)
if err != nil {
return nil, err
}
// Handle project autodetection.
projectID, err = detect.ProjectID(ctx, projectID, "", opts...)
if err != nil {
return nil, err
}
return &Client{
rawClient: rawClient,
projectID: projectID,
}, nil
}
// Close releases resources held by the client.
func (c *Client) Close() error {
// TODO: consider if we should propagate a cancellation from client to all associated managed streams.
if c.rawClient == nil {
return fmt.Errorf("already closed")
}
c.rawClient.Close()
c.rawClient = nil
return nil
}
// NewManagedStream establishes a new managed stream for appending data into a table.
//
// Context here is retained for use by the underlying streaming connections the managed stream may create.
func (c *Client) NewManagedStream(ctx context.Context, opts ...WriterOption) (*ManagedStream, error) {
return c.buildManagedStream(ctx, c.rawClient.AppendRows, false, opts...)
}
func (c *Client) buildManagedStream(ctx context.Context, streamFunc streamClientFunc, skipSetup bool, opts ...WriterOption) (*ManagedStream, error) {
ctx, cancel := context.WithCancel(ctx)
ms := &ManagedStream{
streamSettings: defaultStreamSettings(),
c: c,
ctx: ctx,
cancel: cancel,
callOptions: []gax.CallOption{
gax.WithGRPCOptions(grpc.MaxCallRecvMsgSize(10 * 1024 * 1024)),
},
open: func(streamID string, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) {
arc, err := streamFunc(
// Bidi Streaming doesn't append stream ID as request metadata, so we must inject it manually.
metadata.AppendToOutgoingContext(ctx, "x-goog-request-params", fmt.Sprintf("write_stream=%s", streamID)))
if err != nil {
return nil, err
}
return arc, nil
},
}
// apply writer options
for _, opt := range opts {
opt(ms)
}
// skipSetup exists for testing scenarios.
if !skipSetup {
if err := c.validateOptions(ctx, ms); err != nil {
return nil, err
}
if ms.streamSettings.streamID == "" {
// not instantiated with a stream, construct one.
streamName := fmt.Sprintf("%s/_default", ms.destinationTable)
if ms.streamSettings.streamType != DefaultStream {
// For everything but a default stream, we create a new stream on behalf of the user.
req := &storagepb.CreateWriteStreamRequest{
Parent: ms.destinationTable,
WriteStream: &storagepb.WriteStream{
Type: streamTypeToEnum(ms.streamSettings.streamType),
}}
resp, err := ms.c.rawClient.CreateWriteStream(ctx, req)
if err != nil {
return nil, fmt.Errorf("couldn't create write stream: %v", err)
}
streamName = resp.GetName()
}
ms.streamSettings.streamID = streamName
}
}
if ms.streamSettings != nil {
if ms.ctx != nil {
ms.ctx = keyContextWithTags(ms.ctx, ms.streamSettings.streamID, ms.streamSettings.dataOrigin)
}
ms.fc = newFlowController(ms.streamSettings.MaxInflightRequests, ms.streamSettings.MaxInflightBytes)
} else {
ms.fc = newFlowController(0, 0)
}
return ms, nil
}
// validateOptions is used to validate that we received a sane/compatible set of WriterOptions
// for constructing a new managed stream.
func (c *Client) validateOptions(ctx context.Context, ms *ManagedStream) error {
if ms == nil {
return fmt.Errorf("no managed stream definition")
}
if ms.streamSettings.streamID != "" {
// User supplied a stream, we need to verify it exists.
info, err := c.getWriteStream(ctx, ms.streamSettings.streamID)
if err != nil {
return fmt.Errorf("a streamname was specified, but lookup of stream failed: %v", err)
}
// update type and destination based on stream metadata
ms.streamSettings.streamType = StreamType(info.Type.String())
ms.destinationTable = TableParentFromStreamName(ms.streamSettings.streamID)
}
if ms.destinationTable == "" {
return fmt.Errorf("no destination table specified")
}
// we could auto-select DEFAULT here, but let's force users to be specific for now.
if ms.StreamType() == "" {
return fmt.Errorf("stream type wasn't specified")
}
return nil
}
// BatchCommitWriteStreams atomically commits a group of PENDING streams that belong to the same
// parent table.
//
// Streams must be finalized before commit and cannot be committed multiple
// times. Once a stream is committed, data in the stream becomes available
// for read operations.
func (c *Client) BatchCommitWriteStreams(ctx context.Context, req *storagepb.BatchCommitWriteStreamsRequest, opts ...gax.CallOption) (*storagepb.BatchCommitWriteStreamsResponse, error) {
return c.rawClient.BatchCommitWriteStreams(ctx, req, opts...)
}
// CreateWriteStream creates a write stream to the given table.
// Additionally, every table has a special stream named ‘_default’
// to which data can be written. This stream doesn’t need to be created using
// CreateWriteStream. It is a stream that can be used simultaneously by any
// number of clients. Data written to this stream is considered committed as
// soon as an acknowledgement is received.
func (c *Client) CreateWriteStream(ctx context.Context, req *storagepb.CreateWriteStreamRequest, opts ...gax.CallOption) (*storagepb.WriteStream, error) {
return c.rawClient.CreateWriteStream(ctx, req, opts...)
}
// getWriteStream returns information about a given write stream.
//
// It's primarily used for setup validation, and not exposed directly to end users.
func (c *Client) getWriteStream(ctx context.Context, streamName string) (*storagepb.WriteStream, error) {
req := &storagepb.GetWriteStreamRequest{
Name: streamName,
}
return c.rawClient.GetWriteStream(ctx, req)
}
// TableParentFromStreamName is a utility function for extracting the parent table
// prefix from a stream name. When an invalid stream ID is passed, this simply returns
// the original stream name.
func TableParentFromStreamName(streamName string) string {
// Stream IDs have the following prefix:
// projects/{project}/datasets/{dataset}/tables/{table}/blah
parts := strings.SplitN(streamName, "/", 7)
if len(parts) < 7 {
// invalid; just pass back the input
return streamName
}
return strings.Join(parts[:6], "/")
}