| /* |
| Copyright 2019 Google LLC |
| |
| Licensed under the Apache License, Version 2.0 (the "License"); |
| you may not use this file except in compliance with the License. |
| You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| package bigtable |
| |
| import ( |
| "context" |
| "flag" |
| "fmt" |
| "math" |
| "math/rand" |
| "sort" |
| "strings" |
| "sync" |
| "testing" |
| "time" |
| |
| "cloud.google.com/go/internal/testutil" |
| "github.com/golang/protobuf/proto" |
| "google.golang.org/api/iterator" |
| btapb "google.golang.org/genproto/googleapis/bigtable/admin/v2" |
| ) |
| |
| var presidentsSocialGraph = map[string][]string{ |
| "wmckinley": {"tjefferson"}, |
| "gwashington": {"jadams"}, |
| "tjefferson": {"gwashington", "jadams"}, |
| "jadams": {"gwashington", "tjefferson"}, |
| } |
| |
| func populatePresidentsGraph(table *Table) error { |
| ctx := context.Background() |
| for row, ss := range presidentsSocialGraph { |
| mut := NewMutation() |
| for _, name := range ss { |
| mut.Set("follows", name, 1000, []byte("1")) |
| } |
| if err := table.Apply(ctx, row, mut); err != nil { |
| return fmt.Errorf("Mutating row %q: %v", row, err) |
| } |
| } |
| return nil |
| } |
| |
| var instanceToCreate string |
| var instanceToCreateZone string |
| |
| func init() { |
| // Don't test instance creation by default, as quota is necessary and aborted tests could strand resources. |
| flag.StringVar(&instanceToCreate, "it.instance-to-create", "", |
| "The id of an instance to create, update and delete. Requires sufficient Cloud Bigtable quota. Requires that it.use-prod is true.") |
| flag.StringVar(&instanceToCreateZone, "it.instance-to-create-zone", "us-central1-b", |
| "The zone in which to create the new test instance.") |
| } |
| |
| func TestIntegration_ConditionalMutations(t *testing.T) { |
| ctx := context.Background() |
| _, _, table, _, cleanup, err := setupIntegration(ctx, t) |
| if err != nil { |
| t.Fatal(err) |
| } |
| defer cleanup() |
| |
| if err := populatePresidentsGraph(table); err != nil { |
| t.Fatal(err) |
| } |
| |
| // Do a conditional mutation with a complex filter. |
| mutTrue := NewMutation() |
| mutTrue.Set("follows", "wmckinley", 1000, []byte("1")) |
| filter := ChainFilters(ColumnFilter("gwash[iz].*"), ValueFilter(".")) |
| mut := NewCondMutation(filter, mutTrue, nil) |
| if err := table.Apply(ctx, "tjefferson", mut); err != nil { |
| t.Fatalf("Conditionally mutating row: %v", err) |
| } |
| // Do a second condition mutation with a filter that does not match, |
| // and thus no changes should be made. |
| mutTrue = NewMutation() |
| mutTrue.DeleteRow() |
| filter = ColumnFilter("snoop.dogg") |
| mut = NewCondMutation(filter, mutTrue, nil) |
| if err := table.Apply(ctx, "tjefferson", mut); err != nil { |
| t.Fatalf("Conditionally mutating row: %v", err) |
| } |
| |
| // Fetch a row. |
| row, err := table.ReadRow(ctx, "jadams") |
| if err != nil { |
| t.Fatalf("Reading a row: %v", err) |
| } |
| wantRow := Row{ |
| "follows": []ReadItem{ |
| {Row: "jadams", Column: "follows:gwashington", Timestamp: 1000, Value: []byte("1")}, |
| {Row: "jadams", Column: "follows:tjefferson", Timestamp: 1000, Value: []byte("1")}, |
| }, |
| } |
| if !testutil.Equal(row, wantRow) { |
| t.Fatalf("Read row mismatch.\n got %#v\nwant %#v", row, wantRow) |
| } |
| } |
| |
| func TestIntegration_PartialReadRows(t *testing.T) { |
| ctx := context.Background() |
| _, _, table, _, cleanup, err := setupIntegration(ctx, t) |
| if err != nil { |
| t.Fatal(err) |
| } |
| defer cleanup() |
| |
| if err := populatePresidentsGraph(table); err != nil { |
| t.Fatal(err) |
| } |
| |
| // Do a scan and stop part way through. |
| // Verify that the ReadRows callback doesn't keep running. |
| stopped := false |
| err = table.ReadRows(ctx, InfiniteRange(""), func(r Row) bool { |
| if r.Key() < "h" { |
| return true |
| } |
| if !stopped { |
| stopped = true |
| return false |
| } |
| t.Fatalf("ReadRows kept scanning to row %q after being told to stop", r.Key()) |
| return false |
| }) |
| if err != nil { |
| t.Fatalf("Partial ReadRows: %v", err) |
| } |
| } |
| |
| func TestIntegration_ReadRowList(t *testing.T) { |
| ctx := context.Background() |
| _, _, table, _, cleanup, err := setupIntegration(ctx, t) |
| if err != nil { |
| t.Fatal(err) |
| } |
| defer cleanup() |
| |
| if err := populatePresidentsGraph(table); err != nil { |
| t.Fatal(err) |
| } |
| |
| // Read a RowList |
| var elt []string |
| keys := RowList{"wmckinley", "gwashington", "jadams"} |
| want := "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1,wmckinley-tjefferson-1" |
| err = table.ReadRows(ctx, keys, func(r Row) bool { |
| for _, ris := range r { |
| for _, ri := range ris { |
| elt = append(elt, formatReadItem(ri)) |
| } |
| } |
| return true |
| }) |
| if err != nil { |
| t.Fatalf("read RowList: %v", err) |
| } |
| |
| if got := strings.Join(elt, ","); got != want { |
| t.Fatalf("bulk read: wrong reads.\n got %q\nwant %q", got, want) |
| } |
| } |
| |
| func TestIntegration_DeleteRow(t *testing.T) { |
| ctx := context.Background() |
| _, _, table, _, cleanup, err := setupIntegration(ctx, t) |
| if err != nil { |
| t.Fatal(err) |
| } |
| defer cleanup() |
| |
| if err := populatePresidentsGraph(table); err != nil { |
| t.Fatal(err) |
| } |
| |
| // Delete a row and check it goes away. |
| mut := NewMutation() |
| mut.DeleteRow() |
| if err := table.Apply(ctx, "wmckinley", mut); err != nil { |
| t.Fatalf("Apply DeleteRow: %v", err) |
| } |
| row, err := table.ReadRow(ctx, "wmckinley") |
| if err != nil { |
| t.Fatalf("Reading a row after DeleteRow: %v", err) |
| } |
| if len(row) != 0 { |
| t.Fatalf("Read non-zero row after DeleteRow: %v", row) |
| } |
| } |
| |
| func TestIntegration_ReadModifyWrite(t *testing.T) { |
| ctx := context.Background() |
| _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t) |
| if err != nil { |
| t.Fatal(err) |
| } |
| defer cleanup() |
| |
| if err := populatePresidentsGraph(table); err != nil { |
| t.Fatal(err) |
| } |
| |
| if err := adminClient.CreateColumnFamily(ctx, tableName, "counter"); err != nil { |
| t.Fatalf("Creating column family: %v", err) |
| } |
| |
| appendRMW := func(b []byte) *ReadModifyWrite { |
| rmw := NewReadModifyWrite() |
| rmw.AppendValue("counter", "likes", b) |
| return rmw |
| } |
| incRMW := func(n int64) *ReadModifyWrite { |
| rmw := NewReadModifyWrite() |
| rmw.Increment("counter", "likes", n) |
| return rmw |
| } |
| rmwSeq := []struct { |
| desc string |
| rmw *ReadModifyWrite |
| want []byte |
| }{ |
| { |
| desc: "append #1", |
| rmw: appendRMW([]byte{0, 0, 0}), |
| want: []byte{0, 0, 0}, |
| }, |
| { |
| desc: "append #2", |
| rmw: appendRMW([]byte{0, 0, 0, 0, 17}), // the remaining 40 bits to make a big-endian 17 |
| want: []byte{0, 0, 0, 0, 0, 0, 0, 17}, |
| }, |
| { |
| desc: "increment", |
| rmw: incRMW(8), |
| want: []byte{0, 0, 0, 0, 0, 0, 0, 25}, |
| }, |
| } |
| for _, step := range rmwSeq { |
| row, err := table.ApplyReadModifyWrite(ctx, "gwashington", step.rmw) |
| if err != nil { |
| t.Fatalf("ApplyReadModifyWrite %+v: %v", step.rmw, err) |
| } |
| // Make sure the modified cell returned by the RMW operation has a timestamp. |
| if row["counter"][0].Timestamp == 0 { |
| t.Fatalf("RMW returned cell timestamp: got %v, want > 0", row["counter"][0].Timestamp) |
| } |
| clearTimestamps(row) |
| wantRow := Row{"counter": []ReadItem{{Row: "gwashington", Column: "counter:likes", Value: step.want}}} |
| if !testutil.Equal(row, wantRow) { |
| t.Fatalf("After %s,\n got %v\nwant %v", step.desc, row, wantRow) |
| } |
| } |
| |
| // Check for google-cloud-go/issues/723. RMWs that insert new rows should keep row order sorted in the emulator. |
| _, err = table.ApplyReadModifyWrite(ctx, "issue-723-2", appendRMW([]byte{0})) |
| if err != nil { |
| t.Fatalf("ApplyReadModifyWrite null string: %v", err) |
| } |
| _, err = table.ApplyReadModifyWrite(ctx, "issue-723-1", appendRMW([]byte{0})) |
| if err != nil { |
| t.Fatalf("ApplyReadModifyWrite null string: %v", err) |
| } |
| // Get only the correct row back on read. |
| r, err := table.ReadRow(ctx, "issue-723-1") |
| if err != nil { |
| t.Fatalf("Reading row: %v", err) |
| } |
| if r.Key() != "issue-723-1" { |
| t.Fatalf("ApplyReadModifyWrite: incorrect read after RMW,\n got %v\nwant %v", r.Key(), "issue-723-1") |
| } |
| } |
| |
| func TestIntegration_ArbitraryTimestamps(t *testing.T) { |
| ctx := context.Background() |
| _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t) |
| if err != nil { |
| t.Fatal(err) |
| } |
| defer cleanup() |
| |
| // Test arbitrary timestamps more thoroughly. |
| if err := adminClient.CreateColumnFamily(ctx, tableName, "ts"); err != nil { |
| t.Fatalf("Creating column family: %v", err) |
| } |
| const numVersions = 4 |
| mut := NewMutation() |
| for i := 1; i < numVersions; i++ { |
| // Timestamps are used in thousands because the server |
| // only permits that granularity. |
| mut.Set("ts", "col", Timestamp(i*1000), []byte(fmt.Sprintf("val-%d", i))) |
| mut.Set("ts", "col2", Timestamp(i*1000), []byte(fmt.Sprintf("val-%d", i))) |
| } |
| if err := table.Apply(ctx, "testrow", mut); err != nil { |
| t.Fatalf("Mutating row: %v", err) |
| } |
| r, err := table.ReadRow(ctx, "testrow") |
| if err != nil { |
| t.Fatalf("Reading row: %v", err) |
| } |
| wantRow := Row{"ts": []ReadItem{ |
| // These should be returned in descending timestamp order. |
| {Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")}, |
| {Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")}, |
| {Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")}, |
| {Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")}, |
| {Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")}, |
| {Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")}, |
| }} |
| if !testutil.Equal(r, wantRow) { |
| t.Fatalf("Cell with multiple versions,\n got %v\nwant %v", r, wantRow) |
| } |
| |
| // Do the same read, but filter to the latest two versions. |
| r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(2))) |
| if err != nil { |
| t.Fatalf("Reading row: %v", err) |
| } |
| wantRow = Row{"ts": []ReadItem{ |
| {Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")}, |
| {Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")}, |
| {Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")}, |
| {Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")}, |
| }} |
| if !testutil.Equal(r, wantRow) { |
| t.Fatalf("Cell with multiple versions and LatestNFilter(2),\n got %v\nwant %v", r, wantRow) |
| } |
| // Check cell offset / limit |
| r, err = table.ReadRow(ctx, "testrow", RowFilter(CellsPerRowLimitFilter(3))) |
| if err != nil { |
| t.Fatalf("Reading row: %v", err) |
| } |
| wantRow = Row{"ts": []ReadItem{ |
| {Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")}, |
| {Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")}, |
| {Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")}, |
| }} |
| if !testutil.Equal(r, wantRow) { |
| t.Fatalf("Cell with multiple versions and CellsPerRowLimitFilter(3),\n got %v\nwant %v", r, wantRow) |
| } |
| r, err = table.ReadRow(ctx, "testrow", RowFilter(CellsPerRowOffsetFilter(3))) |
| if err != nil { |
| t.Fatalf("Reading row: %v", err) |
| } |
| wantRow = Row{"ts": []ReadItem{ |
| {Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")}, |
| {Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")}, |
| {Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")}, |
| }} |
| if !testutil.Equal(r, wantRow) { |
| t.Fatalf("Cell with multiple versions and CellsPerRowOffsetFilter(3),\n got %v\nwant %v", r, wantRow) |
| } |
| // Check timestamp range filtering (with truncation) |
| r, err = table.ReadRow(ctx, "testrow", RowFilter(TimestampRangeFilterMicros(1001, 3000))) |
| if err != nil { |
| t.Fatalf("Reading row: %v", err) |
| } |
| wantRow = Row{"ts": []ReadItem{ |
| {Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")}, |
| {Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")}, |
| {Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")}, |
| {Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")}, |
| }} |
| if !testutil.Equal(r, wantRow) { |
| t.Fatalf("Cell with multiple versions and TimestampRangeFilter(1000, 3000),\n got %v\nwant %v", r, wantRow) |
| } |
| r, err = table.ReadRow(ctx, "testrow", RowFilter(TimestampRangeFilterMicros(1000, 0))) |
| if err != nil { |
| t.Fatalf("Reading row: %v", err) |
| } |
| wantRow = Row{"ts": []ReadItem{ |
| {Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")}, |
| {Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")}, |
| {Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")}, |
| {Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")}, |
| {Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")}, |
| {Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")}, |
| }} |
| if !testutil.Equal(r, wantRow) { |
| t.Fatalf("Cell with multiple versions and TimestampRangeFilter(1000, 0),\n got %v\nwant %v", r, wantRow) |
| } |
| // Delete non-existing cells, no such column family in this row |
| // Should not delete anything |
| if err := adminClient.CreateColumnFamily(ctx, tableName, "non-existing"); err != nil { |
| t.Fatalf("Creating column family: %v", err) |
| } |
| mut = NewMutation() |
| mut.DeleteTimestampRange("non-existing", "col", 2000, 3000) // half-open interval |
| if err := table.Apply(ctx, "testrow", mut); err != nil { |
| t.Fatalf("Mutating row: %v", err) |
| } |
| r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(3))) |
| if err != nil { |
| t.Fatalf("Reading row: %v", err) |
| } |
| if !testutil.Equal(r, wantRow) { |
| t.Fatalf("Cell was deleted unexpectly,\n got %v\nwant %v", r, wantRow) |
| } |
| // Delete non-existing cells, no such column in this column family |
| // Should not delete anything |
| mut = NewMutation() |
| mut.DeleteTimestampRange("ts", "non-existing", 2000, 3000) // half-open interval |
| if err := table.Apply(ctx, "testrow", mut); err != nil { |
| t.Fatalf("Mutating row: %v", err) |
| } |
| r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(3))) |
| if err != nil { |
| t.Fatalf("Reading row: %v", err) |
| } |
| if !testutil.Equal(r, wantRow) { |
| t.Fatalf("Cell was deleted unexpectly,\n got %v\nwant %v", r, wantRow) |
| } |
| // Delete the cell with timestamp 2000 and repeat the last read, |
| // checking that we get ts 3000 and ts 1000. |
| mut = NewMutation() |
| mut.DeleteTimestampRange("ts", "col", 2001, 3000) // half-open interval |
| if err := table.Apply(ctx, "testrow", mut); err != nil { |
| t.Fatalf("Mutating row: %v", err) |
| } |
| r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(2))) |
| if err != nil { |
| t.Fatalf("Reading row: %v", err) |
| } |
| wantRow = Row{"ts": []ReadItem{ |
| {Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")}, |
| {Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")}, |
| {Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")}, |
| {Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")}, |
| }} |
| if !testutil.Equal(r, wantRow) { |
| t.Fatalf("Cell with multiple versions and LatestNFilter(2), after deleting timestamp 2000,\n got %v\nwant %v", r, wantRow) |
| } |
| |
| // Check DeleteCellsInFamily |
| if err := adminClient.CreateColumnFamily(ctx, tableName, "status"); err != nil { |
| t.Fatalf("Creating column family: %v", err) |
| } |
| |
| mut = NewMutation() |
| mut.Set("status", "start", 2000, []byte("2")) |
| mut.Set("status", "end", 3000, []byte("3")) |
| mut.Set("ts", "col", 1000, []byte("1")) |
| if err := table.Apply(ctx, "row1", mut); err != nil { |
| t.Fatalf("Mutating row: %v", err) |
| } |
| if err := table.Apply(ctx, "row2", mut); err != nil { |
| t.Fatalf("Mutating row: %v", err) |
| } |
| |
| mut = NewMutation() |
| mut.DeleteCellsInFamily("status") |
| if err := table.Apply(ctx, "row1", mut); err != nil { |
| t.Fatalf("Delete cf: %v", err) |
| } |
| |
| // ColumnFamily removed |
| r, err = table.ReadRow(ctx, "row1") |
| if err != nil { |
| t.Fatalf("Reading row: %v", err) |
| } |
| wantRow = Row{"ts": []ReadItem{ |
| {Row: "row1", Column: "ts:col", Timestamp: 1000, Value: []byte("1")}, |
| }} |
| if !testutil.Equal(r, wantRow) { |
| t.Fatalf("column family was not deleted.\n got %v\n want %v", r, wantRow) |
| } |
| |
| // ColumnFamily not removed |
| r, err = table.ReadRow(ctx, "row2") |
| if err != nil { |
| t.Fatalf("Reading row: %v", err) |
| } |
| wantRow = Row{ |
| "ts": []ReadItem{ |
| {Row: "row2", Column: "ts:col", Timestamp: 1000, Value: []byte("1")}, |
| }, |
| "status": []ReadItem{ |
| {Row: "row2", Column: "status:end", Timestamp: 3000, Value: []byte("3")}, |
| {Row: "row2", Column: "status:start", Timestamp: 2000, Value: []byte("2")}, |
| }, |
| } |
| if !testutil.Equal(r, wantRow) { |
| t.Fatalf("Column family was deleted unexpectedly.\n got %v\n want %v", r, wantRow) |
| } |
| |
| // Check DeleteCellsInColumn |
| mut = NewMutation() |
| mut.Set("status", "start", 2000, []byte("2")) |
| mut.Set("status", "middle", 3000, []byte("3")) |
| mut.Set("status", "end", 1000, []byte("1")) |
| if err := table.Apply(ctx, "row3", mut); err != nil { |
| t.Fatalf("Mutating row: %v", err) |
| } |
| mut = NewMutation() |
| mut.DeleteCellsInColumn("status", "middle") |
| if err := table.Apply(ctx, "row3", mut); err != nil { |
| t.Fatalf("Delete column: %v", err) |
| } |
| r, err = table.ReadRow(ctx, "row3") |
| if err != nil { |
| t.Fatalf("Reading row: %v", err) |
| } |
| wantRow = Row{ |
| "status": []ReadItem{ |
| {Row: "row3", Column: "status:end", Timestamp: 1000, Value: []byte("1")}, |
| {Row: "row3", Column: "status:start", Timestamp: 2000, Value: []byte("2")}, |
| }, |
| } |
| if !testutil.Equal(r, wantRow) { |
| t.Fatalf("Column was not deleted.\n got %v\n want %v", r, wantRow) |
| } |
| mut = NewMutation() |
| mut.DeleteCellsInColumn("status", "start") |
| if err := table.Apply(ctx, "row3", mut); err != nil { |
| t.Fatalf("Delete column: %v", err) |
| } |
| r, err = table.ReadRow(ctx, "row3") |
| if err != nil { |
| t.Fatalf("Reading row: %v", err) |
| } |
| wantRow = Row{ |
| "status": []ReadItem{ |
| {Row: "row3", Column: "status:end", Timestamp: 1000, Value: []byte("1")}, |
| }, |
| } |
| if !testutil.Equal(r, wantRow) { |
| t.Fatalf("Column was not deleted.\n got %v\n want %v", r, wantRow) |
| } |
| mut = NewMutation() |
| mut.DeleteCellsInColumn("status", "end") |
| if err := table.Apply(ctx, "row3", mut); err != nil { |
| t.Fatalf("Delete column: %v", err) |
| } |
| r, err = table.ReadRow(ctx, "row3") |
| if err != nil { |
| t.Fatalf("Reading row: %v", err) |
| } |
| if len(r) != 0 { |
| t.Fatalf("Delete column: got %v, want empty row", r) |
| } |
| // Add same cell after delete |
| mut = NewMutation() |
| mut.Set("status", "end", 1000, []byte("1")) |
| if err := table.Apply(ctx, "row3", mut); err != nil { |
| t.Fatalf("Mutating row: %v", err) |
| } |
| r, err = table.ReadRow(ctx, "row3") |
| if err != nil { |
| t.Fatalf("Reading row: %v", err) |
| } |
| if !testutil.Equal(r, wantRow) { |
| t.Fatalf("Column was not deleted correctly.\n got %v\n want %v", r, wantRow) |
| } |
| } |
| |
| func TestIntegration_HighlyConcurrentReadsAndWrites(t *testing.T) { |
| ctx := context.Background() |
| _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t) |
| if err != nil { |
| t.Fatal(err) |
| } |
| defer cleanup() |
| |
| if err := populatePresidentsGraph(table); err != nil { |
| t.Fatal(err) |
| } |
| |
| if err := adminClient.CreateColumnFamily(ctx, tableName, "ts"); err != nil { |
| t.Fatalf("Creating column family: %v", err) |
| } |
| |
| // Do highly concurrent reads/writes. |
| const maxConcurrency = 1000 |
| var wg sync.WaitGroup |
| for i := 0; i < maxConcurrency; i++ { |
| wg.Add(1) |
| go func() { |
| defer wg.Done() |
| switch r := rand.Intn(100); { // r ∈ [0,100) |
| case 0 <= r && r < 30: |
| // Do a read. |
| _, err := table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(1))) |
| if err != nil { |
| t.Errorf("Concurrent read: %v", err) |
| } |
| case 30 <= r && r < 100: |
| // Do a write. |
| mut := NewMutation() |
| mut.Set("ts", "col", 1000, []byte("data")) |
| if err := table.Apply(ctx, "testrow", mut); err != nil { |
| t.Errorf("Concurrent write: %v", err) |
| } |
| } |
| }() |
| } |
| wg.Wait() |
| } |
| |
| func TestIntegration_LargeReadsWritesAndScans(t *testing.T) { |
| ctx := context.Background() |
| _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t) |
| if err != nil { |
| t.Fatal(err) |
| } |
| defer cleanup() |
| |
| if err := adminClient.CreateColumnFamily(ctx, tableName, "ts"); err != nil { |
| t.Fatalf("Creating column family: %v", err) |
| } |
| |
| bigBytes := make([]byte, 5<<20) // 5 MB is larger than current default gRPC max of 4 MB, but less than the max we set. |
| nonsense := []byte("lorem ipsum dolor sit amet, ") |
| fill(bigBytes, nonsense) |
| mut := NewMutation() |
| mut.Set("ts", "col", 1000, bigBytes) |
| if err := table.Apply(ctx, "bigrow", mut); err != nil { |
| t.Fatalf("Big write: %v", err) |
| } |
| r, err := table.ReadRow(ctx, "bigrow") |
| if err != nil { |
| t.Fatalf("Big read: %v", err) |
| } |
| wantRow := Row{"ts": []ReadItem{ |
| {Row: "bigrow", Column: "ts:col", Timestamp: 1000, Value: bigBytes}, |
| }} |
| if !testutil.Equal(r, wantRow) { |
| t.Fatalf("Big read returned incorrect bytes: %v", r) |
| } |
| |
| var wg sync.WaitGroup |
| // Now write 1000 rows, each with 82 KB values, then scan them all. |
| medBytes := make([]byte, 82<<10) |
| fill(medBytes, nonsense) |
| sem := make(chan int, 50) // do up to 50 mutations at a time. |
| for i := 0; i < 1000; i++ { |
| mut := NewMutation() |
| mut.Set("ts", "big-scan", 1000, medBytes) |
| row := fmt.Sprintf("row-%d", i) |
| wg.Add(1) |
| go func() { |
| defer wg.Done() |
| defer func() { <-sem }() |
| sem <- 1 |
| if err := table.Apply(ctx, row, mut); err != nil { |
| t.Errorf("Preparing large scan: %v", err) |
| } |
| }() |
| } |
| wg.Wait() |
| n := 0 |
| err = table.ReadRows(ctx, PrefixRange("row-"), func(r Row) bool { |
| for _, ris := range r { |
| for _, ri := range ris { |
| n += len(ri.Value) |
| } |
| } |
| return true |
| }, RowFilter(ColumnFilter("big-scan"))) |
| if err != nil { |
| t.Fatalf("Doing large scan: %v", err) |
| } |
| if want := 1000 * len(medBytes); n != want { |
| t.Fatalf("Large scan returned %d bytes, want %d", n, want) |
| } |
| // Scan a subset of the 1000 rows that we just created, using a LimitRows ReadOption. |
| rc := 0 |
| wantRc := 3 |
| err = table.ReadRows(ctx, PrefixRange("row-"), func(r Row) bool { |
| rc++ |
| return true |
| }, LimitRows(int64(wantRc))) |
| if err != nil { |
| t.Fatal(err) |
| } |
| if rc != wantRc { |
| t.Fatalf("Scan with row limit returned %d rows, want %d", rc, wantRc) |
| } |
| |
| // Test bulk mutations |
| if err := adminClient.CreateColumnFamily(ctx, tableName, "bulk"); err != nil { |
| t.Fatalf("Creating column family: %v", err) |
| } |
| bulkData := map[string][]string{ |
| "red sox": {"2004", "2007", "2013"}, |
| "patriots": {"2001", "2003", "2004", "2014"}, |
| "celtics": {"1981", "1984", "1986", "2008"}, |
| } |
| var rowKeys []string |
| var muts []*Mutation |
| for row, ss := range bulkData { |
| mut := NewMutation() |
| for _, name := range ss { |
| mut.Set("bulk", name, 1000, []byte("1")) |
| } |
| rowKeys = append(rowKeys, row) |
| muts = append(muts, mut) |
| } |
| status, err := table.ApplyBulk(ctx, rowKeys, muts) |
| if err != nil { |
| t.Fatalf("Bulk mutating rows %q: %v", rowKeys, err) |
| } |
| if status != nil { |
| t.Fatalf("non-nil errors: %v", err) |
| } |
| |
| // Read each row back |
| for rowKey, ss := range bulkData { |
| row, err := table.ReadRow(ctx, rowKey) |
| if err != nil { |
| t.Fatalf("Reading a bulk row: %v", err) |
| } |
| var wantItems []ReadItem |
| for _, val := range ss { |
| wantItems = append(wantItems, ReadItem{Row: rowKey, Column: "bulk:" + val, Timestamp: 1000, Value: []byte("1")}) |
| } |
| wantRow := Row{"bulk": wantItems} |
| if !testutil.Equal(row, wantRow) { |
| t.Fatalf("Read row mismatch.\n got %#v\nwant %#v", row, wantRow) |
| } |
| } |
| |
| // Test bulk write errors. |
| // Note: Setting timestamps as ServerTime makes sure the mutations are not retried on error. |
| badMut := NewMutation() |
| badMut.Set("badfamily", "col", ServerTime, nil) |
| badMut2 := NewMutation() |
| badMut2.Set("badfamily2", "goodcol", ServerTime, []byte("1")) |
| status, err = table.ApplyBulk(ctx, []string{"badrow", "badrow2"}, []*Mutation{badMut, badMut2}) |
| if err != nil { |
| t.Fatalf("Bulk mutating rows %q: %v", rowKeys, err) |
| } |
| if status == nil { |
| t.Fatalf("No errors for bad bulk mutation") |
| } else if status[0] == nil || status[1] == nil { |
| t.Fatalf("No error for bad bulk mutation") |
| } |
| } |
| |
| func TestIntegration_Read(t *testing.T) { |
| ctx := context.Background() |
| _, _, table, _, cleanup, err := setupIntegration(ctx, t) |
| if err != nil { |
| t.Fatal(err) |
| } |
| defer cleanup() |
| |
| // Insert some data. |
| initialData := map[string][]string{ |
| "wmckinley": {"tjefferson"}, |
| "gwashington": {"jadams"}, |
| "tjefferson": {"gwashington", "jadams", "wmckinley"}, |
| "jadams": {"gwashington", "tjefferson"}, |
| } |
| for row, ss := range initialData { |
| mut := NewMutation() |
| for _, name := range ss { |
| mut.Set("follows", name, 1000, []byte("1")) |
| } |
| if err := table.Apply(ctx, row, mut); err != nil { |
| t.Fatalf("Mutating row %q: %v", row, err) |
| } |
| } |
| |
| for _, test := range []struct { |
| desc string |
| rr RowSet |
| filter Filter // may be nil |
| limit ReadOption // may be nil |
| |
| // We do the read, grab all the cells, turn them into "<row>-<col>-<val>", |
| // and join with a comma. |
| want string |
| }{ |
| { |
| desc: "read all, unfiltered", |
| rr: RowRange{}, |
| want: "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1,tjefferson-gwashington-1,tjefferson-jadams-1,tjefferson-wmckinley-1,wmckinley-tjefferson-1", |
| }, |
| { |
| desc: "read with InfiniteRange, unfiltered", |
| rr: InfiniteRange("tjefferson"), |
| want: "tjefferson-gwashington-1,tjefferson-jadams-1,tjefferson-wmckinley-1,wmckinley-tjefferson-1", |
| }, |
| { |
| desc: "read with NewRange, unfiltered", |
| rr: NewRange("gargamel", "hubbard"), |
| want: "gwashington-jadams-1", |
| }, |
| { |
| desc: "read with PrefixRange, unfiltered", |
| rr: PrefixRange("jad"), |
| want: "jadams-gwashington-1,jadams-tjefferson-1", |
| }, |
| { |
| desc: "read with SingleRow, unfiltered", |
| rr: SingleRow("wmckinley"), |
| want: "wmckinley-tjefferson-1", |
| }, |
| { |
| desc: "read all, with ColumnFilter", |
| rr: RowRange{}, |
| filter: ColumnFilter(".*j.*"), // matches "jadams" and "tjefferson" |
| want: "gwashington-jadams-1,jadams-tjefferson-1,tjefferson-jadams-1,wmckinley-tjefferson-1", |
| }, |
| { |
| desc: "read all, with ColumnFilter, prefix", |
| rr: RowRange{}, |
| filter: ColumnFilter("j"), // no matches |
| want: "", |
| }, |
| { |
| desc: "read range, with ColumnRangeFilter", |
| rr: RowRange{}, |
| filter: ColumnRangeFilter("follows", "h", "k"), |
| want: "gwashington-jadams-1,tjefferson-jadams-1", |
| }, |
| { |
| desc: "read range from empty, with ColumnRangeFilter", |
| rr: RowRange{}, |
| filter: ColumnRangeFilter("follows", "", "u"), |
| want: "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1,tjefferson-gwashington-1,tjefferson-jadams-1,wmckinley-tjefferson-1", |
| }, |
| { |
| desc: "read range from start to empty, with ColumnRangeFilter", |
| rr: RowRange{}, |
| filter: ColumnRangeFilter("follows", "h", ""), |
| want: "gwashington-jadams-1,jadams-tjefferson-1,tjefferson-jadams-1,tjefferson-wmckinley-1,wmckinley-tjefferson-1", |
| }, |
| { |
| desc: "read with RowKeyFilter", |
| rr: RowRange{}, |
| filter: RowKeyFilter(".*wash.*"), |
| want: "gwashington-jadams-1", |
| }, |
| { |
| desc: "read with RowKeyFilter, prefix", |
| rr: RowRange{}, |
| filter: RowKeyFilter("gwash"), |
| want: "", |
| }, |
| { |
| desc: "read with RowKeyFilter, no matches", |
| rr: RowRange{}, |
| filter: RowKeyFilter(".*xxx.*"), |
| want: "", |
| }, |
| { |
| desc: "read with FamilyFilter, no matches", |
| rr: RowRange{}, |
| filter: FamilyFilter(".*xxx.*"), |
| want: "", |
| }, |
| { |
| desc: "read with ColumnFilter + row limit", |
| rr: RowRange{}, |
| filter: ColumnFilter(".*j.*"), // matches "jadams" and "tjefferson" |
| limit: LimitRows(2), |
| want: "gwashington-jadams-1,jadams-tjefferson-1", |
| }, |
| { |
| desc: "read all, strip values", |
| rr: RowRange{}, |
| filter: StripValueFilter(), |
| want: "gwashington-jadams-,jadams-gwashington-,jadams-tjefferson-,tjefferson-gwashington-,tjefferson-jadams-,tjefferson-wmckinley-,wmckinley-tjefferson-", |
| }, |
| { |
| desc: "read with ColumnFilter + row limit + strip values", |
| rr: RowRange{}, |
| filter: ChainFilters(ColumnFilter(".*j.*"), StripValueFilter()), // matches "jadams" and "tjefferson" |
| limit: LimitRows(2), |
| want: "gwashington-jadams-,jadams-tjefferson-", |
| }, |
| { |
| desc: "read with condition, strip values on true", |
| rr: RowRange{}, |
| filter: ConditionFilter(ColumnFilter(".*j.*"), StripValueFilter(), nil), |
| want: "gwashington-jadams-,jadams-gwashington-,jadams-tjefferson-,tjefferson-gwashington-,tjefferson-jadams-,tjefferson-wmckinley-,wmckinley-tjefferson-", |
| }, |
| { |
| desc: "read with condition, strip values on false", |
| rr: RowRange{}, |
| filter: ConditionFilter(ColumnFilter(".*xxx.*"), nil, StripValueFilter()), |
| want: "gwashington-jadams-,jadams-gwashington-,jadams-tjefferson-,tjefferson-gwashington-,tjefferson-jadams-,tjefferson-wmckinley-,wmckinley-tjefferson-", |
| }, |
| { |
| desc: "read with ValueRangeFilter + row limit", |
| rr: RowRange{}, |
| filter: ValueRangeFilter([]byte("1"), []byte("5")), // matches our value of "1" |
| limit: LimitRows(2), |
| want: "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1", |
| }, |
| { |
| desc: "read with ValueRangeFilter, no match on exclusive end", |
| rr: RowRange{}, |
| filter: ValueRangeFilter([]byte("0"), []byte("1")), // no match |
| want: "", |
| }, |
| { |
| desc: "read with ValueRangeFilter, no matches", |
| rr: RowRange{}, |
| filter: ValueRangeFilter([]byte("3"), []byte("5")), // matches nothing |
| want: "", |
| }, |
| { |
| desc: "read with InterleaveFilter, no matches on all filters", |
| rr: RowRange{}, |
| filter: InterleaveFilters(ColumnFilter(".*x.*"), ColumnFilter(".*z.*")), |
| want: "", |
| }, |
| { |
| desc: "read with InterleaveFilter, no duplicate cells", |
| rr: RowRange{}, |
| filter: InterleaveFilters(ColumnFilter(".*g.*"), ColumnFilter(".*j.*")), |
| want: "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1,tjefferson-gwashington-1,tjefferson-jadams-1,wmckinley-tjefferson-1", |
| }, |
| { |
| desc: "read with InterleaveFilter, with duplicate cells", |
| rr: RowRange{}, |
| filter: InterleaveFilters(ColumnFilter(".*g.*"), ColumnFilter(".*g.*")), |
| want: "jadams-gwashington-1,jadams-gwashington-1,tjefferson-gwashington-1,tjefferson-gwashington-1", |
| }, |
| { |
| desc: "read with a RowRangeList and no filter", |
| rr: RowRangeList{NewRange("gargamel", "hubbard"), InfiniteRange("wmckinley")}, |
| want: "gwashington-jadams-1,wmckinley-tjefferson-1", |
| }, |
| { |
| desc: "chain that excludes rows and matches nothing, in a condition", |
| rr: RowRange{}, |
| filter: ConditionFilter(ChainFilters(ColumnFilter(".*j.*"), ColumnFilter(".*mckinley.*")), StripValueFilter(), nil), |
| want: "", |
| }, |
| { |
| desc: "chain that ends with an interleave that has no match. covers #804", |
| rr: RowRange{}, |
| filter: ConditionFilter(ChainFilters(ColumnFilter(".*j.*"), InterleaveFilters(ColumnFilter(".*x.*"), ColumnFilter(".*z.*"))), StripValueFilter(), nil), |
| want: "", |
| }, |
| } { |
| t.Run(test.desc, func(t *testing.T) { |
| var opts []ReadOption |
| if test.filter != nil { |
| opts = append(opts, RowFilter(test.filter)) |
| } |
| if test.limit != nil { |
| opts = append(opts, test.limit) |
| } |
| var elt []string |
| err := table.ReadRows(ctx, test.rr, func(r Row) bool { |
| for _, ris := range r { |
| for _, ri := range ris { |
| elt = append(elt, formatReadItem(ri)) |
| } |
| } |
| return true |
| }, opts...) |
| if err != nil { |
| t.Fatal(err) |
| } |
| if got := strings.Join(elt, ","); got != test.want { |
| t.Fatalf("got %q\nwant %q", got, test.want) |
| } |
| }) |
| } |
| } |
| |
| func TestIntegration_SampleRowKeys(t *testing.T) { |
| ctx := context.Background() |
| _, _, table, _, cleanup, err := setupIntegration(ctx, t) |
| if err != nil { |
| t.Fatal(err) |
| } |
| defer cleanup() |
| |
| // Insert some data. |
| initialData := map[string][]string{ |
| "wmckinley11": {"tjefferson11"}, |
| "gwashington77": {"jadams77"}, |
| "tjefferson0": {"gwashington0", "jadams0"}, |
| } |
| |
| for row, ss := range initialData { |
| mut := NewMutation() |
| for _, name := range ss { |
| mut.Set("follows", name, 1000, []byte("1")) |
| } |
| if err := table.Apply(ctx, row, mut); err != nil { |
| t.Fatalf("Mutating row %q: %v", row, err) |
| } |
| } |
| sampleKeys, err := table.SampleRowKeys(context.Background()) |
| if err != nil { |
| t.Fatalf("%s: %v", "SampleRowKeys:", err) |
| } |
| if len(sampleKeys) == 0 { |
| t.Error("SampleRowKeys length 0") |
| } |
| } |
| |
| func TestIntegration_Admin(t *testing.T) { |
| testEnv, err := NewIntegrationEnv() |
| if err != nil { |
| t.Fatalf("IntegrationEnv: %v", err) |
| } |
| defer testEnv.Close() |
| |
| timeout := 2 * time.Second |
| if testEnv.Config().UseProd { |
| timeout = 5 * time.Minute |
| } |
| ctx, _ := context.WithTimeout(context.Background(), timeout) |
| |
| adminClient, err := testEnv.NewAdminClient() |
| if err != nil { |
| t.Fatalf("NewAdminClient: %v", err) |
| } |
| defer adminClient.Close() |
| |
| iAdminClient, err := testEnv.NewInstanceAdminClient() |
| if err != nil { |
| t.Fatalf("NewInstanceAdminClient: %v", err) |
| } |
| if iAdminClient != nil { |
| defer iAdminClient.Close() |
| |
| iInfo, err := iAdminClient.InstanceInfo(ctx, adminClient.instance) |
| if err != nil { |
| t.Errorf("InstanceInfo: %v", err) |
| } |
| if iInfo.Name != adminClient.instance { |
| t.Errorf("InstanceInfo returned name %#v, want %#v", iInfo.Name, adminClient.instance) |
| } |
| } |
| |
| list := func() []string { |
| tbls, err := adminClient.Tables(ctx) |
| if err != nil { |
| t.Fatalf("Fetching list of tables: %v", err) |
| } |
| sort.Strings(tbls) |
| return tbls |
| } |
| containsAll := func(got, want []string) bool { |
| gotSet := make(map[string]bool) |
| |
| for _, s := range got { |
| gotSet[s] = true |
| } |
| for _, s := range want { |
| if !gotSet[s] { |
| return false |
| } |
| } |
| return true |
| } |
| |
| defer adminClient.DeleteTable(ctx, "mytable") |
| |
| if err := adminClient.CreateTable(ctx, "mytable"); err != nil { |
| t.Fatalf("Creating table: %v", err) |
| } |
| |
| defer adminClient.DeleteTable(ctx, "myothertable") |
| |
| if err := adminClient.CreateTable(ctx, "myothertable"); err != nil { |
| t.Fatalf("Creating table: %v", err) |
| } |
| |
| if got, want := list(), []string{"myothertable", "mytable"}; !containsAll(got, want) { |
| t.Errorf("adminClient.Tables returned %#v, want %#v", got, want) |
| } |
| |
| must(adminClient.WaitForReplication(ctx, "mytable")) |
| |
| if err := adminClient.DeleteTable(ctx, "myothertable"); err != nil { |
| t.Fatalf("Deleting table: %v", err) |
| } |
| tables := list() |
| if got, want := tables, []string{"mytable"}; !containsAll(got, want) { |
| t.Errorf("adminClient.Tables returned %#v, want %#v", got, want) |
| } |
| if got, unwanted := tables, []string{"myothertable"}; containsAll(got, unwanted) { |
| t.Errorf("adminClient.Tables return %#v. unwanted %#v", got, unwanted) |
| } |
| |
| tblConf := TableConf{ |
| TableID: "conftable", |
| Families: map[string]GCPolicy{ |
| "fam1": MaxVersionsPolicy(1), |
| "fam2": MaxVersionsPolicy(2), |
| }, |
| } |
| if err := adminClient.CreateTableFromConf(ctx, &tblConf); err != nil { |
| t.Fatalf("Creating table from TableConf: %v", err) |
| } |
| defer adminClient.DeleteTable(ctx, tblConf.TableID) |
| |
| tblInfo, err := adminClient.TableInfo(ctx, tblConf.TableID) |
| if err != nil { |
| t.Fatalf("Getting table info: %v", err) |
| } |
| sort.Strings(tblInfo.Families) |
| wantFams := []string{"fam1", "fam2"} |
| if !testutil.Equal(tblInfo.Families, wantFams) { |
| t.Errorf("Column family mismatch, got %v, want %v", tblInfo.Families, wantFams) |
| } |
| |
| // Populate mytable and drop row ranges |
| if err = adminClient.CreateColumnFamily(ctx, "mytable", "cf"); err != nil { |
| t.Fatalf("Creating column family: %v", err) |
| } |
| |
| client, err := testEnv.NewClient() |
| if err != nil { |
| t.Fatalf("NewClient: %v", err) |
| } |
| defer client.Close() |
| |
| tbl := client.Open("mytable") |
| |
| prefixes := []string{"a", "b", "c"} |
| for _, prefix := range prefixes { |
| for i := 0; i < 5; i++ { |
| mut := NewMutation() |
| mut.Set("cf", "col", 1000, []byte("1")) |
| if err := tbl.Apply(ctx, fmt.Sprintf("%v-%v", prefix, i), mut); err != nil { |
| t.Fatalf("Mutating row: %v", err) |
| } |
| } |
| } |
| |
| if err = adminClient.DropRowRange(ctx, "mytable", "a"); err != nil { |
| t.Errorf("DropRowRange a: %v", err) |
| } |
| if err = adminClient.DropRowRange(ctx, "mytable", "c"); err != nil { |
| t.Errorf("DropRowRange c: %v", err) |
| } |
| if err = adminClient.DropRowRange(ctx, "mytable", "x"); err != nil { |
| t.Errorf("DropRowRange x: %v", err) |
| } |
| |
| var gotRowCount int |
| must(tbl.ReadRows(ctx, RowRange{}, func(row Row) bool { |
| gotRowCount++ |
| if !strings.HasPrefix(row.Key(), "b") { |
| t.Errorf("Invalid row after dropping range: %v", row) |
| } |
| return true |
| })) |
| if gotRowCount != 5 { |
| t.Errorf("Invalid row count after dropping range: got %v, want %v", gotRowCount, 5) |
| } |
| } |
| |
| func TestIntegration_AdminCreateInstance(t *testing.T) { |
| if instanceToCreate == "" { |
| t.Skip("instanceToCreate not set, skipping instance creation testing") |
| } |
| |
| testEnv, err := NewIntegrationEnv() |
| if err != nil { |
| t.Fatalf("IntegrationEnv: %v", err) |
| } |
| defer testEnv.Close() |
| |
| if !testEnv.Config().UseProd { |
| t.Skip("emulator doesn't support instance creation") |
| } |
| |
| timeout := 5 * time.Minute |
| ctx, _ := context.WithTimeout(context.Background(), timeout) |
| |
| iAdminClient, err := testEnv.NewInstanceAdminClient() |
| if err != nil { |
| t.Fatalf("NewInstanceAdminClient: %v", err) |
| } |
| defer iAdminClient.Close() |
| |
| clusterID := instanceToCreate + "-cluster" |
| |
| // Create a development instance |
| conf := &InstanceConf{ |
| InstanceId: instanceToCreate, |
| ClusterId: clusterID, |
| DisplayName: "test instance", |
| Zone: instanceToCreateZone, |
| InstanceType: DEVELOPMENT, |
| } |
| if err := iAdminClient.CreateInstance(ctx, conf); err != nil { |
| t.Fatalf("CreateInstance: %v", err) |
| } |
| defer iAdminClient.DeleteInstance(ctx, instanceToCreate) |
| |
| iInfo, err := iAdminClient.InstanceInfo(ctx, instanceToCreate) |
| if err != nil { |
| t.Fatalf("InstanceInfo: %v", err) |
| } |
| |
| // Basic return values are tested elsewhere, check instance type |
| if iInfo.InstanceType != DEVELOPMENT { |
| t.Fatalf("Instance is not DEVELOPMENT: %v", err) |
| } |
| |
| // Update everything we can about the instance in one call. |
| confWithClusters := &InstanceWithClustersConfig{ |
| InstanceID: instanceToCreate, |
| DisplayName: "new display name", |
| InstanceType: PRODUCTION, |
| Clusters: []ClusterConfig{ |
| {ClusterID: clusterID, NumNodes: 5}}, |
| } |
| |
| if err = iAdminClient.UpdateInstanceWithClusters(ctx, confWithClusters); err != nil { |
| t.Fatalf("UpdateInstanceWithClusters: %v", err) |
| } |
| |
| iInfo, err = iAdminClient.InstanceInfo(ctx, instanceToCreate) |
| if err != nil { |
| t.Fatalf("InstanceInfo: %v", err) |
| } |
| |
| if iInfo.InstanceType != PRODUCTION { |
| t.Fatalf("Instance type is not PRODUCTION: %v", err) |
| } |
| if got, want := iInfo.DisplayName, confWithClusters.DisplayName; got != want { |
| t.Fatalf("Display name: %q, want: %q", got, want) |
| } |
| |
| cInfo, err := iAdminClient.GetCluster(ctx, instanceToCreate, clusterID) |
| if err != nil { |
| t.Fatalf("GetCluster: %v", err) |
| } |
| |
| if cInfo.ServeNodes != 5 { |
| t.Fatalf("NumNodes: %v, want: %v", cInfo.ServeNodes, 5) |
| } |
| } |
| |
| func TestIntegration_AdminSnapshot(t *testing.T) { |
| testEnv, err := NewIntegrationEnv() |
| if err != nil { |
| t.Fatalf("IntegrationEnv: %v", err) |
| } |
| defer testEnv.Close() |
| |
| if !testEnv.Config().UseProd { |
| t.Skip("emulator doesn't support snapshots") |
| } |
| |
| timeout := 2 * time.Second |
| if testEnv.Config().UseProd { |
| timeout = 5 * time.Minute |
| } |
| ctx, _ := context.WithTimeout(context.Background(), timeout) |
| |
| adminClient, err := testEnv.NewAdminClient() |
| if err != nil { |
| t.Fatalf("NewAdminClient: %v", err) |
| } |
| defer adminClient.Close() |
| |
| table := testEnv.Config().Table |
| cluster := testEnv.Config().Cluster |
| |
| list := func(cluster string) ([]*SnapshotInfo, error) { |
| infos := []*SnapshotInfo(nil) |
| |
| it := adminClient.Snapshots(ctx, cluster) |
| for { |
| s, err := it.Next() |
| if err == iterator.Done { |
| break |
| } |
| if err != nil { |
| return nil, err |
| } |
| infos = append(infos, s) |
| } |
| return infos, err |
| } |
| |
| // Delete the table at the end of the test. Schedule ahead of time |
| // in case the client fails |
| defer adminClient.DeleteTable(ctx, table) |
| |
| if err := adminClient.CreateTable(ctx, table); err != nil { |
| t.Fatalf("Creating table: %v", err) |
| } |
| |
| // Precondition: no snapshots |
| snapshots, err := list(cluster) |
| if err != nil { |
| t.Fatalf("Initial snapshot list: %v", err) |
| } |
| if got, want := len(snapshots), 0; got != want { |
| t.Fatalf("Initial snapshot list len: %d, want: %d", got, want) |
| } |
| |
| // Create snapshot |
| defer adminClient.DeleteSnapshot(ctx, cluster, "mysnapshot") |
| |
| if err = adminClient.SnapshotTable(ctx, table, cluster, "mysnapshot", 5*time.Hour); err != nil { |
| t.Fatalf("Creating snaphot: %v", err) |
| } |
| |
| // List snapshot |
| snapshots, err = list(cluster) |
| if err != nil { |
| t.Fatalf("Listing snapshots: %v", err) |
| } |
| if got, want := len(snapshots), 1; got != want { |
| t.Fatalf("Listing snapshot count: %d, want: %d", got, want) |
| } |
| if got, want := snapshots[0].Name, "mysnapshot"; got != want { |
| t.Fatalf("Snapshot name: %s, want: %s", got, want) |
| } |
| if got, want := snapshots[0].SourceTable, table; got != want { |
| t.Fatalf("Snapshot SourceTable: %s, want: %s", got, want) |
| } |
| if got, want := snapshots[0].DeleteTime, snapshots[0].CreateTime.Add(5*time.Hour); math.Abs(got.Sub(want).Minutes()) > 1 { |
| t.Fatalf("Snapshot DeleteTime: %s, want: %s", got, want) |
| } |
| |
| // Get snapshot |
| snapshot, err := adminClient.SnapshotInfo(ctx, cluster, "mysnapshot") |
| if err != nil { |
| t.Fatalf("SnapshotInfo: %v", snapshot) |
| } |
| if got, want := *snapshot, *snapshots[0]; got != want { |
| t.Fatalf("SnapshotInfo: %v, want: %v", got, want) |
| } |
| |
| // Restore |
| restoredTable := table + "-restored" |
| defer adminClient.DeleteTable(ctx, restoredTable) |
| if err = adminClient.CreateTableFromSnapshot(ctx, restoredTable, cluster, "mysnapshot"); err != nil { |
| t.Fatalf("CreateTableFromSnapshot: %v", err) |
| } |
| if _, err := adminClient.TableInfo(ctx, restoredTable); err != nil { |
| t.Fatalf("Restored TableInfo: %v", err) |
| } |
| |
| // Delete snapshot |
| if err = adminClient.DeleteSnapshot(ctx, cluster, "mysnapshot"); err != nil { |
| t.Fatalf("DeleteSnapshot: %v", err) |
| } |
| snapshots, err = list(cluster) |
| if err != nil { |
| t.Fatalf("List after Delete: %v", err) |
| } |
| if got, want := len(snapshots), 0; got != want { |
| t.Fatalf("List after delete len: %d, want: %d", got, want) |
| } |
| } |
| |
| func TestIntegration_Granularity(t *testing.T) { |
| testEnv, err := NewIntegrationEnv() |
| if err != nil { |
| t.Fatalf("IntegrationEnv: %v", err) |
| } |
| defer testEnv.Close() |
| |
| timeout := 2 * time.Second |
| if testEnv.Config().UseProd { |
| timeout = 5 * time.Minute |
| } |
| ctx, _ := context.WithTimeout(context.Background(), timeout) |
| |
| adminClient, err := testEnv.NewAdminClient() |
| if err != nil { |
| t.Fatalf("NewAdminClient: %v", err) |
| } |
| defer adminClient.Close() |
| |
| list := func() []string { |
| tbls, err := adminClient.Tables(ctx) |
| if err != nil { |
| t.Fatalf("Fetching list of tables: %v", err) |
| } |
| sort.Strings(tbls) |
| return tbls |
| } |
| containsAll := func(got, want []string) bool { |
| gotSet := make(map[string]bool) |
| |
| for _, s := range got { |
| gotSet[s] = true |
| } |
| for _, s := range want { |
| if !gotSet[s] { |
| return false |
| } |
| } |
| return true |
| } |
| |
| defer adminClient.DeleteTable(ctx, "mytable") |
| |
| if err := adminClient.CreateTable(ctx, "mytable"); err != nil { |
| t.Fatalf("Creating table: %v", err) |
| } |
| |
| tables := list() |
| if got, want := tables, []string{"mytable"}; !containsAll(got, want) { |
| t.Errorf("adminClient.Tables returned %#v, want %#v", got, want) |
| } |
| |
| // calling ModifyColumnFamilies to check the granularity of table |
| prefix := adminClient.instancePrefix() |
| req := &btapb.ModifyColumnFamiliesRequest{ |
| Name: prefix + "/tables/" + "mytable", |
| Modifications: []*btapb.ModifyColumnFamiliesRequest_Modification{{ |
| Id: "cf", |
| Mod: &btapb.ModifyColumnFamiliesRequest_Modification_Create{&btapb.ColumnFamily{}}, |
| }}, |
| } |
| table, err := adminClient.tClient.ModifyColumnFamilies(ctx, req) |
| if err != nil { |
| t.Fatalf("Creating column family: %v", err) |
| } |
| if table.Granularity != btapb.Table_TimestampGranularity(btapb.Table_MILLIS) { |
| t.Errorf("ModifyColumnFamilies returned granularity %#v, want %#v", table.Granularity, btapb.Table_TimestampGranularity(btapb.Table_MILLIS)) |
| } |
| } |
| |
| func TestIntegration_InstanceAdminClient_AppProfile(t *testing.T) { |
| testEnv, err := NewIntegrationEnv() |
| if err != nil { |
| t.Fatalf("IntegrationEnv: %v", err) |
| } |
| defer testEnv.Close() |
| |
| timeout := 2 * time.Second |
| if testEnv.Config().UseProd { |
| timeout = 5 * time.Minute |
| } |
| ctx, cancel := context.WithTimeout(context.Background(), timeout) |
| defer cancel() |
| |
| adminClient, err := testEnv.NewAdminClient() |
| if err != nil { |
| t.Fatalf("NewAdminClient: %v", err) |
| } |
| defer adminClient.Close() |
| |
| iAdminClient, err := testEnv.NewInstanceAdminClient() |
| if err != nil { |
| t.Fatalf("NewInstanceAdminClient: %v", err) |
| } |
| |
| if iAdminClient == nil { |
| return |
| } |
| |
| defer iAdminClient.Close() |
| profile := ProfileConf{ |
| ProfileID: "app_profile1", |
| InstanceID: adminClient.instance, |
| ClusterID: testEnv.Config().Cluster, |
| Description: "creating new app profile 1", |
| RoutingPolicy: SingleClusterRouting, |
| } |
| |
| createdProfile, err := iAdminClient.CreateAppProfile(ctx, profile) |
| if err != nil { |
| t.Fatalf("Creating app profile: %v", err) |
| |
| } |
| |
| gotProfile, err := iAdminClient.GetAppProfile(ctx, adminClient.instance, "app_profile1") |
| |
| if err != nil { |
| t.Fatalf("Get app profile: %v", err) |
| } |
| |
| if !proto.Equal(createdProfile, gotProfile) { |
| t.Fatalf("created profile: %s, got profile: %s", createdProfile.Name, gotProfile.Name) |
| |
| } |
| |
| list := func(instanceID string) ([]*btapb.AppProfile, error) { |
| profiles := []*btapb.AppProfile(nil) |
| |
| it := iAdminClient.ListAppProfiles(ctx, instanceID) |
| for { |
| s, err := it.Next() |
| if err == iterator.Done { |
| break |
| } |
| if err != nil { |
| return nil, err |
| } |
| profiles = append(profiles, s) |
| } |
| return profiles, err |
| } |
| |
| profiles, err := list(adminClient.instance) |
| if err != nil { |
| t.Fatalf("List app profile: %v", err) |
| } |
| |
| if got, want := len(profiles), 1; got != want { |
| t.Fatalf("Initial app profile list len: %d, want: %d", got, want) |
| } |
| |
| for _, test := range []struct { |
| desc string |
| uattrs ProfileAttrsToUpdate |
| want *btapb.AppProfile // nil means error |
| }{ |
| { |
| desc: "empty update", |
| uattrs: ProfileAttrsToUpdate{}, |
| want: nil, |
| }, |
| |
| { |
| desc: "empty description update", |
| uattrs: ProfileAttrsToUpdate{Description: ""}, |
| want: &btapb.AppProfile{ |
| Name: gotProfile.Name, |
| Description: "", |
| RoutingPolicy: gotProfile.RoutingPolicy, |
| Etag: gotProfile.Etag}, |
| }, |
| { |
| desc: "routing update", |
| uattrs: ProfileAttrsToUpdate{ |
| RoutingPolicy: SingleClusterRouting, |
| ClusterID: testEnv.Config().Cluster, |
| }, |
| want: &btapb.AppProfile{ |
| Name: gotProfile.Name, |
| Description: "", |
| Etag: gotProfile.Etag, |
| RoutingPolicy: &btapb.AppProfile_SingleClusterRouting_{ |
| SingleClusterRouting: &btapb.AppProfile_SingleClusterRouting{ |
| ClusterId: testEnv.Config().Cluster, |
| }}, |
| }, |
| }, |
| } { |
| err = iAdminClient.UpdateAppProfile(ctx, adminClient.instance, "app_profile1", test.uattrs) |
| if err != nil { |
| if test.want != nil { |
| t.Errorf("%s: %v", test.desc, err) |
| } |
| continue |
| } |
| if err == nil && test.want == nil { |
| t.Errorf("%s: got nil, want error", test.desc) |
| continue |
| } |
| |
| got, _ := iAdminClient.GetAppProfile(ctx, adminClient.instance, "app_profile1") |
| |
| if !proto.Equal(got, test.want) { |
| t.Fatalf("%s : got profile : %v, want profile: %v", test.desc, gotProfile, test.want) |
| } |
| |
| } |
| |
| err = iAdminClient.DeleteAppProfile(ctx, adminClient.instance, "app_profile1") |
| if err != nil { |
| t.Fatalf("Delete app profile: %v", err) |
| } |
| |
| } |
| |
| func TestIntegration_InstanceUpdate(t *testing.T) { |
| testEnv, err := NewIntegrationEnv() |
| if err != nil { |
| t.Fatalf("IntegrationEnv: %v", err) |
| } |
| defer testEnv.Close() |
| |
| timeout := 2 * time.Second |
| if testEnv.Config().UseProd { |
| timeout = 5 * time.Minute |
| } |
| ctx, cancel := context.WithTimeout(context.Background(), timeout) |
| defer cancel() |
| |
| adminClient, err := testEnv.NewAdminClient() |
| if err != nil { |
| t.Fatalf("NewAdminClient: %v", err) |
| } |
| |
| defer adminClient.Close() |
| |
| iAdminClient, err := testEnv.NewInstanceAdminClient() |
| if err != nil { |
| t.Fatalf("NewInstanceAdminClient: %v", err) |
| } |
| |
| if iAdminClient == nil { |
| return |
| } |
| |
| defer iAdminClient.Close() |
| |
| iInfo, err := iAdminClient.InstanceInfo(ctx, adminClient.instance) |
| if err != nil { |
| t.Errorf("InstanceInfo: %v", err) |
| } |
| |
| if iInfo.Name != adminClient.instance { |
| t.Errorf("InstanceInfo returned name %#v, want %#v", iInfo.Name, adminClient.instance) |
| } |
| |
| if iInfo.DisplayName != adminClient.instance { |
| t.Errorf("InstanceInfo returned name %#v, want %#v", iInfo.Name, adminClient.instance) |
| } |
| |
| const numNodes = 4 |
| // update cluster nodes |
| if err := iAdminClient.UpdateCluster(ctx, adminClient.instance, testEnv.Config().Cluster, int32(numNodes)); err != nil { |
| t.Errorf("UpdateCluster: %v", err) |
| } |
| |
| // get cluster after updating |
| cis, err := iAdminClient.GetCluster(ctx, adminClient.instance, testEnv.Config().Cluster) |
| if err != nil { |
| t.Errorf("GetCluster %v", err) |
| } |
| if cis.ServeNodes != int(numNodes) { |
| t.Errorf("ServeNodes returned %d, want %d", cis.ServeNodes, int(numNodes)) |
| } |
| } |
| |
| func setupIntegration(ctx context.Context, t *testing.T) (_ *Client, _ *AdminClient, table *Table, tableName string, cleanup func(), _ error) { |
| testEnv, err := NewIntegrationEnv() |
| if err != nil { |
| return nil, nil, nil, "", nil, err |
| } |
| |
| var timeout time.Duration |
| if testEnv.Config().UseProd { |
| timeout = 10 * time.Minute |
| t.Logf("Running test against production") |
| } else { |
| timeout = 1 * time.Minute |
| t.Logf("bttest.Server running on %s", testEnv.Config().AdminEndpoint) |
| } |
| ctx, cancel := context.WithTimeout(ctx, timeout) |
| defer cancel() |
| |
| client, err := testEnv.NewClient() |
| if err != nil { |
| return nil, nil, nil, "", nil, err |
| } |
| |
| adminClient, err := testEnv.NewAdminClient() |
| if err != nil { |
| return nil, nil, nil, "", nil, err |
| } |
| |
| tableName = testEnv.Config().Table |
| if err := adminClient.CreateTable(ctx, tableName); err != nil { |
| return nil, nil, nil, "", nil, err |
| } |
| if err := adminClient.CreateColumnFamily(ctx, tableName, "follows"); err != nil { |
| return nil, nil, nil, "", nil, err |
| } |
| |
| return client, adminClient, client.Open(tableName), tableName, func() { |
| adminClient.DeleteTable(ctx, tableName) |
| client.Close() |
| adminClient.Close() |
| }, nil |
| } |
| |
| func formatReadItem(ri ReadItem) string { |
| // Use the column qualifier only to make the test data briefer. |
| col := ri.Column[strings.Index(ri.Column, ":")+1:] |
| return fmt.Sprintf("%s-%s-%s", ri.Row, col, ri.Value) |
| } |
| |
| func fill(b, sub []byte) { |
| for len(b) > len(sub) { |
| n := copy(b, sub) |
| b = b[n:] |
| } |
| } |
| |
| func clearTimestamps(r Row) { |
| for _, ris := range r { |
| for i := range ris { |
| ris[i].Timestamp = 0 |
| } |
| } |
| } |