bigquery: support jobs.list
Add Client.Jobs, which returns an iterator over JobInfos.
Support filtering by project ID, all users, and job state.
A JobInfo includes a Job, holding the immutable parts of a job (ID,
config), and a JobStatus. A later CL will expose the job config.
At present we always read the job config. The "minimal" view, which
omits the config, is a performance optimization. We can include it
when we get feedback that the full view is too slow.
Change-Id: Id90ee0efdbab6c5123fdd0110ab6c3ad80d02122
Reviewed-on: https://code-review.googlesource.com/16910
Reviewed-by: kokoro <noreply+kokoro@google.com>
Reviewed-by: Michael Darakananda <pongad@google.com>
Reviewed-by: Jeff Jolma <jjolma@google.com>
Reviewed-by: Tim Swast <swast@google.com>
Reviewed-by: Jonathan Amsterdam <jba@google.com>
diff --git a/bigquery/integration_test.go b/bigquery/integration_test.go
index 31b729f..a0fcb5d 100644
--- a/bigquery/integration_test.go
+++ b/bigquery/integration_test.go
@@ -1117,6 +1117,38 @@
}
}
+func TestIntegration_ListJobs(t *testing.T) {
+ // It's difficult to test the list of jobs, because we can't easily
+ // control what's in it. Also, there are many jobs in the test project,
+ // and it takes considerable time to list them all.
+ if client == nil {
+ t.Skip("Integration tests skipped")
+ }
+ ctx := context.Background()
+
+ // About all we can do is list a few jobs.
+ const max = 20
+ var jis []JobInfo
+ it := client.Jobs(ctx)
+ for {
+ ji, err := it.Next()
+ if err == iterator.Done {
+ break
+ }
+ if err != nil {
+ t.Fatal(err)
+ }
+ jis = append(jis, ji)
+ if len(jis) >= max {
+ break
+ }
+ }
+ // We expect that there is at least one job in the last few months.
+ if len(jis) == 0 {
+ t.Fatal("did not get any jobs")
+ }
+}
+
// Creates a new, temporary table with a unique name and the given schema.
func newTable(t *testing.T, s Schema) *Table {
name := fmt.Sprintf("t%d", time.Now().UnixNano())
diff --git a/bigquery/job.go b/bigquery/job.go
index e32161a..34a7bc8 100644
--- a/bigquery/job.go
+++ b/bigquery/job.go
@@ -16,6 +16,7 @@
import (
"errors"
+ "fmt"
"math/rand"
"os"
"sync"
@@ -25,6 +26,7 @@
gax "github.com/googleapis/gax-go"
"golang.org/x/net/context"
bq "google.golang.org/api/bigquery/v2"
+ "google.golang.org/api/iterator"
)
// A Job represents an operation which has been submitted to BigQuery for processing.
@@ -58,7 +60,8 @@
type State int
const (
- Pending State = iota
+ StateUnspecified State = iota // used only as a default in JobIterator
+ Pending
Running
Done
)
@@ -121,20 +124,25 @@
return s.err
}
+// Fill in the client field of Tables in the statistics.
+func (s *JobStatus) setClient(c *Client) {
+ if s.Statistics == nil {
+ return
+ }
+ if qs, ok := s.Statistics.Details.(*QueryStatistics); ok {
+ for _, t := range qs.ReferencedTables {
+ t.c = c
+ }
+ }
+}
+
// Status returns the current status of the job. It fails if the Status could not be determined.
func (j *Job) Status(ctx context.Context) (*JobStatus, error) {
js, err := j.c.service.jobStatus(ctx, j.projectID, j.jobID)
if err != nil {
return nil, err
}
- // Fill in the client field of Tables in the statistics.
- if js.Statistics != nil {
- if qs, ok := js.Statistics.Details.(*QueryStatistics); ok {
- for _, t := range qs.ReferencedTables {
- t.c = j.c
- }
- }
- }
+ js.setClient(j.c)
return js, nil
}
@@ -346,3 +354,73 @@
func (*ExtractStatistics) implementsStatistics() {}
func (*LoadStatistics) implementsStatistics() {}
func (*QueryStatistics) implementsStatistics() {}
+
+// Jobs lists jobs within a project.
+func (c *Client) Jobs(ctx context.Context) *JobIterator {
+ it := &JobIterator{
+ ctx: ctx,
+ c: c,
+ ProjectID: c.projectID,
+ }
+ it.pageInfo, it.nextFunc = iterator.NewPageInfo(
+ it.fetch,
+ func() int { return len(it.items) },
+ func() interface{} { b := it.items; it.items = nil; return b })
+ return it
+}
+
+// A JobInfo consists of a Job and a JobStatus.
+type JobInfo struct {
+ Job *Job
+ Status *JobStatus
+}
+
+// JobIterator iterates over jobs in a project.
+type JobIterator struct {
+ ProjectID string // Project ID of the jobs to list. Default is the client's project.
+ AllUsers bool // Whether to list jobs owned by all users in the project, or just the current caller.
+ State State // List only jobs in the given state. Defaults to all states.
+
+ ctx context.Context
+ c *Client
+ pageInfo *iterator.PageInfo
+ nextFunc func() error
+ items []JobInfo
+}
+
+func (it *JobIterator) PageInfo() *iterator.PageInfo { return it.pageInfo }
+
+func (it *JobIterator) Next() (JobInfo, error) {
+ if err := it.nextFunc(); err != nil {
+ return JobInfo{}, err
+ }
+ item := it.items[0]
+ it.items = it.items[1:]
+ return item, nil
+}
+
+func (it *JobIterator) fetch(pageSize int, pageToken string) (string, error) {
+ var st string
+ switch it.State {
+ case StateUnspecified:
+ st = ""
+ case Pending:
+ st = "pending"
+ case Running:
+ st = "running"
+ case Done:
+ st = "done"
+ default:
+ return "", fmt.Errorf("bigquery: invalid value for JobIterator.State: %d", it.State)
+ }
+ jobInfos, nextPageToken, err := it.c.service.listJobs(it.ctx, it.ProjectID, pageSize, pageToken, it.AllUsers, st)
+ if err != nil {
+ return "", err
+ }
+ for _, ji := range jobInfos {
+ ji.Job.c = it.c
+ ji.Status.setClient(it.c)
+ it.items = append(it.items, ji)
+ }
+ return nextPageToken, nil
+}
diff --git a/bigquery/service.go b/bigquery/service.go
index 2134d80..d4e3a70 100644
--- a/bigquery/service.go
+++ b/bigquery/service.go
@@ -41,6 +41,7 @@
getJob(ctx context.Context, projectId, jobID string) (*Job, error)
jobCancel(ctx context.Context, projectId, jobID string) error
jobStatus(ctx context.Context, projectId, jobID string) (*JobStatus, error)
+ listJobs(ctx context.Context, projectId string, maxResults int, pageToken string, all bool, state string) ([]JobInfo, string, error)
// Tables
createTable(ctx context.Context, conf *createTableConf) error
@@ -313,22 +314,11 @@
}
func (s *bigqueryService) getJob(ctx context.Context, projectID, jobID string) (*Job, error) {
- job, err := s.getJobInternal(ctx, projectID, jobID, "configuration")
+ bqjob, err := s.getJobInternal(ctx, projectID, jobID, "configuration")
if err != nil {
return nil, err
}
- var isQuery bool
- var dest *bq.TableReference
- if job.Configuration.Query != nil {
- isQuery = true
- dest = job.Configuration.Query.DestinationTable
- }
- return &Job{
- projectID: projectID,
- jobID: jobID,
- isQuery: isQuery,
- destinationTable: dest,
- }, nil
+ return jobFromProtos(bqjob.JobReference, bqjob.Configuration), nil
}
func (s *bigqueryService) jobStatus(ctx context.Context, projectID, jobID string) (*JobStatus, error) {
@@ -346,9 +336,10 @@
func (s *bigqueryService) getJobInternal(ctx context.Context, projectID, jobID string, fields ...googleapi.Field) (*bq.Job, error) {
var job *bq.Job
- call := s.s.Jobs.Get(projectID, jobID).
- Fields(fields...).
- Context(ctx)
+ call := s.s.Jobs.Get(projectID, jobID).Context(ctx)
+ if len(fields) > 0 {
+ call = call.Fields(fields...)
+ }
setClientHeader(call.Header())
err := runWithRetry(ctx, func() (err error) {
job, err = call.Do()
@@ -376,6 +367,21 @@
})
}
+func jobFromProtos(jr *bq.JobReference, config *bq.JobConfiguration) *Job {
+ var isQuery bool
+ var dest *bq.TableReference
+ if config.Query != nil {
+ isQuery = true
+ dest = config.Query.DestinationTable
+ }
+ return &Job{
+ projectID: jr.ProjectId,
+ jobID: jr.JobId,
+ isQuery: isQuery,
+ destinationTable: dest,
+ }
+}
+
var stateMap = map[string]State{"PENDING": Pending, "RUNNING": Running, "DONE": Done}
func jobStatusFromProto(status *bq.JobStatus) (*JobStatus, error) {
@@ -807,6 +813,46 @@
}
}
+func (s *bigqueryService) listJobs(ctx context.Context, projectID string, maxResults int, pageToken string, all bool, state string) ([]JobInfo, string, error) {
+ req := s.s.Jobs.List(projectID).
+ Context(ctx).
+ PageToken(pageToken).
+ Projection("full").
+ AllUsers(all)
+ if state != "" {
+ req.StateFilter(state)
+ }
+ setClientHeader(req.Header())
+ if maxResults > 0 {
+ req.MaxResults(int64(maxResults))
+ }
+ res, err := req.Do()
+ if err != nil {
+ return nil, "", err
+ }
+ var jobInfos []JobInfo
+ for _, j := range res.Jobs {
+ ji, err := s.convertListedJob(j)
+ if err != nil {
+ return nil, "", err
+ }
+ jobInfos = append(jobInfos, ji)
+ }
+ return jobInfos, res.NextPageToken, nil
+}
+
+func (s *bigqueryService) convertListedJob(j *bq.JobListJobs) (JobInfo, error) {
+ st, err := jobStatusFromProto(j.Status)
+ if err != nil {
+ return JobInfo{}, err
+ }
+ st.Statistics = jobStatisticsFromProto(j.Statistics)
+ return JobInfo{
+ Job: jobFromProtos(j.JobReference, j.Configuration),
+ Status: st,
+ }, nil
+}
+
// runWithRetry calls the function until it returns nil or a non-retryable error, or
// the context is done.
// See the similar function in ../storage/invoke.go. The main difference is the