| // Copyright 2015 Google Inc. All Rights Reserved. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package bigquery |
| |
| import ( |
| "errors" |
| "fmt" |
| "io" |
| "net/http" |
| "time" |
| |
| "cloud.google.com/go/internal" |
| "cloud.google.com/go/internal/optional" |
| "cloud.google.com/go/internal/version" |
| gax "github.com/googleapis/gax-go" |
| |
| "golang.org/x/net/context" |
| bq "google.golang.org/api/bigquery/v2" |
| "google.golang.org/api/googleapi" |
| ) |
| |
| // service provides an internal abstraction to isolate the generated |
| // BigQuery API; most of this package uses this interface instead. |
| // The single implementation, *bigqueryService, contains all the knowledge |
| // of the generated BigQuery API. |
| type service interface { |
| // Jobs |
| insertJob(ctx context.Context, projectId string, conf *insertJobConf) (*Job, error) |
| getJob(ctx context.Context, projectId, jobID string) (*Job, error) |
| jobCancel(ctx context.Context, projectId, jobID string) error |
| jobStatus(ctx context.Context, projectId, jobID string) (*JobStatus, error) |
| listJobs(ctx context.Context, projectId string, maxResults int, pageToken string, all bool, state string) ([]JobInfo, string, error) |
| |
| // Tables |
| createTable(ctx context.Context, projectID, datasetID, tableID string, tm *TableMetadata) error |
| getTableMetadata(ctx context.Context, projectID, datasetID, tableID string) (*TableMetadata, error) |
| deleteTable(ctx context.Context, projectID, datasetID, tableID string) error |
| |
| // listTables returns a page of Tables and a next page token. Note: the Tables do not have their c field populated. |
| listTables(ctx context.Context, projectID, datasetID string, pageSize int, pageToken string) ([]*Table, string, error) |
| patchTable(ctx context.Context, projectID, datasetID, tableID string, conf *patchTableConf, etag string) (*TableMetadata, error) |
| |
| // Table data |
| readTabledata(ctx context.Context, conf *readTableConf, pageToken string) (*readDataResult, error) |
| insertRows(ctx context.Context, projectID, datasetID, tableID string, rows []*insertionRow, conf *insertRowsConf) error |
| |
| // Datasets |
| insertDataset(ctx context.Context, datasetID, projectID string, dm *DatasetMetadata) error |
| deleteDataset(ctx context.Context, datasetID, projectID string) error |
| getDatasetMetadata(ctx context.Context, projectID, datasetID string) (*DatasetMetadata, error) |
| patchDataset(ctx context.Context, projectID, datasetID string, dm *DatasetMetadataToUpdate, etag string) (*DatasetMetadata, error) |
| |
| // Misc |
| |
| // Waits for a query to complete. |
| waitForQuery(ctx context.Context, projectID, jobID string) (Schema, error) |
| |
| // listDatasets returns a page of Datasets and a next page token. Note: the Datasets do not have their c field populated. |
| listDatasets(ctx context.Context, projectID string, maxResults int, pageToken string, all bool, filter string) ([]*Dataset, string, error) |
| } |
| |
| var xGoogHeader = fmt.Sprintf("gl-go/%s gccl/%s", version.Go(), version.Repo) |
| |
| func setClientHeader(headers http.Header) { |
| headers.Set("x-goog-api-client", xGoogHeader) |
| } |
| |
| type bigqueryService struct { |
| s *bq.Service |
| } |
| |
| func newBigqueryService(client *http.Client, endpoint string) (*bigqueryService, error) { |
| s, err := bq.New(client) |
| if err != nil { |
| return nil, fmt.Errorf("constructing bigquery client: %v", err) |
| } |
| s.BasePath = endpoint |
| |
| return &bigqueryService{s: s}, nil |
| } |
| |
| // getPages calls the supplied getPage function repeatedly until there are no pages left to get. |
| // token is the token of the initial page to start from. Use an empty string to start from the beginning. |
| func getPages(token string, getPage func(token string) (nextToken string, err error)) error { |
| for { |
| var err error |
| token, err = getPage(token) |
| if err != nil { |
| return err |
| } |
| if token == "" { |
| return nil |
| } |
| } |
| } |
| |
| type insertJobConf struct { |
| job *bq.Job |
| media io.Reader |
| } |
| |
| // Calls the Jobs.Insert RPC and returns a Job. Callers must set the returned Job's |
| // client. |
| func (s *bigqueryService) insertJob(ctx context.Context, projectID string, conf *insertJobConf) (*Job, error) { |
| call := s.s.Jobs.Insert(projectID, conf.job).Context(ctx) |
| setClientHeader(call.Header()) |
| if conf.media != nil { |
| call.Media(conf.media) |
| } |
| var res *bq.Job |
| var err error |
| invoke := func() error { |
| res, err = call.Do() |
| return err |
| } |
| // A job with a client-generated ID can be retried; the presence of the |
| // ID makes the insert operation idempotent. |
| // We don't retry if there is media, because it is an io.Reader. We'd |
| // have to read the contents and keep it in memory, and that could be expensive. |
| // TODO(jba): Look into retrying if media != nil. |
| if conf.job.JobReference != nil && conf.media == nil { |
| err = runWithRetry(ctx, invoke) |
| } else { |
| err = invoke() |
| } |
| if err != nil { |
| return nil, err |
| } |
| |
| var dt *bq.TableReference |
| if qc := res.Configuration.Query; qc != nil { |
| dt = qc.DestinationTable |
| } |
| return &Job{ |
| projectID: projectID, |
| jobID: res.JobReference.JobId, |
| destinationTable: dt, |
| }, nil |
| } |
| |
| type pagingConf struct { |
| recordsPerRequest int64 |
| setRecordsPerRequest bool |
| |
| startIndex uint64 |
| } |
| |
| type readTableConf struct { |
| projectID, datasetID, tableID string |
| paging pagingConf |
| schema Schema // lazily initialized when the first page of data is fetched. |
| } |
| |
| func (conf *readTableConf) fetch(ctx context.Context, s service, token string) (*readDataResult, error) { |
| return s.readTabledata(ctx, conf, token) |
| } |
| |
| func (conf *readTableConf) setPaging(pc *pagingConf) { conf.paging = *pc } |
| |
| type readDataResult struct { |
| pageToken string |
| rows [][]Value |
| totalRows uint64 |
| schema Schema |
| } |
| |
| func (s *bigqueryService) readTabledata(ctx context.Context, conf *readTableConf, pageToken string) (*readDataResult, error) { |
| // Prepare request to fetch one page of table data. |
| req := s.s.Tabledata.List(conf.projectID, conf.datasetID, conf.tableID) |
| setClientHeader(req.Header()) |
| if pageToken != "" { |
| req.PageToken(pageToken) |
| } else { |
| req.StartIndex(conf.paging.startIndex) |
| } |
| |
| if conf.paging.setRecordsPerRequest { |
| req.MaxResults(conf.paging.recordsPerRequest) |
| } |
| |
| // Fetch the table schema in the background, if necessary. |
| errc := make(chan error, 1) |
| if conf.schema != nil { |
| errc <- nil |
| } else { |
| go func() { |
| var t *bq.Table |
| err := runWithRetry(ctx, func() (err error) { |
| t, err = s.s.Tables.Get(conf.projectID, conf.datasetID, conf.tableID). |
| Fields("schema"). |
| Context(ctx). |
| Do() |
| return err |
| }) |
| if err == nil && t.Schema != nil { |
| conf.schema = convertTableSchema(t.Schema) |
| } |
| errc <- err |
| }() |
| } |
| var res *bq.TableDataList |
| err := runWithRetry(ctx, func() (err error) { |
| res, err = req.Context(ctx).Do() |
| return err |
| }) |
| if err != nil { |
| return nil, err |
| } |
| err = <-errc |
| if err != nil { |
| return nil, err |
| } |
| result := &readDataResult{ |
| pageToken: res.PageToken, |
| totalRows: uint64(res.TotalRows), |
| schema: conf.schema, |
| } |
| result.rows, err = convertRows(res.Rows, conf.schema) |
| if err != nil { |
| return nil, err |
| } |
| return result, nil |
| } |
| |
| func (s *bigqueryService) waitForQuery(ctx context.Context, projectID, jobID string) (Schema, error) { |
| // Use GetQueryResults only to wait for completion, not to read results. |
| req := s.s.Jobs.GetQueryResults(projectID, jobID).Context(ctx).MaxResults(0) |
| setClientHeader(req.Header()) |
| backoff := gax.Backoff{ |
| Initial: 1 * time.Second, |
| Multiplier: 2, |
| Max: 60 * time.Second, |
| } |
| var res *bq.GetQueryResultsResponse |
| err := internal.Retry(ctx, backoff, func() (stop bool, err error) { |
| res, err = req.Do() |
| if err != nil { |
| return !retryableError(err), err |
| } |
| if !res.JobComplete { // GetQueryResults may return early without error; retry. |
| return false, nil |
| } |
| return true, nil |
| }) |
| if err != nil { |
| return nil, err |
| } |
| return convertTableSchema(res.Schema), nil |
| } |
| |
| type insertRowsConf struct { |
| templateSuffix string |
| ignoreUnknownValues bool |
| skipInvalidRows bool |
| } |
| |
| func (s *bigqueryService) insertRows(ctx context.Context, projectID, datasetID, tableID string, rows []*insertionRow, conf *insertRowsConf) error { |
| req := &bq.TableDataInsertAllRequest{ |
| TemplateSuffix: conf.templateSuffix, |
| IgnoreUnknownValues: conf.ignoreUnknownValues, |
| SkipInvalidRows: conf.skipInvalidRows, |
| } |
| for _, row := range rows { |
| m := make(map[string]bq.JsonValue) |
| for k, v := range row.Row { |
| m[k] = bq.JsonValue(v) |
| } |
| req.Rows = append(req.Rows, &bq.TableDataInsertAllRequestRows{ |
| InsertId: row.InsertID, |
| Json: m, |
| }) |
| } |
| call := s.s.Tabledata.InsertAll(projectID, datasetID, tableID, req).Context(ctx) |
| setClientHeader(call.Header()) |
| var res *bq.TableDataInsertAllResponse |
| err := runWithRetry(ctx, func() (err error) { |
| res, err = call.Do() |
| return err |
| }) |
| if err != nil { |
| return err |
| } |
| if len(res.InsertErrors) == 0 { |
| return nil |
| } |
| |
| var errs PutMultiError |
| for _, e := range res.InsertErrors { |
| if int(e.Index) > len(rows) { |
| return fmt.Errorf("internal error: unexpected row index: %v", e.Index) |
| } |
| rie := RowInsertionError{ |
| InsertID: rows[e.Index].InsertID, |
| RowIndex: int(e.Index), |
| } |
| for _, errp := range e.Errors { |
| rie.Errors = append(rie.Errors, errorFromErrorProto(errp)) |
| } |
| errs = append(errs, rie) |
| } |
| return errs |
| } |
| |
| func (s *bigqueryService) getJob(ctx context.Context, projectID, jobID string) (*Job, error) { |
| bqjob, err := s.getJobInternal(ctx, projectID, jobID, "configuration", "jobReference") |
| if err != nil { |
| return nil, err |
| } |
| return jobFromProtos(bqjob.JobReference, bqjob.Configuration), nil |
| } |
| |
| func (s *bigqueryService) jobStatus(ctx context.Context, projectID, jobID string) (*JobStatus, error) { |
| job, err := s.getJobInternal(ctx, projectID, jobID, "status", "statistics") |
| if err != nil { |
| return nil, err |
| } |
| st, err := jobStatusFromProto(job.Status) |
| if err != nil { |
| return nil, err |
| } |
| st.Statistics = jobStatisticsFromProto(job.Statistics) |
| return st, nil |
| } |
| |
| func (s *bigqueryService) getJobInternal(ctx context.Context, projectID, jobID string, fields ...googleapi.Field) (*bq.Job, error) { |
| var job *bq.Job |
| call := s.s.Jobs.Get(projectID, jobID).Context(ctx) |
| if len(fields) > 0 { |
| call = call.Fields(fields...) |
| } |
| setClientHeader(call.Header()) |
| err := runWithRetry(ctx, func() (err error) { |
| job, err = call.Do() |
| return err |
| }) |
| if err != nil { |
| return nil, err |
| } |
| return job, nil |
| } |
| |
| func (s *bigqueryService) jobCancel(ctx context.Context, projectID, jobID string) error { |
| // Jobs.Cancel returns a job entity, but the only relevant piece of |
| // data it may contain (the status of the job) is unreliable. From the |
| // docs: "This call will return immediately, and the client will need |
| // to poll for the job status to see if the cancel completed |
| // successfully". So it would be misleading to return a status. |
| call := s.s.Jobs.Cancel(projectID, jobID). |
| Fields(). // We don't need any of the response data. |
| Context(ctx) |
| setClientHeader(call.Header()) |
| return runWithRetry(ctx, func() error { |
| _, err := call.Do() |
| return err |
| }) |
| } |
| |
| func jobFromProtos(jr *bq.JobReference, config *bq.JobConfiguration) *Job { |
| var isQuery bool |
| var dest *bq.TableReference |
| if config.Query != nil { |
| isQuery = true |
| dest = config.Query.DestinationTable |
| } |
| return &Job{ |
| projectID: jr.ProjectId, |
| jobID: jr.JobId, |
| isQuery: isQuery, |
| destinationTable: dest, |
| } |
| } |
| |
| var stateMap = map[string]State{"PENDING": Pending, "RUNNING": Running, "DONE": Done} |
| |
| func jobStatusFromProto(status *bq.JobStatus) (*JobStatus, error) { |
| state, ok := stateMap[status.State] |
| if !ok { |
| return nil, fmt.Errorf("unexpected job state: %v", status.State) |
| } |
| |
| newStatus := &JobStatus{ |
| State: state, |
| err: nil, |
| } |
| if err := errorFromErrorProto(status.ErrorResult); state == Done && err != nil { |
| newStatus.err = err |
| } |
| |
| for _, ep := range status.Errors { |
| newStatus.Errors = append(newStatus.Errors, errorFromErrorProto(ep)) |
| } |
| return newStatus, nil |
| } |
| |
| func jobStatisticsFromProto(s *bq.JobStatistics) *JobStatistics { |
| js := &JobStatistics{ |
| CreationTime: unixMillisToTime(s.CreationTime), |
| StartTime: unixMillisToTime(s.StartTime), |
| EndTime: unixMillisToTime(s.EndTime), |
| TotalBytesProcessed: s.TotalBytesProcessed, |
| } |
| switch { |
| case s.Extract != nil: |
| js.Details = &ExtractStatistics{ |
| DestinationURIFileCounts: []int64(s.Extract.DestinationUriFileCounts), |
| } |
| case s.Load != nil: |
| js.Details = &LoadStatistics{ |
| InputFileBytes: s.Load.InputFileBytes, |
| InputFiles: s.Load.InputFiles, |
| OutputBytes: s.Load.OutputBytes, |
| OutputRows: s.Load.OutputRows, |
| } |
| case s.Query != nil: |
| var names []string |
| for _, qp := range s.Query.UndeclaredQueryParameters { |
| names = append(names, qp.Name) |
| } |
| var tables []*Table |
| for _, tr := range s.Query.ReferencedTables { |
| tables = append(tables, convertTableReference(tr)) |
| } |
| js.Details = &QueryStatistics{ |
| BillingTier: s.Query.BillingTier, |
| CacheHit: s.Query.CacheHit, |
| StatementType: s.Query.StatementType, |
| TotalBytesBilled: s.Query.TotalBytesBilled, |
| TotalBytesProcessed: s.Query.TotalBytesProcessed, |
| NumDMLAffectedRows: s.Query.NumDmlAffectedRows, |
| QueryPlan: queryPlanFromProto(s.Query.QueryPlan), |
| Schema: convertTableSchema(s.Query.Schema), |
| ReferencedTables: tables, |
| UndeclaredQueryParameterNames: names, |
| } |
| } |
| return js |
| } |
| |
| func queryPlanFromProto(stages []*bq.ExplainQueryStage) []*ExplainQueryStage { |
| var res []*ExplainQueryStage |
| for _, s := range stages { |
| var steps []*ExplainQueryStep |
| for _, p := range s.Steps { |
| steps = append(steps, &ExplainQueryStep{ |
| Kind: p.Kind, |
| Substeps: p.Substeps, |
| }) |
| } |
| res = append(res, &ExplainQueryStage{ |
| ComputeRatioAvg: s.ComputeRatioAvg, |
| ComputeRatioMax: s.ComputeRatioMax, |
| ID: s.Id, |
| Name: s.Name, |
| ReadRatioAvg: s.ReadRatioAvg, |
| ReadRatioMax: s.ReadRatioMax, |
| RecordsRead: s.RecordsRead, |
| RecordsWritten: s.RecordsWritten, |
| Status: s.Status, |
| Steps: steps, |
| WaitRatioAvg: s.WaitRatioAvg, |
| WaitRatioMax: s.WaitRatioMax, |
| WriteRatioAvg: s.WriteRatioAvg, |
| WriteRatioMax: s.WriteRatioMax, |
| }) |
| } |
| return res |
| } |
| |
| // listTables returns a subset of tables that belong to a dataset, and a token for fetching the next subset. |
| func (s *bigqueryService) listTables(ctx context.Context, projectID, datasetID string, pageSize int, pageToken string) ([]*Table, string, error) { |
| var tables []*Table |
| req := s.s.Tables.List(projectID, datasetID). |
| PageToken(pageToken). |
| Context(ctx) |
| setClientHeader(req.Header()) |
| if pageSize > 0 { |
| req.MaxResults(int64(pageSize)) |
| } |
| var res *bq.TableList |
| err := runWithRetry(ctx, func() (err error) { |
| res, err = req.Do() |
| return err |
| }) |
| if err != nil { |
| return nil, "", err |
| } |
| for _, t := range res.Tables { |
| tables = append(tables, convertTableReference(t.TableReference)) |
| } |
| return tables, res.NextPageToken, nil |
| } |
| |
| // createTable creates a table in the BigQuery service. |
| // If tm.ViewQuery is non-empty, the created table will be of type VIEW. |
| // Note: expiration can only be set during table creation. |
| // Note: after table creation, a view can be modified only if its table was initially created with a view. |
| func (s *bigqueryService) createTable(ctx context.Context, projectID, datasetID, tableID string, tm *TableMetadata) error { |
| table, err := bqTableFromMetadata(tm) |
| if err != nil { |
| return err |
| } |
| table.TableReference = &bq.TableReference{ |
| ProjectId: projectID, |
| DatasetId: datasetID, |
| TableId: tableID, |
| } |
| req := s.s.Tables.Insert(projectID, datasetID, table).Context(ctx) |
| setClientHeader(req.Header()) |
| _, err = req.Do() |
| return err |
| } |
| |
| func bqTableFromMetadata(tm *TableMetadata) (*bq.Table, error) { |
| t := &bq.Table{} |
| if tm == nil { |
| return t, nil |
| } |
| if tm.Schema != nil && tm.ViewQuery != "" { |
| return nil, errors.New("bigquery: provide Schema or ViewQuery, not both") |
| } |
| t.FriendlyName = tm.Name |
| t.Description = tm.Description |
| if tm.Schema != nil { |
| t.Schema = tm.Schema.asTableSchema() |
| } |
| if tm.ViewQuery != "" { |
| if tm.UseStandardSQL && tm.UseLegacySQL { |
| return nil, errors.New("bigquery: cannot provide both UseStandardSQL and UseLegacySQL") |
| } |
| t.View = &bq.ViewDefinition{Query: tm.ViewQuery} |
| if tm.UseLegacySQL { |
| t.View.UseLegacySql = true |
| } else { |
| t.View.UseLegacySql = false |
| t.View.ForceSendFields = append(t.View.ForceSendFields, "UseLegacySql") |
| } |
| } else if tm.UseLegacySQL || tm.UseStandardSQL { |
| return nil, errors.New("bigquery: UseLegacy/StandardSQL requires ViewQuery") |
| } |
| if tm.TimePartitioning != nil { |
| t.TimePartitioning = &bq.TimePartitioning{ |
| Type: "DAY", |
| ExpirationMs: int64(tm.TimePartitioning.Expiration / time.Millisecond), |
| } |
| } |
| if !tm.ExpirationTime.IsZero() { |
| t.ExpirationTime = tm.ExpirationTime.UnixNano() / 1e6 |
| } |
| |
| if tm.FullID != "" { |
| return nil, errors.New("cannot set FullID on create") |
| } |
| if tm.Type != "" { |
| return nil, errors.New("cannot set Type on create") |
| } |
| if !tm.CreationTime.IsZero() { |
| return nil, errors.New("cannot set CreationTime on create") |
| } |
| if !tm.LastModifiedTime.IsZero() { |
| return nil, errors.New("cannot set LastModifiedTime on create") |
| } |
| if tm.NumBytes != 0 { |
| return nil, errors.New("cannot set NumBytes on create") |
| } |
| if tm.NumRows != 0 { |
| return nil, errors.New("cannot set NumRows on create") |
| } |
| if tm.StreamingBuffer != nil { |
| return nil, errors.New("cannot set StreamingBuffer on create") |
| } |
| if tm.ETag != "" { |
| return nil, errors.New("cannot set ETag on create") |
| } |
| return t, nil |
| } |
| |
| func (s *bigqueryService) getTableMetadata(ctx context.Context, projectID, datasetID, tableID string) (*TableMetadata, error) { |
| req := s.s.Tables.Get(projectID, datasetID, tableID).Context(ctx) |
| setClientHeader(req.Header()) |
| var table *bq.Table |
| err := runWithRetry(ctx, func() (err error) { |
| table, err = req.Do() |
| return err |
| }) |
| if err != nil { |
| return nil, err |
| } |
| return bqTableToMetadata(table), nil |
| } |
| |
| func (s *bigqueryService) deleteTable(ctx context.Context, projectID, datasetID, tableID string) error { |
| req := s.s.Tables.Delete(projectID, datasetID, tableID).Context(ctx) |
| setClientHeader(req.Header()) |
| return runWithRetry(ctx, func() error { return req.Do() }) |
| } |
| |
| func bqTableToMetadata(t *bq.Table) *TableMetadata { |
| md := &TableMetadata{ |
| Description: t.Description, |
| Name: t.FriendlyName, |
| Type: TableType(t.Type), |
| FullID: t.Id, |
| NumBytes: t.NumBytes, |
| NumRows: t.NumRows, |
| ExpirationTime: unixMillisToTime(t.ExpirationTime), |
| CreationTime: unixMillisToTime(t.CreationTime), |
| LastModifiedTime: unixMillisToTime(int64(t.LastModifiedTime)), |
| ETag: t.Etag, |
| } |
| if t.Schema != nil { |
| md.Schema = convertTableSchema(t.Schema) |
| } |
| if t.View != nil { |
| md.ViewQuery = t.View.Query |
| md.UseLegacySQL = t.View.UseLegacySql |
| } |
| if t.TimePartitioning != nil { |
| md.TimePartitioning = &TimePartitioning{ |
| Expiration: time.Duration(t.TimePartitioning.ExpirationMs) * time.Millisecond, |
| } |
| } |
| if t.StreamingBuffer != nil { |
| md.StreamingBuffer = &StreamingBuffer{ |
| EstimatedBytes: t.StreamingBuffer.EstimatedBytes, |
| EstimatedRows: t.StreamingBuffer.EstimatedRows, |
| OldestEntryTime: unixMillisToTime(int64(t.StreamingBuffer.OldestEntryTime)), |
| } |
| } |
| return md |
| } |
| |
| func bqDatasetToMetadata(d *bq.Dataset) *DatasetMetadata { |
| /// TODO(jba): access |
| return &DatasetMetadata{ |
| CreationTime: unixMillisToTime(d.CreationTime), |
| LastModifiedTime: unixMillisToTime(d.LastModifiedTime), |
| DefaultTableExpiration: time.Duration(d.DefaultTableExpirationMs) * time.Millisecond, |
| Description: d.Description, |
| Name: d.FriendlyName, |
| FullID: d.Id, |
| Location: d.Location, |
| Labels: d.Labels, |
| ETag: d.Etag, |
| } |
| } |
| |
| // Convert a number of milliseconds since the Unix epoch to a time.Time. |
| // Treat an input of zero specially: convert it to the zero time, |
| // rather than the start of the epoch. |
| func unixMillisToTime(m int64) time.Time { |
| if m == 0 { |
| return time.Time{} |
| } |
| return time.Unix(0, m*1e6) |
| } |
| |
| func convertTableReference(tr *bq.TableReference) *Table { |
| return &Table{ |
| ProjectID: tr.ProjectId, |
| DatasetID: tr.DatasetId, |
| TableID: tr.TableId, |
| } |
| } |
| |
| // patchTableConf contains fields to be patched. |
| type patchTableConf struct { |
| // These fields are omitted from the patch operation if nil. |
| Description *string |
| Name *string |
| Schema Schema |
| ExpirationTime time.Time |
| } |
| |
| func (s *bigqueryService) patchTable(ctx context.Context, projectID, datasetID, tableID string, conf *patchTableConf, etag string) (*TableMetadata, error) { |
| t := &bq.Table{} |
| forceSend := func(field string) { |
| t.ForceSendFields = append(t.ForceSendFields, field) |
| } |
| |
| if conf.Description != nil { |
| t.Description = *conf.Description |
| forceSend("Description") |
| } |
| if conf.Name != nil { |
| t.FriendlyName = *conf.Name |
| forceSend("FriendlyName") |
| } |
| if conf.Schema != nil { |
| t.Schema = conf.Schema.asTableSchema() |
| forceSend("Schema") |
| } |
| if !conf.ExpirationTime.IsZero() { |
| t.ExpirationTime = conf.ExpirationTime.UnixNano() / 1e6 |
| forceSend("ExpirationTime") |
| } |
| call := s.s.Tables.Patch(projectID, datasetID, tableID, t).Context(ctx) |
| setClientHeader(call.Header()) |
| if etag != "" { |
| call.Header().Set("If-Match", etag) |
| } |
| var table *bq.Table |
| if err := runWithRetry(ctx, func() (err error) { |
| table, err = call.Do() |
| return err |
| }); err != nil { |
| return nil, err |
| } |
| return bqTableToMetadata(table), nil |
| } |
| |
| func (s *bigqueryService) insertDataset(ctx context.Context, datasetID, projectID string, dm *DatasetMetadata) error { |
| // TODO(jba): retry? |
| ds, err := bqDatasetFromMetadata(dm) |
| if err != nil { |
| return err |
| } |
| ds.DatasetReference = &bq.DatasetReference{DatasetId: datasetID} |
| req := s.s.Datasets.Insert(projectID, ds).Context(ctx) |
| setClientHeader(req.Header()) |
| _, err = req.Do() |
| return err |
| } |
| |
| func (s *bigqueryService) patchDataset(ctx context.Context, projectID, datasetID string, dm *DatasetMetadataToUpdate, etag string) (*DatasetMetadata, error) { |
| ds := bqDatasetFromUpdateMetadata(dm) |
| call := s.s.Datasets.Patch(projectID, datasetID, ds).Context(ctx) |
| setClientHeader(call.Header()) |
| if etag != "" { |
| call.Header().Set("If-Match", etag) |
| } |
| var ds2 *bq.Dataset |
| if err := runWithRetry(ctx, func() (err error) { |
| ds2, err = call.Do() |
| return err |
| }); err != nil { |
| return nil, err |
| } |
| return bqDatasetToMetadata(ds2), nil |
| } |
| |
| func bqDatasetFromMetadata(dm *DatasetMetadata) (*bq.Dataset, error) { |
| ds := &bq.Dataset{} |
| if dm == nil { |
| return ds, nil |
| } |
| ds.FriendlyName = dm.Name |
| ds.Description = dm.Description |
| ds.Location = dm.Location |
| ds.DefaultTableExpirationMs = int64(dm.DefaultTableExpiration / time.Millisecond) |
| ds.Labels = dm.Labels |
| if !dm.CreationTime.IsZero() { |
| return nil, errors.New("bigquery: Dataset.CreationTime is not writable") |
| } |
| if !dm.LastModifiedTime.IsZero() { |
| return nil, errors.New("bigquery: Dataset.LastModifiedTime is not writable") |
| } |
| if dm.FullID != "" { |
| return nil, errors.New("bigquery: Dataset.FullID is not writable") |
| } |
| if dm.ETag != "" { |
| return nil, errors.New("bigquery: Dataset.ETag is not writable") |
| } |
| return ds, nil |
| } |
| |
| func bqDatasetFromUpdateMetadata(dm *DatasetMetadataToUpdate) *bq.Dataset { |
| ds := &bq.Dataset{} |
| forceSend := func(field string) { |
| ds.ForceSendFields = append(ds.ForceSendFields, field) |
| } |
| |
| if dm.Description != nil { |
| ds.Description = optional.ToString(dm.Description) |
| forceSend("Description") |
| } |
| if dm.Name != nil { |
| ds.FriendlyName = optional.ToString(dm.Name) |
| forceSend("FriendlyName") |
| } |
| if dm.DefaultTableExpiration != nil { |
| dur := optional.ToDuration(dm.DefaultTableExpiration) |
| if dur == 0 { |
| // Send a null to delete the field. |
| ds.NullFields = append(ds.NullFields, "DefaultTableExpirationMs") |
| } else { |
| ds.DefaultTableExpirationMs = int64(dur / time.Millisecond) |
| } |
| } |
| if dm.setLabels != nil || dm.deleteLabels != nil { |
| ds.Labels = map[string]string{} |
| for k, v := range dm.setLabels { |
| ds.Labels[k] = v |
| } |
| if len(ds.Labels) == 0 && len(dm.deleteLabels) > 0 { |
| forceSend("Labels") |
| } |
| for l := range dm.deleteLabels { |
| ds.NullFields = append(ds.NullFields, "Labels."+l) |
| } |
| } |
| return ds |
| } |
| |
| func (s *bigqueryService) deleteDataset(ctx context.Context, datasetID, projectID string) error { |
| req := s.s.Datasets.Delete(projectID, datasetID).Context(ctx) |
| setClientHeader(req.Header()) |
| return runWithRetry(ctx, func() error { return req.Do() }) |
| } |
| |
| func (s *bigqueryService) getDatasetMetadata(ctx context.Context, projectID, datasetID string) (*DatasetMetadata, error) { |
| req := s.s.Datasets.Get(projectID, datasetID).Context(ctx) |
| setClientHeader(req.Header()) |
| var ds *bq.Dataset |
| if err := runWithRetry(ctx, func() (err error) { |
| ds, err = req.Do() |
| return err |
| }); err != nil { |
| return nil, err |
| } |
| return bqDatasetToMetadata(ds), nil |
| } |
| |
| func (s *bigqueryService) listDatasets(ctx context.Context, projectID string, maxResults int, pageToken string, all bool, filter string) ([]*Dataset, string, error) { |
| req := s.s.Datasets.List(projectID). |
| Context(ctx). |
| PageToken(pageToken). |
| All(all) |
| setClientHeader(req.Header()) |
| if maxResults > 0 { |
| req.MaxResults(int64(maxResults)) |
| } |
| if filter != "" { |
| req.Filter(filter) |
| } |
| var res *bq.DatasetList |
| err := runWithRetry(ctx, func() (err error) { |
| res, err = req.Do() |
| return err |
| }) |
| if err != nil { |
| return nil, "", err |
| } |
| var datasets []*Dataset |
| for _, d := range res.Datasets { |
| datasets = append(datasets, s.convertListedDataset(d)) |
| } |
| return datasets, res.NextPageToken, nil |
| } |
| |
| func (s *bigqueryService) convertListedDataset(d *bq.DatasetListDatasets) *Dataset { |
| return &Dataset{ |
| ProjectID: d.DatasetReference.ProjectId, |
| DatasetID: d.DatasetReference.DatasetId, |
| } |
| } |
| |
| func (s *bigqueryService) listJobs(ctx context.Context, projectID string, maxResults int, pageToken string, all bool, state string) ([]JobInfo, string, error) { |
| req := s.s.Jobs.List(projectID). |
| Context(ctx). |
| PageToken(pageToken). |
| Projection("full"). |
| AllUsers(all) |
| if state != "" { |
| req.StateFilter(state) |
| } |
| setClientHeader(req.Header()) |
| if maxResults > 0 { |
| req.MaxResults(int64(maxResults)) |
| } |
| res, err := req.Do() |
| if err != nil { |
| return nil, "", err |
| } |
| var jobInfos []JobInfo |
| for _, j := range res.Jobs { |
| ji, err := s.convertListedJob(j) |
| if err != nil { |
| return nil, "", err |
| } |
| jobInfos = append(jobInfos, ji) |
| } |
| return jobInfos, res.NextPageToken, nil |
| } |
| |
| func (s *bigqueryService) convertListedJob(j *bq.JobListJobs) (JobInfo, error) { |
| st, err := jobStatusFromProto(j.Status) |
| if err != nil { |
| return JobInfo{}, err |
| } |
| st.Statistics = jobStatisticsFromProto(j.Statistics) |
| return JobInfo{ |
| Job: jobFromProtos(j.JobReference, j.Configuration), |
| Status: st, |
| }, nil |
| } |
| |
| // runWithRetry calls the function until it returns nil or a non-retryable error, or |
| // the context is done. |
| // See the similar function in ../storage/invoke.go. The main difference is the |
| // reason for retrying. |
| func runWithRetry(ctx context.Context, call func() error) error { |
| // These parameters match the suggestions in https://cloud.google.com/bigquery/sla. |
| backoff := gax.Backoff{ |
| Initial: 1 * time.Second, |
| Max: 32 * time.Second, |
| Multiplier: 2, |
| } |
| return internal.Retry(ctx, backoff, func() (stop bool, err error) { |
| err = call() |
| if err == nil { |
| return true, nil |
| } |
| return !retryableError(err), err |
| }) |
| } |
| |
| // This is the correct definition of retryable according to the BigQuery team. |
| func retryableError(err error) bool { |
| e, ok := err.(*googleapi.Error) |
| if !ok { |
| return false |
| } |
| var reason string |
| if len(e.Errors) > 0 { |
| reason = e.Errors[0].Reason |
| } |
| return reason == "backendError" || reason == "rateLimitExceeded" |
| } |