bigquery: refactor RowIterator to support fast query path (#2484)

This CL significantly revamps the rowiterator.

Previously, row iteration necessitated a reference to a Table object.
For jobs, this meant we would consult job statistics to find the
reference to the destination table.

With this change, we switch to using the jobs.getQueryResults RPC to get
rows from a job, while still allowing table reads to use the existing
tabledata.list mechanism.

This change also includes support for a caching abstraction in the row
iterator, which is added but not yet leveraged.  This will be wired up
in subsequent PRs that change the query execution path.

Additionally, this PR address some other reported issues against the library as the use of jobs.getQueryResults allows the service to make decisions about the source of results (e.g. in cases where a scripting/multistatement query returns resultsets from a child job).

Fixes: #1960, #1974
diff --git a/bigquery/integration_test.go b/bigquery/integration_test.go
index 7590f13..77ee8f2 100644
--- a/bigquery/integration_test.go
+++ b/bigquery/integration_test.go
@@ -954,12 +954,12 @@
 			want:        [][]Value{},
 		},
 		{
-			// Note: currently CTAS returns the rows due to the destination table reference,
-			// but it's not clear that it should.
-			// https://github.com/googleapis/google-cloud-go/issues/1467 for followup.
+			// Previously this would return rows due to the destination reference being present
+			// in the job config, but switching to relying on jobs.getQueryResults allows the
+			// service to decide the behavior.
 			description: "ctas ddl",
 			query:       fmt.Sprintf("CREATE TABLE %s.%s AS SELECT 17 as foo", dataset.DatasetID, tableIDs.New()),
-			want:        [][]Value{{int64(17)}},
+			want:        nil,
 		},
 	}
 	for _, tc := range testCases {
@@ -976,6 +976,48 @@
 	}
 }
 
+func TestIntegration_RoutineStoredProcedure(t *testing.T) {
+	// Verifies we're exhibiting documented behavior, where we're expected
+	// to return the last resultset in a script as the response from a script
+	// job.
+	// https://github.com/googleapis/google-cloud-go/issues/1974
+	if client == nil {
+		t.Skip("Integration tests skipped")
+	}
+	ctx := context.Background()
+
+	// Define a simple stored procedure via DDL.
+	routineID := routineIDs.New()
+	routine := dataset.Routine(routineID)
+	sql := fmt.Sprintf(`
+		CREATE OR REPLACE PROCEDURE `+"`%s`"+`(val INT64)
+		BEGIN
+			SELECT CURRENT_TIMESTAMP() as ts;
+			SELECT val * 2 as f2;
+		END`,
+		routine.FullyQualifiedName())
+
+	if err := runQueryJob(ctx, sql); err != nil {
+		t.Fatal(err)
+	}
+	defer routine.Delete(ctx)
+
+	// Invoke the stored procedure.
+	sql = fmt.Sprintf(`
+	CALL `+"`%s`"+`(5)`,
+		routine.FullyQualifiedName())
+
+	q := client.Query(sql)
+	it, err := q.Read(ctx)
+	if err != nil {
+		t.Fatalf("query.Read: %v", err)
+	}
+
+	checkReadAndTotalRows(t,
+		"expect result set from procedure",
+		it, [][]Value{{int64(10)}})
+}
+
 func TestIntegration_InsertAndRead(t *testing.T) {
 	if client == nil {
 		t.Skip("Integration tests skipped")
diff --git a/bigquery/iterator.go b/bigquery/iterator.go
index 298143c..6d28ea0 100644
--- a/bigquery/iterator.go
+++ b/bigquery/iterator.go
@@ -16,37 +16,37 @@
 
 import (
 	"context"
+	"errors"
 	"fmt"
 	"reflect"
 
 	bq "google.golang.org/api/bigquery/v2"
+	"google.golang.org/api/googleapi"
 	"google.golang.org/api/iterator"
 )
 
 // Construct a RowIterator.
-// If pf is nil, there are no rows in the result set.
-func newRowIterator(ctx context.Context, t *Table, pf pageFetcher) *RowIterator {
+func newRowIterator(ctx context.Context, src *rowSource, pf pageFetcher) *RowIterator {
 	it := &RowIterator{
-		ctx:   ctx,
-		table: t,
-		pf:    pf,
+		ctx: ctx,
+		src: src,
+		pf:  pf,
 	}
-	if pf != nil {
-		it.pageInfo, it.nextFunc = iterator.NewPageInfo(
-			it.fetch,
-			func() int { return len(it.rows) },
-			func() interface{} { r := it.rows; it.rows = nil; return r })
-	}
+	it.pageInfo, it.nextFunc = iterator.NewPageInfo(
+		it.fetch,
+		func() int { return len(it.rows) },
+		func() interface{} { r := it.rows; it.rows = nil; return r })
 	return it
 }
 
 // A RowIterator provides access to the result of a BigQuery lookup.
 type RowIterator struct {
-	ctx      context.Context
-	table    *Table
-	pf       pageFetcher
+	ctx context.Context
+	src *rowSource
+
 	pageInfo *iterator.PageInfo
 	nextFunc func() error
+	pf       pageFetcher
 
 	// StartIndex can be set before the first call to Next. If PageInfo().Token
 	// is also set, StartIndex is ignored.
@@ -63,6 +63,11 @@
 	structLoader structLoader // used to populate a pointer to a struct
 }
 
+// We declare a function signature for fetching results.  The primary reason
+// for this is to enable us to swap out the fetch function with alternate
+// implementations (e.g. to enable testing).
+type pageFetcher func(ctx context.Context, _ *rowSource, _ Schema, startIndex uint64, pageSize int64, pageToken string) (*fetchPageResult, error)
+
 // Next loads the next row into dst. Its return value is iterator.Done if there
 // are no more results. Once Next returns iterator.Done, all subsequent calls
 // will return iterator.Done.
@@ -103,9 +108,6 @@
 // NullDateTime. You can also use a *[]Value or *map[string]Value to read from a
 // table with NULLs.
 func (it *RowIterator) Next(dst interface{}) error {
-	if it.pf == nil { // There are no rows in the result set.
-		return iterator.Done
-	}
 	var vl ValueLoader
 	switch dst := dst.(type) {
 	case ValueLoader:
@@ -145,19 +147,42 @@
 func (it *RowIterator) PageInfo() *iterator.PageInfo { return it.pageInfo }
 
 func (it *RowIterator) fetch(pageSize int, pageToken string) (string, error) {
-	res, err := it.pf(it.ctx, it.table, it.Schema, it.StartIndex, int64(pageSize), pageToken)
+	res, err := it.pf(it.ctx, it.src, it.Schema, it.StartIndex, int64(pageSize), pageToken)
 	if err != nil {
 		return "", err
 	}
 	it.rows = append(it.rows, res.rows...)
-	it.Schema = res.schema
+	if it.Schema == nil {
+		it.Schema = res.schema
+	}
 	it.TotalRows = res.totalRows
 	return res.pageToken, nil
 }
 
-// A pageFetcher returns a page of rows from a destination table.
-type pageFetcher func(ctx context.Context, _ *Table, _ Schema, startIndex uint64, pageSize int64, pageToken string) (*fetchPageResult, error)
+// rowSource represents one of the multiple sources of data for a row iterator.
+// Rows can be read directly from a BigQuery table or from a job reference.
+// If a job is present, that's treated as the authoritative source.
+//
+// rowSource can also cache results for special situations, primarily for the
+// fast execution query path which can return status, rows, and schema all at
+// once.  Our cache data expectations are as follows:
+//
+// * We can only cache data from the start of a source.
+// * We need to cache schema, rows, and next page token to effective service
+//   a request from cache.
+// * cache references are destroyed as soon as they're interrogated.  We don't
+//   want to retain the data unnecessarily, and we expect that the backend
+//   can always provide them if needed.
+type rowSource struct {
+	j *Job
+	t *Table
 
+	cachedRows      []*bq.TableRow
+	cachedSchema    *bq.TableSchema
+	cachedNextToken string
+}
+
+// fetchPageResult represents a page of rows returned from the backend.
 type fetchPageResult struct {
 	pageToken string
 	rows      [][]Value
@@ -165,8 +190,26 @@
 	schema    Schema
 }
 
-// fetchPage gets a page of rows from t.
-func fetchPage(ctx context.Context, t *Table, schema Schema, startIndex uint64, pageSize int64, pageToken string) (*fetchPageResult, error) {
+// fetchPage is our generalized fetch mechanism.  It interrogates from cache, and
+// then dispatches to either the appropriate job or table-based backend mechanism
+// as needed.
+func fetchPage(ctx context.Context, src *rowSource, schema Schema, startIndex uint64, pageSize int64, pageToken string) (*fetchPageResult, error) {
+	result, err := fetchCachedPage(ctx, src, schema, startIndex, pageSize, pageToken)
+	if err != nil {
+		if err != errNoCacheData {
+			// This likely means something more severe, like a problem with schema.
+			return nil, err
+		}
+		// If we failed to fet data from cache, invoke the appropriate service method.
+		if src.j != nil {
+			return fetchJobResultPage(ctx, src, schema, startIndex, pageSize, pageToken)
+		}
+		return fetchTableResultPage(ctx, src, schema, startIndex, pageSize, pageToken)
+	}
+	return result, nil
+}
+
+func fetchTableResultPage(ctx context.Context, src *rowSource, schema Schema, startIndex uint64, pageSize int64, pageToken string) (*fetchPageResult, error) {
 	// Fetch the table schema in the background, if necessary.
 	errc := make(chan error, 1)
 	if schema != nil {
@@ -175,7 +218,7 @@
 		go func() {
 			var bqt *bq.Table
 			err := runWithRetry(ctx, func() (err error) {
-				bqt, err = t.c.bqs.Tables.Get(t.ProjectID, t.DatasetID, t.TableID).
+				bqt, err = src.t.c.bqs.Tables.Get(src.t.ProjectID, src.t.DatasetID, src.t.TableID).
 					Fields("schema").
 					Context(ctx).
 					Do()
@@ -187,7 +230,7 @@
 			errc <- err
 		}()
 	}
-	call := t.c.bqs.Tabledata.List(t.ProjectID, t.DatasetID, t.TableID)
+	call := src.t.c.bqs.Tabledata.List(src.t.ProjectID, src.t.DatasetID, src.t.TableID)
 	setClientHeader(call.Header())
 	if pageToken != "" {
 		call.PageToken(pageToken)
@@ -220,3 +263,100 @@
 		schema:    schema,
 	}, nil
 }
+
+func fetchJobResultPage(ctx context.Context, src *rowSource, schema Schema, startIndex uint64, pageSize int64, pageToken string) (*fetchPageResult, error) {
+	// reduce data transfered by leveraging api projections
+	projectedFields := []googleapi.Field{"rows", "pageToken", "totalRows"}
+	call := src.j.c.bqs.Jobs.GetQueryResults(src.j.projectID, src.j.jobID).Location(src.j.location)
+	call = call.Fields(projectedFields...)
+	if schema == nil {
+		// only project schema if we weren't supplied one.
+		call = call.Fields("schema")
+	}
+	setClientHeader(call.Header())
+	if pageToken != "" {
+		call.PageToken(pageToken)
+	} else {
+		call.StartIndex(startIndex)
+	}
+	if pageSize > 0 {
+		call.MaxResults(pageSize)
+	}
+	var res *bq.GetQueryResultsResponse
+	err := runWithRetry(ctx, func() (err error) {
+		res, err = call.Context(ctx).Do()
+		return err
+	})
+	if err != nil {
+		return nil, err
+	}
+	// Populate schema in the rowsource if it's missing
+	if schema == nil {
+		schema = bqToSchema(res.Schema)
+	}
+
+	rows, err := convertRows(res.Rows, schema)
+	if err != nil {
+		return nil, err
+	}
+	return &fetchPageResult{
+		pageToken: res.PageToken,
+		rows:      rows,
+		totalRows: uint64(res.TotalRows),
+		schema:    schema,
+	}, nil
+}
+
+var errNoCacheData = errors.New("No rows in rowSource cache")
+
+// fetchCachedPage attempts to service the first page of results.  For the jobs path specifically, we have an
+// opportunity to fetch rows before the iterator is constructed, and thus serve that data as the first request
+// without an unnecessary network round trip.
+func fetchCachedPage(ctx context.Context, src *rowSource, schema Schema, startIndex uint64, pageSize int64, pageToken string) (*fetchPageResult, error) {
+	// we have no cached data
+	if src.cachedRows == nil {
+		return nil, errNoCacheData
+	}
+	// we have no schema for decoding.  convert from the cached representation if available.
+	if schema == nil {
+		if src.cachedSchema == nil {
+			// We can't progress with no schema, destroy references and return a miss.
+			src.cachedRows = nil
+			src.cachedNextToken = ""
+			return nil, errNoCacheData
+		}
+		schema = bqToSchema(src.cachedSchema)
+	}
+	// Only serve from cache where we're confident we know someone's asking for the first page
+	// without having to align data.
+	//
+	// Future consideration: we could service pagesizes smaller than the cache if we're willing to handle generation
+	// of pageTokens for the cache.
+	if pageToken == "" &&
+		startIndex == 0 &&
+		(pageSize == 0 || pageSize == int64(len(src.cachedRows))) {
+		converted, err := convertRows(src.cachedRows, schema)
+		if err != nil {
+			// destroy cache references and return error
+			src.cachedRows = nil
+			src.cachedSchema = nil
+			src.cachedNextToken = ""
+			return nil, err
+		}
+		result := &fetchPageResult{
+			pageToken: src.cachedNextToken,
+			rows:      converted,
+			schema:    schema,
+			totalRows: uint64(len(converted)),
+		}
+		// clear cache references and return response.
+		src.cachedRows = nil
+		src.cachedNextToken = ""
+		return result, nil
+	}
+	// All other cases are invalid.  Destroy any cache references on the way out the door.
+	src.cachedRows = nil
+	src.cachedSchema = nil
+	src.cachedNextToken = ""
+	return nil, errNoCacheData
+}
diff --git a/bigquery/iterator_test.go b/bigquery/iterator_test.go
index 125796d..14a2111 100644
--- a/bigquery/iterator_test.go
+++ b/bigquery/iterator_test.go
@@ -21,6 +21,8 @@
 	"testing"
 
 	"cloud.google.com/go/internal/testutil"
+	"github.com/google/go-cmp/cmp"
+	bq "google.golang.org/api/bigquery/v2"
 	"google.golang.org/api/iterator"
 )
 
@@ -35,7 +37,7 @@
 	err            error
 }
 
-func (pf *pageFetcherStub) fetchPage(ctx context.Context, _ *Table, _ Schema, _ uint64, _ int64, pageToken string) (*fetchPageResult, error) {
+func (pf *pageFetcherStub) fetchPage(ctx context.Context, _ *rowSource, _ Schema, _ uint64, _ int64, pageToken string) (*fetchPageResult, error) {
 	call, ok := pf.fetchResponses[pageToken]
 	if !ok {
 		pf.err = fmt.Errorf("Unexpected page token: %q", pageToken)
@@ -43,6 +45,115 @@
 	return call.result, call.err
 }
 
+func TestRowIteratorCacheBehavior(t *testing.T) {
+
+	testSchema := &bq.TableSchema{
+		Fields: []*bq.TableFieldSchema{
+			{Type: "INTEGER", Name: "field1"},
+			{Type: "STRING", Name: "field2"},
+		},
+	}
+	testRows := []*bq.TableRow{
+		{F: []*bq.TableCell{
+			{V: "1"},
+			{V: "foo"},
+		},
+		},
+	}
+	convertedSchema := bqToSchema(testSchema)
+
+	convertedRows, _ := convertRows(testRows, convertedSchema)
+
+	testCases := []struct {
+		inSource     *rowSource
+		inSchema     Schema
+		inStartIndex uint64
+		inPageSize   int64
+		inPageToken  string
+		wantErr      error
+		wantResult   *fetchPageResult
+	}{
+		{
+			inSource: &rowSource{},
+			wantErr:  errNoCacheData,
+		},
+		{
+			// primary success case: schema in cache
+			inSource: &rowSource{
+				cachedSchema: testSchema,
+				cachedRows:   testRows,
+			},
+			wantResult: &fetchPageResult{
+				totalRows: uint64(len(convertedRows)),
+				schema:    convertedSchema,
+				rows:      convertedRows,
+			},
+		},
+		{
+			// secondary success case: schema provided
+			inSource: &rowSource{
+				cachedRows:      testRows,
+				cachedNextToken: "foo",
+			},
+			inSchema: convertedSchema,
+			wantResult: &fetchPageResult{
+				totalRows: uint64(len(convertedRows)),
+				schema:    convertedSchema,
+				rows:      convertedRows,
+				pageToken: "foo",
+			},
+		},
+		{
+			// misaligned page size.
+			inSource: &rowSource{
+				cachedSchema: testSchema,
+				cachedRows:   testRows,
+			},
+			inPageSize: 99,
+			wantErr:    errNoCacheData,
+		},
+		{
+			// nonzero start.
+			inSource: &rowSource{
+				cachedSchema: testSchema,
+				cachedRows:   testRows,
+			},
+			inStartIndex: 1,
+			wantErr:      errNoCacheData,
+		},
+		{
+			// data without schema
+			inSource: &rowSource{
+				cachedSchema: testSchema,
+				cachedRows:   testRows,
+			},
+			inStartIndex: 1,
+			wantErr:      errNoCacheData,
+		},
+		{
+			// data without schema
+			inSource: &rowSource{
+				cachedSchema: testSchema,
+				cachedRows:   testRows,
+			},
+			inStartIndex: 1,
+			wantErr:      errNoCacheData,
+		},
+	}
+	for _, tc := range testCases {
+		gotResp, gotErr := fetchCachedPage(context.Background(), tc.inSource, tc.inSchema, tc.inStartIndex, tc.inPageSize, tc.inPageToken)
+		if gotErr != tc.wantErr {
+			t.Errorf("err mismatch.  got %v, want %v", gotErr, tc.wantErr)
+		} else {
+			if diff := testutil.Diff(gotResp, tc.wantResult,
+				cmp.AllowUnexported(fetchPageResult{}, rowSource{}, Job{}, Client{}, Table{})); diff != "" {
+				t.Errorf("response diff (got=-, want=+):\n%s", diff)
+			}
+		}
+	}
+
+}
+
 func TestIterator(t *testing.T) {
 	var (
 		iiSchema = Schema{
diff --git a/bigquery/job.go b/bigquery/job.go
index 6a51973..edaa136 100644
--- a/bigquery/job.go
+++ b/bigquery/job.go
@@ -284,21 +284,18 @@
 	if !j.isQuery() {
 		return nil, errors.New("bigquery: cannot read from a non-query job")
 	}
-	destTable := j.config.Query.DestinationTable
-	// The destination table should only be nil if there was a query error.
-	projectID := j.projectID
-	if destTable != nil && projectID != destTable.ProjectId {
-		return nil, fmt.Errorf("bigquery: job project ID is %q, but destination table's is %q", projectID, destTable.ProjectId)
-	}
-	schema, totalRows, err := waitForQuery(ctx, projectID)
+	schema, totalRows, err := waitForQuery(ctx, j.projectID)
 	if err != nil {
 		return nil, err
 	}
-	if destTable == nil {
-		return nil, errors.New("bigquery: query job missing destination table")
+	// Shave off some potential overhead by only retaining the minimal job representation in the iterator.
+	itJob := &Job{
+		c:         j.c,
+		projectID: j.projectID,
+		jobID:     j.jobID,
+		location:  j.location,
 	}
-	dt := bqToTable(destTable, j.c)
-	it := newRowIterator(ctx, dt, pf)
+	it := newRowIterator(ctx, &rowSource{j: itJob}, pf)
 	it.Schema = schema
 	it.TotalRows = totalRows
 	return it, nil
diff --git a/bigquery/read_test.go b/bigquery/read_test.go
index 883430a..66f6136 100644
--- a/bigquery/read_test.go
+++ b/bigquery/read_test.go
@@ -26,7 +26,7 @@
 )
 
 type pageFetcherArgs struct {
-	table      *Table
+	src        *rowSource
 	schema     Schema
 	startIndex uint64
 	pageSize   int64
@@ -43,9 +43,9 @@
 	calls []pageFetcherArgs
 }
 
-func (s *pageFetcherReadStub) fetchPage(ctx context.Context, t *Table, schema Schema, startIndex uint64, pageSize int64, pageToken string) (*fetchPageResult, error) {
+func (s *pageFetcherReadStub) fetchPage(ctx context.Context, src *rowSource, schema Schema, startIndex uint64, pageSize int64, pageToken string) (*fetchPageResult, error) {
 	s.calls = append(s.calls,
-		pageFetcherArgs{t, schema, startIndex, pageSize, pageToken})
+		pageFetcherArgs{src, schema, startIndex, pageSize, pageToken})
 	result := &fetchPageResult{
 		pageToken: s.pageTokens[pageToken],
 		rows:      s.values[0],
@@ -156,7 +156,7 @@
 
 var errBang = errors.New("bang")
 
-func errorFetchPage(context.Context, *Table, Schema, uint64, int64, string) (*fetchPageResult, error) {
+func errorFetchPage(context.Context, *rowSource, Schema, uint64, int64, string) (*fetchPageResult, error) {
 	return nil, errBang
 }
 
@@ -184,11 +184,13 @@
 		t.Fatal(err)
 	}
 	want := []pageFetcherArgs{{
-		table:     tr,
+		src: &rowSource{
+			t: tr,
+		},
 		pageSize:  5,
 		pageToken: "",
 	}}
-	if diff := testutil.Diff(s.calls, want, cmp.AllowUnexported(pageFetcherArgs{}, pageFetcherReadStub{}, Table{}, Client{})); diff != "" {
+	if diff := testutil.Diff(s.calls, want, cmp.AllowUnexported(pageFetcherArgs{}, pageFetcherReadStub{}, rowSource{}, Table{}, Client{})); diff != "" {
 		t.Errorf("reading (got=-, want=+):\n%s", diff)
 	}
 }
@@ -223,11 +225,18 @@
 	}
 
 	want := []pageFetcherArgs{{
-		table:     bqToTable(tr, c),
+		src: &rowSource{
+			j: &Job{
+				c:         queryJob.c,
+				jobID:     queryJob.jobID,
+				projectID: queryJob.projectID,
+				location:  queryJob.location,
+			},
+		},
 		pageSize:  5,
 		pageToken: "",
 	}}
-	if !testutil.Equal(pf.calls, want, cmp.AllowUnexported(pageFetcherArgs{}, Table{}, Client{})) {
+	if !testutil.Equal(pf.calls, want, cmp.AllowUnexported(pageFetcherArgs{}, rowSource{}, Job{}, Client{})) {
 		t.Errorf("reading: got:\n%v\nwant:\n%v", pf.calls, want)
 	}
 }
diff --git a/bigquery/table.go b/bigquery/table.go
index b06d2ff..05e1270 100644
--- a/bigquery/table.go
+++ b/bigquery/table.go
@@ -614,7 +614,7 @@
 }
 
 func (t *Table) read(ctx context.Context, pf pageFetcher) *RowIterator {
-	return newRowIterator(ctx, t, pf)
+	return newRowIterator(ctx, &rowSource{t: t}, pf)
 }
 
 // NeverExpire is a sentinel value used to remove a table'e expiration time.