blob: fa8e4013705c241498beeba0425675ecb61082ac [file] [log] [blame]
// Copyright 2016 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"bytes"
"context"
"encoding/csv"
"fmt"
"strings"
"testing"
"time"
"cloud.google.com/go/bigtable"
"cloud.google.com/go/bigtable/bttest"
"cloud.google.com/go/internal/testutil"
"github.com/google/go-cmp/cmp"
"google.golang.org/api/option"
"google.golang.org/grpc"
)
func TestParseDuration(t *testing.T) {
tests := []struct {
in string
// out or fail are mutually exclusive
out time.Duration
fail bool
}{
{in: "10ms", out: 10 * time.Millisecond},
{in: "3s", out: 3 * time.Second},
{in: "60m", out: 60 * time.Minute},
{in: "12h", out: 12 * time.Hour},
{in: "7d", out: 168 * time.Hour},
{in: "", fail: true},
{in: "0", fail: true},
{in: "7ns", fail: true},
{in: "14mo", fail: true},
{in: "3.5h", fail: true},
{in: "106752d", fail: true}, // overflow
}
for _, tc := range tests {
got, err := parseDuration(tc.in)
if !tc.fail && err != nil {
t.Errorf("parseDuration(%q) unexpectedly failed: %v", tc.in, err)
continue
}
if tc.fail && err == nil {
t.Errorf("parseDuration(%q) did not fail", tc.in)
continue
}
if tc.fail {
continue
}
if got != tc.out {
t.Errorf("parseDuration(%q) = %v, want %v", tc.in, got, tc.out)
}
}
}
func TestParseArgs(t *testing.T) {
got, err := parseArgs([]string{"a=1", "b=2"}, []string{"a", "b"})
if err != nil {
t.Fatal(err)
}
want := map[string]string{"a": "1", "b": "2"}
if !testutil.Equal(got, want) {
t.Fatalf("got %v, want %v", got, want)
}
if _, err := parseArgs([]string{"a1"}, []string{"a1"}); err == nil {
t.Error("malformed: got nil, want error")
}
if _, err := parseArgs([]string{"a=1"}, []string{"b"}); err == nil {
t.Error("invalid: got nil, want error")
}
}
func TestParseColumnsFilter(t *testing.T) {
tests := []struct {
in string
out bigtable.Filter
fail bool
}{
{
in: "columnA",
out: bigtable.ColumnFilter("columnA"),
},
{
in: "familyA:columnA",
out: bigtable.ChainFilters(bigtable.FamilyFilter("familyA"), bigtable.ColumnFilter("columnA")),
},
{
in: "columnA,columnB",
out: bigtable.InterleaveFilters(bigtable.ColumnFilter("columnA"), bigtable.ColumnFilter("columnB")),
},
{
in: "familyA:columnA,columnB",
out: bigtable.InterleaveFilters(
bigtable.ChainFilters(bigtable.FamilyFilter("familyA"), bigtable.ColumnFilter("columnA")),
bigtable.ColumnFilter("columnB"),
),
},
{
in: "columnA,familyB:columnB",
out: bigtable.InterleaveFilters(
bigtable.ColumnFilter("columnA"),
bigtable.ChainFilters(bigtable.FamilyFilter("familyB"), bigtable.ColumnFilter("columnB")),
),
},
{
in: "familyA:columnA,familyB:columnB",
out: bigtable.InterleaveFilters(
bigtable.ChainFilters(bigtable.FamilyFilter("familyA"), bigtable.ColumnFilter("columnA")),
bigtable.ChainFilters(bigtable.FamilyFilter("familyB"), bigtable.ColumnFilter("columnB")),
),
},
{
in: "familyA:",
out: bigtable.FamilyFilter("familyA"),
},
{
in: ":columnA",
out: bigtable.ColumnFilter("columnA"),
},
{
in: ",:columnA,,familyB:columnB,",
out: bigtable.InterleaveFilters(
bigtable.ColumnFilter("columnA"),
bigtable.ChainFilters(bigtable.FamilyFilter("familyB"), bigtable.ColumnFilter("columnB")),
),
},
{
in: "familyA:columnA:cellA",
fail: true,
},
{
in: "familyA::columnA",
fail: true,
},
}
for _, tc := range tests {
got, err := parseColumnsFilter(tc.in)
if !tc.fail && err != nil {
t.Errorf("parseColumnsFilter(%q) unexpectedly failed: %v", tc.in, err)
continue
}
if tc.fail && err == nil {
t.Errorf("parseColumnsFilter(%q) did not fail", tc.in)
continue
}
if tc.fail {
continue
}
var cmpOpts cmp.Options
cmpOpts =
append(
cmpOpts,
cmp.AllowUnexported(bigtable.ChainFilters([]bigtable.Filter{}...)),
cmp.AllowUnexported(bigtable.InterleaveFilters([]bigtable.Filter{}...)))
if !cmp.Equal(got, tc.out, cmpOpts) {
t.Errorf("parseColumnsFilter(%q) = %v, want %v", tc.in, got, tc.out)
}
}
}
// Check if we get a substring of the expected error.
// Returns "" if so, else returns the expected substring and error.
func matchesExpectedError(want string, err error) string {
if err != nil {
got := err.Error()
if want == "" || !strings.Contains(got, want) {
return fmt.Sprintf("expected error substr:%s, got:%s", want, got)
}
} else if want != "" {
return fmt.Sprintf("expected error substr:%s", want)
}
return ""
}
func TestCsvImporterArgs(t *testing.T) {
tests := []struct {
in []string
out importerArgs
err string
}{
{in: []string{"my-table", "my-file.csv"}, out: importerArgs{"", "", 500, 1}},
{in: []string{"my-table", "my-file.csv", "app-profile="}, out: importerArgs{"", "", 500, 1}},
{in: []string{"my-table", "my-file.csv", "app-profile=my-ap", "column-family=my-family", "batch-size=100", "workers=20"},
out: importerArgs{"my-ap", "my-family", 100, 20}},
{in: []string{}, err: "usage: cbt import <table-id> <input-file> [app-profile=<app-profile-id>] [column-family=<family-name>] [batch-size=<500>] [workers=<1>]"},
{in: []string{"my-table", "my-file.csv", "column-family="}, err: "column-family cannot be ''"},
{in: []string{"my-table", "my-file.csv", "batch-size=-5"}, err: "batch-size must be > 0 and <= 100000"},
{in: []string{"my-table", "my-file.csv", "batch-size=5000000"}, err: "batch-size must be > 0 and <= 100000"},
{in: []string{"my-table", "my-file.csv", "batch-size=nan"}, err: "batch-size must be > 0 and <= 100000"},
{in: []string{"my-table", "my-file.csv", "batch-size="}, err: "batch-size must be > 0 and <= 100000"},
{in: []string{"my-table", "my-file.csv", "workers=0"}, err: "workers must be > 0, err:%!s(<nil>)"},
{in: []string{"my-table", "my-file.csv", "workers=nan"}, err: "workers must be > 0, err:strconv.Atoi: parsing \"nan\": invalid syntax"},
{in: []string{"my-table", "my-file.csv", "workers="}, err: "workers must be > 0, err:strconv.Atoi: parsing \"\": invalid syntax"},
}
for _, tc := range tests {
got, err := parseImporterArgs(context.Background(), tc.in)
if e := matchesExpectedError(tc.err, err); e != "" {
t.Errorf("%s", e)
continue
}
if tc.err != "" {
continue // received expected error, do not parse below
}
if got.appProfile != tc.out.appProfile ||
got.fam != tc.out.fam ||
got.sz != tc.out.sz ||
got.workers != tc.out.workers {
t.Errorf("parseImportArgs(%q) did not fail, out: %q", tc.in, got)
}
}
}
func transformToCsvBuffer(data [][]string) ([]byte, error) {
if len(data) == 0 {
return nil, fmt.Errorf("Data cannot be empty")
}
var buf bytes.Buffer
csvWriter := csv.NewWriter(&buf)
if err := csvWriter.WriteAll(data); err != nil {
return nil, err
}
csvWriter.Flush()
if err := csvWriter.Error(); err != nil {
return nil, err
}
return buf.Bytes(), nil
}
func TestCsvHeaderParser(t *testing.T) {
tests := []struct {
label string
iData [][]string
iFam string
oFams []string
oCols []string
nextLine []string
err string
}{
{label: "extend-family-gap",
iData: [][]string{{"", "my-family", "", "my-family-2"}, {"", "col-1", "col-2", "col-3"}, {"rk-1", "A", "", ""}},
iFam: "",
oFams: []string{"", "my-family", "my-family", "my-family-2"},
oCols: []string{"", "col-1", "col-2", "col-3"},
nextLine: []string{"rk-1", "A", "", ""}},
{label: "handle-family-arg",
iData: [][]string{{"", "col-1", "col-2"}, {"rk-1", "A", ""}},
iFam: "arg-family",
oFams: []string{"", "arg-family", "arg-family"},
oCols: []string{"", "col-1", "col-2"},
nextLine: []string{"rk-1", "A", ""}},
{label: "eof-header-family",
iData: [][]string{{""}},
iFam: "",
err: "family header reader error:EOF"},
{label: "eof-header-column",
iData: [][]string{{""}, {""}},
iFam: "arg-family",
err: "columns header reader error:EOF"},
{label: "rowkey-in-header-row",
iData: [][]string{{"ABC", "my-family", ""}},
iFam: "arg-family",
err: "the first column must be empty for column-family and column name rows"},
{label: "blank-first-headers",
iData: [][]string{{"", "", ""}},
iFam: "arg-family",
err: "the second column (first data column) must have values for column family and column name rows if present"},
}
for _, tc := range tests {
// create in memory csv like file
byteData, err := transformToCsvBuffer(tc.iData)
if err != nil {
t.Fatal(err)
}
reader := csv.NewReader(bytes.NewReader(byteData))
fams, cols, err := parseCsvHeaders(reader, tc.iFam)
if e := matchesExpectedError(tc.err, err); e != "" {
t.Errorf("%s %s", tc.label, e)
continue
}
if tc.err != "" {
continue // received expected error, do not parse below
}
line, _ := reader.Read()
if err != nil {
t.Errorf("Next line for reader error, got: %q, expect: %q, error:%s", line, tc.nextLine, err)
continue
}
if len(fams) != len(tc.oFams) ||
len(cols) != len(tc.oCols) ||
len(line) != len(tc.nextLine) {
t.Errorf("parseCsvHeaders() did not fail, incorrect output sizes found, fams: %d, cols:%d, line:%d", len(fams), len(cols), len(line))
continue
}
for i, f := range fams {
if f != tc.oFams[i] {
t.Errorf("Incorrect column families idx:%d, got: %q, want %q", i, fams[i], tc.oFams[i])
continue
}
}
for i, c := range cols {
if c != tc.oCols[i] {
t.Errorf("parseCsvHeaders() did not fail for column names idx:%d, got: %q, want %q", i, cols[i], tc.oCols[i])
continue
}
}
for i, v := range line {
if v != tc.nextLine[i] {
t.Errorf("parseCsvHeaders() did not fail for next line idx:%d, got: %q, want %q", i, cols[i], tc.oCols[i])
continue
}
}
}
}
func setupEmulator(t *testing.T, tables, families []string) (context.Context, *bigtable.Client) {
srv, err := bttest.NewServer("localhost:0")
if err != nil {
t.Fatalf("Error starting bttest server: %s", err)
}
ctx := context.Background()
conn, err := grpc.Dial(srv.Addr, grpc.WithInsecure())
if err != nil {
t.Fatalf("Error %s", err)
}
proj, instance := "proj", "instance"
adminClient, err := bigtable.NewAdminClient(ctx, proj, instance, option.WithGRPCConn(conn))
if err != nil {
t.Fatalf("Error %s", err)
}
for _, ta := range tables {
if err = adminClient.CreateTable(ctx, ta); err != nil {
t.Fatalf("Error %s", err)
}
for _, f := range families {
if err = adminClient.CreateColumnFamily(ctx, ta, f); err != nil {
t.Fatalf("Error %s", err)
}
}
}
client, err := bigtable.NewClient(ctx, proj, instance, option.WithGRPCConn(conn))
if err != nil {
t.Fatalf("Error %s", err)
}
return ctx, client
}
func validateData(ctx context.Context, tbl *bigtable.Table, fams, cols []string, rowData [][]string) error {
// vaildate table entries, valMap["rowkey:family:column"] = mutation value
valMap := make(map[string]string)
for _, row := range rowData {
for i, val := range row {
if i > 0 && val != "" {
valMap[row[0]+":"+fams[i]+":"+cols[i]] = val
}
}
}
for _, data := range rowData {
row, err := tbl.ReadRow(ctx, data[0])
if err != nil {
return err
}
for _, cf := range row {
for _, column := range cf {
k := data[0] + ":" + string(column.Column)
v, ok := valMap[k]
if ok && v == string(column.Value) {
delete(valMap, k)
}
}
}
}
if len(valMap) != 0 {
return fmt.Errorf("Data didn't match after read, not found %v", valMap)
}
return nil
}
func TestCsvParseAndWrite(t *testing.T) {
ctx, client := setupEmulator(t, []string{"my-table"}, []string{"my-family", "my-family-2"})
tbl := client.Open("my-table")
fams := []string{"", "my-family", "my-family-2"}
cols := []string{"", "col-1", "col-2"}
rowData := [][]string{
{"rk-0", "A", "B"},
{"rk-1", "", "C"},
}
byteData, err := transformToCsvBuffer(rowData)
if err != nil {
t.Fatal(err)
}
reader := csv.NewReader(bytes.NewReader(byteData))
sr := safeReader{r: reader}
if err = sr.parseAndWrite(ctx, tbl, fams, cols, 1, 1, 1); err != nil {
t.Fatalf("parseAndWrite() failed unexpectedly, error:%s", err)
}
if err := validateData(ctx, tbl, fams, cols, rowData); err != nil {
t.Fatalf("Read back validation error:%s", err)
}
}
func TestCsvParseAndWriteBadFamily(t *testing.T) {
ctx, client := setupEmulator(t, []string{"my-table"}, []string{"my-family"})
tbl := client.Open("my-table")
fams := []string{"", "my-family", "not-my-family"}
cols := []string{"", "col-1", "col-2"}
rowData := [][]string{
{"rk-0", "A", "B"},
{"rk-1", "", "C"},
}
byteData, err := transformToCsvBuffer(rowData)
if err != nil {
t.Fatal(err)
}
reader := csv.NewReader(bytes.NewReader(byteData))
sr := safeReader{r: reader}
if err = sr.parseAndWrite(ctx, tbl, fams, cols, 1, 1, 1); err == nil {
t.Fatalf("parseAndWrite() should have failed with non-existant column family")
}
}
func TestCsvParseAndWriteDuplicateRowkeys(t *testing.T) {
ctx, client := setupEmulator(t, []string{"my-table"}, []string{"my-family"})
tbl := client.Open("my-table")
fams := []string{"", "my-family", "my-family"}
cols := []string{"", "col-1", "col-2"}
rowData := [][]string{
{"rk-0", "A", ""},
{"rk-0", "", "B"},
{"rk-0", "C", ""},
}
byteData, err := transformToCsvBuffer(rowData)
if err != nil {
t.Fatal(err)
}
reader := csv.NewReader(bytes.NewReader(byteData))
sr := safeReader{r: reader}
if err = sr.parseAndWrite(ctx, tbl, fams, cols, 1, 1, 1); err != nil {
t.Fatalf("parseAndWrite() should not have failed for duplicate rowkeys: %s", err)
}
// the "A" not present result is expected, the emulator only keeps 1 version
valMap := map[string]bool{"rk-0:my-family:col-2:B": true, "rk-0:my-family:col-1:C": true}
row, err := tbl.ReadRow(ctx, "rk-0")
if err != nil {
t.Errorf("error %s", err)
}
for _, cf := range row { // each column family in row
for _, column := range cf { // each cf:column, aka each mutation
k := "rk-0:" + string(column.Column) + ":" + string(column.Value)
_, ok := valMap[k]
if ok {
delete(valMap, k)
continue
}
t.Errorf("row data not found for %s\n", k)
}
}
if len(valMap) != 0 {
t.Fatalf("values were not present in table: %v", valMap)
}
}
func TestCsvToCbt(t *testing.T) {
tests := []struct {
label string
ia importerArgs
csvData [][]string
expectedFams []string
dataStartIdx int
}{
{
label: "has-column-families",
ia: importerArgs{fam: "", sz: 1, workers: 1},
csvData: [][]string{
{"", "my-family", ""},
{"", "col-1", "col-2"},
{"rk-0", "A", ""},
{"rk-1", "", "B"},
{"rk-2", "", ""},
{"rk-3", "C", ""},
},
expectedFams: []string{"", "my-family", "my-family"},
dataStartIdx: 2,
},
{
label: "no-column-families",
ia: importerArgs{fam: "arg-family", sz: 1, workers: 1},
csvData: [][]string{
{"", "col-1", "col-2"},
{"rk-0", "A", ""},
{"rk-1", "", "B"},
{"rk-2", "", ""},
{"rk-3", "C", "D"},
},
expectedFams: []string{"", "arg-family", "arg-family"},
dataStartIdx: 1,
},
{
label: "larger-batches",
ia: importerArgs{fam: "arg-family", sz: 100, workers: 1},
csvData: [][]string{
{"", "col-1", "col-2"},
{"rk-0", "A", ""},
{"rk-1", "", "B"},
{"rk-2", "", ""},
{"rk-3", "C", "D"},
},
expectedFams: []string{"", "arg-family", "arg-family"},
dataStartIdx: 1,
},
{
label: "many-workers",
ia: importerArgs{fam: "arg-family", sz: 1, workers: 20},
csvData: [][]string{
{"", "col-1", "col-2"},
{"rk-0", "A", ""},
{"rk-1", "", "B"},
{"rk-2", "", ""},
{"rk-3", "C", "D"},
},
expectedFams: []string{"", "arg-family", "arg-family"},
dataStartIdx: 1,
},
}
for _, tc := range tests {
ctx, client := setupEmulator(t, []string{"my-table"}, []string{"my-family", "arg-family"})
tbl := client.Open("my-table")
byteData, err := transformToCsvBuffer(tc.csvData)
if err != nil {
t.Fatal(err)
}
reader := csv.NewReader(bytes.NewReader(byteData))
importCSV(ctx, tbl, reader, tc.ia)
if err := validateData(ctx, tbl, tc.expectedFams, tc.csvData[tc.dataStartIdx-1], tc.csvData[tc.dataStartIdx:]); err != nil {
t.Fatalf("Read back validation error: %s", err)
}
}
}