blob: 439d958d05e837dec6a4de4ffa616c0af1457cde [file] [log] [blame]
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package managedwriter
import (
"bytes"
"context"
"fmt"
"testing"
"cloud.google.com/go/bigquery"
)
// validateTableConstraints is used to validate properties of a table by computing stats using the query engine.
func validateTableConstraints(ctx context.Context, t *testing.T, client *bigquery.Client, table *bigquery.Table, description string, opts ...constraintOption) {
vi := &validationInfo{
constraints: make(map[string]*constraint),
}
for _, o := range opts {
o(vi)
}
if len(vi.constraints) == 0 {
t.Errorf("%q: no constraints were specified", description)
return
}
sql := new(bytes.Buffer)
sql.WriteString("SELECT\n")
var i int
for _, c := range vi.constraints {
if i > 0 {
sql.WriteString(",")
}
sql.WriteString(c.projection)
i++
}
sql.WriteString(fmt.Sprintf("\nFROM `%s`.%s.%s", table.ProjectID, table.DatasetID, table.TableID))
q := client.Query(sql.String())
it, err := q.Read(ctx)
if err != nil {
t.Errorf("%q: failed to issue validation query: %v", description, err)
return
}
var resultrow []bigquery.Value
err = it.Next(&resultrow)
if err != nil {
t.Errorf("%q: failed to get result row: %v", description, err)
return
}
for colname, con := range vi.constraints {
off := -1
for k, v := range it.Schema {
if v.Name == colname {
off = k
break
}
}
if off == -1 {
t.Errorf("%q: missing constraint %q from results", description, colname)
continue
}
val, ok := resultrow[off].(int64)
if !ok {
t.Errorf("%q: constraint %q type mismatch", description, colname)
}
if con.allowedError == 0 {
if val != con.expectedValue {
t.Errorf("%q: constraint %q mismatch, got %d want %d", description, colname, val, con.expectedValue)
}
continue
}
res := val - con.expectedValue
if res < 0 {
res = -res
}
if res > con.allowedError {
t.Errorf("%q: constraint %q outside error bound %d, got %d want %d", description, colname, con.allowedError, val, con.expectedValue)
}
}
}
// constraint is a specific table constraint.
type constraint struct {
// sql fragment that projects a result value
projection string
// all validation constraints must eval as int64.
expectedValue int64
// if nonzero, the constraint value must be within allowedError distance of expectedValue.
allowedError int64
}
// validationInfo is keyed by the result column name.
type validationInfo struct {
constraints map[string]*constraint
}
// constraintOption is for building validation rules.
type constraintOption func(*validationInfo)
// withExactRowCount asserts the exact total row count of the table.
func withExactRowCount(totalRows int64) constraintOption {
return func(vi *validationInfo) {
resultCol := "total_rows"
vi.constraints[resultCol] = &constraint{
projection: fmt.Sprintf("COUNT(1) AS %s", resultCol),
expectedValue: totalRows,
}
}
}
// withNullCount asserts the number of null values in a column.
func withNullCount(colname string, nullcount int64) constraintOption {
return func(vi *validationInfo) {
resultCol := fmt.Sprintf("nullcol_count_%s", colname)
vi.constraints[resultCol] = &constraint{
projection: fmt.Sprintf("COUNTIF(ISNULL(%s)) AS %s", colname, resultCol),
expectedValue: nullcount,
}
}
}
// withNonNullCount asserts the number of non null values in a column.
func withNonNullCount(colname string, nullcount int64) constraintOption {
return func(vi *validationInfo) {
resultCol := fmt.Sprintf("nonnullcol_count_%s", colname)
vi.constraints[resultCol] = &constraint{
projection: fmt.Sprintf("COUNTIF(NOT ISNULL(%s)) AS %s", colname, resultCol),
expectedValue: nullcount,
}
}
}
// withDistinctValues validates the exact cardinality of a column.
func withDistinctValues(colname string, distinctVals int64) constraintOption {
return func(vi *validationInfo) {
resultCol := fmt.Sprintf("distinct_count_%s", colname)
vi.constraints[resultCol] = &constraint{
projection: fmt.Sprintf("COUNT(DISTINCT %s) AS %s", colname, resultCol),
expectedValue: distinctVals,
}
}
}
// withApproxDistinctValues validates the approximate cardinality of a column with an error bound.
func withApproxDistinctValues(colname string, approxValues int64, errorBound int64) constraintOption {
return func(vi *validationInfo) {
resultCol := fmt.Sprintf("distinct_count_%s", colname)
vi.constraints[resultCol] = &constraint{
projection: fmt.Sprintf("APPROX_COUNT_DISTINCT(%s) AS %s", colname, resultCol),
expectedValue: approxValues,
allowedError: errorBound,
}
}
}