| // Copyright 2023 Google LLC |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package bigquery |
| |
| import ( |
| "context" |
| "errors" |
| "fmt" |
| "testing" |
| "time" |
| |
| "cloud.google.com/go/internal/testutil" |
| "github.com/apache/arrow/go/v15/arrow" |
| "github.com/apache/arrow/go/v15/arrow/array" |
| "github.com/apache/arrow/go/v15/arrow/ipc" |
| "github.com/apache/arrow/go/v15/arrow/math" |
| "github.com/apache/arrow/go/v15/arrow/memory" |
| "github.com/google/go-cmp/cmp" |
| "google.golang.org/api/iterator" |
| ) |
| |
| func TestIntegration_StorageReadBasicTypes(t *testing.T) { |
| if client == nil { |
| t.Skip("Integration tests skipped") |
| } |
| ctx := context.Background() |
| |
| initQueryParameterTestCases() |
| |
| for _, c := range queryParameterTestCases { |
| t.Run(c.name, func(t *testing.T) { |
| q := storageOptimizedClient.Query(c.query) |
| q.Parameters = c.parameters |
| q.forceStorageAPI = true |
| it, err := q.Read(ctx) |
| if err != nil { |
| t.Fatal(err) |
| } |
| err = checkIteratorRead(it, c.wantRow) |
| if err != nil { |
| t.Fatalf("%s: error on query `%s`[%v]: %v", it.SourceJob().ID(), c.query, c.parameters, err) |
| } |
| if !it.IsAccelerated() { |
| t.Fatalf("%s: expected storage api to be used", it.SourceJob().ID()) |
| } |
| }) |
| } |
| } |
| |
| func TestIntegration_StorageReadEmptyResultSet(t *testing.T) { |
| if client == nil { |
| t.Skip("Integration tests skipped") |
| } |
| ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) |
| defer cancel() |
| |
| table := storageOptimizedClient.Dataset(dataset.DatasetID).Table(tableIDs.New()) |
| err := table.Create(ctx, &TableMetadata{ |
| Schema: Schema{ |
| {Name: "name", Type: StringFieldType, Required: true}, |
| }, |
| }) |
| if err != nil { |
| t.Fatal(err) |
| } |
| defer table.Delete(ctx) |
| |
| it := table.Read(ctx) |
| err = checkIteratorRead(it, []Value{}) |
| if err != nil { |
| t.Fatalf("failed to read empty table: %v", err) |
| } |
| if !it.IsAccelerated() { |
| t.Fatal("expected storage api to be used") |
| } |
| } |
| |
| func TestIntegration_StorageReadFromSources(t *testing.T) { |
| if client == nil { |
| t.Skip("Integration tests skipped") |
| } |
| ctx := context.Background() |
| |
| dstTable := dataset.Table(tableIDs.New()) |
| dstTable.c = storageOptimizedClient |
| |
| sql := `SELECT 1 as num, 'one' as str |
| UNION ALL |
| SELECT 2 as num, 'two' as str |
| UNION ALL |
| SELECT 3 as num, 'three' as str |
| ORDER BY num` |
| q := storageOptimizedClient.Query(sql) |
| q.Dst = dstTable |
| job, err := q.Run(ctx) |
| if err != nil { |
| t.Fatal(err) |
| } |
| status, err := job.Wait(ctx) |
| if err != nil { |
| t.Fatal(err) |
| } |
| if err := status.Err(); err != nil { |
| t.Fatal(err) |
| } |
| expectedRows := [][]Value{ |
| {int64(1), "one"}, |
| {int64(2), "two"}, |
| {int64(3), "three"}, |
| } |
| tableRowIt := dstTable.Read(ctx) |
| if err = checkRowsRead(tableRowIt, expectedRows); err != nil { |
| t.Fatalf("checkRowsRead(table): %v", err) |
| } |
| if !tableRowIt.IsAccelerated() { |
| t.Fatalf("reading from table should use Storage API") |
| } |
| jobRowIt, err := job.Read(ctx) |
| if err != nil { |
| t.Fatalf("ReadJobResults(job): %v", err) |
| } |
| if err = checkRowsRead(jobRowIt, expectedRows); err != nil { |
| t.Fatalf("checkRowsRead(job): %v", err) |
| } |
| if !jobRowIt.IsAccelerated() { |
| t.Fatalf("reading job should use Storage API") |
| } |
| q.Dst = nil |
| q.forceStorageAPI = true |
| qRowIt, err := q.Read(ctx) |
| if err != nil { |
| t.Fatalf("ReadQuery(query): %v", err) |
| } |
| if !qRowIt.IsAccelerated() { |
| t.Fatalf("reading query should use Storage API") |
| } |
| if err = checkRowsRead(qRowIt, expectedRows); err != nil { |
| t.Fatalf("checkRowsRead(query): %v", err) |
| } |
| } |
| |
| func TestIntegration_StorageReadScriptJob(t *testing.T) { |
| if client == nil { |
| t.Skip("Integration tests skipped") |
| } |
| tableID := tableIDs.New() |
| ctx := context.Background() |
| |
| sql := fmt.Sprintf(` |
| -- Statement 0 |
| DECLARE x INT64; |
| SET x = 4; |
| -- Statement 1 |
| SELECT 1 as foo; |
| -- Statement 2 |
| SELECT 1 as num, 'one' as str |
| UNION ALL |
| SELECT 2 as num, 'two' as str; |
| -- Statement 3 |
| SELECT 1 as num, 'one' as str |
| UNION ALL |
| SELECT 2 as num, 'two' as str |
| UNION ALL |
| SELECT 3 as num, 'three' as str |
| UNION ALL |
| SELECT x as num, 'four' as str |
| ORDER BY num; |
| -- Statement 4 |
| CREATE TABLE %s.%s ( num INT64, str STRING ); |
| -- Statement 5 |
| DROP TABLE %s.%s; |
| `, dataset.DatasetID, tableID, dataset.DatasetID, tableID) |
| q := storageOptimizedClient.Query(sql) |
| q.forceStorageAPI = true |
| it, err := q.Read(ctx) |
| if err != nil { |
| t.Fatal(err) |
| } |
| expectedRows := [][]Value{ |
| {int64(1), "one"}, |
| {int64(2), "two"}, |
| {int64(3), "three"}, |
| {int64(4), "four"}, |
| } |
| if err = checkRowsRead(it, expectedRows); err != nil { |
| t.Fatalf("checkRowsRead(it): %v", err) |
| } |
| if !it.IsAccelerated() { |
| t.Fatalf("reading job should use Storage API") |
| } |
| } |
| |
| func TestIntegration_StorageReadQueryOrdering(t *testing.T) { |
| if client == nil { |
| t.Skip("Integration tests skipped") |
| } |
| ctx := context.Background() |
| |
| table := "`bigquery-public-data.usa_names.usa_1910_current`" |
| testCases := []struct { |
| name string |
| query string |
| maxExpectedStreams int |
| }{ |
| { |
| name: "Non_Ordered_Query", |
| query: fmt.Sprintf(`SELECT name, number, state FROM %s`, table), |
| maxExpectedStreams: -1, // No limit |
| }, |
| { |
| name: "Ordered_Query", |
| query: fmt.Sprintf(`SELECT name, number, state FROM %s order by name`, table), |
| maxExpectedStreams: 1, |
| }, |
| } |
| |
| type S struct { |
| Name string |
| Number int |
| State string |
| } |
| |
| for _, tc := range testCases { |
| q := storageOptimizedClient.Query(tc.query) |
| q.forceStorageAPI = true |
| |
| it, err := q.Read(ctx) |
| if err != nil { |
| t.Fatal(err) |
| } |
| |
| var firstValue S |
| err = it.Next(&firstValue) |
| if err != nil { |
| t.Fatal(err) |
| } |
| |
| if cmp.Equal(firstValue, S{}) { |
| t.Fatalf("user defined struct was not filled with data") |
| } |
| |
| total, err := countIteratorRows(it) |
| if err != nil { |
| t.Fatal(err) |
| } |
| total++ // as we read the first value separately |
| |
| session := it.arrowIterator.(*storageArrowIterator).session |
| bqSession := session.bqSession |
| if len(bqSession.Streams) == 0 { |
| t.Fatalf("%s: expected to use at least one stream but found %d", tc.name, len(bqSession.Streams)) |
| } |
| streamSettings := session.settings.maxStreamCount |
| if tc.maxExpectedStreams > 0 { |
| if streamSettings > tc.maxExpectedStreams { |
| t.Fatalf("%s: expected stream settings to be at most %d streams but found %d", tc.name, tc.maxExpectedStreams, streamSettings) |
| } |
| if len(bqSession.Streams) > tc.maxExpectedStreams { |
| t.Fatalf("%s: expected server to set up at most %d streams but found %d", tc.name, tc.maxExpectedStreams, len(bqSession.Streams)) |
| } |
| } else { |
| if streamSettings != 0 { |
| t.Fatalf("%s: expected stream settings to be 0 (server defines amount of stream) but found %d", tc.name, streamSettings) |
| } |
| } |
| if total != it.TotalRows { |
| t.Fatalf("%s: should have read %d rows, but read %d", tc.name, it.TotalRows, total) |
| } |
| if !it.IsAccelerated() { |
| t.Fatalf("%s: expected query to be accelerated by Storage API", tc.name) |
| } |
| } |
| } |
| |
| func TestIntegration_StorageReadQueryStruct(t *testing.T) { |
| if client == nil { |
| t.Skip("Integration tests skipped") |
| } |
| ctx := context.Background() |
| table := "`bigquery-public-data.samples.wikipedia`" |
| sql := fmt.Sprintf(`SELECT id, title, timestamp, comment FROM %s LIMIT 1000`, table) |
| q := storageOptimizedClient.Query(sql) |
| q.forceStorageAPI = true |
| q.DisableQueryCache = true |
| it, err := q.Read(ctx) |
| if err != nil { |
| t.Fatal(err) |
| } |
| if !it.IsAccelerated() { |
| t.Fatal("expected query to use Storage API") |
| } |
| |
| type S struct { |
| ID int64 |
| Title string |
| Timestamp int64 |
| Comment NullString |
| } |
| |
| total := uint64(0) |
| for { |
| var dst S |
| err := it.Next(&dst) |
| if err == iterator.Done { |
| break |
| } |
| if err != nil { |
| t.Fatalf("failed to fetch via storage API: %v", err) |
| } |
| if cmp.Equal(dst, S{}) { |
| t.Fatalf("user defined struct was not filled with data") |
| } |
| total++ |
| } |
| |
| bqSession := it.arrowIterator.(*storageArrowIterator).session.bqSession |
| if len(bqSession.Streams) == 0 { |
| t.Fatalf("should use more than one stream but found %d", len(bqSession.Streams)) |
| } |
| if total != it.TotalRows { |
| t.Fatalf("should have read %d rows, but read %d", it.TotalRows, total) |
| } |
| } |
| |
| func TestIntegration_StorageReadQueryMorePages(t *testing.T) { |
| if client == nil { |
| t.Skip("Integration tests skipped") |
| } |
| ctx := context.Background() |
| table := "`bigquery-public-data.samples.github_timeline`" |
| sql := fmt.Sprintf(`SELECT repository_url as url, repository_owner as owner, repository_forks as forks FROM %s`, table) |
| // Don't forceStorageAPI usage and still see internally Storage API is selected |
| q := storageOptimizedClient.Query(sql) |
| q.DisableQueryCache = true |
| it, err := q.Read(ctx) |
| if err != nil { |
| t.Fatal(err) |
| } |
| if !it.IsAccelerated() { |
| t.Fatal("expected query to use Storage API") |
| } |
| |
| type S struct { |
| URL NullString |
| Owner NullString |
| Forks NullInt64 |
| } |
| |
| var firstValue S |
| err = it.Next(&firstValue) |
| if err != nil { |
| t.Fatal(err) |
| } |
| |
| if cmp.Equal(firstValue, S{}) { |
| t.Fatalf("user defined struct was not filled with data") |
| } |
| |
| total, err := countIteratorRows(it) |
| if err != nil { |
| t.Fatal(err) |
| } |
| total++ // as we read the first value separately |
| |
| bqSession := it.arrowIterator.(*storageArrowIterator).session.bqSession |
| if len(bqSession.Streams) == 0 { |
| t.Fatalf("should use more than one stream but found %d", len(bqSession.Streams)) |
| } |
| if total != it.TotalRows { |
| t.Fatalf("should have read %d rows, but read %d", it.TotalRows, total) |
| } |
| } |
| |
| func TestIntegration_StorageReadCancel(t *testing.T) { |
| if client == nil { |
| t.Skip("Integration tests skipped") |
| } |
| ctx := context.Background() |
| ctx, cancel := context.WithCancel(ctx) |
| defer cancel() |
| table := "`bigquery-public-data.samples.github_timeline`" |
| sql := fmt.Sprintf(`SELECT repository_url as url, repository_owner as owner, repository_forks as forks FROM %s`, table) |
| storageOptimizedClient.rc.settings.maxWorkerCount = 1 |
| q := storageOptimizedClient.Query(sql) |
| q.DisableQueryCache = true |
| q.forceStorageAPI = true |
| it, err := q.Read(ctx) |
| if err != nil { |
| t.Fatal(err) |
| } |
| if !it.IsAccelerated() { |
| t.Fatal("expected query to use Storage API") |
| } |
| |
| // Cancel read after readings 1000 rows |
| rowsRead := 0 |
| for { |
| var dst []Value |
| err := it.Next(&dst) |
| if err == iterator.Done { |
| break |
| } |
| if err != nil { |
| if errors.Is(err, context.DeadlineExceeded) || |
| errors.Is(err, context.Canceled) { |
| break |
| } |
| t.Fatalf("failed to fetch via storage API: %v", err) |
| } |
| rowsRead++ |
| if rowsRead > 1000 { |
| cancel() |
| } |
| } |
| // resources are cleaned asynchronously |
| time.Sleep(time.Second) |
| arrowIt := it.arrowIterator.(*storageArrowIterator) |
| if !arrowIt.isDone() { |
| t.Fatal("expected stream to be done") |
| } |
| } |
| |
| func TestIntegration_StorageReadArrow(t *testing.T) { |
| if client == nil { |
| t.Skip("Integration tests skipped") |
| } |
| ctx := context.Background() |
| table := "`bigquery-public-data.usa_names.usa_1910_current`" |
| sql := fmt.Sprintf(`SELECT name, number, state FROM %s where state = "CA"`, table) |
| |
| q := storageOptimizedClient.Query(sql) |
| job, err := q.Run(ctx) // force usage of Storage API by skipping fast paths |
| if err != nil { |
| t.Fatal(err) |
| } |
| it, err := job.Read(ctx) |
| if err != nil { |
| t.Fatal(err) |
| } |
| |
| checkedAllocator := memory.NewCheckedAllocator(memory.DefaultAllocator) |
| it.arrowDecoder.allocator = checkedAllocator |
| defer checkedAllocator.AssertSize(t, 0) |
| |
| arrowIt, err := it.ArrowIterator() |
| if err != nil { |
| t.Fatalf("expected iterator to be accelerated: %v", err) |
| } |
| arrowItReader := NewArrowIteratorReader(arrowIt) |
| |
| records := []arrow.Record{} |
| r, err := ipc.NewReader(arrowItReader, ipc.WithAllocator(checkedAllocator)) |
| numrec := 0 |
| for r.Next() { |
| rec := r.Record() |
| rec.Retain() |
| defer rec.Release() |
| records = append(records, rec) |
| numrec += int(rec.NumRows()) |
| } |
| r.Release() |
| |
| arrowSchema := r.Schema() |
| arrowTable := array.NewTableFromRecords(arrowSchema, records) |
| defer arrowTable.Release() |
| if arrowTable.NumRows() != int64(it.TotalRows) { |
| t.Fatalf("should have a table with %d rows, but found %d", it.TotalRows, arrowTable.NumRows()) |
| } |
| if arrowTable.NumCols() != 3 { |
| t.Fatalf("should have a table with 3 columns, but found %d", arrowTable.NumCols()) |
| } |
| |
| sumSQL := fmt.Sprintf(`SELECT sum(number) as total FROM %s where state = "CA"`, table) |
| sumQuery := client.Query(sumSQL) |
| sumIt, err := sumQuery.Read(ctx) |
| if err != nil { |
| t.Fatal(err) |
| } |
| sumValues := []Value{} |
| err = sumIt.Next(&sumValues) |
| if err != nil { |
| t.Fatal(err) |
| } |
| totalFromSQL := sumValues[0].(int64) |
| |
| tr := array.NewTableReader(arrowTable, arrowTable.NumRows()) |
| defer tr.Release() |
| var totalFromArrow int64 |
| for tr.Next() { |
| rec := tr.Record() |
| vec := rec.Column(1).(*array.Int64) |
| totalFromArrow += math.Int64.Sum(vec) |
| } |
| if totalFromArrow != totalFromSQL { |
| t.Fatalf("expected total to be %d, but with arrow we got %d", totalFromSQL, totalFromArrow) |
| } |
| } |
| |
| func countIteratorRows(it *RowIterator) (total uint64, err error) { |
| for { |
| var dst []Value |
| err := it.Next(&dst) |
| if err == iterator.Done { |
| break |
| } |
| if err != nil { |
| return total, fmt.Errorf("failed to fetch via storage API: %w", err) |
| } |
| total++ |
| } |
| return total, err |
| } |
| |
| func checkRowsRead(it *RowIterator, expectedRows [][]Value) error { |
| if int(it.TotalRows) != len(expectedRows) { |
| return fmt.Errorf("expected %d rows, found %d", len(expectedRows), it.TotalRows) |
| } |
| for _, row := range expectedRows { |
| err := checkIteratorRead(it, row) |
| if err != nil { |
| return err |
| } |
| } |
| return nil |
| } |
| |
| func checkIteratorRead(it *RowIterator, expectedRow []Value) error { |
| var outRow []Value |
| err := it.Next(&outRow) |
| if err == iterator.Done { |
| return nil |
| } |
| if err != nil { |
| return fmt.Errorf("failed to fetch via storage API: %v", err) |
| } |
| if len(outRow) != len(expectedRow) { |
| return fmt.Errorf("expected %d columns, but got %d", len(expectedRow), len(outRow)) |
| } |
| if !testutil.Equal(outRow, expectedRow) { |
| return fmt.Errorf("got %v, want %v", outRow, expectedRow) |
| } |
| return nil |
| } |