bigquery: support jobs.list

Add Client.Jobs, which returns an iterator over JobInfos.

Support filtering by project ID, all users, and job state.

A JobInfo includes a Job, holding the immutable parts of a job (ID,
config), and a JobStatus. A later CL will expose the job config.

At present we always read the job config. The "minimal" view, which
omits the config, is a performance optimization. We can include it
when we get feedback that the full view is too slow.

Change-Id: Id90ee0efdbab6c5123fdd0110ab6c3ad80d02122
Reviewed-on: https://code-review.googlesource.com/16910
Reviewed-by: kokoro <noreply+kokoro@google.com>
Reviewed-by: Michael Darakananda <pongad@google.com>
Reviewed-by: Jeff Jolma <jjolma@google.com>
Reviewed-by: Tim Swast <swast@google.com>
Reviewed-by: Jonathan Amsterdam <jba@google.com>
diff --git a/bigquery/integration_test.go b/bigquery/integration_test.go
index 31b729f..a0fcb5d 100644
--- a/bigquery/integration_test.go
+++ b/bigquery/integration_test.go
@@ -1117,6 +1117,38 @@
 	}
 }
 
+func TestIntegration_ListJobs(t *testing.T) {
+	// It's difficult to test the list of jobs, because we can't easily
+	// control what's in it. Also, there are many jobs in the test project,
+	// and it takes considerable time to list them all.
+	if client == nil {
+		t.Skip("Integration tests skipped")
+	}
+	ctx := context.Background()
+
+	// About all we can do is list a few jobs.
+	const max = 20
+	var jis []JobInfo
+	it := client.Jobs(ctx)
+	for {
+		ji, err := it.Next()
+		if err == iterator.Done {
+			break
+		}
+		if err != nil {
+			t.Fatal(err)
+		}
+		jis = append(jis, ji)
+		if len(jis) >= max {
+			break
+		}
+	}
+	// We expect that there is at least one job in the last few months.
+	if len(jis) == 0 {
+		t.Fatal("did not get any jobs")
+	}
+}
+
 // Creates a new, temporary table with a unique name and the given schema.
 func newTable(t *testing.T, s Schema) *Table {
 	name := fmt.Sprintf("t%d", time.Now().UnixNano())
diff --git a/bigquery/job.go b/bigquery/job.go
index e32161a..34a7bc8 100644
--- a/bigquery/job.go
+++ b/bigquery/job.go
@@ -16,6 +16,7 @@
 
 import (
 	"errors"
+	"fmt"
 	"math/rand"
 	"os"
 	"sync"
@@ -25,6 +26,7 @@
 	gax "github.com/googleapis/gax-go"
 	"golang.org/x/net/context"
 	bq "google.golang.org/api/bigquery/v2"
+	"google.golang.org/api/iterator"
 )
 
 // A Job represents an operation which has been submitted to BigQuery for processing.
@@ -58,7 +60,8 @@
 type State int
 
 const (
-	Pending State = iota
+	StateUnspecified State = iota // used only as a default in JobIterator
+	Pending
 	Running
 	Done
 )
@@ -121,20 +124,25 @@
 	return s.err
 }
 
+// Fill in the client field of Tables in the statistics.
+func (s *JobStatus) setClient(c *Client) {
+	if s.Statistics == nil {
+		return
+	}
+	if qs, ok := s.Statistics.Details.(*QueryStatistics); ok {
+		for _, t := range qs.ReferencedTables {
+			t.c = c
+		}
+	}
+}
+
 // Status returns the current status of the job. It fails if the Status could not be determined.
 func (j *Job) Status(ctx context.Context) (*JobStatus, error) {
 	js, err := j.c.service.jobStatus(ctx, j.projectID, j.jobID)
 	if err != nil {
 		return nil, err
 	}
-	// Fill in the client field of Tables in the statistics.
-	if js.Statistics != nil {
-		if qs, ok := js.Statistics.Details.(*QueryStatistics); ok {
-			for _, t := range qs.ReferencedTables {
-				t.c = j.c
-			}
-		}
-	}
+	js.setClient(j.c)
 	return js, nil
 }
 
@@ -346,3 +354,73 @@
 func (*ExtractStatistics) implementsStatistics() {}
 func (*LoadStatistics) implementsStatistics()    {}
 func (*QueryStatistics) implementsStatistics()   {}
+
+// Jobs lists jobs within a project.
+func (c *Client) Jobs(ctx context.Context) *JobIterator {
+	it := &JobIterator{
+		ctx:       ctx,
+		c:         c,
+		ProjectID: c.projectID,
+	}
+	it.pageInfo, it.nextFunc = iterator.NewPageInfo(
+		it.fetch,
+		func() int { return len(it.items) },
+		func() interface{} { b := it.items; it.items = nil; return b })
+	return it
+}
+
+// A JobInfo consists of a Job and a JobStatus.
+type JobInfo struct {
+	Job    *Job
+	Status *JobStatus
+}
+
+// JobIterator iterates over jobs in a project.
+type JobIterator struct {
+	ProjectID string // Project ID of the jobs to list. Default is the client's project.
+	AllUsers  bool   // Whether to list jobs owned by all users in the project, or just the current caller.
+	State     State  // List only jobs in the given state. Defaults to all states.
+
+	ctx      context.Context
+	c        *Client
+	pageInfo *iterator.PageInfo
+	nextFunc func() error
+	items    []JobInfo
+}
+
+func (it *JobIterator) PageInfo() *iterator.PageInfo { return it.pageInfo }
+
+func (it *JobIterator) Next() (JobInfo, error) {
+	if err := it.nextFunc(); err != nil {
+		return JobInfo{}, err
+	}
+	item := it.items[0]
+	it.items = it.items[1:]
+	return item, nil
+}
+
+func (it *JobIterator) fetch(pageSize int, pageToken string) (string, error) {
+	var st string
+	switch it.State {
+	case StateUnspecified:
+		st = ""
+	case Pending:
+		st = "pending"
+	case Running:
+		st = "running"
+	case Done:
+		st = "done"
+	default:
+		return "", fmt.Errorf("bigquery: invalid value for JobIterator.State: %d", it.State)
+	}
+	jobInfos, nextPageToken, err := it.c.service.listJobs(it.ctx, it.ProjectID, pageSize, pageToken, it.AllUsers, st)
+	if err != nil {
+		return "", err
+	}
+	for _, ji := range jobInfos {
+		ji.Job.c = it.c
+		ji.Status.setClient(it.c)
+		it.items = append(it.items, ji)
+	}
+	return nextPageToken, nil
+}
diff --git a/bigquery/service.go b/bigquery/service.go
index 2134d80..d4e3a70 100644
--- a/bigquery/service.go
+++ b/bigquery/service.go
@@ -41,6 +41,7 @@
 	getJob(ctx context.Context, projectId, jobID string) (*Job, error)
 	jobCancel(ctx context.Context, projectId, jobID string) error
 	jobStatus(ctx context.Context, projectId, jobID string) (*JobStatus, error)
+	listJobs(ctx context.Context, projectId string, maxResults int, pageToken string, all bool, state string) ([]JobInfo, string, error)
 
 	// Tables
 	createTable(ctx context.Context, conf *createTableConf) error
@@ -313,22 +314,11 @@
 }
 
 func (s *bigqueryService) getJob(ctx context.Context, projectID, jobID string) (*Job, error) {
-	job, err := s.getJobInternal(ctx, projectID, jobID, "configuration")
+	bqjob, err := s.getJobInternal(ctx, projectID, jobID, "configuration")
 	if err != nil {
 		return nil, err
 	}
-	var isQuery bool
-	var dest *bq.TableReference
-	if job.Configuration.Query != nil {
-		isQuery = true
-		dest = job.Configuration.Query.DestinationTable
-	}
-	return &Job{
-		projectID:        projectID,
-		jobID:            jobID,
-		isQuery:          isQuery,
-		destinationTable: dest,
-	}, nil
+	return jobFromProtos(bqjob.JobReference, bqjob.Configuration), nil
 }
 
 func (s *bigqueryService) jobStatus(ctx context.Context, projectID, jobID string) (*JobStatus, error) {
@@ -346,9 +336,10 @@
 
 func (s *bigqueryService) getJobInternal(ctx context.Context, projectID, jobID string, fields ...googleapi.Field) (*bq.Job, error) {
 	var job *bq.Job
-	call := s.s.Jobs.Get(projectID, jobID).
-		Fields(fields...).
-		Context(ctx)
+	call := s.s.Jobs.Get(projectID, jobID).Context(ctx)
+	if len(fields) > 0 {
+		call = call.Fields(fields...)
+	}
 	setClientHeader(call.Header())
 	err := runWithRetry(ctx, func() (err error) {
 		job, err = call.Do()
@@ -376,6 +367,21 @@
 	})
 }
 
+func jobFromProtos(jr *bq.JobReference, config *bq.JobConfiguration) *Job {
+	var isQuery bool
+	var dest *bq.TableReference
+	if config.Query != nil {
+		isQuery = true
+		dest = config.Query.DestinationTable
+	}
+	return &Job{
+		projectID:        jr.ProjectId,
+		jobID:            jr.JobId,
+		isQuery:          isQuery,
+		destinationTable: dest,
+	}
+}
+
 var stateMap = map[string]State{"PENDING": Pending, "RUNNING": Running, "DONE": Done}
 
 func jobStatusFromProto(status *bq.JobStatus) (*JobStatus, error) {
@@ -807,6 +813,46 @@
 	}
 }
 
+func (s *bigqueryService) listJobs(ctx context.Context, projectID string, maxResults int, pageToken string, all bool, state string) ([]JobInfo, string, error) {
+	req := s.s.Jobs.List(projectID).
+		Context(ctx).
+		PageToken(pageToken).
+		Projection("full").
+		AllUsers(all)
+	if state != "" {
+		req.StateFilter(state)
+	}
+	setClientHeader(req.Header())
+	if maxResults > 0 {
+		req.MaxResults(int64(maxResults))
+	}
+	res, err := req.Do()
+	if err != nil {
+		return nil, "", err
+	}
+	var jobInfos []JobInfo
+	for _, j := range res.Jobs {
+		ji, err := s.convertListedJob(j)
+		if err != nil {
+			return nil, "", err
+		}
+		jobInfos = append(jobInfos, ji)
+	}
+	return jobInfos, res.NextPageToken, nil
+}
+
+func (s *bigqueryService) convertListedJob(j *bq.JobListJobs) (JobInfo, error) {
+	st, err := jobStatusFromProto(j.Status)
+	if err != nil {
+		return JobInfo{}, err
+	}
+	st.Statistics = jobStatisticsFromProto(j.Statistics)
+	return JobInfo{
+		Job:    jobFromProtos(j.JobReference, j.Configuration),
+		Status: st,
+	}, nil
+}
+
 // runWithRetry calls the function until it returns nil or a non-retryable error, or
 // the context is done.
 // See the similar function in ../storage/invoke.go. The main difference is the