bigtable/bttest: fix the cells ordering problem in emulator.
The changes includes:
1) sort columns within a family in lexicographically ascending order.
2) sort families by creation order when we output the row.
Change-Id: I31634789e8afd6b0f69a1a635893cc0d21e455df
diff --git a/bigtable/bttest/inmem.go b/bigtable/bttest/inmem.go
index 06c036f..943da93 100644
--- a/bigtable/bttest/inmem.go
+++ b/bigtable/bttest/inmem.go
@@ -187,8 +187,10 @@
}
newcf := &columnFamily{
name: req.Name + "/columnFamilies/" + mod.Id,
+ order: tbl.counter,
gcRule: create.GcRule,
}
+ tbl.counter++
tbl.families[mod.Id] = newcf
} else if mod.GetDrop() {
if _, ok := tbl.families[mod.Id]; !ok {
@@ -353,21 +355,23 @@
}
rrr := &btpb.ReadRowsResponse{}
- for col, cells := range r.cells {
- i := strings.Index(col, ":") // guaranteed to exist
- fam, col := col[:i], col[i+1:]
- if len(cells) == 0 {
- continue
- }
- // TODO(dsymonds): Apply transformers.
- for _, cell := range cells {
- rrr.Chunks = append(rrr.Chunks, &btpb.ReadRowsResponse_CellChunk{
- RowKey: []byte(r.key),
- FamilyName: &wrappers.StringValue{Value: fam},
- Qualifier: &wrappers.BytesValue{Value: []byte(col)},
- TimestampMicros: cell.ts,
- Value: cell.value,
- })
+ families := r.sortedFamilies()
+ for _, fam := range families {
+ for _, colName := range fam.colNames {
+ cells := fam.cells[colName]
+ if len(cells) == 0 {
+ continue
+ }
+ // TODO(dsymonds): Apply transformers.
+ for _, cell := range cells {
+ rrr.Chunks = append(rrr.Chunks, &btpb.ReadRowsResponse_CellChunk{
+ RowKey: []byte(r.key),
+ FamilyName: &wrappers.StringValue{Value: fam.name},
+ Qualifier: &wrappers.BytesValue{Value: []byte(colName)},
+ TimestampMicros: cell.ts,
+ Value: cell.value,
+ })
+ }
}
}
// We can't have a cell with just COMMIT set, which would imply a new empty cell.
@@ -401,21 +405,39 @@
}
// merge
// TODO(dsymonds): is this correct?
- r.cells = make(map[string][]cell)
+ r.families = make(map[string]*family)
for _, sr := range srs {
- for col, cs := range sr.cells {
- r.cells[col] = append(r.cells[col], cs...)
+ for _, fam := range sr.families {
+ if _, ok := r.families[fam.name]; !ok {
+ r.families[fam.name] = &family{
+ name: fam.name,
+ order: fam.order,
+ cells: make(map[string][]cell),
+ }
+ }
+ f := r.families[fam.name]
+ for colName, cs := range fam.cells {
+ if _, ok := f.cells[colName]; !ok {
+ f.colNames = append(f.colNames, colName)
+ }
+ f.cells[colName] = append(f.cells[colName], cs...)
+ }
}
}
- for _, cs := range r.cells {
- sort.Sort(byDescTS(cs))
+ for _, fam := range r.families {
+ sort.Strings(fam.colNames)
+ for _, cs := range fam.cells {
+ sort.Sort(byDescTS(cs))
+ }
}
return true
case *btpb.RowFilter_CellsPerColumnLimitFilter:
lim := int(f.CellsPerColumnLimitFilter)
- for col, cs := range r.cells {
- if len(cs) > lim {
- r.cells[col] = cs[:lim]
+ for _, fam := range r.families {
+ for col, cs := range fam.cells {
+ if len(cs) > lim {
+ fam.cells[col] = cs[:lim]
+ }
}
}
return true
@@ -444,11 +466,11 @@
// Any other case, operate on a per-cell basis.
cellCount := 0
- for key, cs := range r.cells {
- i := strings.Index(key, ":") // guaranteed to exist
- fam, col := key[:i], key[i+1:]
- r.cells[key] = filterCells(f, fam, col, cs)
- cellCount += len(r.cells[key])
+ for _, fam := range r.families {
+ for colName, cs := range fam.cells {
+ fam.cells[colName] = filterCells(f, fam.name, colName, cs)
+ cellCount += len(fam.cells[colName])
+ }
}
return cellCount > 0
}
@@ -574,7 +596,7 @@
return nil, grpc.Errorf(codes.NotFound, "table %q not found", req.TableName)
}
- fs := tbl.columnFamiliesSet()
+ fs := tbl.columnFamilies()
r := tbl.mutableRow(string(req.RowKey))
r.mu.Lock()
defer r.mu.Unlock()
@@ -595,7 +617,7 @@
res := &btpb.MutateRowsResponse{Entries: make([]*btpb.MutateRowsResponse_Entry, len(req.Entries))}
- fs := tbl.columnFamiliesSet()
+ fs := tbl.columnFamilies()
for i, entry := range req.Entries {
r := tbl.mutableRow(string(entry.RowKey))
@@ -625,7 +647,7 @@
res := &btpb.CheckAndMutateRowResponse{}
- fs := tbl.columnFamiliesSet()
+ fs := tbl.columnFamilies()
r := tbl.mutableRow(string(req.RowKey))
r.mu.Lock()
@@ -635,18 +657,13 @@
whichMut := false
if req.PredicateFilter == nil {
// Use true_mutations iff row contains any cells.
- whichMut = len(r.cells) > 0
+ whichMut = !r.isEmpty()
} else {
// Use true_mutations iff any cells in the row match the filter.
// TODO(dsymonds): This could be cheaper.
nr := r.copy()
filterRow(req.PredicateFilter, nr)
- for _, cs := range nr.cells {
- if len(cs) > 0 {
- whichMut = true
- break
- }
- }
+ whichMut = !nr.isEmpty()
// TODO(dsymonds): Figure out if this is supposed to be set
// even when there's no predicate filter.
res.PredicateMatched = whichMut
@@ -665,14 +682,14 @@
// applyMutations applies a sequence of mutations to a row.
// fam should be a snapshot of the keys of tbl.families.
// It assumes r.mu is locked.
-func applyMutations(tbl *table, r *row, muts []*btpb.Mutation, fs map[string]bool) error {
+func applyMutations(tbl *table, r *row, muts []*btpb.Mutation, fs map[string]*columnFamily) error {
for _, mut := range muts {
switch mut := mut.Mutation.(type) {
default:
return fmt.Errorf("can't handle mutation type %T", mut)
case *btpb.Mutation_SetCell_:
set := mut.SetCell
- if !fs[set.FamilyName] {
+ if _, ok := fs[set.FamilyName]; !ok {
return fmt.Errorf("unknown family %q", set.FamilyName)
}
ts := set.TimestampMicros
@@ -682,54 +699,73 @@
if !tbl.validTimestamp(ts) {
return fmt.Errorf("invalid timestamp %d", ts)
}
- col := fmt.Sprintf("%s:%s", set.FamilyName, set.ColumnQualifier)
+ fam := set.FamilyName
+ col := string(set.ColumnQualifier)
newCell := cell{ts: ts, value: set.Value}
- r.cells[col] = appendOrReplaceCell(r.cells[col], newCell)
+ if _, ok := r.families[fam]; !ok {
+ r.families[fam] = &family{
+ name: fam,
+ order: fs[fam].order,
+ cells: make(map[string][]cell),
+ }
+ }
+ f := r.families[fam]
+ if _, ok := f.cells[col]; !ok {
+ f.colNames = append(f.colNames, col)
+ sort.Strings(f.colNames)
+ }
+ f.cells[col] = appendOrReplaceCell(f.cells[col], newCell)
case *btpb.Mutation_DeleteFromColumn_:
del := mut.DeleteFromColumn
- col := fmt.Sprintf("%s:%s", del.FamilyName, del.ColumnQualifier)
-
- cs := r.cells[col]
- if del.TimeRange != nil {
- tsr := del.TimeRange
- if !tbl.validTimestamp(tsr.StartTimestampMicros) {
- return fmt.Errorf("invalid timestamp %d", tsr.StartTimestampMicros)
+ fam := del.FamilyName
+ col := string(del.ColumnQualifier)
+ if _, ok := r.families[fam]; ok {
+ cs := r.families[fam].cells[col]
+ if del.TimeRange != nil {
+ tsr := del.TimeRange
+ if !tbl.validTimestamp(tsr.StartTimestampMicros) {
+ return fmt.Errorf("invalid timestamp %d", tsr.StartTimestampMicros)
+ }
+ if !tbl.validTimestamp(tsr.EndTimestampMicros) {
+ return fmt.Errorf("invalid timestamp %d", tsr.EndTimestampMicros)
+ }
+ // Find half-open interval to remove.
+ // Cells are in descending timestamp order,
+ // so the predicates to sort.Search are inverted.
+ si, ei := 0, len(cs)
+ if tsr.StartTimestampMicros > 0 {
+ ei = sort.Search(len(cs), func(i int) bool { return cs[i].ts < tsr.StartTimestampMicros })
+ }
+ if tsr.EndTimestampMicros > 0 {
+ si = sort.Search(len(cs), func(i int) bool { return cs[i].ts < tsr.EndTimestampMicros })
+ }
+ if si < ei {
+ copy(cs[si:], cs[ei:])
+ cs = cs[:len(cs)-(ei-si)]
+ }
+ } else {
+ cs = nil
}
- if !tbl.validTimestamp(tsr.EndTimestampMicros) {
- return fmt.Errorf("invalid timestamp %d", tsr.EndTimestampMicros)
+ if len(cs) == 0 {
+ delete(r.families[fam].cells, col)
+ colNames := r.families[fam].colNames
+ i := sort.Search(len(colNames), func(i int) bool { return colNames[i] >= col })
+ if i < len(colNames) && colNames[i] == col {
+ r.families[fam].colNames = append(colNames[:i], colNames[i+1:]...)
+ }
+ if len(r.families[fam].cells) == 0 {
+ delete(r.families, fam)
+ }
+ } else {
+ r.families[fam].cells[col] = cs
}
- // Find half-open interval to remove.
- // Cells are in descending timestamp order,
- // so the predicates to sort.Search are inverted.
- si, ei := 0, len(cs)
- if tsr.StartTimestampMicros > 0 {
- ei = sort.Search(len(cs), func(i int) bool { return cs[i].ts < tsr.StartTimestampMicros })
- }
- if tsr.EndTimestampMicros > 0 {
- si = sort.Search(len(cs), func(i int) bool { return cs[i].ts < tsr.EndTimestampMicros })
- }
- if si < ei {
- copy(cs[si:], cs[ei:])
- cs = cs[:len(cs)-(ei-si)]
- }
- } else {
- cs = nil
- }
- if len(cs) == 0 {
- delete(r.cells, col)
- } else {
- r.cells[col] = cs
}
case *btpb.Mutation_DeleteFromRow_:
- r.cells = make(map[string][]cell)
+ r.families = make(map[string]*family)
case *btpb.Mutation_DeleteFromFamily_:
- fampre := mut.DeleteFromFamily.FamilyName + ":"
- for col, _ := range r.cells {
- if strings.HasPrefix(col, fampre) {
- delete(r.cells, col)
- }
- }
+ fampre := mut.DeleteFromFamily.FamilyName
+ delete(r.families, fampre)
}
}
return nil
@@ -774,7 +810,7 @@
updates := make(map[string]cell) // copy of updated cells; keyed by full column name
- fs := tbl.columnFamiliesSet()
+ fs := tbl.columnFamilies()
r := tbl.mutableRow(string(req.RowKey))
r.mu.Lock()
@@ -782,17 +818,31 @@
// Assume all mutations apply to the most recent version of the cell.
// TODO(dsymonds): Verify this assumption and document it in the proto.
for _, rule := range req.Rules {
- if !fs[rule.FamilyName] {
+ if _, ok := fs[rule.FamilyName]; !ok {
return nil, fmt.Errorf("unknown family %q", rule.FamilyName)
}
- key := fmt.Sprintf("%s:%s", rule.FamilyName, rule.ColumnQualifier)
+ fam := rule.FamilyName
+ col := string(rule.ColumnQualifier)
+ isEmpty := false
+ if _, ok := r.families[fam]; !ok {
+ r.families[fam] = &family{
+ name: fam,
+ order: fs[fam].order,
+ cells: make(map[string][]cell),
+ }
+ }
+ if cs, ok := r.families[fam].cells[col]; !ok {
+ r.families[fam].colNames = append(r.families[fam].colNames, col)
+ isEmpty = true
+ } else {
+ isEmpty = len(cs) == 0
+ }
- cells := r.cells[key]
ts := newTimestamp()
var newCell, prevCell cell
- isEmpty := len(cells) == 0
if !isEmpty {
+ cells := r.families[fam].cells[col]
prevCell = cells[0]
// ts is the max of now or the prev cell's timestamp in case the
@@ -819,8 +869,9 @@
binary.BigEndian.PutUint64(val[:], uint64(v))
newCell = cell{ts: ts, value: val[:]}
}
+ key := strings.Join([]string{fam, col}, ":")
updates[key] = newCell
- r.cells[key] = appendOrReplaceCell(r.cells[key], newCell)
+ r.families[fam].cells[col] = appendOrReplaceCell(r.families[fam].cells[col], newCell)
}
res := &btpb.Row{
@@ -920,6 +971,7 @@
type table struct {
mu sync.RWMutex
+ counter uint64 // increment by 1 when a new family is created
families map[string]*columnFamily // keyed by plain family name
rows []*row // sorted by row key
rowIndex map[string]*row // indexed by row key
@@ -927,16 +979,20 @@
func newTable(ctr *btapb.CreateTableRequest) *table {
fams := make(map[string]*columnFamily)
+ c := uint64(0)
if ctr.Table != nil {
for id, cf := range ctr.Table.ColumnFamilies {
fams[id] = &columnFamily{
name: ctr.Parent + "/columnFamilies/" + id,
+ order: c,
gcRule: cf.GcRule,
}
+ c++
}
}
return &table{
families: fams,
+ counter: c,
rowIndex: make(map[string]*row),
}
}
@@ -956,14 +1012,6 @@
return cp
}
-func (t *table) columnFamiliesSet() map[string]bool {
- fs := make(map[string]bool)
- for fam := range t.columnFamilies() {
- fs[fam] = true
- }
- return fs
-}
-
func (t *table) mutableRow(row string) *row {
// Try fast path first.
t.mu.RLock()
@@ -1018,14 +1066,14 @@
type row struct {
key string
- mu sync.Mutex
- cells map[string][]cell // keyed by full column name; cells are in descending timestamp order
+ mu sync.Mutex
+ families map[string]*family // keyed by family name
}
func newRow(key string) *row {
return &row{
- key: key,
- cells: make(map[string][]cell),
+ key: key,
+ families: make(map[string]*family),
}
}
@@ -1033,36 +1081,66 @@
// Cell values are aliased.
// r.mu should be held.
func (r *row) copy() *row {
- nr := &row{
- key: r.key,
- cells: make(map[string][]cell, len(r.cells)),
- }
- for col, cs := range r.cells {
- // Copy the []cell slice, but not the []byte inside each cell.
- nr.cells[col] = append([]cell(nil), cs...)
+ nr := newRow(r.key)
+ for _, fam := range r.families {
+ nr.families[fam.name] = &family{
+ name: fam.name,
+ order: fam.order,
+ colNames: fam.colNames,
+ cells: make(map[string][]cell),
+ }
+ for col, cs := range fam.cells {
+ // Copy the []cell slice, but not the []byte inside each cell.
+ nr.families[fam.name].cells[col] = append([]cell(nil), cs...)
+ }
}
return nr
}
+// return true iff a row doesn't contain any cell
+func (r *row) isEmpty() bool {
+ for _, fam := range r.families {
+ for _, cs := range fam.cells {
+ if len(cs) > 0 {
+ return false
+ }
+ }
+ }
+ return true
+}
+
+// sort family names in ascending creation order in a row.
+func (r *row) sortedFamilies() []*family {
+ var families []*family
+ for _, fam := range r.families {
+ families = append(families, fam)
+ }
+ sort.Sort(byCreationOrder(families))
+ return families
+}
+
// gc applies the given GC rules to the row.
// r.mu should be held.
func (r *row) gc(rules map[string]*btapb.GcRule) {
- for col, cs := range r.cells {
- fam := col[:strings.Index(col, ":")]
- rule, ok := rules[fam]
+ for _, fam := range r.families {
+ rule, ok := rules[fam.name]
if !ok {
continue
}
- r.cells[col] = applyGC(cs, rule)
+ for col, cs := range fam.cells {
+ r.families[fam.name].cells[col] = applyGC(cs, rule)
+ }
}
}
// size returns the total size of all cell values in the row.
func (r *row) size() int {
size := 0
- for _, cells := range r.cells {
- for _, cell := range cells {
- size += len(cell.value)
+ for _, fam := range r.families {
+ for _, cells := range fam.cells {
+ for _, cell := range cells {
+ size += len(cell.value)
+ }
}
}
return size
@@ -1109,6 +1187,19 @@
return cells
}
+type family struct {
+ name string // Column family name
+ order uint64 // Creation order of column family
+ colNames []string // Collumn names are sorted in lexicographical ascending order
+ cells map[string][]cell // Keyed by collumn name; cells are in descending timestamp order
+}
+
+type byCreationOrder []*family
+
+func (b byCreationOrder) Len() int { return len(b) }
+func (b byCreationOrder) Swap(i, j int) { b[i], b[j] = b[j], b[i] }
+func (b byCreationOrder) Less(i, j int) bool { return b[i].order < b[j].order }
+
type cell struct {
ts int64
value []byte
@@ -1122,6 +1213,7 @@
type columnFamily struct {
name string
+ order uint64 // Creation order of column family
gcRule *btapb.GcRule
}
diff --git a/bigtable/bttest/inmem_test.go b/bigtable/bttest/inmem_test.go
index 005c4ee..8a6b61e 100644
--- a/bigtable/bttest/inmem_test.go
+++ b/bigtable/bttest/inmem_test.go
@@ -271,50 +271,152 @@
tblSize := len(tbl.rows)
req := &btapb.DropRowRangeRequest{
- Name:tblInfo.Name,
+ Name: tblInfo.Name,
Target: &btapb.DropRowRangeRequest_RowKeyPrefix{[]byte("AAA")},
}
if _, err = s.DropRowRange(ctx, req); err != nil {
t.Fatalf("Dropping first range: %v", err)
}
- got, want := len(tbl.rows), tblSize - count
+ got, want := len(tbl.rows), tblSize-count
if got != want {
t.Errorf("Row count after first drop: got %d (%v), want %d", got, tbl.rows, want)
}
req = &btapb.DropRowRangeRequest{
- Name:tblInfo.Name,
+ Name: tblInfo.Name,
Target: &btapb.DropRowRangeRequest_RowKeyPrefix{[]byte("DDD")},
}
if _, err = s.DropRowRange(ctx, req); err != nil {
t.Fatalf("Dropping second range: %v", err)
}
- got, want = len(tbl.rows), tblSize - (2 * count)
- if got != want {
+ got, want = len(tbl.rows), tblSize-(2*count)
+ if got != want {
t.Errorf("Row count after second drop: got %d (%v), want %d", got, tbl.rows, want)
}
req = &btapb.DropRowRangeRequest{
- Name:tblInfo.Name,
+ Name: tblInfo.Name,
Target: &btapb.DropRowRangeRequest_RowKeyPrefix{[]byte("XXX")},
}
if _, err = s.DropRowRange(ctx, req); err != nil {
t.Fatalf("Dropping invalid range: %v", err)
}
- got, want = len(tbl.rows), tblSize - (2 * count)
- if got != want {
+ got, want = len(tbl.rows), tblSize-(2*count)
+ if got != want {
t.Errorf("Row count after invalid drop: got %d (%v), want %d", got, tbl.rows, want)
}
req = &btapb.DropRowRangeRequest{
- Name:tblInfo.Name,
+ Name: tblInfo.Name,
Target: &btapb.DropRowRangeRequest_DeleteAllDataFromTable{true},
}
if _, err = s.DropRowRange(ctx, req); err != nil {
t.Fatalf("Dropping all data: %v", err)
}
got, want = len(tbl.rows), 0
- if got != want {
+ if got != want {
t.Errorf("Row count after drop all: got %d, want %d", got, want)
}
-}
\ No newline at end of file
+}
+
+type MockReadRowsServer struct {
+ responses []*btpb.ReadRowsResponse
+ grpc.ServerStream
+}
+
+func (s *MockReadRowsServer) Send(resp *btpb.ReadRowsResponse) error {
+ s.responses = append(s.responses, resp)
+ return nil
+}
+
+func TestReadRowsOrder(t *testing.T) {
+ s := &server{
+ tables: make(map[string]*table),
+ }
+ ctx := context.Background()
+ newTbl := btapb.Table{
+ ColumnFamilies: map[string]*btapb.ColumnFamily{
+ "cf0": {GcRule: &btapb.GcRule{Rule: &btapb.GcRule_MaxNumVersions{1}}},
+ "cf1": {GcRule: &btapb.GcRule{Rule: &btapb.GcRule_MaxNumVersions{2}}},
+ },
+ }
+ tblInfo, err := s.CreateTable(ctx, &btapb.CreateTableRequest{Parent: "cluster", TableId: "t", Table: &newTbl})
+ if err != nil {
+ t.Fatalf("Creating table: %v", err)
+ }
+
+ // tbl := s.tables[tblInfo.Name]
+ req := &btapb.ModifyColumnFamiliesRequest{
+ Name: tblInfo.Name,
+ Modifications: []*btapb.ModifyColumnFamiliesRequest_Modification{{
+ Id: "cf2",
+ Mod: &btapb.ModifyColumnFamiliesRequest_Modification_Create{&btapb.ColumnFamily{}},
+ }},
+ }
+ _, err = s.ModifyColumnFamilies(ctx, req)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // Populate the table
+ count := 3
+ for fc := 0; fc < count; fc++ {
+ for cc := 0; cc < count; cc++ {
+ for tc := 0; tc < count; tc++ {
+ req := &btpb.MutateRowRequest{
+ TableName: tblInfo.Name,
+ RowKey: []byte("row"),
+ Mutations: []*btpb.Mutation{{
+ Mutation: &btpb.Mutation_SetCell_{&btpb.Mutation_SetCell{
+ FamilyName: "cf" + strconv.Itoa(fc),
+ ColumnQualifier: []byte("col" + strconv.Itoa(cc)),
+ TimestampMicros: int64((tc + 1) * 1000),
+ Value: []byte{},
+ }},
+ }},
+ }
+ if _, err := s.MutateRow(ctx, req); err != nil {
+ t.Fatalf("Populating table: %v", err)
+ }
+ }
+ }
+ }
+ rreq := &btpb.ReadRowsRequest{
+ TableName: tblInfo.Name,
+ Rows: &btpb.RowSet{RowKeys: [][]byte{[]byte("row")}},
+ }
+ mock := &MockReadRowsServer{}
+ if err = s.ReadRows(rreq, mock); err != nil {
+ t.Errorf("ReadRows error: %v", err)
+ }
+ if len(mock.responses) == 0 {
+ t.Fatal("Response count: got 0, want > 0")
+ }
+ if len(mock.responses[0].Chunks) == 9 {
+ t.Fatal("Chunk count: got %d, want 9", len(mock.responses[0].Chunks))
+ }
+ var prevFam, prevCol string
+ var prevTime int64
+ for _, cc := range mock.responses[0].Chunks {
+ if prevFam == "" {
+ prevFam = cc.FamilyName.Value
+ prevCol = string(cc.Qualifier.Value)
+ prevTime = cc.TimestampMicros
+ continue
+ }
+ if cc.FamilyName.Value < prevFam {
+ t.Errorf("Family order is not correct: got %s < %s", cc.FamilyName.Value, prevFam)
+ } else if cc.FamilyName.Value == prevFam {
+ if string(cc.Qualifier.Value) < prevCol {
+ t.Errorf("Column order is not correct: got %s < %s", string(cc.Qualifier.Value), prevCol)
+ } else if string(cc.Qualifier.Value) == prevCol {
+ if cc.TimestampMicros > prevTime {
+ t.Errorf("cell order is not correct: got %d > %d", cc.TimestampMicros, prevTime)
+ }
+ }
+ }
+ prevFam = cc.FamilyName.Value
+ prevCol = string(cc.Qualifier.Value)
+ prevTime = cc.TimestampMicros
+ }
+}