bigtable/bttest: fix the cells ordering problem in emulator. The changes includes: 1) sort columns within a family in lexicographically ascending order. 2) sort families by creation order when we output the row. Change-Id: I31634789e8afd6b0f69a1a635893cc0d21e455df
diff --git a/bigtable/bttest/inmem.go b/bigtable/bttest/inmem.go index 06c036f..943da93 100644 --- a/bigtable/bttest/inmem.go +++ b/bigtable/bttest/inmem.go
@@ -187,8 +187,10 @@ } newcf := &columnFamily{ name: req.Name + "/columnFamilies/" + mod.Id, + order: tbl.counter, gcRule: create.GcRule, } + tbl.counter++ tbl.families[mod.Id] = newcf } else if mod.GetDrop() { if _, ok := tbl.families[mod.Id]; !ok { @@ -353,21 +355,23 @@ } rrr := &btpb.ReadRowsResponse{} - for col, cells := range r.cells { - i := strings.Index(col, ":") // guaranteed to exist - fam, col := col[:i], col[i+1:] - if len(cells) == 0 { - continue - } - // TODO(dsymonds): Apply transformers. - for _, cell := range cells { - rrr.Chunks = append(rrr.Chunks, &btpb.ReadRowsResponse_CellChunk{ - RowKey: []byte(r.key), - FamilyName: &wrappers.StringValue{Value: fam}, - Qualifier: &wrappers.BytesValue{Value: []byte(col)}, - TimestampMicros: cell.ts, - Value: cell.value, - }) + families := r.sortedFamilies() + for _, fam := range families { + for _, colName := range fam.colNames { + cells := fam.cells[colName] + if len(cells) == 0 { + continue + } + // TODO(dsymonds): Apply transformers. + for _, cell := range cells { + rrr.Chunks = append(rrr.Chunks, &btpb.ReadRowsResponse_CellChunk{ + RowKey: []byte(r.key), + FamilyName: &wrappers.StringValue{Value: fam.name}, + Qualifier: &wrappers.BytesValue{Value: []byte(colName)}, + TimestampMicros: cell.ts, + Value: cell.value, + }) + } } } // We can't have a cell with just COMMIT set, which would imply a new empty cell. @@ -401,21 +405,39 @@ } // merge // TODO(dsymonds): is this correct? - r.cells = make(map[string][]cell) + r.families = make(map[string]*family) for _, sr := range srs { - for col, cs := range sr.cells { - r.cells[col] = append(r.cells[col], cs...) + for _, fam := range sr.families { + if _, ok := r.families[fam.name]; !ok { + r.families[fam.name] = &family{ + name: fam.name, + order: fam.order, + cells: make(map[string][]cell), + } + } + f := r.families[fam.name] + for colName, cs := range fam.cells { + if _, ok := f.cells[colName]; !ok { + f.colNames = append(f.colNames, colName) + } + f.cells[colName] = append(f.cells[colName], cs...) + } } } - for _, cs := range r.cells { - sort.Sort(byDescTS(cs)) + for _, fam := range r.families { + sort.Strings(fam.colNames) + for _, cs := range fam.cells { + sort.Sort(byDescTS(cs)) + } } return true case *btpb.RowFilter_CellsPerColumnLimitFilter: lim := int(f.CellsPerColumnLimitFilter) - for col, cs := range r.cells { - if len(cs) > lim { - r.cells[col] = cs[:lim] + for _, fam := range r.families { + for col, cs := range fam.cells { + if len(cs) > lim { + fam.cells[col] = cs[:lim] + } } } return true @@ -444,11 +466,11 @@ // Any other case, operate on a per-cell basis. cellCount := 0 - for key, cs := range r.cells { - i := strings.Index(key, ":") // guaranteed to exist - fam, col := key[:i], key[i+1:] - r.cells[key] = filterCells(f, fam, col, cs) - cellCount += len(r.cells[key]) + for _, fam := range r.families { + for colName, cs := range fam.cells { + fam.cells[colName] = filterCells(f, fam.name, colName, cs) + cellCount += len(fam.cells[colName]) + } } return cellCount > 0 } @@ -574,7 +596,7 @@ return nil, grpc.Errorf(codes.NotFound, "table %q not found", req.TableName) } - fs := tbl.columnFamiliesSet() + fs := tbl.columnFamilies() r := tbl.mutableRow(string(req.RowKey)) r.mu.Lock() defer r.mu.Unlock() @@ -595,7 +617,7 @@ res := &btpb.MutateRowsResponse{Entries: make([]*btpb.MutateRowsResponse_Entry, len(req.Entries))} - fs := tbl.columnFamiliesSet() + fs := tbl.columnFamilies() for i, entry := range req.Entries { r := tbl.mutableRow(string(entry.RowKey)) @@ -625,7 +647,7 @@ res := &btpb.CheckAndMutateRowResponse{} - fs := tbl.columnFamiliesSet() + fs := tbl.columnFamilies() r := tbl.mutableRow(string(req.RowKey)) r.mu.Lock() @@ -635,18 +657,13 @@ whichMut := false if req.PredicateFilter == nil { // Use true_mutations iff row contains any cells. - whichMut = len(r.cells) > 0 + whichMut = !r.isEmpty() } else { // Use true_mutations iff any cells in the row match the filter. // TODO(dsymonds): This could be cheaper. nr := r.copy() filterRow(req.PredicateFilter, nr) - for _, cs := range nr.cells { - if len(cs) > 0 { - whichMut = true - break - } - } + whichMut = !nr.isEmpty() // TODO(dsymonds): Figure out if this is supposed to be set // even when there's no predicate filter. res.PredicateMatched = whichMut @@ -665,14 +682,14 @@ // applyMutations applies a sequence of mutations to a row. // fam should be a snapshot of the keys of tbl.families. // It assumes r.mu is locked. -func applyMutations(tbl *table, r *row, muts []*btpb.Mutation, fs map[string]bool) error { +func applyMutations(tbl *table, r *row, muts []*btpb.Mutation, fs map[string]*columnFamily) error { for _, mut := range muts { switch mut := mut.Mutation.(type) { default: return fmt.Errorf("can't handle mutation type %T", mut) case *btpb.Mutation_SetCell_: set := mut.SetCell - if !fs[set.FamilyName] { + if _, ok := fs[set.FamilyName]; !ok { return fmt.Errorf("unknown family %q", set.FamilyName) } ts := set.TimestampMicros @@ -682,54 +699,73 @@ if !tbl.validTimestamp(ts) { return fmt.Errorf("invalid timestamp %d", ts) } - col := fmt.Sprintf("%s:%s", set.FamilyName, set.ColumnQualifier) + fam := set.FamilyName + col := string(set.ColumnQualifier) newCell := cell{ts: ts, value: set.Value} - r.cells[col] = appendOrReplaceCell(r.cells[col], newCell) + if _, ok := r.families[fam]; !ok { + r.families[fam] = &family{ + name: fam, + order: fs[fam].order, + cells: make(map[string][]cell), + } + } + f := r.families[fam] + if _, ok := f.cells[col]; !ok { + f.colNames = append(f.colNames, col) + sort.Strings(f.colNames) + } + f.cells[col] = appendOrReplaceCell(f.cells[col], newCell) case *btpb.Mutation_DeleteFromColumn_: del := mut.DeleteFromColumn - col := fmt.Sprintf("%s:%s", del.FamilyName, del.ColumnQualifier) - - cs := r.cells[col] - if del.TimeRange != nil { - tsr := del.TimeRange - if !tbl.validTimestamp(tsr.StartTimestampMicros) { - return fmt.Errorf("invalid timestamp %d", tsr.StartTimestampMicros) + fam := del.FamilyName + col := string(del.ColumnQualifier) + if _, ok := r.families[fam]; ok { + cs := r.families[fam].cells[col] + if del.TimeRange != nil { + tsr := del.TimeRange + if !tbl.validTimestamp(tsr.StartTimestampMicros) { + return fmt.Errorf("invalid timestamp %d", tsr.StartTimestampMicros) + } + if !tbl.validTimestamp(tsr.EndTimestampMicros) { + return fmt.Errorf("invalid timestamp %d", tsr.EndTimestampMicros) + } + // Find half-open interval to remove. + // Cells are in descending timestamp order, + // so the predicates to sort.Search are inverted. + si, ei := 0, len(cs) + if tsr.StartTimestampMicros > 0 { + ei = sort.Search(len(cs), func(i int) bool { return cs[i].ts < tsr.StartTimestampMicros }) + } + if tsr.EndTimestampMicros > 0 { + si = sort.Search(len(cs), func(i int) bool { return cs[i].ts < tsr.EndTimestampMicros }) + } + if si < ei { + copy(cs[si:], cs[ei:]) + cs = cs[:len(cs)-(ei-si)] + } + } else { + cs = nil } - if !tbl.validTimestamp(tsr.EndTimestampMicros) { - return fmt.Errorf("invalid timestamp %d", tsr.EndTimestampMicros) + if len(cs) == 0 { + delete(r.families[fam].cells, col) + colNames := r.families[fam].colNames + i := sort.Search(len(colNames), func(i int) bool { return colNames[i] >= col }) + if i < len(colNames) && colNames[i] == col { + r.families[fam].colNames = append(colNames[:i], colNames[i+1:]...) + } + if len(r.families[fam].cells) == 0 { + delete(r.families, fam) + } + } else { + r.families[fam].cells[col] = cs } - // Find half-open interval to remove. - // Cells are in descending timestamp order, - // so the predicates to sort.Search are inverted. - si, ei := 0, len(cs) - if tsr.StartTimestampMicros > 0 { - ei = sort.Search(len(cs), func(i int) bool { return cs[i].ts < tsr.StartTimestampMicros }) - } - if tsr.EndTimestampMicros > 0 { - si = sort.Search(len(cs), func(i int) bool { return cs[i].ts < tsr.EndTimestampMicros }) - } - if si < ei { - copy(cs[si:], cs[ei:]) - cs = cs[:len(cs)-(ei-si)] - } - } else { - cs = nil - } - if len(cs) == 0 { - delete(r.cells, col) - } else { - r.cells[col] = cs } case *btpb.Mutation_DeleteFromRow_: - r.cells = make(map[string][]cell) + r.families = make(map[string]*family) case *btpb.Mutation_DeleteFromFamily_: - fampre := mut.DeleteFromFamily.FamilyName + ":" - for col, _ := range r.cells { - if strings.HasPrefix(col, fampre) { - delete(r.cells, col) - } - } + fampre := mut.DeleteFromFamily.FamilyName + delete(r.families, fampre) } } return nil @@ -774,7 +810,7 @@ updates := make(map[string]cell) // copy of updated cells; keyed by full column name - fs := tbl.columnFamiliesSet() + fs := tbl.columnFamilies() r := tbl.mutableRow(string(req.RowKey)) r.mu.Lock() @@ -782,17 +818,31 @@ // Assume all mutations apply to the most recent version of the cell. // TODO(dsymonds): Verify this assumption and document it in the proto. for _, rule := range req.Rules { - if !fs[rule.FamilyName] { + if _, ok := fs[rule.FamilyName]; !ok { return nil, fmt.Errorf("unknown family %q", rule.FamilyName) } - key := fmt.Sprintf("%s:%s", rule.FamilyName, rule.ColumnQualifier) + fam := rule.FamilyName + col := string(rule.ColumnQualifier) + isEmpty := false + if _, ok := r.families[fam]; !ok { + r.families[fam] = &family{ + name: fam, + order: fs[fam].order, + cells: make(map[string][]cell), + } + } + if cs, ok := r.families[fam].cells[col]; !ok { + r.families[fam].colNames = append(r.families[fam].colNames, col) + isEmpty = true + } else { + isEmpty = len(cs) == 0 + } - cells := r.cells[key] ts := newTimestamp() var newCell, prevCell cell - isEmpty := len(cells) == 0 if !isEmpty { + cells := r.families[fam].cells[col] prevCell = cells[0] // ts is the max of now or the prev cell's timestamp in case the @@ -819,8 +869,9 @@ binary.BigEndian.PutUint64(val[:], uint64(v)) newCell = cell{ts: ts, value: val[:]} } + key := strings.Join([]string{fam, col}, ":") updates[key] = newCell - r.cells[key] = appendOrReplaceCell(r.cells[key], newCell) + r.families[fam].cells[col] = appendOrReplaceCell(r.families[fam].cells[col], newCell) } res := &btpb.Row{ @@ -920,6 +971,7 @@ type table struct { mu sync.RWMutex + counter uint64 // increment by 1 when a new family is created families map[string]*columnFamily // keyed by plain family name rows []*row // sorted by row key rowIndex map[string]*row // indexed by row key @@ -927,16 +979,20 @@ func newTable(ctr *btapb.CreateTableRequest) *table { fams := make(map[string]*columnFamily) + c := uint64(0) if ctr.Table != nil { for id, cf := range ctr.Table.ColumnFamilies { fams[id] = &columnFamily{ name: ctr.Parent + "/columnFamilies/" + id, + order: c, gcRule: cf.GcRule, } + c++ } } return &table{ families: fams, + counter: c, rowIndex: make(map[string]*row), } } @@ -956,14 +1012,6 @@ return cp } -func (t *table) columnFamiliesSet() map[string]bool { - fs := make(map[string]bool) - for fam := range t.columnFamilies() { - fs[fam] = true - } - return fs -} - func (t *table) mutableRow(row string) *row { // Try fast path first. t.mu.RLock() @@ -1018,14 +1066,14 @@ type row struct { key string - mu sync.Mutex - cells map[string][]cell // keyed by full column name; cells are in descending timestamp order + mu sync.Mutex + families map[string]*family // keyed by family name } func newRow(key string) *row { return &row{ - key: key, - cells: make(map[string][]cell), + key: key, + families: make(map[string]*family), } } @@ -1033,36 +1081,66 @@ // Cell values are aliased. // r.mu should be held. func (r *row) copy() *row { - nr := &row{ - key: r.key, - cells: make(map[string][]cell, len(r.cells)), - } - for col, cs := range r.cells { - // Copy the []cell slice, but not the []byte inside each cell. - nr.cells[col] = append([]cell(nil), cs...) + nr := newRow(r.key) + for _, fam := range r.families { + nr.families[fam.name] = &family{ + name: fam.name, + order: fam.order, + colNames: fam.colNames, + cells: make(map[string][]cell), + } + for col, cs := range fam.cells { + // Copy the []cell slice, but not the []byte inside each cell. + nr.families[fam.name].cells[col] = append([]cell(nil), cs...) + } } return nr } +// return true iff a row doesn't contain any cell +func (r *row) isEmpty() bool { + for _, fam := range r.families { + for _, cs := range fam.cells { + if len(cs) > 0 { + return false + } + } + } + return true +} + +// sort family names in ascending creation order in a row. +func (r *row) sortedFamilies() []*family { + var families []*family + for _, fam := range r.families { + families = append(families, fam) + } + sort.Sort(byCreationOrder(families)) + return families +} + // gc applies the given GC rules to the row. // r.mu should be held. func (r *row) gc(rules map[string]*btapb.GcRule) { - for col, cs := range r.cells { - fam := col[:strings.Index(col, ":")] - rule, ok := rules[fam] + for _, fam := range r.families { + rule, ok := rules[fam.name] if !ok { continue } - r.cells[col] = applyGC(cs, rule) + for col, cs := range fam.cells { + r.families[fam.name].cells[col] = applyGC(cs, rule) + } } } // size returns the total size of all cell values in the row. func (r *row) size() int { size := 0 - for _, cells := range r.cells { - for _, cell := range cells { - size += len(cell.value) + for _, fam := range r.families { + for _, cells := range fam.cells { + for _, cell := range cells { + size += len(cell.value) + } } } return size @@ -1109,6 +1187,19 @@ return cells } +type family struct { + name string // Column family name + order uint64 // Creation order of column family + colNames []string // Collumn names are sorted in lexicographical ascending order + cells map[string][]cell // Keyed by collumn name; cells are in descending timestamp order +} + +type byCreationOrder []*family + +func (b byCreationOrder) Len() int { return len(b) } +func (b byCreationOrder) Swap(i, j int) { b[i], b[j] = b[j], b[i] } +func (b byCreationOrder) Less(i, j int) bool { return b[i].order < b[j].order } + type cell struct { ts int64 value []byte @@ -1122,6 +1213,7 @@ type columnFamily struct { name string + order uint64 // Creation order of column family gcRule *btapb.GcRule }
diff --git a/bigtable/bttest/inmem_test.go b/bigtable/bttest/inmem_test.go index 005c4ee..8a6b61e 100644 --- a/bigtable/bttest/inmem_test.go +++ b/bigtable/bttest/inmem_test.go
@@ -271,50 +271,152 @@ tblSize := len(tbl.rows) req := &btapb.DropRowRangeRequest{ - Name:tblInfo.Name, + Name: tblInfo.Name, Target: &btapb.DropRowRangeRequest_RowKeyPrefix{[]byte("AAA")}, } if _, err = s.DropRowRange(ctx, req); err != nil { t.Fatalf("Dropping first range: %v", err) } - got, want := len(tbl.rows), tblSize - count + got, want := len(tbl.rows), tblSize-count if got != want { t.Errorf("Row count after first drop: got %d (%v), want %d", got, tbl.rows, want) } req = &btapb.DropRowRangeRequest{ - Name:tblInfo.Name, + Name: tblInfo.Name, Target: &btapb.DropRowRangeRequest_RowKeyPrefix{[]byte("DDD")}, } if _, err = s.DropRowRange(ctx, req); err != nil { t.Fatalf("Dropping second range: %v", err) } - got, want = len(tbl.rows), tblSize - (2 * count) - if got != want { + got, want = len(tbl.rows), tblSize-(2*count) + if got != want { t.Errorf("Row count after second drop: got %d (%v), want %d", got, tbl.rows, want) } req = &btapb.DropRowRangeRequest{ - Name:tblInfo.Name, + Name: tblInfo.Name, Target: &btapb.DropRowRangeRequest_RowKeyPrefix{[]byte("XXX")}, } if _, err = s.DropRowRange(ctx, req); err != nil { t.Fatalf("Dropping invalid range: %v", err) } - got, want = len(tbl.rows), tblSize - (2 * count) - if got != want { + got, want = len(tbl.rows), tblSize-(2*count) + if got != want { t.Errorf("Row count after invalid drop: got %d (%v), want %d", got, tbl.rows, want) } req = &btapb.DropRowRangeRequest{ - Name:tblInfo.Name, + Name: tblInfo.Name, Target: &btapb.DropRowRangeRequest_DeleteAllDataFromTable{true}, } if _, err = s.DropRowRange(ctx, req); err != nil { t.Fatalf("Dropping all data: %v", err) } got, want = len(tbl.rows), 0 - if got != want { + if got != want { t.Errorf("Row count after drop all: got %d, want %d", got, want) } -} \ No newline at end of file +} + +type MockReadRowsServer struct { + responses []*btpb.ReadRowsResponse + grpc.ServerStream +} + +func (s *MockReadRowsServer) Send(resp *btpb.ReadRowsResponse) error { + s.responses = append(s.responses, resp) + return nil +} + +func TestReadRowsOrder(t *testing.T) { + s := &server{ + tables: make(map[string]*table), + } + ctx := context.Background() + newTbl := btapb.Table{ + ColumnFamilies: map[string]*btapb.ColumnFamily{ + "cf0": {GcRule: &btapb.GcRule{Rule: &btapb.GcRule_MaxNumVersions{1}}}, + "cf1": {GcRule: &btapb.GcRule{Rule: &btapb.GcRule_MaxNumVersions{2}}}, + }, + } + tblInfo, err := s.CreateTable(ctx, &btapb.CreateTableRequest{Parent: "cluster", TableId: "t", Table: &newTbl}) + if err != nil { + t.Fatalf("Creating table: %v", err) + } + + // tbl := s.tables[tblInfo.Name] + req := &btapb.ModifyColumnFamiliesRequest{ + Name: tblInfo.Name, + Modifications: []*btapb.ModifyColumnFamiliesRequest_Modification{{ + Id: "cf2", + Mod: &btapb.ModifyColumnFamiliesRequest_Modification_Create{&btapb.ColumnFamily{}}, + }}, + } + _, err = s.ModifyColumnFamilies(ctx, req) + if err != nil { + t.Fatal(err) + } + + // Populate the table + count := 3 + for fc := 0; fc < count; fc++ { + for cc := 0; cc < count; cc++ { + for tc := 0; tc < count; tc++ { + req := &btpb.MutateRowRequest{ + TableName: tblInfo.Name, + RowKey: []byte("row"), + Mutations: []*btpb.Mutation{{ + Mutation: &btpb.Mutation_SetCell_{&btpb.Mutation_SetCell{ + FamilyName: "cf" + strconv.Itoa(fc), + ColumnQualifier: []byte("col" + strconv.Itoa(cc)), + TimestampMicros: int64((tc + 1) * 1000), + Value: []byte{}, + }}, + }}, + } + if _, err := s.MutateRow(ctx, req); err != nil { + t.Fatalf("Populating table: %v", err) + } + } + } + } + rreq := &btpb.ReadRowsRequest{ + TableName: tblInfo.Name, + Rows: &btpb.RowSet{RowKeys: [][]byte{[]byte("row")}}, + } + mock := &MockReadRowsServer{} + if err = s.ReadRows(rreq, mock); err != nil { + t.Errorf("ReadRows error: %v", err) + } + if len(mock.responses) == 0 { + t.Fatal("Response count: got 0, want > 0") + } + if len(mock.responses[0].Chunks) == 9 { + t.Fatal("Chunk count: got %d, want 9", len(mock.responses[0].Chunks)) + } + var prevFam, prevCol string + var prevTime int64 + for _, cc := range mock.responses[0].Chunks { + if prevFam == "" { + prevFam = cc.FamilyName.Value + prevCol = string(cc.Qualifier.Value) + prevTime = cc.TimestampMicros + continue + } + if cc.FamilyName.Value < prevFam { + t.Errorf("Family order is not correct: got %s < %s", cc.FamilyName.Value, prevFam) + } else if cc.FamilyName.Value == prevFam { + if string(cc.Qualifier.Value) < prevCol { + t.Errorf("Column order is not correct: got %s < %s", string(cc.Qualifier.Value), prevCol) + } else if string(cc.Qualifier.Value) == prevCol { + if cc.TimestampMicros > prevTime { + t.Errorf("cell order is not correct: got %d > %d", cc.TimestampMicros, prevTime) + } + } + } + prevFam = cc.FamilyName.Value + prevCol = string(cc.Qualifier.Value) + prevTime = cc.TimestampMicros + } +}