blob: 88924ec758a96d803e4a25f9d5825ec003951624 [file] [log] [blame]
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package bigquery
import (
"context"
"errors"
"fmt"
"testing"
"time"
"cloud.google.com/go/internal/testutil"
"github.com/google/go-cmp/cmp"
"google.golang.org/api/iterator"
)
func TestIntegration_StorageReadBasicTypes(t *testing.T) {
if client == nil {
t.Skip("Integration tests skipped")
}
ctx := context.Background()
initQueryParameterTestCases()
for _, c := range queryParameterTestCases {
q := storageOptimizedClient.Query(c.query)
q.Parameters = c.parameters
q.forceStorageAPI = true
it, err := q.Read(ctx)
if err != nil {
t.Fatal(err)
}
err = checkIteratorRead(it, c.wantRow)
if err != nil {
t.Fatalf("%s: error on query `%s`[%v]: %v", it.SourceJob().ID(), c.query, c.parameters, err)
}
if !it.IsAccelerated() {
t.Fatalf("%s: expected storage api to be used", it.SourceJob().ID())
}
}
}
func TestIntegration_StorageReadEmptyResultSet(t *testing.T) {
if client == nil {
t.Skip("Integration tests skipped")
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
table := storageOptimizedClient.Dataset(dataset.DatasetID).Table(tableIDs.New())
err := table.Create(ctx, &TableMetadata{
Schema: Schema{
{Name: "name", Type: StringFieldType, Required: true},
},
})
if err != nil {
t.Fatal(err)
}
defer table.Delete(ctx)
it := table.Read(ctx)
err = checkIteratorRead(it, []Value{})
if err != nil {
t.Fatalf("failed to read empty table: %v", err)
}
if !it.IsAccelerated() {
t.Fatal("expected storage api to be used")
}
}
func TestIntegration_StorageReadFromSources(t *testing.T) {
if client == nil {
t.Skip("Integration tests skipped")
}
ctx := context.Background()
dstTable := dataset.Table(tableIDs.New())
dstTable.c = storageOptimizedClient
sql := `SELECT 1 as num, 'one' as str
UNION ALL
SELECT 2 as num, 'two' as str
UNION ALL
SELECT 3 as num, 'three' as str
ORDER BY num`
q := storageOptimizedClient.Query(sql)
q.Dst = dstTable
job, err := q.Run(ctx)
if err != nil {
t.Fatal(err)
}
status, err := job.Wait(ctx)
if err != nil {
t.Fatal(err)
}
if err := status.Err(); err != nil {
t.Fatal(err)
}
expectedRows := [][]Value{
{int64(1), "one"},
{int64(2), "two"},
{int64(3), "three"},
}
tableRowIt := dstTable.Read(ctx)
if err = checkRowsRead(tableRowIt, expectedRows); err != nil {
t.Fatalf("checkRowsRead(table): %v", err)
}
if !tableRowIt.IsAccelerated() {
t.Fatalf("reading from table should use Storage API")
}
jobRowIt, err := job.Read(ctx)
if err != nil {
t.Fatalf("ReadJobResults(job): %v", err)
}
if err = checkRowsRead(jobRowIt, expectedRows); err != nil {
t.Fatalf("checkRowsRead(job): %v", err)
}
if !jobRowIt.IsAccelerated() {
t.Fatalf("reading job should use Storage API")
}
q.Dst = nil
q.forceStorageAPI = true
qRowIt, err := q.Read(ctx)
if err != nil {
t.Fatalf("ReadQuery(query): %v", err)
}
if !qRowIt.IsAccelerated() {
t.Fatalf("reading query should use Storage API")
}
if err = checkRowsRead(qRowIt, expectedRows); err != nil {
t.Fatalf("checkRowsRead(query): %v", err)
}
}
func TestIntegration_StorageReadScriptJob(t *testing.T) {
if client == nil {
t.Skip("Integration tests skipped")
}
tableID := tableIDs.New()
ctx := context.Background()
sql := fmt.Sprintf(`
-- Statement 0
DECLARE x INT64;
SET x = 4;
-- Statement 1
SELECT 1 as foo;
-- Statement 2
SELECT 1 as num, 'one' as str
UNION ALL
SELECT 2 as num, 'two' as str;
-- Statement 3
SELECT 1 as num, 'one' as str
UNION ALL
SELECT 2 as num, 'two' as str
UNION ALL
SELECT 3 as num, 'three' as str
UNION ALL
SELECT x as num, 'four' as str
ORDER BY num;
-- Statement 4
CREATE TABLE %s.%s ( num INT64, str STRING );
-- Statement 5
DROP TABLE %s.%s;
`, dataset.DatasetID, tableID, dataset.DatasetID, tableID)
q := storageOptimizedClient.Query(sql)
q.forceStorageAPI = true
it, err := q.Read(ctx)
if err != nil {
t.Fatal(err)
}
expectedRows := [][]Value{
{int64(1), "one"},
{int64(2), "two"},
{int64(3), "three"},
{int64(4), "four"},
}
if err = checkRowsRead(it, expectedRows); err != nil {
t.Fatalf("checkRowsRead(it): %v", err)
}
if !it.IsAccelerated() {
t.Fatalf("reading job should use Storage API")
}
}
func TestIntegration_StorageReadQueryOrdering(t *testing.T) {
if client == nil {
t.Skip("Integration tests skipped")
}
ctx := context.Background()
table := "`bigquery-public-data.usa_names.usa_1910_current`"
testCases := []struct {
name string
query string
maxExpectedStreams int
}{
{
name: "Non_Ordered_Query",
query: fmt.Sprintf(`SELECT name, number, state FROM %s`, table),
maxExpectedStreams: -1, // No limit
},
{
name: "Ordered_Query",
query: fmt.Sprintf(`SELECT name, number, state FROM %s order by name`, table),
maxExpectedStreams: 1,
},
}
type S struct {
Name string
Number int
State string
}
for _, tc := range testCases {
q := storageOptimizedClient.Query(tc.query)
q.forceStorageAPI = true
it, err := q.Read(ctx)
if err != nil {
t.Fatal(err)
}
var firstValue S
err = it.Next(&firstValue)
if err != nil {
t.Fatal(err)
}
if cmp.Equal(firstValue, S{}) {
t.Fatalf("user defined struct was not filled with data")
}
total, err := countIteratorRows(it)
if err != nil {
t.Fatal(err)
}
total++ // as we read the first value separately
bqSession := it.arrowIterator.session.bqSession
if len(bqSession.Streams) == 0 {
t.Fatalf("%s: expected to use at least one stream but found %d", tc.name, len(bqSession.Streams))
}
streamSettings := it.arrowIterator.session.settings.maxStreamCount
if tc.maxExpectedStreams > 0 {
if streamSettings > tc.maxExpectedStreams {
t.Fatalf("%s: expected stream settings to be at most %d streams but found %d", tc.name, tc.maxExpectedStreams, streamSettings)
}
if len(bqSession.Streams) > tc.maxExpectedStreams {
t.Fatalf("%s: expected server to set up at most %d streams but found %d", tc.name, tc.maxExpectedStreams, len(bqSession.Streams))
}
} else {
if streamSettings != 0 {
t.Fatalf("%s: expected stream settings to be 0 (server defines amount of stream) but found %d", tc.name, streamSettings)
}
}
if total != it.TotalRows {
t.Fatalf("%s: should have read %d rows, but read %d", tc.name, it.TotalRows, total)
}
if !it.IsAccelerated() {
t.Fatalf("%s: expected query to be accelerated by Storage API", tc.name)
}
}
}
func TestIntegration_StorageReadQueryStruct(t *testing.T) {
if client == nil {
t.Skip("Integration tests skipped")
}
ctx := context.Background()
table := "`bigquery-public-data.samples.wikipedia`"
sql := fmt.Sprintf(`SELECT id, title, timestamp, comment FROM %s LIMIT 1000`, table)
q := storageOptimizedClient.Query(sql)
q.forceStorageAPI = true
q.DisableQueryCache = true
it, err := q.Read(ctx)
if err != nil {
t.Fatal(err)
}
if !it.IsAccelerated() {
t.Fatal("expected query to use Storage API")
}
type S struct {
ID int64
Title string
Timestamp int64
Comment NullString
}
total := uint64(0)
for {
var dst S
err := it.Next(&dst)
if err == iterator.Done {
break
}
if err != nil {
t.Fatalf("failed to fetch via storage API: %v", err)
}
if cmp.Equal(dst, S{}) {
t.Fatalf("user defined struct was not filled with data")
}
total++
}
bqSession := it.arrowIterator.session.bqSession
if len(bqSession.Streams) == 0 {
t.Fatalf("should use more than one stream but found %d", len(bqSession.Streams))
}
if total != it.TotalRows {
t.Fatalf("should have read %d rows, but read %d", it.TotalRows, total)
}
}
func TestIntegration_StorageReadQueryMorePages(t *testing.T) {
if client == nil {
t.Skip("Integration tests skipped")
}
ctx := context.Background()
table := "`bigquery-public-data.samples.github_timeline`"
sql := fmt.Sprintf(`SELECT repository_url as url, repository_owner as owner, repository_forks as forks FROM %s`, table)
// Don't forceStorageAPI usage and still see internally Storage API is selected
q := storageOptimizedClient.Query(sql)
q.DisableQueryCache = true
it, err := q.Read(ctx)
if err != nil {
t.Fatal(err)
}
if !it.IsAccelerated() {
t.Fatal("expected query to use Storage API")
}
type S struct {
URL NullString
Owner NullString
Forks NullInt64
}
var firstValue S
err = it.Next(&firstValue)
if err != nil {
t.Fatal(err)
}
if cmp.Equal(firstValue, S{}) {
t.Fatalf("user defined struct was not filled with data")
}
total, err := countIteratorRows(it)
if err != nil {
t.Fatal(err)
}
total++ // as we read the first value separately
bqSession := it.arrowIterator.session.bqSession
if len(bqSession.Streams) == 0 {
t.Fatalf("should use more than one stream but found %d", len(bqSession.Streams))
}
if total != it.TotalRows {
t.Fatalf("should have read %d rows, but read %d", it.TotalRows, total)
}
}
func TestIntegration_StorageReadCancel(t *testing.T) {
if client == nil {
t.Skip("Integration tests skipped")
}
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
table := "`bigquery-public-data.samples.github_timeline`"
sql := fmt.Sprintf(`SELECT repository_url as url, repository_owner as owner, repository_forks as forks FROM %s`, table)
storageOptimizedClient.rc.settings.maxWorkerCount = 1
q := storageOptimizedClient.Query(sql)
q.DisableQueryCache = true
q.forceStorageAPI = true
it, err := q.Read(ctx)
if err != nil {
t.Fatal(err)
}
if !it.IsAccelerated() {
t.Fatal("expected query to use Storage API")
}
// Cancel read after readings 1000 rows
rowsRead := 0
for {
var dst []Value
err := it.Next(&dst)
if err == iterator.Done {
break
}
if err != nil {
if errors.Is(err, context.DeadlineExceeded) ||
errors.Is(err, context.Canceled) {
break
}
t.Fatalf("failed to fetch via storage API: %v", err)
}
rowsRead++
if rowsRead > 1000 {
cancel()
}
}
// resources are cleaned asynchronously
time.Sleep(time.Second)
if !it.arrowIterator.isDone() {
t.Fatal("expected stream to be done")
}
}
func countIteratorRows(it *RowIterator) (total uint64, err error) {
for {
var dst []Value
err := it.Next(&dst)
if err == iterator.Done {
break
}
if err != nil {
return total, fmt.Errorf("failed to fetch via storage API: %w", err)
}
total++
}
return total, err
}
func checkRowsRead(it *RowIterator, expectedRows [][]Value) error {
if int(it.TotalRows) != len(expectedRows) {
return fmt.Errorf("expected %d rows, found %d", len(expectedRows), it.TotalRows)
}
for _, row := range expectedRows {
err := checkIteratorRead(it, row)
if err != nil {
return err
}
}
return nil
}
func checkIteratorRead(it *RowIterator, expectedRow []Value) error {
var outRow []Value
err := it.Next(&outRow)
if err == iterator.Done {
return nil
}
if err != nil {
return fmt.Errorf("failed to fetch via storage API: %v", err)
}
if len(outRow) != len(expectedRow) {
return fmt.Errorf("expected %d columns, but got %d", len(expectedRow), len(outRow))
}
if !testutil.Equal(outRow, expectedRow) {
return fmt.Errorf("got %v, want %v", outRow, expectedRow)
}
return nil
}