| // Copyright 2016 Google LLC |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package bttest |
| |
| import ( |
| "bytes" |
| "context" |
| "fmt" |
| "math/rand" |
| "sort" |
| "strconv" |
| "sync" |
| "sync/atomic" |
| "testing" |
| "time" |
| |
| "cloud.google.com/go/internal/testutil" |
| "github.com/golang/protobuf/proto" |
| "github.com/golang/protobuf/ptypes/wrappers" |
| "github.com/google/go-cmp/cmp" |
| btapb "google.golang.org/genproto/googleapis/bigtable/admin/v2" |
| btpb "google.golang.org/genproto/googleapis/bigtable/v2" |
| "google.golang.org/grpc" |
| ) |
| |
| func TestConcurrentMutationsReadModifyAndGC(t *testing.T) { |
| s := &server{ |
| tables: make(map[string]*table), |
| } |
| ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) |
| defer cancel() |
| if _, err := s.CreateTable( |
| ctx, |
| &btapb.CreateTableRequest{Parent: "cluster", TableId: "t"}); err != nil { |
| t.Fatal(err) |
| } |
| const name = `cluster/tables/t` |
| tbl := s.tables[name] |
| req := &btapb.ModifyColumnFamiliesRequest{ |
| Name: name, |
| Modifications: []*btapb.ModifyColumnFamiliesRequest_Modification{{ |
| Id: "cf", |
| Mod: &btapb.ModifyColumnFamiliesRequest_Modification_Create{Create: &btapb.ColumnFamily{}}, |
| }}, |
| } |
| _, err := s.ModifyColumnFamilies(ctx, req) |
| if err != nil { |
| t.Fatal(err) |
| } |
| req = &btapb.ModifyColumnFamiliesRequest{ |
| Name: name, |
| Modifications: []*btapb.ModifyColumnFamiliesRequest_Modification{{ |
| Id: "cf", |
| Mod: &btapb.ModifyColumnFamiliesRequest_Modification_Update{Update: &btapb.ColumnFamily{ |
| GcRule: &btapb.GcRule{Rule: &btapb.GcRule_MaxNumVersions{MaxNumVersions: 1}}, |
| }}, |
| }}, |
| } |
| if _, err := s.ModifyColumnFamilies(ctx, req); err != nil { |
| t.Fatal(err) |
| } |
| |
| var wg sync.WaitGroup |
| var ts int64 |
| ms := func() []*btpb.Mutation { |
| return []*btpb.Mutation{{ |
| Mutation: &btpb.Mutation_SetCell_{SetCell: &btpb.Mutation_SetCell{ |
| FamilyName: "cf", |
| ColumnQualifier: []byte(`col`), |
| TimestampMicros: atomic.AddInt64(&ts, 1000), |
| }}, |
| }} |
| } |
| |
| rmw := func() *btpb.ReadModifyWriteRowRequest { |
| return &btpb.ReadModifyWriteRowRequest{ |
| TableName: name, |
| RowKey: []byte(fmt.Sprint(rand.Intn(100))), |
| Rules: []*btpb.ReadModifyWriteRule{{ |
| FamilyName: "cf", |
| ColumnQualifier: []byte("col"), |
| Rule: &btpb.ReadModifyWriteRule_IncrementAmount{IncrementAmount: 1}, |
| }}, |
| } |
| } |
| for i := 0; i < 100; i++ { |
| wg.Add(1) |
| go func() { |
| defer wg.Done() |
| for ctx.Err() == nil { |
| req := &btpb.MutateRowRequest{ |
| TableName: name, |
| RowKey: []byte(fmt.Sprint(rand.Intn(100))), |
| Mutations: ms(), |
| } |
| if _, err := s.MutateRow(ctx, req); err != nil { |
| panic(err) // can't use t.Fatal in goroutine |
| } |
| } |
| }() |
| wg.Add(1) |
| go func() { |
| defer wg.Done() |
| for ctx.Err() == nil { |
| _, _ = s.ReadModifyWriteRow(ctx, rmw()) |
| } |
| }() |
| |
| wg.Add(1) |
| go func() { |
| defer wg.Done() |
| tbl.gc() |
| }() |
| } |
| done := make(chan struct{}) |
| go func() { |
| wg.Wait() |
| close(done) |
| }() |
| select { |
| case <-done: |
| case <-time.After(1 * time.Second): |
| t.Error("Concurrent mutations and GCs haven't completed after 1s") |
| } |
| } |
| |
| func TestCreateTableResponse(t *testing.T) { |
| // We need to ensure that invoking CreateTable returns |
| // the ColumnFamilies as well as Granularity. |
| // See issue https://github.com/googleapis/google-cloud-go/issues/1512. |
| s := &server{ |
| tables: make(map[string]*table), |
| } |
| ctx := context.Background() |
| got, err := s.CreateTable(ctx, &btapb.CreateTableRequest{ |
| Parent: "projects/issue-1512/instances/instance", |
| TableId: "table", |
| Table: &btapb.Table{ |
| ColumnFamilies: map[string]*btapb.ColumnFamily{ |
| "cf1": {GcRule: &btapb.GcRule{Rule: &btapb.GcRule_MaxNumVersions{MaxNumVersions: 123}}}, |
| "cf2": {GcRule: &btapb.GcRule{Rule: &btapb.GcRule_MaxNumVersions{MaxNumVersions: 456}}}, |
| }, |
| }, |
| }) |
| if err != nil { |
| t.Fatalf("Creating table: %v", err) |
| } |
| |
| want := &btapb.Table{ |
| Name: "projects/issue-1512/instances/instance/tables/table", |
| // If no Granularity was specified, we should get back "MILLIS". |
| Granularity: btapb.Table_MILLIS, |
| ColumnFamilies: map[string]*btapb.ColumnFamily{ |
| "cf1": {GcRule: &btapb.GcRule{Rule: &btapb.GcRule_MaxNumVersions{MaxNumVersions: 123}}}, |
| "cf2": {GcRule: &btapb.GcRule{Rule: &btapb.GcRule_MaxNumVersions{MaxNumVersions: 456}}}, |
| }, |
| } |
| if diff := testutil.Diff(got, want); diff != "" { |
| t.Fatalf("Response mismatch: got - want +\n%s", diff) |
| } |
| } |
| |
| func TestCreateTableWithFamily(t *testing.T) { |
| // The Go client currently doesn't support creating a table with column families |
| // in one operation but it is allowed by the API. This must still be supported by the |
| // fake server so this test lives here instead of in the main bigtable |
| // integration test. |
| s := &server{ |
| tables: make(map[string]*table), |
| } |
| ctx := context.Background() |
| newTbl := btapb.Table{ |
| ColumnFamilies: map[string]*btapb.ColumnFamily{ |
| "cf1": {GcRule: &btapb.GcRule{Rule: &btapb.GcRule_MaxNumVersions{MaxNumVersions: 123}}}, |
| "cf2": {GcRule: &btapb.GcRule{Rule: &btapb.GcRule_MaxNumVersions{MaxNumVersions: 456}}}, |
| }, |
| } |
| cTbl, err := s.CreateTable(ctx, &btapb.CreateTableRequest{Parent: "cluster", TableId: "t", Table: &newTbl}) |
| if err != nil { |
| t.Fatalf("Creating table: %v", err) |
| } |
| tbl, err := s.GetTable(ctx, &btapb.GetTableRequest{Name: cTbl.Name}) |
| if err != nil { |
| t.Fatalf("Getting table: %v", err) |
| } |
| cf := tbl.ColumnFamilies["cf1"] |
| if cf == nil { |
| t.Fatalf("Missing col family cf1") |
| } |
| if got, want := cf.GcRule.GetMaxNumVersions(), int32(123); got != want { |
| t.Errorf("Invalid MaxNumVersions: wanted:%d, got:%d", want, got) |
| } |
| cf = tbl.ColumnFamilies["cf2"] |
| if cf == nil { |
| t.Fatalf("Missing col family cf2") |
| } |
| if got, want := cf.GcRule.GetMaxNumVersions(), int32(456); got != want { |
| t.Errorf("Invalid MaxNumVersions: wanted:%d, got:%d", want, got) |
| } |
| } |
| |
| type MockSampleRowKeysServer struct { |
| responses []*btpb.SampleRowKeysResponse |
| grpc.ServerStream |
| } |
| |
| func (s *MockSampleRowKeysServer) Send(resp *btpb.SampleRowKeysResponse) error { |
| s.responses = append(s.responses, resp) |
| return nil |
| } |
| |
| func TestSampleRowKeys(t *testing.T) { |
| s := &server{ |
| tables: make(map[string]*table), |
| } |
| ctx := context.Background() |
| newTbl := btapb.Table{ |
| ColumnFamilies: map[string]*btapb.ColumnFamily{ |
| "cf": {GcRule: &btapb.GcRule{Rule: &btapb.GcRule_MaxNumVersions{MaxNumVersions: 1}}}, |
| }, |
| } |
| tbl, err := s.CreateTable(ctx, &btapb.CreateTableRequest{Parent: "cluster", TableId: "t", Table: &newTbl}) |
| if err != nil { |
| t.Fatalf("Creating table: %v", err) |
| } |
| |
| // Populate the table |
| val := []byte("value") |
| rowCount := 1000 |
| for i := 0; i < rowCount; i++ { |
| req := &btpb.MutateRowRequest{ |
| TableName: tbl.Name, |
| RowKey: []byte("row-" + strconv.Itoa(i)), |
| Mutations: []*btpb.Mutation{{ |
| Mutation: &btpb.Mutation_SetCell_{SetCell: &btpb.Mutation_SetCell{ |
| FamilyName: "cf", |
| ColumnQualifier: []byte("col"), |
| TimestampMicros: 1000, |
| Value: val, |
| }}, |
| }}, |
| } |
| if _, err := s.MutateRow(ctx, req); err != nil { |
| t.Fatalf("Populating table: %v", err) |
| } |
| } |
| |
| mock := &MockSampleRowKeysServer{} |
| if err := s.SampleRowKeys(&btpb.SampleRowKeysRequest{TableName: tbl.Name}, mock); err != nil { |
| t.Errorf("SampleRowKeys error: %v", err) |
| } |
| if len(mock.responses) == 0 { |
| t.Fatal("Response count: got 0, want > 0") |
| } |
| // Make sure the offset of the final response is the offset of the final row |
| got := mock.responses[len(mock.responses)-1].OffsetBytes |
| want := int64((rowCount - 1) * len(val)) |
| if got != want { |
| t.Errorf("Invalid offset: got %d, want %d", got, want) |
| } |
| } |
| |
| func TestDropRowRange(t *testing.T) { |
| s := &server{ |
| tables: make(map[string]*table), |
| } |
| ctx := context.Background() |
| newTbl := btapb.Table{ |
| ColumnFamilies: map[string]*btapb.ColumnFamily{ |
| "cf": {GcRule: &btapb.GcRule{Rule: &btapb.GcRule_MaxNumVersions{MaxNumVersions: 1}}}, |
| }, |
| } |
| tblInfo, err := s.CreateTable(ctx, &btapb.CreateTableRequest{Parent: "cluster", TableId: "t", Table: &newTbl}) |
| if err != nil { |
| t.Fatalf("Creating table: %v", err) |
| } |
| |
| tbl := s.tables[tblInfo.Name] |
| |
| // Populate the table |
| prefixes := []string{"AAA", "BBB", "CCC", "DDD"} |
| count := 3 |
| doWrite := func() { |
| for _, prefix := range prefixes { |
| for i := 0; i < count; i++ { |
| req := &btpb.MutateRowRequest{ |
| TableName: tblInfo.Name, |
| RowKey: []byte(prefix + strconv.Itoa(i)), |
| Mutations: []*btpb.Mutation{{ |
| Mutation: &btpb.Mutation_SetCell_{SetCell: &btpb.Mutation_SetCell{ |
| FamilyName: "cf", |
| ColumnQualifier: []byte("col"), |
| TimestampMicros: 1000, |
| Value: []byte{}, |
| }}, |
| }}, |
| } |
| if _, err := s.MutateRow(ctx, req); err != nil { |
| t.Fatalf("Populating table: %v", err) |
| } |
| } |
| } |
| } |
| |
| doWrite() |
| tblSize := tbl.rows.Len() |
| req := &btapb.DropRowRangeRequest{ |
| Name: tblInfo.Name, |
| Target: &btapb.DropRowRangeRequest_RowKeyPrefix{RowKeyPrefix: []byte("AAA")}, |
| } |
| if _, err = s.DropRowRange(ctx, req); err != nil { |
| t.Fatalf("Dropping first range: %v", err) |
| } |
| got, want := tbl.rows.Len(), tblSize-count |
| if got != want { |
| t.Errorf("Row count after first drop: got %d (%v), want %d", got, tbl.rows, want) |
| } |
| |
| req = &btapb.DropRowRangeRequest{ |
| Name: tblInfo.Name, |
| Target: &btapb.DropRowRangeRequest_RowKeyPrefix{RowKeyPrefix: []byte("DDD")}, |
| } |
| if _, err = s.DropRowRange(ctx, req); err != nil { |
| t.Fatalf("Dropping second range: %v", err) |
| } |
| got, want = tbl.rows.Len(), tblSize-(2*count) |
| if got != want { |
| t.Errorf("Row count after second drop: got %d (%v), want %d", got, tbl.rows, want) |
| } |
| |
| req = &btapb.DropRowRangeRequest{ |
| Name: tblInfo.Name, |
| Target: &btapb.DropRowRangeRequest_RowKeyPrefix{RowKeyPrefix: []byte("XXX")}, |
| } |
| if _, err = s.DropRowRange(ctx, req); err != nil { |
| t.Fatalf("Dropping invalid range: %v", err) |
| } |
| got, want = tbl.rows.Len(), tblSize-(2*count) |
| if got != want { |
| t.Errorf("Row count after invalid drop: got %d (%v), want %d", got, tbl.rows, want) |
| } |
| |
| req = &btapb.DropRowRangeRequest{ |
| Name: tblInfo.Name, |
| Target: &btapb.DropRowRangeRequest_DeleteAllDataFromTable{DeleteAllDataFromTable: true}, |
| } |
| if _, err = s.DropRowRange(ctx, req); err != nil { |
| t.Fatalf("Dropping all data: %v", err) |
| } |
| got, want = tbl.rows.Len(), 0 |
| if got != want { |
| t.Errorf("Row count after drop all: got %d, want %d", got, want) |
| } |
| |
| // Test that we can write rows, delete some and then write them again. |
| count = 1 |
| doWrite() |
| |
| req = &btapb.DropRowRangeRequest{ |
| Name: tblInfo.Name, |
| Target: &btapb.DropRowRangeRequest_DeleteAllDataFromTable{DeleteAllDataFromTable: true}, |
| } |
| if _, err = s.DropRowRange(ctx, req); err != nil { |
| t.Fatalf("Dropping all data: %v", err) |
| } |
| got, want = tbl.rows.Len(), 0 |
| if got != want { |
| t.Errorf("Row count after drop all: got %d, want %d", got, want) |
| } |
| |
| doWrite() |
| got, want = tbl.rows.Len(), len(prefixes) |
| if got != want { |
| t.Errorf("Row count after rewrite: got %d, want %d", got, want) |
| } |
| |
| req = &btapb.DropRowRangeRequest{ |
| Name: tblInfo.Name, |
| Target: &btapb.DropRowRangeRequest_RowKeyPrefix{RowKeyPrefix: []byte("BBB")}, |
| } |
| if _, err = s.DropRowRange(ctx, req); err != nil { |
| t.Fatalf("Dropping range: %v", err) |
| } |
| doWrite() |
| got, want = tbl.rows.Len(), len(prefixes) |
| if got != want { |
| t.Errorf("Row count after drop range: got %d, want %d", got, want) |
| } |
| } |
| |
| type MockReadRowsServer struct { |
| responses []*btpb.ReadRowsResponse |
| grpc.ServerStream |
| } |
| |
| func (s *MockReadRowsServer) Send(resp *btpb.ReadRowsResponse) error { |
| s.responses = append(s.responses, resp) |
| return nil |
| } |
| |
| func TestReadRows(t *testing.T) { |
| ctx := context.Background() |
| s := &server{ |
| tables: make(map[string]*table), |
| } |
| newTbl := btapb.Table{ |
| ColumnFamilies: map[string]*btapb.ColumnFamily{ |
| "cf0": {GcRule: &btapb.GcRule{Rule: &btapb.GcRule_MaxNumVersions{MaxNumVersions: 1}}}, |
| }, |
| } |
| tblInfo, err := s.CreateTable(ctx, &btapb.CreateTableRequest{Parent: "cluster", TableId: "t", Table: &newTbl}) |
| if err != nil { |
| t.Fatalf("Creating table: %v", err) |
| } |
| mreq := &btpb.MutateRowRequest{ |
| TableName: tblInfo.Name, |
| RowKey: []byte("row"), |
| Mutations: []*btpb.Mutation{{ |
| Mutation: &btpb.Mutation_SetCell_{SetCell: &btpb.Mutation_SetCell{ |
| FamilyName: "cf0", |
| ColumnQualifier: []byte("col"), |
| TimestampMicros: 1000, |
| Value: []byte{}, |
| }}, |
| }}, |
| } |
| if _, err := s.MutateRow(ctx, mreq); err != nil { |
| t.Fatalf("Populating table: %v", err) |
| } |
| |
| for _, rowset := range []*btpb.RowSet{ |
| {RowKeys: [][]byte{[]byte("row")}}, |
| {RowRanges: []*btpb.RowRange{{StartKey: &btpb.RowRange_StartKeyClosed{StartKeyClosed: []byte("")}}}}, |
| {RowRanges: []*btpb.RowRange{{StartKey: &btpb.RowRange_StartKeyClosed{StartKeyClosed: []byte("r")}}}}, |
| {RowRanges: []*btpb.RowRange{{ |
| StartKey: &btpb.RowRange_StartKeyClosed{StartKeyClosed: []byte("")}, |
| EndKey: &btpb.RowRange_EndKeyOpen{EndKeyOpen: []byte("s")}, |
| }}}, |
| } { |
| mock := &MockReadRowsServer{} |
| req := &btpb.ReadRowsRequest{TableName: tblInfo.Name, Rows: rowset} |
| if err = s.ReadRows(req, mock); err != nil { |
| t.Fatalf("ReadRows error: %v", err) |
| } |
| if got, want := len(mock.responses), 1; got != want { |
| t.Errorf("%+v: response count: got %d, want %d", rowset, got, want) |
| } |
| } |
| } |
| |
| func TestReadRowsError(t *testing.T) { |
| ctx := context.Background() |
| s := &server{ |
| tables: make(map[string]*table), |
| } |
| newTbl := btapb.Table{ |
| ColumnFamilies: map[string]*btapb.ColumnFamily{ |
| "cf0": {GcRule: &btapb.GcRule{Rule: &btapb.GcRule_MaxNumVersions{MaxNumVersions: 1}}}, |
| }, |
| } |
| tblInfo, err := s.CreateTable(ctx, &btapb.CreateTableRequest{Parent: "cluster", TableId: "t", Table: &newTbl}) |
| if err != nil { |
| t.Fatalf("Creating table: %v", err) |
| } |
| mreq := &btpb.MutateRowRequest{ |
| TableName: tblInfo.Name, |
| RowKey: []byte("row"), |
| Mutations: []*btpb.Mutation{{ |
| Mutation: &btpb.Mutation_SetCell_{SetCell: &btpb.Mutation_SetCell{ |
| FamilyName: "cf0", |
| ColumnQualifier: []byte("col"), |
| TimestampMicros: 1000, |
| Value: []byte{}, |
| }}, |
| }}, |
| } |
| if _, err := s.MutateRow(ctx, mreq); err != nil { |
| t.Fatalf("Populating table: %v", err) |
| } |
| |
| mock := &MockReadRowsServer{} |
| req := &btpb.ReadRowsRequest{TableName: tblInfo.Name, Filter: &btpb.RowFilter{ |
| Filter: &btpb.RowFilter_RowKeyRegexFilter{RowKeyRegexFilter: []byte("[")}}, // Invalid regex. |
| } |
| if err = s.ReadRows(req, mock); err == nil { |
| t.Fatal("ReadRows got no error, want error") |
| } |
| } |
| |
| func TestReadRowsAfterDeletion(t *testing.T) { |
| ctx := context.Background() |
| s := &server{ |
| tables: make(map[string]*table), |
| } |
| newTbl := btapb.Table{ |
| ColumnFamilies: map[string]*btapb.ColumnFamily{ |
| "cf0": {}, |
| }, |
| } |
| tblInfo, err := s.CreateTable(ctx, &btapb.CreateTableRequest{ |
| Parent: "cluster", TableId: "t", Table: &newTbl, |
| }) |
| if err != nil { |
| t.Fatalf("Creating table: %v", err) |
| } |
| populateTable(ctx, s) |
| dreq := &btpb.MutateRowRequest{ |
| TableName: tblInfo.Name, |
| RowKey: []byte("row"), |
| Mutations: []*btpb.Mutation{{ |
| Mutation: &btpb.Mutation_DeleteFromRow_{ |
| DeleteFromRow: &btpb.Mutation_DeleteFromRow{}, |
| }, |
| }}, |
| } |
| if _, err := s.MutateRow(ctx, dreq); err != nil { |
| t.Fatalf("Deleting from table: %v", err) |
| } |
| |
| mock := &MockReadRowsServer{} |
| req := &btpb.ReadRowsRequest{TableName: tblInfo.Name} |
| if err = s.ReadRows(req, mock); err != nil { |
| t.Fatalf("ReadRows error: %v", err) |
| } |
| if got, want := len(mock.responses), 0; got != want { |
| t.Errorf("response count: got %d, want %d", got, want) |
| } |
| } |
| |
| func TestReadRowsOrder(t *testing.T) { |
| s := &server{ |
| tables: make(map[string]*table), |
| } |
| ctx := context.Background() |
| newTbl := btapb.Table{ |
| ColumnFamilies: map[string]*btapb.ColumnFamily{ |
| "cf0": {GcRule: &btapb.GcRule{Rule: &btapb.GcRule_MaxNumVersions{MaxNumVersions: 1}}}, |
| }, |
| } |
| tblInfo, err := s.CreateTable(ctx, &btapb.CreateTableRequest{Parent: "cluster", TableId: "t", Table: &newTbl}) |
| if err != nil { |
| t.Fatalf("Creating table: %v", err) |
| } |
| count := 3 |
| mcf := func(i int) *btapb.ModifyColumnFamiliesRequest { |
| return &btapb.ModifyColumnFamiliesRequest{ |
| Name: tblInfo.Name, |
| Modifications: []*btapb.ModifyColumnFamiliesRequest_Modification{{ |
| Id: "cf" + strconv.Itoa(i), |
| Mod: &btapb.ModifyColumnFamiliesRequest_Modification_Create{Create: &btapb.ColumnFamily{}}, |
| }}, |
| } |
| } |
| for i := 1; i <= count; i++ { |
| _, err = s.ModifyColumnFamilies(ctx, mcf(i)) |
| if err != nil { |
| t.Fatal(err) |
| } |
| } |
| // Populate the table |
| for fc := 0; fc < count; fc++ { |
| for cc := count; cc > 0; cc-- { |
| for tc := 0; tc < count; tc++ { |
| req := &btpb.MutateRowRequest{ |
| TableName: tblInfo.Name, |
| RowKey: []byte("row"), |
| Mutations: []*btpb.Mutation{{ |
| Mutation: &btpb.Mutation_SetCell_{SetCell: &btpb.Mutation_SetCell{ |
| FamilyName: "cf" + strconv.Itoa(fc), |
| ColumnQualifier: []byte("col" + strconv.Itoa(cc)), |
| TimestampMicros: int64((tc + 1) * 1000), |
| Value: []byte{}, |
| }}, |
| }}, |
| } |
| if _, err := s.MutateRow(ctx, req); err != nil { |
| t.Fatalf("Populating table: %v", err) |
| } |
| } |
| } |
| } |
| req := &btpb.ReadRowsRequest{ |
| TableName: tblInfo.Name, |
| Rows: &btpb.RowSet{RowKeys: [][]byte{[]byte("row")}}, |
| } |
| mock := &MockReadRowsServer{} |
| if err = s.ReadRows(req, mock); err != nil { |
| t.Errorf("ReadRows error: %v", err) |
| } |
| if len(mock.responses) == 0 { |
| t.Fatal("Response count: got 0, want > 0") |
| } |
| if len(mock.responses[0].Chunks) != 27 { |
| t.Fatalf("Chunk count: got %d, want 27", len(mock.responses[0].Chunks)) |
| } |
| testOrder := func(ms *MockReadRowsServer) { |
| var prevFam, prevCol string |
| var prevTime int64 |
| for _, cc := range ms.responses[0].Chunks { |
| if prevFam == "" { |
| prevFam = cc.FamilyName.Value |
| prevCol = string(cc.Qualifier.Value) |
| prevTime = cc.TimestampMicros |
| continue |
| } |
| if cc.FamilyName.Value < prevFam { |
| t.Errorf("Family order is not correct: got %s < %s", cc.FamilyName.Value, prevFam) |
| } else if cc.FamilyName.Value == prevFam { |
| if string(cc.Qualifier.Value) < prevCol { |
| t.Errorf("Column order is not correct: got %s < %s", string(cc.Qualifier.Value), prevCol) |
| } else if string(cc.Qualifier.Value) == prevCol { |
| if cc.TimestampMicros > prevTime { |
| t.Errorf("cell order is not correct: got %d > %d", cc.TimestampMicros, prevTime) |
| } |
| } |
| } |
| prevFam = cc.FamilyName.Value |
| prevCol = string(cc.Qualifier.Value) |
| prevTime = cc.TimestampMicros |
| } |
| } |
| testOrder(mock) |
| |
| // Read with interleave filter |
| inter := &btpb.RowFilter_Interleave{} |
| fnr := &btpb.RowFilter{Filter: &btpb.RowFilter_FamilyNameRegexFilter{FamilyNameRegexFilter: "cf1"}} |
| cqr := &btpb.RowFilter{Filter: &btpb.RowFilter_ColumnQualifierRegexFilter{ColumnQualifierRegexFilter: []byte("col2")}} |
| inter.Filters = append(inter.Filters, fnr, cqr) |
| req = &btpb.ReadRowsRequest{ |
| TableName: tblInfo.Name, |
| Rows: &btpb.RowSet{RowKeys: [][]byte{[]byte("row")}}, |
| Filter: &btpb.RowFilter{ |
| Filter: &btpb.RowFilter_Interleave_{Interleave: inter}, |
| }, |
| } |
| |
| mock = &MockReadRowsServer{} |
| if err = s.ReadRows(req, mock); err != nil { |
| t.Errorf("ReadRows error: %v", err) |
| } |
| if len(mock.responses) == 0 { |
| t.Fatal("Response count: got 0, want > 0") |
| } |
| if len(mock.responses[0].Chunks) != 18 { |
| t.Fatalf("Chunk count: got %d, want 18", len(mock.responses[0].Chunks)) |
| } |
| testOrder(mock) |
| |
| // Check order after ReadModifyWriteRow |
| rmw := func(i int) *btpb.ReadModifyWriteRowRequest { |
| return &btpb.ReadModifyWriteRowRequest{ |
| TableName: tblInfo.Name, |
| RowKey: []byte("row"), |
| Rules: []*btpb.ReadModifyWriteRule{{ |
| FamilyName: "cf3", |
| ColumnQualifier: []byte("col" + strconv.Itoa(i)), |
| Rule: &btpb.ReadModifyWriteRule_IncrementAmount{IncrementAmount: 1}, |
| }}, |
| } |
| } |
| for i := count; i > 0; i-- { |
| if _, err := s.ReadModifyWriteRow(ctx, rmw(i)); err != nil { |
| t.Fatal(err) |
| } |
| } |
| req = &btpb.ReadRowsRequest{ |
| TableName: tblInfo.Name, |
| Rows: &btpb.RowSet{RowKeys: [][]byte{[]byte("row")}}, |
| } |
| mock = &MockReadRowsServer{} |
| if err = s.ReadRows(req, mock); err != nil { |
| t.Errorf("ReadRows error: %v", err) |
| } |
| if len(mock.responses) == 0 { |
| t.Fatal("Response count: got 0, want > 0") |
| } |
| if len(mock.responses[0].Chunks) != 30 { |
| t.Fatalf("Chunk count: got %d, want 30", len(mock.responses[0].Chunks)) |
| } |
| testOrder(mock) |
| } |
| |
| func TestReadRowsWithlabelTransformer(t *testing.T) { |
| ctx := context.Background() |
| s := &server{ |
| tables: make(map[string]*table), |
| } |
| newTbl := btapb.Table{ |
| ColumnFamilies: map[string]*btapb.ColumnFamily{ |
| "cf0": {GcRule: &btapb.GcRule{Rule: &btapb.GcRule_MaxNumVersions{MaxNumVersions: 1}}}, |
| }, |
| } |
| tblInfo, err := s.CreateTable(ctx, &btapb.CreateTableRequest{Parent: "cluster", TableId: "t", Table: &newTbl}) |
| if err != nil { |
| t.Fatalf("Creating table: %v", err) |
| } |
| mreq := &btpb.MutateRowRequest{ |
| TableName: tblInfo.Name, |
| RowKey: []byte("row"), |
| Mutations: []*btpb.Mutation{{ |
| Mutation: &btpb.Mutation_SetCell_{SetCell: &btpb.Mutation_SetCell{ |
| FamilyName: "cf0", |
| ColumnQualifier: []byte("col"), |
| TimestampMicros: 1000, |
| Value: []byte{}, |
| }}, |
| }}, |
| } |
| if _, err := s.MutateRow(ctx, mreq); err != nil { |
| t.Fatalf("Populating table: %v", err) |
| } |
| |
| mock := &MockReadRowsServer{} |
| req := &btpb.ReadRowsRequest{ |
| TableName: tblInfo.Name, |
| Filter: &btpb.RowFilter{ |
| Filter: &btpb.RowFilter_ApplyLabelTransformer{ |
| ApplyLabelTransformer: "label", |
| }, |
| }, |
| } |
| if err = s.ReadRows(req, mock); err != nil { |
| t.Fatalf("ReadRows error: %v", err) |
| } |
| |
| if got, want := len(mock.responses), 1; got != want { |
| t.Fatalf("response count: got %d, want %d", got, want) |
| } |
| resp := mock.responses[0] |
| if got, want := len(resp.Chunks), 1; got != want { |
| t.Fatalf("chunks count: got %d, want %d", got, want) |
| } |
| chunk := resp.Chunks[0] |
| if got, want := len(chunk.Labels), 1; got != want { |
| t.Fatalf("labels count: got %d, want %d", got, want) |
| } |
| if got, want := chunk.Labels[0], "label"; got != want { |
| t.Fatalf("label: got %s, want %s", got, want) |
| } |
| |
| mock = &MockReadRowsServer{} |
| req = &btpb.ReadRowsRequest{ |
| TableName: tblInfo.Name, |
| Filter: &btpb.RowFilter{ |
| Filter: &btpb.RowFilter_ApplyLabelTransformer{ |
| ApplyLabelTransformer: "", // invalid label |
| }, |
| }, |
| } |
| if err = s.ReadRows(req, mock); err == nil { |
| t.Fatal("ReadRows want invalid label error, got none") |
| } |
| } |
| |
| func TestCheckAndMutateRowWithoutPredicate(t *testing.T) { |
| s := &server{ |
| tables: make(map[string]*table), |
| } |
| ctx := context.Background() |
| newTbl := btapb.Table{ |
| ColumnFamilies: map[string]*btapb.ColumnFamily{ |
| "cf": {GcRule: &btapb.GcRule{Rule: &btapb.GcRule_MaxNumVersions{MaxNumVersions: 1}}}, |
| }, |
| } |
| tbl, err := s.CreateTable(ctx, &btapb.CreateTableRequest{Parent: "cluster", TableId: "t", Table: &newTbl}) |
| if err != nil { |
| t.Fatalf("Creating table: %v", err) |
| } |
| |
| // Populate the table |
| val := []byte("value") |
| mrreq := &btpb.MutateRowRequest{ |
| TableName: tbl.Name, |
| RowKey: []byte("row-present"), |
| Mutations: []*btpb.Mutation{{ |
| Mutation: &btpb.Mutation_SetCell_{SetCell: &btpb.Mutation_SetCell{ |
| FamilyName: "cf", |
| ColumnQualifier: []byte("col"), |
| TimestampMicros: 1000, |
| Value: val, |
| }}, |
| }}, |
| } |
| if _, err := s.MutateRow(ctx, mrreq); err != nil { |
| t.Fatalf("Populating table: %v", err) |
| } |
| |
| req := &btpb.CheckAndMutateRowRequest{ |
| TableName: tbl.Name, |
| RowKey: []byte("row-not-present"), |
| } |
| if res, err := s.CheckAndMutateRow(ctx, req); err != nil { |
| t.Errorf("CheckAndMutateRow error: %v", err) |
| } else if got, want := res.PredicateMatched, false; got != want { |
| t.Errorf("Invalid PredicateMatched value: got %t, want %t", got, want) |
| } |
| |
| req = &btpb.CheckAndMutateRowRequest{ |
| TableName: tbl.Name, |
| RowKey: []byte("row-present"), |
| } |
| if res, err := s.CheckAndMutateRow(ctx, req); err != nil { |
| t.Errorf("CheckAndMutateRow error: %v", err) |
| } else if got, want := res.PredicateMatched, true; got != want { |
| t.Errorf("Invalid PredicateMatched value: got %t, want %t", got, want) |
| } |
| } |
| |
| func TestCheckAndMutateRowWithPredicate(t *testing.T) { |
| ctx := context.Background() |
| srv := &server{tables: make(map[string]*table)} |
| |
| tblReq := &btapb.CreateTableRequest{ |
| Parent: "issue-1435", |
| TableId: "table_id", |
| Table: &btapb.Table{ |
| ColumnFamilies: map[string]*btapb.ColumnFamily{ |
| "cf": {}, |
| "df": {}, |
| "ef": {}, |
| "ff": {}, |
| "zf": {}, |
| }, |
| }, |
| } |
| tbl, err := srv.CreateTable(ctx, tblReq) |
| if err != nil { |
| t.Fatalf("Failed to create the table: %v", err) |
| } |
| |
| entries := []struct { |
| row string |
| value []byte |
| familyName, columnQualifier string |
| }{ |
| {"row1", []byte{0x11}, "cf", "cq"}, |
| {"row2", []byte{0x1a}, "df", "dq"}, |
| {"row3", []byte{'a'}, "ef", "eq"}, |
| {"row4", []byte{'b'}, "ff", "fq"}, |
| } |
| |
| for _, entry := range entries { |
| req := &btpb.MutateRowRequest{ |
| TableName: tbl.Name, |
| RowKey: []byte(entry.row), |
| Mutations: []*btpb.Mutation{{ |
| Mutation: &btpb.Mutation_SetCell_{SetCell: &btpb.Mutation_SetCell{ |
| FamilyName: entry.familyName, |
| ColumnQualifier: []byte(entry.columnQualifier), |
| TimestampMicros: 1000, |
| Value: entry.value, |
| }}, |
| }}, |
| } |
| if _, err := srv.MutateRow(ctx, req); err != nil { |
| t.Fatalf("Failed to insert entry %v into server: %v", entry, err) |
| } |
| } |
| |
| tests := []struct { |
| req *btpb.CheckAndMutateRowRequest |
| wantMatch bool |
| name string |
| |
| // if wantState is nil, that means we don't care to check |
| // what the state of the world is. |
| wantState []*btpb.ReadRowsResponse_CellChunk |
| }{ |
| { |
| req: &btpb.CheckAndMutateRowRequest{ |
| TableName: tbl.Name, |
| PredicateFilter: &btpb.RowFilter{ |
| Filter: &btpb.RowFilter_RowKeyRegexFilter{ |
| RowKeyRegexFilter: []byte("not-one"), |
| }, |
| }, |
| }, |
| name: "no match", |
| }, |
| { |
| req: &btpb.CheckAndMutateRowRequest{ |
| TableName: tbl.Name, |
| RowKey: []byte("row1"), |
| PredicateFilter: &btpb.RowFilter{ |
| Filter: &btpb.RowFilter_RowKeyRegexFilter{ |
| RowKeyRegexFilter: []byte("ro.+"), |
| }, |
| }, |
| }, |
| wantMatch: true, |
| name: "rowkey regex", |
| }, |
| { |
| req: &btpb.CheckAndMutateRowRequest{ |
| TableName: tbl.Name, |
| RowKey: []byte("row1"), |
| PredicateFilter: &btpb.RowFilter{ |
| Filter: &btpb.RowFilter_PassAllFilter{ |
| PassAllFilter: true, |
| }, |
| }, |
| }, |
| wantMatch: true, |
| name: "pass all", |
| }, |
| { |
| req: &btpb.CheckAndMutateRowRequest{ |
| TableName: tbl.Name, |
| RowKey: []byte("row1"), |
| PredicateFilter: &btpb.RowFilter{ |
| Filter: &btpb.RowFilter_BlockAllFilter{ |
| BlockAllFilter: true, |
| }, |
| }, |
| FalseMutations: []*btpb.Mutation{ |
| { |
| Mutation: &btpb.Mutation_SetCell_{ |
| SetCell: &btpb.Mutation_SetCell{ |
| FamilyName: "zf", |
| Value: []byte("foo"), |
| TimestampMicros: 2000, |
| ColumnQualifier: []byte("et"), |
| }, |
| }, |
| }, |
| }, |
| }, |
| name: "BlockAll for row1", |
| wantMatch: false, |
| wantState: []*btpb.ReadRowsResponse_CellChunk{ |
| { |
| RowKey: []byte("row1"), |
| FamilyName: &wrappers.StringValue{ |
| Value: "cf", |
| }, |
| Qualifier: &wrappers.BytesValue{ |
| Value: []byte("cq"), |
| }, |
| TimestampMicros: 1000, |
| Value: []byte{0x11}, |
| }, |
| { |
| RowKey: []byte("row1"), |
| FamilyName: &wrappers.StringValue{ |
| Value: "zf", |
| }, |
| Qualifier: &wrappers.BytesValue{ |
| Value: []byte("et"), |
| }, |
| TimestampMicros: 2000, |
| Value: []byte("foo"), |
| RowStatus: &btpb.ReadRowsResponse_CellChunk_CommitRow{ |
| CommitRow: true, |
| }, |
| }, |
| { |
| RowKey: []byte("row2"), |
| FamilyName: &wrappers.StringValue{ |
| Value: "df", |
| }, |
| Qualifier: &wrappers.BytesValue{ |
| Value: []byte("dq"), |
| }, |
| TimestampMicros: 1000, |
| Value: []byte{0x1a}, |
| RowStatus: &btpb.ReadRowsResponse_CellChunk_CommitRow{ |
| CommitRow: true, |
| }, |
| }, |
| { |
| RowKey: []byte("row3"), |
| FamilyName: &wrappers.StringValue{ |
| Value: "ef", |
| }, |
| Qualifier: &wrappers.BytesValue{ |
| Value: []byte("eq"), |
| }, |
| TimestampMicros: 1000, |
| Value: []byte("a"), |
| RowStatus: &btpb.ReadRowsResponse_CellChunk_CommitRow{ |
| CommitRow: true, |
| }, |
| }, |
| { |
| RowKey: []byte("row4"), |
| FamilyName: &wrappers.StringValue{ |
| Value: "ff", |
| }, |
| Qualifier: &wrappers.BytesValue{ |
| Value: []byte("fq"), |
| }, |
| TimestampMicros: 1000, |
| Value: []byte("b"), |
| RowStatus: &btpb.ReadRowsResponse_CellChunk_CommitRow{ |
| CommitRow: true, |
| }, |
| }, |
| }, |
| }, |
| } |
| |
| for _, tt := range tests { |
| t.Run(tt.name, func(t *testing.T) { |
| res, err := srv.CheckAndMutateRow(ctx, tt.req) |
| if err != nil { |
| t.Fatalf("CheckAndMutateRow error: %v", err) |
| } |
| got, want := res.PredicateMatched, tt.wantMatch |
| if got != want { |
| t.Fatalf("Invalid PredicateMatched value: got %t, want %t\nRequest: %+v", got, want, tt.req) |
| } |
| |
| if tt.wantState == nil { |
| return |
| } |
| |
| rreq := &btpb.ReadRowsRequest{TableName: tbl.Name} |
| mock := &MockReadRowsServer{} |
| if err = srv.ReadRows(rreq, mock); err != nil { |
| t.Fatalf("ReadRows error: %v", err) |
| } |
| |
| // Collect all the cellChunks |
| var gotCellChunks []*btpb.ReadRowsResponse_CellChunk |
| for _, res := range mock.responses { |
| gotCellChunks = append(gotCellChunks, res.Chunks...) |
| } |
| sort.Slice(gotCellChunks, func(i, j int) bool { |
| ci, cj := gotCellChunks[i], gotCellChunks[j] |
| return compareCellChunks(ci, cj) |
| }) |
| wantCellChunks := tt.wantState[0:] |
| sort.Slice(wantCellChunks, func(i, j int) bool { |
| return compareCellChunks(wantCellChunks[i], wantCellChunks[j]) |
| }) |
| |
| // bttest for some reason undeterministically returns: |
| // RowStatus: &bigtable.ReadRowsResponse_CellChunk_CommitRow{CommitRow: true}, |
| // so we'll ignore that field during comparison. |
| scrubRowStatus := func(cs []*btpb.ReadRowsResponse_CellChunk) []*btpb.ReadRowsResponse_CellChunk { |
| for _, c := range cs { |
| c.RowStatus = nil |
| } |
| return cs |
| } |
| diff := cmp.Diff(scrubRowStatus(gotCellChunks), scrubRowStatus(wantCellChunks), cmp.Comparer(proto.Equal)) |
| if diff != "" { |
| t.Fatalf("unexpected response: %s", diff) |
| } |
| }) |
| } |
| } |
| |
| // compareCellChunks is a comparator that is passed |
| // into sort.Slice to stably sort cell chunks. |
| func compareCellChunks(ci, cj *btpb.ReadRowsResponse_CellChunk) bool { |
| if bytes.Compare(ci.RowKey, cj.RowKey) > 0 { |
| return false |
| } |
| if bytes.Compare(ci.Value, cj.Value) > 0 { |
| return false |
| } |
| return ci.FamilyName.GetValue() < cj.FamilyName.GetValue() |
| } |
| |
| func TestServer_ReadModifyWriteRow(t *testing.T) { |
| s := &server{ |
| tables: make(map[string]*table), |
| } |
| |
| ctx := context.Background() |
| newTbl := btapb.Table{ |
| ColumnFamilies: map[string]*btapb.ColumnFamily{ |
| "cf": {GcRule: &btapb.GcRule{Rule: &btapb.GcRule_MaxNumVersions{MaxNumVersions: 1}}}, |
| }, |
| } |
| tbl, err := s.CreateTable(ctx, &btapb.CreateTableRequest{Parent: "cluster", TableId: "t", Table: &newTbl}) |
| if err != nil { |
| t.Fatalf("Creating table: %v", err) |
| } |
| |
| req := &btpb.ReadModifyWriteRowRequest{ |
| TableName: tbl.Name, |
| RowKey: []byte("row-key"), |
| Rules: []*btpb.ReadModifyWriteRule{ |
| { |
| FamilyName: "cf", |
| ColumnQualifier: []byte("q1"), |
| Rule: &btpb.ReadModifyWriteRule_AppendValue{ |
| AppendValue: []byte("a"), |
| }, |
| }, |
| // multiple ops for same cell |
| { |
| FamilyName: "cf", |
| ColumnQualifier: []byte("q1"), |
| Rule: &btpb.ReadModifyWriteRule_AppendValue{ |
| AppendValue: []byte("b"), |
| }, |
| }, |
| // different cell whose qualifier should sort before the prior rules |
| { |
| FamilyName: "cf", |
| ColumnQualifier: []byte("q0"), |
| Rule: &btpb.ReadModifyWriteRule_IncrementAmount{ |
| IncrementAmount: 1, |
| }, |
| }, |
| }, |
| } |
| |
| got, err := s.ReadModifyWriteRow(ctx, req) |
| |
| if err != nil { |
| t.Fatalf("ReadModifyWriteRow error: %v", err) |
| } |
| |
| want := &btpb.ReadModifyWriteRowResponse{ |
| Row: &btpb.Row{ |
| Key: []byte("row-key"), |
| Families: []*btpb.Family{{ |
| Name: "cf", |
| Columns: []*btpb.Column{ |
| { |
| Qualifier: []byte("q0"), |
| Cells: []*btpb.Cell{{ |
| Value: []byte{0, 0, 0, 0, 0, 0, 0, 1}, |
| }}, |
| }, |
| { |
| Qualifier: []byte("q1"), |
| Cells: []*btpb.Cell{{ |
| Value: []byte("ab"), |
| }}, |
| }, |
| }, |
| }}, |
| }, |
| } |
| |
| scrubTimestamp := func(resp *btpb.ReadModifyWriteRowResponse) *btpb.ReadModifyWriteRowResponse { |
| for _, fam := range resp.GetRow().GetFamilies() { |
| for _, col := range fam.GetColumns() { |
| for _, cell := range col.GetCells() { |
| cell.TimestampMicros = 0 |
| } |
| } |
| } |
| return resp |
| } |
| diff := cmp.Diff(scrubTimestamp(got), scrubTimestamp(want), cmp.Comparer(proto.Equal)) |
| if diff != "" { |
| t.Errorf("unexpected response: %s", diff) |
| } |
| } |
| |
| // helper function to populate table data |
| func populateTable(ctx context.Context, s *server) (*btapb.Table, error) { |
| newTbl := btapb.Table{ |
| ColumnFamilies: map[string]*btapb.ColumnFamily{ |
| "cf0": {GcRule: &btapb.GcRule{Rule: &btapb.GcRule_MaxNumVersions{1}}}, |
| }, |
| } |
| tblInfo, err := s.CreateTable(ctx, &btapb.CreateTableRequest{Parent: "cluster", TableId: "t", Table: &newTbl}) |
| if err != nil { |
| return nil, err |
| } |
| count := 3 |
| mcf := func(i int) *btapb.ModifyColumnFamiliesRequest { |
| return &btapb.ModifyColumnFamiliesRequest{ |
| Name: tblInfo.Name, |
| Modifications: []*btapb.ModifyColumnFamiliesRequest_Modification{{ |
| Id: "cf" + strconv.Itoa(i), |
| Mod: &btapb.ModifyColumnFamiliesRequest_Modification_Create{&btapb.ColumnFamily{}}, |
| }}, |
| } |
| } |
| for i := 1; i <= count; i++ { |
| _, err = s.ModifyColumnFamilies(ctx, mcf(i)) |
| if err != nil { |
| return nil, err |
| } |
| } |
| // Populate the table |
| for fc := 0; fc < count; fc++ { |
| for cc := count; cc > 0; cc-- { |
| for tc := 0; tc < count; tc++ { |
| req := &btpb.MutateRowRequest{ |
| TableName: tblInfo.Name, |
| RowKey: []byte("row"), |
| Mutations: []*btpb.Mutation{{ |
| Mutation: &btpb.Mutation_SetCell_{&btpb.Mutation_SetCell{ |
| FamilyName: "cf" + strconv.Itoa(fc), |
| ColumnQualifier: []byte("col" + strconv.Itoa(cc)), |
| TimestampMicros: int64((tc + 1) * 1000), |
| Value: []byte{}, |
| }}, |
| }}, |
| } |
| if _, err := s.MutateRow(ctx, req); err != nil { |
| return nil, err |
| } |
| } |
| } |
| } |
| |
| return tblInfo, nil |
| } |
| |
| func TestFilters(t *testing.T) { |
| tests := []struct { |
| in *btpb.RowFilter |
| out int |
| }{ |
| {in: &btpb.RowFilter{Filter: &btpb.RowFilter_BlockAllFilter{true}}, out: 0}, |
| {in: &btpb.RowFilter{Filter: &btpb.RowFilter_BlockAllFilter{false}}, out: 1}, |
| {in: &btpb.RowFilter{Filter: &btpb.RowFilter_PassAllFilter{true}}, out: 1}, |
| {in: &btpb.RowFilter{Filter: &btpb.RowFilter_PassAllFilter{false}}, out: 0}, |
| } |
| |
| ctx := context.Background() |
| |
| s := &server{ |
| tables: make(map[string]*table), |
| } |
| |
| tblInfo, err := populateTable(ctx, s) |
| if err != nil { |
| t.Fatal(err) |
| } |
| |
| req := &btpb.ReadRowsRequest{ |
| TableName: tblInfo.Name, |
| Rows: &btpb.RowSet{RowKeys: [][]byte{[]byte("row")}}, |
| } |
| |
| for _, tc := range tests { |
| req.Filter = tc.in |
| |
| mock := &MockReadRowsServer{} |
| if err = s.ReadRows(req, mock); err != nil { |
| t.Errorf("ReadRows error: %v", err) |
| continue |
| } |
| |
| if len(mock.responses) != tc.out { |
| t.Errorf("Response count: got %d, want %d", len(mock.responses), tc.out) |
| continue |
| } |
| } |
| } |
| |
| func Test_Mutation_DeleteFromColumn(t *testing.T) { |
| ctx := context.Background() |
| |
| s := &server{ |
| tables: make(map[string]*table), |
| } |
| |
| tblInfo, err := populateTable(ctx, s) |
| if err != nil { |
| t.Fatal(err) |
| } |
| |
| tests := []struct { |
| in *btpb.MutateRowRequest |
| fail bool |
| }{ |
| { |
| in: &btpb.MutateRowRequest{ |
| TableName: tblInfo.Name, |
| RowKey: []byte("row"), |
| Mutations: []*btpb.Mutation{{ |
| Mutation: &btpb.Mutation_DeleteFromColumn_{DeleteFromColumn: &btpb.Mutation_DeleteFromColumn{ |
| FamilyName: "cf1", |
| ColumnQualifier: []byte("col1"), |
| TimeRange: &btpb.TimestampRange{ |
| StartTimestampMicros: 2000, |
| EndTimestampMicros: 1000, |
| }, |
| }}, |
| }}, |
| }, |
| fail: true, |
| }, |
| { |
| in: &btpb.MutateRowRequest{ |
| TableName: tblInfo.Name, |
| RowKey: []byte("row"), |
| Mutations: []*btpb.Mutation{{ |
| Mutation: &btpb.Mutation_DeleteFromColumn_{DeleteFromColumn: &btpb.Mutation_DeleteFromColumn{ |
| FamilyName: "cf2", |
| ColumnQualifier: []byte("col2"), |
| TimeRange: &btpb.TimestampRange{ |
| StartTimestampMicros: 1000, |
| EndTimestampMicros: 2000, |
| }, |
| }}, |
| }}, |
| }, |
| fail: false, |
| }, |
| { |
| in: &btpb.MutateRowRequest{ |
| TableName: tblInfo.Name, |
| RowKey: []byte("row"), |
| Mutations: []*btpb.Mutation{{ |
| Mutation: &btpb.Mutation_DeleteFromColumn_{DeleteFromColumn: &btpb.Mutation_DeleteFromColumn{ |
| FamilyName: "cf3", |
| ColumnQualifier: []byte("col3"), |
| TimeRange: &btpb.TimestampRange{ |
| StartTimestampMicros: 1000, |
| EndTimestampMicros: 0, |
| }, |
| }}, |
| }}, |
| }, |
| fail: false, |
| }, |
| { |
| in: &btpb.MutateRowRequest{ |
| TableName: tblInfo.Name, |
| RowKey: []byte("row"), |
| Mutations: []*btpb.Mutation{{ |
| Mutation: &btpb.Mutation_DeleteFromColumn_{DeleteFromColumn: &btpb.Mutation_DeleteFromColumn{ |
| FamilyName: "cf4", |
| ColumnQualifier: []byte("col4"), |
| TimeRange: &btpb.TimestampRange{ |
| StartTimestampMicros: 0, |
| EndTimestampMicros: 1000, |
| }, |
| }}, |
| }}, |
| }, |
| fail: true, |
| }, |
| } |
| for _, test := range tests { |
| _, err = s.MutateRow(ctx, test.in) |
| |
| if err != nil && !test.fail { |
| t.Errorf("expected passed got failure for : %v \n with err: %v", test.in, err) |
| } |
| |
| if err == nil && test.fail { |
| t.Errorf("expected failure got passed for : %v", test) |
| } |
| } |
| } |
| |
| func TestFilterRow(t *testing.T) { |
| row := &row{ |
| key: "row", |
| families: map[string]*family{ |
| "fam": { |
| name: "fam", |
| cells: map[string][]cell{ |
| "col": {{ts: 1000, value: []byte("val")}}, |
| }, |
| }, |
| }, |
| } |
| for _, test := range []struct { |
| filter *btpb.RowFilter |
| want bool |
| }{ |
| // The regexp-based filters perform whole-string, case-sensitive matches. |
| {&btpb.RowFilter{Filter: &btpb.RowFilter_RowKeyRegexFilter{[]byte("row")}}, true}, |
| {&btpb.RowFilter{Filter: &btpb.RowFilter_RowKeyRegexFilter{[]byte("ro")}}, false}, |
| {&btpb.RowFilter{Filter: &btpb.RowFilter_RowKeyRegexFilter{[]byte("ROW")}}, false}, |
| {&btpb.RowFilter{Filter: &btpb.RowFilter_RowKeyRegexFilter{[]byte("moo")}}, false}, |
| |
| {&btpb.RowFilter{Filter: &btpb.RowFilter_FamilyNameRegexFilter{"fam"}}, true}, |
| {&btpb.RowFilter{Filter: &btpb.RowFilter_FamilyNameRegexFilter{"f.*"}}, true}, |
| {&btpb.RowFilter{Filter: &btpb.RowFilter_FamilyNameRegexFilter{"[fam]+"}}, true}, |
| {&btpb.RowFilter{Filter: &btpb.RowFilter_FamilyNameRegexFilter{"fa"}}, false}, |
| {&btpb.RowFilter{Filter: &btpb.RowFilter_FamilyNameRegexFilter{"FAM"}}, false}, |
| {&btpb.RowFilter{Filter: &btpb.RowFilter_FamilyNameRegexFilter{"moo"}}, false}, |
| |
| {&btpb.RowFilter{Filter: &btpb.RowFilter_ColumnQualifierRegexFilter{[]byte("col")}}, true}, |
| {&btpb.RowFilter{Filter: &btpb.RowFilter_ColumnQualifierRegexFilter{[]byte("co")}}, false}, |
| {&btpb.RowFilter{Filter: &btpb.RowFilter_ColumnQualifierRegexFilter{[]byte("COL")}}, false}, |
| {&btpb.RowFilter{Filter: &btpb.RowFilter_ColumnQualifierRegexFilter{[]byte("moo")}}, false}, |
| |
| {&btpb.RowFilter{Filter: &btpb.RowFilter_ValueRegexFilter{[]byte("val")}}, true}, |
| {&btpb.RowFilter{Filter: &btpb.RowFilter_ValueRegexFilter{[]byte("va")}}, false}, |
| {&btpb.RowFilter{Filter: &btpb.RowFilter_ValueRegexFilter{[]byte("VAL")}}, false}, |
| {&btpb.RowFilter{Filter: &btpb.RowFilter_ValueRegexFilter{[]byte("moo")}}, false}, |
| |
| {&btpb.RowFilter{Filter: &btpb.RowFilter_TimestampRangeFilter{&btpb.TimestampRange{StartTimestampMicros: int64(0), EndTimestampMicros: int64(1000)}}}, false}, |
| {&btpb.RowFilter{Filter: &btpb.RowFilter_TimestampRangeFilter{&btpb.TimestampRange{StartTimestampMicros: int64(1000), EndTimestampMicros: int64(2000)}}}, true}, |
| } { |
| got, err := filterRow(test.filter, row.copy()) |
| if err != nil { |
| t.Errorf("%s: got unexpected error: %v", proto.CompactTextString(test.filter), err) |
| } |
| if got != test.want { |
| t.Errorf("%s: got %t, want %t", proto.CompactTextString(test.filter), got, test.want) |
| } |
| } |
| } |
| |
| func TestFilterRowWithErrors(t *testing.T) { |
| row := &row{ |
| key: "row", |
| families: map[string]*family{ |
| "fam": { |
| name: "fam", |
| cells: map[string][]cell{ |
| "col": {{ts: 1000, value: []byte("val")}}, |
| }, |
| }, |
| }, |
| } |
| for _, test := range []struct { |
| badRegex *btpb.RowFilter |
| }{ |
| {&btpb.RowFilter{Filter: &btpb.RowFilter_RowKeyRegexFilter{[]byte("[")}}}, |
| {&btpb.RowFilter{Filter: &btpb.RowFilter_FamilyNameRegexFilter{"["}}}, |
| {&btpb.RowFilter{Filter: &btpb.RowFilter_ColumnQualifierRegexFilter{[]byte("[")}}}, |
| {&btpb.RowFilter{Filter: &btpb.RowFilter_ValueRegexFilter{[]byte("[")}}}, |
| {&btpb.RowFilter{Filter: &btpb.RowFilter_Chain_{ |
| Chain: &btpb.RowFilter_Chain{Filters: []*btpb.RowFilter{ |
| {Filter: &btpb.RowFilter_ValueRegexFilter{[]byte("[")}}}, |
| }, |
| }}}, |
| {&btpb.RowFilter{Filter: &btpb.RowFilter_Condition_{ |
| Condition: &btpb.RowFilter_Condition{ |
| PredicateFilter: &btpb.RowFilter{Filter: &btpb.RowFilter_ValueRegexFilter{[]byte("[")}}, |
| }, |
| }}}, |
| |
| {&btpb.RowFilter{Filter: &btpb.RowFilter_RowSampleFilter{0.0}}}, // 0.0 is invalid. |
| {&btpb.RowFilter{Filter: &btpb.RowFilter_RowSampleFilter{1.0}}}, // 1.0 is invalid. |
| {&btpb.RowFilter{Filter: &btpb.RowFilter_TimestampRangeFilter{&btpb.TimestampRange{StartTimestampMicros: int64(1), EndTimestampMicros: int64(1000)}}}}, // Server only supports millisecond precision. |
| {&btpb.RowFilter{Filter: &btpb.RowFilter_TimestampRangeFilter{&btpb.TimestampRange{StartTimestampMicros: int64(1000), EndTimestampMicros: int64(1)}}}}, // Server only supports millisecond precision. |
| } { |
| got, err := filterRow(test.badRegex, row.copy()) |
| if got != false { |
| t.Errorf("%s: got true, want false", proto.CompactTextString(test.badRegex)) |
| } |
| if err == nil { |
| t.Errorf("%s: got no error, want error", proto.CompactTextString(test.badRegex)) |
| } |
| } |
| } |
| |
| func TestFilterRowWithRowSampleFilter(t *testing.T) { |
| prev := randFloat |
| randFloat = func() float64 { return 0.5 } |
| defer func() { randFloat = prev }() |
| for _, test := range []struct { |
| p float64 |
| want bool |
| }{ |
| {0.1, false}, // Less than random float. Return no rows. |
| {0.5, false}, // Equal to random float. Return no rows. |
| {0.9, true}, // Greater than random float. Return all rows. |
| } { |
| got, err := filterRow(&btpb.RowFilter{Filter: &btpb.RowFilter_RowSampleFilter{test.p}}, &row{}) |
| if err != nil { |
| t.Fatalf("%f: %v", test.p, err) |
| } |
| if got != test.want { |
| t.Errorf("%v: got %t, want %t", test.p, got, test.want) |
| } |
| } |
| } |
| |
| func TestFilterRowWithBinaryColumnQualifier(t *testing.T) { |
| rs := []byte{128, 128} |
| row := &row{ |
| key: string(rs), |
| families: map[string]*family{ |
| "fam": { |
| name: "fam", |
| cells: map[string][]cell{ |
| string(rs): {{ts: 1000, value: []byte("val")}}, |
| }, |
| }, |
| }, |
| } |
| for _, test := range []struct { |
| filter string |
| want bool |
| }{ |
| {`\x80\x80`, true}, // succeeds, exact match |
| {`\x80\x81`, false}, // fails |
| {`\x80`, false}, // fails, because the regexp must match the entire input |
| {`\x80*`, true}, // succeeds: 0 or more 128s |
| {`[\x7f\x80]{2}`, true}, // succeeds: exactly two of either 127 or 128 |
| {`\C{2}`, true}, // succeeds: two bytes |
| } { |
| got, _ := filterRow(&btpb.RowFilter{Filter: &btpb.RowFilter_ColumnQualifierRegexFilter{[]byte(test.filter)}}, row.copy()) |
| if got != test.want { |
| t.Errorf("%v: got %t, want %t", test.filter, got, test.want) |
| } |
| } |
| } |
| |
| // Test that a single column qualifier with the interleave filter returns |
| // the correct result and not return every single row. |
| // See Issue https://github.com/googleapis/google-cloud-go/issues/1399 |
| func TestFilterRowWithSingleColumnQualifier(t *testing.T) { |
| ctx := context.Background() |
| srv := &server{tables: make(map[string]*table)} |
| |
| tblReq := &btapb.CreateTableRequest{ |
| Parent: "issue-1399", |
| TableId: "table_id", |
| Table: &btapb.Table{ |
| ColumnFamilies: map[string]*btapb.ColumnFamily{ |
| "cf": {}, |
| }, |
| }, |
| } |
| tbl, err := srv.CreateTable(ctx, tblReq) |
| if err != nil { |
| t.Fatalf("Failed to create the table: %v", err) |
| } |
| |
| entries := []struct { |
| row string |
| value []byte |
| }{ |
| {"row1", []byte{0x11}}, |
| {"row2", []byte{0x1a}}, |
| {"row3", []byte{'a'}}, |
| {"row4", []byte{'b'}}, |
| } |
| |
| for _, entry := range entries { |
| req := &btpb.MutateRowRequest{ |
| TableName: tbl.Name, |
| RowKey: []byte(entry.row), |
| Mutations: []*btpb.Mutation{{ |
| Mutation: &btpb.Mutation_SetCell_{SetCell: &btpb.Mutation_SetCell{ |
| FamilyName: "cf", |
| ColumnQualifier: []byte("cq"), |
| TimestampMicros: 1000, |
| Value: entry.value, |
| }}, |
| }}, |
| } |
| if _, err := srv.MutateRow(ctx, req); err != nil { |
| t.Fatalf("Failed to insert entry %v into server: %v", entry, err) |
| } |
| } |
| |
| // After insertion now it is time for querying. |
| req := &btpb.ReadRowsRequest{ |
| TableName: tbl.Name, |
| Filter: &btpb.RowFilter{Filter: &btpb.RowFilter_Chain_{ |
| Chain: &btpb.RowFilter_Chain{Filters: []*btpb.RowFilter{{ |
| Filter: &btpb.RowFilter_Interleave_{ |
| Interleave: &btpb.RowFilter_Interleave{ |
| Filters: []*btpb.RowFilter{{Filter: &btpb.RowFilter_Condition_{ |
| Condition: &btpb.RowFilter_Condition{ |
| PredicateFilter: &btpb.RowFilter{Filter: &btpb.RowFilter_Chain_{ |
| Chain: &btpb.RowFilter_Chain{Filters: []*btpb.RowFilter{ |
| { |
| Filter: &btpb.RowFilter_ValueRangeFilter{ValueRangeFilter: &btpb.ValueRange{ |
| StartValue: &btpb.ValueRange_StartValueClosed{ |
| StartValueClosed: []byte("a"), |
| }, |
| EndValue: &btpb.ValueRange_EndValueClosed{EndValueClosed: []byte("a")}, |
| }}, |
| }, |
| }}, |
| }}, |
| TrueFilter: &btpb.RowFilter{Filter: &btpb.RowFilter_PassAllFilter{PassAllFilter: true}}, |
| }, |
| }}}, |
| }, |
| }, |
| }}}, |
| }}, |
| } |
| |
| rrss := new(MockReadRowsServer) |
| if err := srv.ReadRows(req, rrss); err != nil { |
| t.Fatalf("Failed to read rows: %v", err) |
| } |
| |
| if g, w := len(rrss.responses), 1; g != w { |
| t.Fatalf("Results/Streamed chunks mismatch:: got %d want %d", g, w) |
| } |
| |
| got := rrss.responses[0] |
| // Only row3 should be matched. |
| want := &btpb.ReadRowsResponse{ |
| Chunks: []*btpb.ReadRowsResponse_CellChunk{ |
| { |
| RowKey: []byte("row3"), |
| FamilyName: &wrappers.StringValue{Value: "cf"}, |
| Qualifier: &wrappers.BytesValue{Value: []byte("cq")}, |
| TimestampMicros: 1000, |
| Value: []byte("a"), |
| RowStatus: &btpb.ReadRowsResponse_CellChunk_CommitRow{ |
| CommitRow: true, |
| }, |
| }, |
| }, |
| } |
| if diff := cmp.Diff(got, want, cmp.Comparer(proto.Equal)); diff != "" { |
| t.Fatalf("Response mismatch: got: + want -\n%s", diff) |
| } |
| } |
| |
| func TestValueFilterRowWithAlternationInRegex(t *testing.T) { |
| // Test that regex alternation is applied properly. |
| // See Issue https://github.com/googleapis/google-cloud-go/issues/1499 |
| ctx := context.Background() |
| srv := &server{tables: make(map[string]*table)} |
| |
| tblReq := &btapb.CreateTableRequest{ |
| Parent: "issue-1499", |
| TableId: "table_id", |
| Table: &btapb.Table{ |
| ColumnFamilies: map[string]*btapb.ColumnFamily{ |
| "cf": {}, |
| }, |
| }, |
| } |
| tbl, err := srv.CreateTable(ctx, tblReq) |
| if err != nil { |
| t.Fatalf("Failed to create the table: %v", err) |
| } |
| |
| entries := []struct { |
| row string |
| value []byte |
| }{ |
| {"row1", []byte("")}, |
| {"row2", []byte{'x'}}, |
| {"row3", []byte{'a'}}, |
| {"row4", []byte{'m'}}, |
| } |
| |
| for _, entry := range entries { |
| req := &btpb.MutateRowRequest{ |
| TableName: tbl.Name, |
| RowKey: []byte(entry.row), |
| Mutations: []*btpb.Mutation{{ |
| Mutation: &btpb.Mutation_SetCell_{SetCell: &btpb.Mutation_SetCell{ |
| FamilyName: "cf", |
| ColumnQualifier: []byte("cq"), |
| TimestampMicros: 1000, |
| Value: entry.value, |
| }}, |
| }}, |
| } |
| if _, err := srv.MutateRow(ctx, req); err != nil { |
| t.Fatalf("Failed to insert entry %v into server: %v", entry, err) |
| } |
| } |
| |
| // After insertion now it is time for querying. |
| req := &btpb.ReadRowsRequest{ |
| TableName: tbl.Name, |
| Rows: &btpb.RowSet{}, |
| Filter: &btpb.RowFilter{ |
| Filter: &btpb.RowFilter_ValueRegexFilter{ |
| ValueRegexFilter: []byte("|a"), |
| }, |
| }, |
| } |
| |
| rrss := new(MockReadRowsServer) |
| if err := srv.ReadRows(req, rrss); err != nil { |
| t.Fatalf("Failed to read rows: %v", err) |
| } |
| |
| var gotChunks []*btpb.ReadRowsResponse_CellChunk |
| for _, res := range rrss.responses { |
| gotChunks = append(gotChunks, res.Chunks...) |
| } |
| |
| // Only row1 "" and row3 "a" should be matched. |
| wantChunks := []*btpb.ReadRowsResponse_CellChunk{ |
| { |
| RowKey: []byte("row1"), |
| FamilyName: &wrappers.StringValue{Value: "cf"}, |
| Qualifier: &wrappers.BytesValue{Value: []byte("cq")}, |
| TimestampMicros: 1000, |
| Value: []byte(""), |
| RowStatus: &btpb.ReadRowsResponse_CellChunk_CommitRow{ |
| CommitRow: true, |
| }, |
| }, |
| { |
| RowKey: []byte("row3"), |
| FamilyName: &wrappers.StringValue{Value: "cf"}, |
| Qualifier: &wrappers.BytesValue{Value: []byte("cq")}, |
| TimestampMicros: 1000, |
| Value: []byte("a"), |
| RowStatus: &btpb.ReadRowsResponse_CellChunk_CommitRow{ |
| CommitRow: true, |
| }, |
| }, |
| } |
| if diff := cmp.Diff(gotChunks, wantChunks, cmp.Comparer(proto.Equal)); diff != "" { |
| t.Fatalf("Response chunks mismatch: got: + want -\n%s", diff) |
| } |
| } |