| // Copyright 2023 Google LLC |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // https://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package bigquery |
| |
| import ( |
| "context" |
| "fmt" |
| "runtime" |
| |
| "cloud.google.com/go/bigquery/internal" |
| storage "cloud.google.com/go/bigquery/storage/apiv1" |
| "cloud.google.com/go/bigquery/storage/apiv1/storagepb" |
| "cloud.google.com/go/internal/detect" |
| gax "github.com/googleapis/gax-go/v2" |
| "google.golang.org/api/option" |
| "google.golang.org/grpc" |
| ) |
| |
| // readClient is a managed BigQuery Storage read client scoped to a single project. |
| type readClient struct { |
| rawClient *storage.BigQueryReadClient |
| projectID string |
| |
| settings readClientSettings |
| } |
| |
| type readClientSettings struct { |
| maxStreamCount int |
| maxWorkerCount int |
| } |
| |
| func defaultReadClientSettings() readClientSettings { |
| maxWorkerCount := runtime.GOMAXPROCS(0) |
| return readClientSettings{ |
| // with zero, the server will provide a value of streams so as to produce reasonable throughput |
| maxStreamCount: 0, |
| maxWorkerCount: maxWorkerCount, |
| } |
| } |
| |
| // newReadClient instantiates a new storage read client. |
| func newReadClient(ctx context.Context, projectID string, opts ...option.ClientOption) (c *readClient, err error) { |
| numConns := runtime.GOMAXPROCS(0) |
| if numConns > 4 { |
| numConns = 4 |
| } |
| o := []option.ClientOption{ |
| option.WithGRPCConnectionPool(numConns), |
| option.WithUserAgent(fmt.Sprintf("%s/%s", userAgentPrefix, internal.Version)), |
| } |
| o = append(o, opts...) |
| |
| rawClient, err := storage.NewBigQueryReadClient(ctx, o...) |
| if err != nil { |
| return nil, err |
| } |
| rawClient.SetGoogleClientInfo("gccl", internal.Version) |
| |
| // Handle project autodetection. |
| projectID, err = detect.ProjectID(ctx, projectID, "", opts...) |
| if err != nil { |
| return nil, err |
| } |
| |
| settings := defaultReadClientSettings() |
| rc := &readClient{ |
| rawClient: rawClient, |
| projectID: projectID, |
| settings: settings, |
| } |
| |
| return rc, nil |
| } |
| |
| // close releases resources held by the client. |
| func (c *readClient) close() error { |
| if c.rawClient == nil { |
| return fmt.Errorf("already closed") |
| } |
| c.rawClient.Close() |
| c.rawClient = nil |
| return nil |
| } |
| |
| // sessionForTable establishes a new session to fetch from a table using the Storage API |
| func (c *readClient) sessionForTable(ctx context.Context, table *Table, ordered bool) (*readSession, error) { |
| tableID, err := table.Identifier(StorageAPIResourceID) |
| if err != nil { |
| return nil, err |
| } |
| |
| // copy settings for a given session, to avoid overrides for all sessions |
| settings := c.settings |
| if ordered { |
| settings.maxStreamCount = 1 |
| } |
| |
| rs := &readSession{ |
| ctx: ctx, |
| table: table, |
| tableID: tableID, |
| settings: settings, |
| readRowsFunc: c.rawClient.ReadRows, |
| createReadSessionFunc: c.rawClient.CreateReadSession, |
| } |
| return rs, nil |
| } |
| |
| // ReadSession is the abstraction over a storage API read session. |
| type readSession struct { |
| settings readClientSettings |
| |
| ctx context.Context |
| table *Table |
| tableID string |
| |
| bqSession *storagepb.ReadSession |
| |
| // decouple from readClient to enable testing |
| createReadSessionFunc func(context.Context, *storagepb.CreateReadSessionRequest, ...gax.CallOption) (*storagepb.ReadSession, error) |
| readRowsFunc func(context.Context, *storagepb.ReadRowsRequest, ...gax.CallOption) (storagepb.BigQueryRead_ReadRowsClient, error) |
| } |
| |
| // Start initiates a read session |
| func (rs *readSession) start() error { |
| createReadSessionRequest := &storagepb.CreateReadSessionRequest{ |
| Parent: fmt.Sprintf("projects/%s", rs.table.ProjectID), |
| ReadSession: &storagepb.ReadSession{ |
| Table: rs.tableID, |
| DataFormat: storagepb.DataFormat_ARROW, |
| }, |
| MaxStreamCount: int32(rs.settings.maxStreamCount), |
| } |
| rpcOpts := gax.WithGRPCOptions( |
| // Read API can send batches up to 128MB |
| // https://cloud.google.com/bigquery/quotas#storage-limits |
| grpc.MaxCallRecvMsgSize(1024 * 1024 * 129), |
| ) |
| session, err := rs.createReadSessionFunc(rs.ctx, createReadSessionRequest, rpcOpts) |
| if err != nil { |
| return err |
| } |
| rs.bqSession = session |
| return nil |
| } |
| |
| // readRows returns a more direct iterators to the underlying Storage API row stream. |
| func (rs *readSession) readRows(req *storagepb.ReadRowsRequest) (storagepb.BigQueryRead_ReadRowsClient, error) { |
| if rs.bqSession == nil { |
| err := rs.start() |
| if err != nil { |
| return nil, err |
| } |
| } |
| return rs.readRowsFunc(rs.ctx, req) |
| } |