feat(bigtable): Add support for reverse scans (#8755)
* feat: add more expressive range api
* feat(bigtable): Add support for reverse scans
* resolve feature flag conflict after merge
* add a new constructor for chunkReader to avoid line noise
* remove orphaned code
* fix typo
* adding a first test
* extended RowRange to express bound types
* adding unit tests
* correcting backwards compatability behavior
* adding test proxy support for reverse scan
* added todo reminder
* all unit tests pass
* updated naming
* fix retries on reverse scan
* more tests
* exposing client messages to test proxy
formatting
* exposing client messages to test proxy
formatting
* fixing vet errors
* minor style tweaks
* changing error message to be consistent with java and cpp
* cleaning up code and adding tests
* simplify RowRange valid logic
* rolling back test proxy changes
* rolling back test proxy
* changed default bound type for better backwards compatability
rolled back some integration test changes to ensure backwards compatability
---------
Co-authored-by: Igor Berntein <igorbernstein@google.com>
diff --git a/bigtable/bigtable.go b/bigtable/bigtable.go
index 43993fc..0cd7a70 100644
--- a/bigtable/bigtable.go
+++ b/bigtable/bigtable.go
@@ -207,7 +207,14 @@
if err != nil {
return err
}
- cr := newChunkReader()
+
+ var cr *chunkReader
+ if req.Reversed {
+ cr = newReverseChunkReader()
+ } else {
+ cr = newChunkReader()
+ }
+
for {
res, err := stream.Recv()
if err == io.EOF {
@@ -215,7 +222,11 @@
}
if err != nil {
// Reset arg for next Invoke call.
- arg = arg.retainRowsAfter(prevRowKey)
+ if req.Reversed {
+ arg = arg.retainRowsBefore(prevRowKey)
+ } else {
+ arg = arg.retainRowsAfter(prevRowKey)
+ }
attrMap["rowKey"] = prevRowKey
attrMap["error"] = err.Error()
attrMap["time_secs"] = time.Since(startTime).Seconds()
@@ -306,6 +317,10 @@
// given row key or any row key lexicographically less than it.
retainRowsAfter(lastRowKey string) RowSet
+ // retainRowsBefore returns a new RowSet that does not include the
+ // given row key or any row key lexicographically greater than it.
+ retainRowsBefore(lastRowKey string) RowSet
+
// Valid reports whether this set can cover at least one row.
valid() bool
}
@@ -331,53 +346,184 @@
return retryKeys
}
+func (r RowList) retainRowsBefore(lastRowKey string) RowSet {
+ var retryKeys RowList
+ for _, key := range r {
+ if key < lastRowKey {
+ retryKeys = append(retryKeys, key)
+ }
+ }
+ return retryKeys
+}
+
func (r RowList) valid() bool {
return len(r) > 0
}
-// A RowRange is a half-open interval [Start, Limit) encompassing
-// all the rows with keys at least as large as Start, and less than Limit.
-// (Bigtable string comparison is the same as Go's.)
-// A RowRange can be unbounded, encompassing all keys at least as large as Start.
+type rangeBoundType int64
+
+const (
+ rangeUnbounded rangeBoundType = iota
+ rangeOpen
+ rangeClosed
+)
+
+// A RowRange describes a range of rows between the start and end key. Start and
+// end keys may be rangeOpen, rangeClosed or rangeUnbounded.
type RowRange struct {
- start string
- limit string
+ startBound rangeBoundType
+ start string
+ endBound rangeBoundType
+ end string
}
// NewRange returns the new RowRange [begin, end).
func NewRange(begin, end string) RowRange {
+ return createRowRange(rangeClosed, begin, rangeOpen, end)
+}
+
+// NewClosedOpenRange returns the RowRange consisting of all greater than or
+// equal to the start and less than the end: [start, end).
+func NewClosedOpenRange(start, end string) RowRange {
+ return createRowRange(rangeClosed, start, rangeOpen, end)
+}
+
+// NewOpenClosedRange returns the RowRange consisting of all keys greater than
+// the start and less than or equal to the end: (start, end].
+func NewOpenClosedRange(start, end string) RowRange {
+ return createRowRange(rangeOpen, start, rangeClosed, end)
+}
+
+// NewOpenRange returns the RowRange consisting of all keys greater than the
+// start and less than the end: (start, end).
+func NewOpenRange(start, end string) RowRange {
+ return createRowRange(rangeOpen, start, rangeOpen, end)
+}
+
+// NewClosedRange returns the RowRange consisting of all keys greater than or
+// equal to the start and less than or equal to the end: [start, end].
+func NewClosedRange(start, end string) RowRange {
+ return createRowRange(rangeClosed, start, rangeClosed, end)
+}
+
+// PrefixRange returns a RowRange consisting of all keys starting with the prefix.
+func PrefixRange(prefix string) RowRange {
+ end := prefixSuccessor(prefix)
+ return createRowRange(rangeClosed, prefix, rangeOpen, end)
+}
+
+// InfiniteRange returns the RowRange consisting of all keys at least as
+// large as start: [start, ∞).
+func InfiniteRange(start string) RowRange {
+ return createRowRange(rangeClosed, start, rangeUnbounded, "")
+}
+
+// InfiniteReverseRange returns the RowRange consisting of all keys less than or
+// equal to the end: (∞, end].
+func InfiniteReverseRange(end string) RowRange {
+ return createRowRange(rangeUnbounded, "", rangeClosed, end)
+}
+
+// createRowRange creates a new RowRange, normalizing start and end
+// rangeBoundType to rangeUnbounded if they're empty strings because empty
+// strings also represent unbounded keys
+func createRowRange(startBound rangeBoundType, start string, endBound rangeBoundType, end string) RowRange {
+ // normalize start bound type
+ if start == "" {
+ startBound = rangeUnbounded
+ }
+ // normalize end bound type
+ if end == "" {
+ endBound = rangeUnbounded
+ }
return RowRange{
- start: begin,
- limit: end,
+ startBound: startBound,
+ start: start,
+ endBound: endBound,
+ end: end,
}
}
// Unbounded tests whether a RowRange is unbounded.
func (r RowRange) Unbounded() bool {
- return r.limit == ""
+ return r.startBound == rangeUnbounded || r.endBound == rangeUnbounded
}
// Contains says whether the RowRange contains the key.
func (r RowRange) Contains(row string) bool {
- return r.start <= row && (r.limit == "" || r.limit > row)
+ switch r.startBound {
+ case rangeOpen:
+ if r.start >= row {
+ return false
+ }
+ case rangeClosed:
+ if r.start > row {
+ return false
+ }
+ case rangeUnbounded:
+ }
+
+ switch r.endBound {
+ case rangeOpen:
+ if r.end <= row {
+ return false
+ }
+ case rangeClosed:
+ if r.end < row {
+ return false
+ }
+ case rangeUnbounded:
+ }
+
+ return true
}
// String provides a printable description of a RowRange.
func (r RowRange) String() string {
- a := strconv.Quote(r.start)
- if r.Unbounded() {
- return fmt.Sprintf("[%s,∞)", a)
+ var startStr string
+ switch r.startBound {
+ case rangeOpen:
+ startStr = "(" + strconv.Quote(r.start)
+ case rangeClosed:
+ startStr = "[" + strconv.Quote(r.start)
+ case rangeUnbounded:
+ startStr = "(∞"
}
- return fmt.Sprintf("[%s,%q)", a, r.limit)
+
+ var endStr string
+ switch r.endBound {
+ case rangeOpen:
+ endStr = r.end + ")"
+ case rangeClosed:
+ endStr = r.end + "]"
+ case rangeUnbounded:
+ endStr = "∞)"
+ }
+
+ return fmt.Sprintf("%s,%s", startStr, endStr)
}
func (r RowRange) proto() *btpb.RowSet {
- rr := &btpb.RowRange{
- StartKey: &btpb.RowRange_StartKeyClosed{StartKeyClosed: []byte(r.start)},
+ rr := &btpb.RowRange{}
+
+ switch r.startBound {
+ case rangeOpen:
+ rr.StartKey = &btpb.RowRange_StartKeyOpen{StartKeyOpen: []byte(r.start)}
+ case rangeClosed:
+ rr.StartKey = &btpb.RowRange_StartKeyClosed{StartKeyClosed: []byte(r.start)}
+ case rangeUnbounded:
+ // leave unbounded
}
- if !r.Unbounded() {
- rr.EndKey = &btpb.RowRange_EndKeyOpen{EndKeyOpen: []byte(r.limit)}
+
+ switch r.endBound {
+ case rangeOpen:
+ rr.EndKey = &btpb.RowRange_EndKeyOpen{EndKeyOpen: []byte(r.end)}
+ case rangeClosed:
+ rr.EndKey = &btpb.RowRange_EndKeyClosed{EndKeyClosed: []byte(r.end)}
+ case rangeUnbounded:
+ // leave unbounded
}
+
return &btpb.RowSet{RowRanges: []*btpb.RowRange{rr}}
}
@@ -385,16 +531,45 @@
if lastRowKey == "" || lastRowKey < r.start {
return r
}
- // Set the beginning of the range to the row after the last scanned.
- start := lastRowKey + "\x00"
- if r.Unbounded() {
- return InfiniteRange(start)
+
+ return RowRange{
+ // Set the beginning of the range to the row after the last scanned.
+ startBound: rangeOpen,
+ start: lastRowKey,
+ endBound: r.endBound,
+ end: r.end,
}
- return NewRange(start, r.limit)
+}
+
+func (r RowRange) retainRowsBefore(lastRowKey string) RowSet {
+ if lastRowKey == "" || (r.endBound != rangeUnbounded && r.end < lastRowKey) {
+ return r
+ }
+
+ return RowRange{
+ startBound: r.startBound,
+ start: r.start,
+ endBound: rangeOpen,
+ end: lastRowKey,
+ }
}
func (r RowRange) valid() bool {
- return r.Unbounded() || r.start < r.limit
+ // If either end is unbounded, then the range is always valid.
+ if r.Unbounded() {
+ return true
+ }
+
+ // If either end is an open interval, then the start must be strictly less
+ // than the end and since neither end is unbounded, we don't have to check
+ // for empty strings.
+ if r.startBound == rangeOpen || r.endBound == rangeOpen {
+ return r.start < r.end
+ }
+
+ // At this point both endpoints must be closed, which makes [a,a] a valid
+ // interval
+ return r.start <= r.end
}
// RowRangeList is a sequence of RowRanges representing the union of the ranges.
@@ -424,6 +599,21 @@
return ranges
}
+func (r RowRangeList) retainRowsBefore(lastRowKey string) RowSet {
+ if lastRowKey == "" {
+ return r
+ }
+ // Return a list of any range that has not yet been completely processed
+ var ranges RowRangeList
+ for _, rr := range r {
+ retained := rr.retainRowsBefore(lastRowKey)
+ if retained.valid() {
+ ranges = append(ranges, retained.(RowRange))
+ }
+ }
+ return ranges
+}
+
func (r RowRangeList) valid() bool {
for _, rr := range r {
if rr.valid() {
@@ -438,23 +628,6 @@
return RowList{row}
}
-// PrefixRange returns a RowRange consisting of all keys starting with the prefix.
-func PrefixRange(prefix string) RowRange {
- return RowRange{
- start: prefix,
- limit: prefixSuccessor(prefix),
- }
-}
-
-// InfiniteRange returns the RowRange consisting of all keys at least as
-// large as start.
-func InfiniteRange(start string) RowRange {
- return RowRange{
- start: start,
- limit: "",
- }
-}
-
// prefixSuccessor returns the lexically smallest string greater than the
// prefix, if it exists, or "" otherwise. In either case, it is the string
// needed for the Limit of a RowRange.
@@ -557,7 +730,7 @@
func (rf rowFilter) set(settings *readSettings) { settings.req.Filter = rf.f.proto() }
-// LimitRows returns a ReadOption that will limit the number of rows to be read.
+// LimitRows returns a ReadOption that will end the number of rows to be read.
func LimitRows(limit int64) ReadOption { return limitRows{limit} }
type limitRows struct{ limit int64 }
@@ -577,6 +750,25 @@
settings.fullReadStatsFunc = wrs.f
}
+// ReverseScan returns a RadOption that will reverse the results of a Scan.
+// The rows will be streamed in reverse lexiographic order of the keys. The row key ranges of the RowSet are
+// still expected to be oriented the same way as forwards. ie [a,c] where a <= c. The row content
+// will remain unchanged from the ordering forward scans. This is particularly useful to get the
+// last N records before a key:
+//
+// table.ReadRows(ctx, NewOpenClosedRange("", "key"), func(row bigtable.Row) bool {
+// return true
+// }, bigtable.ReverseScan(), bigtable.LimitRows(10))
+func ReverseScan() ReadOption {
+ return reverseScan{}
+}
+
+type reverseScan struct{}
+
+func (rs reverseScan) set(settings *readSettings) {
+ settings.req.Reversed = true
+}
+
// mutationsAreRetryable returns true if all mutations are idempotent
// and therefore retryable. A mutation is idempotent iff all cell timestamps
// have an explicit timestamp set and do not rely on the timestamp being set on the server.
diff --git a/bigtable/bigtable_test.go b/bigtable/bigtable_test.go
index 4fc0d13..0bea28e 100644
--- a/bigtable/bigtable_test.go
+++ b/bigtable/bigtable_test.go
@@ -18,6 +18,7 @@
import (
"context"
+ "reflect"
"testing"
"time"
@@ -42,11 +43,206 @@
continue
}
r := PrefixRange(test.prefix)
- if test.succ == "" && r.limit != "" {
- t.Errorf("PrefixRange(%q) got limit %q", test.prefix, r.limit)
+ if test.succ == "" && r.end != "" {
+ t.Errorf("PrefixRange(%q) got end %q", test.prefix, r.end)
}
- if test.succ != "" && r.limit != test.succ {
- t.Errorf("PrefixRange(%q) got limit %q, want %q", test.prefix, r.limit, test.succ)
+ if test.succ != "" && r.end != test.succ {
+ t.Errorf("PrefixRange(%q) got end %q, want %q", test.prefix, r.end, test.succ)
+ }
+ }
+}
+
+func TestNewClosedOpenRange(t *testing.T) {
+ start := "b"
+ limit := "b\x01"
+ r := NewClosedOpenRange(start, limit)
+ for _, test := range []struct {
+ k string
+ contains bool
+ }{
+ {"a", false},
+ {"b", true},
+ {"b\x00", true},
+ {"b\x01", false},
+ } {
+ if want, got := test.contains, r.Contains(test.k); want != got {
+ t.Errorf("%s.Contains(%q) = %t, want %t", r.String(), test.k, got, want)
+ }
+ }
+
+ for _, test := range []struct {
+ start, limit string
+ valid bool
+ }{
+ {"a", "a", false},
+ {"b", "a", false},
+ {"a", "a\x00", true},
+ {"a", "b", true},
+ } {
+ r := NewClosedOpenRange(test.start, test.limit)
+ if want, got := test.valid, r.valid(); want != got {
+ t.Errorf("%s.valid() = %t, want %t", r.String(), got, want)
+ }
+ }
+}
+func TestNewOpenClosedRange(t *testing.T) {
+ start := "b"
+ limit := "b\x01"
+ r := NewOpenClosedRange(start, limit)
+ for _, test := range []struct {
+ k string
+ contains bool
+ }{
+ {"a", false},
+ {"b", false},
+ {"b\x00", true},
+ {"b\x01", true},
+ {"b\x01\x00", false},
+ } {
+ if want, got := test.contains, r.Contains(test.k); want != got {
+ t.Errorf("%s.Contains(%q) = %t, want %t", r.String(), test.k, got, want)
+ }
+ }
+
+ for _, test := range []struct {
+ start, limit string
+ valid bool
+ }{
+ {"a", "a", false},
+ {"b", "a", false},
+ {"a", "a\x00", true},
+ {"a", "b", true},
+ } {
+ r := NewOpenClosedRange(test.start, test.limit)
+ if want, got := test.valid, r.valid(); want != got {
+ t.Errorf("%s.valid() = %t, want %t", r.String(), got, want)
+ }
+ }
+}
+func TestNewClosedRange(t *testing.T) {
+ start := "b"
+ limit := "b"
+
+ r := NewClosedRange(start, limit)
+ for _, test := range []struct {
+ k string
+ contains bool
+ }{
+ {"a", false},
+ {"b", true},
+ {"b\x01", false},
+ } {
+ if want, got := test.contains, r.Contains(test.k); want != got {
+ t.Errorf("NewClosedRange(%q, %q).Contains(%q) = %t, want %t", "a", "a\x01", test.k, got, test.contains)
+ }
+ }
+
+ for _, test := range []struct {
+ start, limit string
+ valid bool
+ }{
+ {"a", "b", true},
+ {"b", "b", true},
+ {"b", "b\x00", true},
+ {"b\x00", "b", false},
+ } {
+ r := NewClosedRange(test.start, test.limit)
+ if want, got := test.valid, r.valid(); want != got {
+ t.Errorf("NewClosedRange(%q, %q).valid() = %t, want %t", test.start, test.limit, got, want)
+ }
+ }
+}
+
+func TestNewOpenRange(t *testing.T) {
+ start := "b"
+ limit := "b\x01"
+
+ r := NewOpenRange(start, limit)
+ for _, test := range []struct {
+ k string
+ contains bool
+ }{
+ {"a", false},
+ {"b", false},
+ {"b\x00", true},
+ {"b\x01", false},
+ } {
+ if want, got := test.contains, r.Contains(test.k); want != got {
+ t.Errorf("NewOpenRange(%q, %q).Contains(%q) = %t, want %t", "a", "a\x01", test.k, got, test.contains)
+ }
+ }
+
+ for _, test := range []struct {
+ start, limit string
+ valid bool
+ }{
+ {"a", "a", false},
+ {"a", "b", true},
+ {"a", "a\x00", true},
+ {"a", "a\x01", true},
+ } {
+ r := NewOpenRange(test.start, test.limit)
+ if want, got := test.valid, r.valid(); want != got {
+ t.Errorf("NewOpenRange(%q, %q).valid() = %t, want %t", test.start, test.limit, got, want)
+ }
+ }
+}
+
+func TestInfiniteRange(t *testing.T) {
+ r := InfiniteRange("b")
+ for _, test := range []struct {
+ k string
+ contains bool
+ }{
+ {"a", false},
+ {"b", true},
+ {"b\x00", true},
+ {"z", true},
+ } {
+ if want, got := test.contains, r.Contains(test.k); want != got {
+ t.Errorf("%s.Contains(%q) = %t, want %t", r.String(), test.k, got, want)
+ }
+ }
+
+ for _, test := range []struct {
+ start string
+ valid bool
+ }{
+ {"a", true},
+ {"", true},
+ } {
+ r := InfiniteRange(test.start)
+ if want, got := test.valid, r.valid(); want != got {
+ t.Errorf("%s.valid() = %t, want %t", r.String(), got, want)
+ }
+ }
+}
+
+func TestInfiniteReverseRange(t *testing.T) {
+ r := InfiniteReverseRange("z")
+ for _, test := range []struct {
+ k string
+ contains bool
+ }{
+ {"a", true},
+ {"z", true},
+ {"z\x00", false},
+ } {
+ if want, got := test.contains, r.Contains(test.k); want != got {
+ t.Errorf("%s.Contains(%q) = %t, want %t", r.String(), test.k, got, want)
+ }
+ }
+
+ for _, test := range []struct {
+ start string
+ valid bool
+ }{
+ {"a", true},
+ {"", true},
+ } {
+ r := InfiniteReverseRange(test.start)
+ if want, got := test.valid, r.valid(); want != got {
+ t.Errorf("%s.valid() = %t, want %t", r.String(), got, want)
}
}
}
@@ -176,6 +372,188 @@
}
}
+func TestRowRangeProto(t *testing.T) {
+
+ for _, test := range []struct {
+ desc string
+ rr RowRange
+ proto *btpb.RowSet
+ }{
+ {
+ desc: "RowRange proto start and end",
+ rr: NewClosedOpenRange("a", "b"),
+ proto: &btpb.RowSet{RowRanges: []*btpb.RowRange{{
+ StartKey: &btpb.RowRange_StartKeyClosed{StartKeyClosed: []byte("a")},
+ EndKey: &btpb.RowRange_EndKeyOpen{EndKeyOpen: []byte("b")},
+ }}},
+ },
+ {
+ desc: "RowRange proto start but empty end",
+ rr: NewClosedOpenRange("a", ""),
+ proto: &btpb.RowSet{RowRanges: []*btpb.RowRange{{
+ StartKey: &btpb.RowRange_StartKeyClosed{StartKeyClosed: []byte("a")},
+ }}},
+ },
+ {
+ desc: "RowRange proto unbound",
+ rr: NewClosedOpenRange("", ""),
+ proto: &btpb.RowSet{RowRanges: []*btpb.RowRange{{}}},
+ },
+ {
+ desc: "RowRange proto unbound with no start or end",
+ rr: InfiniteRange(""),
+ proto: &btpb.RowSet{RowRanges: []*btpb.RowRange{{}}},
+ },
+ {
+ desc: "RowRange proto open closed",
+ rr: NewOpenClosedRange("a", "b"),
+ proto: &btpb.RowSet{RowRanges: []*btpb.RowRange{{
+ StartKey: &btpb.RowRange_StartKeyOpen{StartKeyOpen: []byte("a")},
+ EndKey: &btpb.RowRange_EndKeyClosed{EndKeyClosed: []byte("b")},
+ }}},
+ },
+ {
+ desc: "RowRange proto open closed and empty start",
+ rr: NewOpenClosedRange("", "b"),
+ proto: &btpb.RowSet{RowRanges: []*btpb.RowRange{{
+ EndKey: &btpb.RowRange_EndKeyClosed{EndKeyClosed: []byte("b")},
+ }}},
+ },
+ {
+ desc: "RowRange proto open closed and empty start",
+ rr: NewOpenClosedRange("", "b"),
+ proto: &btpb.RowSet{RowRanges: []*btpb.RowRange{{
+ EndKey: &btpb.RowRange_EndKeyClosed{EndKeyClosed: []byte("b")},
+ }}},
+ },
+ {
+ desc: "RowRange proto closed open",
+ rr: NewClosedOpenRange("a", "b"),
+ proto: &btpb.RowSet{RowRanges: []*btpb.RowRange{{
+ StartKey: &btpb.RowRange_StartKeyClosed{StartKeyClosed: []byte("a")},
+ EndKey: &btpb.RowRange_EndKeyOpen{EndKeyOpen: []byte("b")},
+ }}},
+ },
+ } {
+ t.Run(test.desc, func(t *testing.T) {
+ got := test.rr.proto()
+ want := test.proto
+ if !reflect.DeepEqual(got, want) {
+ t.Errorf("Bad proto for %s: got %v, want %v", test.rr.String(), got, want)
+ }
+ })
+ }
+}
+
+func TestRowRangeRetainRowsBefore(t *testing.T) {
+ for _, test := range []struct {
+ desc string
+ rr RowSet
+ proto *btpb.RowSet
+ }{
+ {
+ desc: "retain rows before",
+ rr: NewRange("a", "c").retainRowsBefore("b"),
+ proto: &btpb.RowSet{RowRanges: []*btpb.RowRange{{
+ StartKey: &btpb.RowRange_StartKeyClosed{StartKeyClosed: []byte("a")},
+ EndKey: &btpb.RowRange_EndKeyOpen{EndKeyOpen: []byte("b")},
+ }}},
+ },
+ {
+ desc: "retain rows before empty key",
+ rr: NewRange("a", "c").retainRowsBefore(""),
+ proto: &btpb.RowSet{RowRanges: []*btpb.RowRange{{
+ StartKey: &btpb.RowRange_StartKeyClosed{StartKeyClosed: []byte("a")},
+ EndKey: &btpb.RowRange_EndKeyOpen{EndKeyOpen: []byte("c")},
+ }}},
+ },
+ {
+ desc: "retain rows before key greater than range end",
+ rr: NewClosedRange("a", "c").retainRowsBefore("d"),
+ proto: &btpb.RowSet{RowRanges: []*btpb.RowRange{{
+ StartKey: &btpb.RowRange_StartKeyClosed{StartKeyClosed: []byte("a")},
+ EndKey: &btpb.RowRange_EndKeyClosed{EndKeyClosed: []byte("c")},
+ }}},
+ },
+ {
+ desc: "retain rows before key same as closed end key",
+ rr: NewClosedRange("a", "c").retainRowsBefore("c"),
+ proto: &btpb.RowSet{RowRanges: []*btpb.RowRange{{
+ StartKey: &btpb.RowRange_StartKeyClosed{StartKeyClosed: []byte("a")},
+ EndKey: &btpb.RowRange_EndKeyOpen{EndKeyOpen: []byte("c")},
+ }}},
+ },
+ {
+ desc: "retain rows before on unbounded range",
+ rr: InfiniteRange("").retainRowsBefore("z"),
+ proto: &btpb.RowSet{RowRanges: []*btpb.RowRange{{
+ EndKey: &btpb.RowRange_EndKeyOpen{EndKeyOpen: []byte("z")},
+ }}},
+ },
+ } {
+ t.Run(test.desc, func(t *testing.T) {
+ got := test.rr.proto()
+ want := test.proto
+ if !reflect.DeepEqual(got, want) {
+ t.Errorf("Bad retain rows before proto: got %v, want %v", got, want)
+ }
+ })
+ }
+}
+
+func TestRowRangeString(t *testing.T) {
+
+ for _, test := range []struct {
+ desc string
+ rr RowRange
+ str string
+ }{
+ {
+ desc: "RowRange closed open",
+ rr: NewClosedOpenRange("a", "b"),
+ str: "[\"a\",b)",
+ },
+ {
+ desc: "RowRange open open",
+ rr: NewOpenRange("c", "d"),
+ str: "(\"c\",d)",
+ },
+ {
+ desc: "RowRange closed closed",
+ rr: NewClosedRange("e", "f"),
+ str: "[\"e\",f]",
+ },
+ {
+ desc: "RowRange open closed",
+ rr: NewOpenClosedRange("g", "h"),
+ str: "(\"g\",h]",
+ },
+ {
+ desc: "RowRange unbound unbound",
+ rr: InfiniteRange(""),
+ str: "(∞,∞)",
+ },
+ {
+ desc: "RowRange closed unbound",
+ rr: InfiniteRange("b"),
+ str: "[\"b\",∞)",
+ },
+ {
+ desc: "RowRange unbound closed",
+ rr: InfiniteReverseRange("c"),
+ str: "(∞,c]",
+ },
+ } {
+ t.Run(test.desc, func(t *testing.T) {
+ got := test.rr.String()
+ want := test.str
+ if !reflect.DeepEqual(got, want) {
+ t.Errorf("Bad String(): got %v, want %v", got, want)
+ }
+ })
+ }
+}
+
// TestReadRowsInvalidRowSet verifies that the client doesn't send ReadRows() requests with invalid RowSets.
func TestReadRowsInvalidRowSet(t *testing.T) {
testEnv, err := NewEmulatedEnv(IntegrationTestConfig{})
@@ -212,19 +590,19 @@
valid bool
}{
{
- rr: RowRange{},
+ rr: RowRange{startBound: rangeUnbounded, endBound: rangeUnbounded},
valid: true,
},
{
- rr: RowRange{start: "b"},
+ rr: RowRange{startBound: rangeClosed, start: "b", endBound: rangeUnbounded},
valid: true,
},
{
- rr: RowRange{start: "b", limit: "c"},
+ rr: RowRange{startBound: rangeClosed, start: "b", endBound: rangeOpen, end: "c"},
valid: true,
},
{
- rr: RowRange{start: "b", limit: "a"},
+ rr: RowRange{startBound: rangeClosed, start: "b", endBound: rangeOpen, end: "a"},
valid: false,
},
{
@@ -307,7 +685,7 @@
statsChannel := make(chan FullReadStats, 1)
readStart := time.Now()
- if err := table.ReadRows(ctx, RowRange{}, func(r Row) bool { return true }, WithFullReadStats(func(s *FullReadStats) { statsChannel <- *s }), RowFilter(ColumnFilter("q.*"))); err != nil {
+ if err := table.ReadRows(ctx, InfiniteRange(""), func(r Row) bool { return true }, WithFullReadStats(func(s *FullReadStats) { statsChannel <- *s }), RowFilter(ColumnFilter("q.*"))); err != nil {
t.Fatalf("NewClient failed: %v", err)
}
readElapsed := time.Since(readStart)
diff --git a/bigtable/integration_test.go b/bigtable/integration_test.go
index 5b49072..1722826 100644
--- a/bigtable/integration_test.go
+++ b/bigtable/integration_test.go
@@ -208,7 +208,7 @@
// Do a scan and stop part way through.
// Verify that the ReadRows callback doesn't keep running.
stopped := false
- err = table.ReadRows(ctx, InfiniteRange(""), func(r Row) bool {
+ err = table.ReadRows(ctx, RowRange{}, func(r Row) bool {
if r.Key() < "h" {
return true
}
@@ -256,6 +256,39 @@
t.Fatalf("bulk read: wrong reads.\n got %q\nwant %q", got, want)
}
}
+func TestIntegration_ReadRowListReverse(t *testing.T) {
+ ctx := context.Background()
+ _, _, _, table, _, cleanup, err := setupIntegration(ctx, t)
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer cleanup()
+
+ if err := populatePresidentsGraph(table); err != nil {
+ t.Fatal(err)
+ }
+
+ // Read a RowList
+ var elt []string
+ rowRange := NewOpenClosedRange("gwashington", "wmckinley")
+ want := "wmckinley-tjefferson-1,tjefferson-gwashington-1,tjefferson-j§adams-1,j§adams-gwashington-1,j§adams-tjefferson-1"
+ err = table.ReadRows(ctx, rowRange, func(r Row) bool {
+ for _, ris := range r {
+ for _, ri := range ris {
+ elt = append(elt, formatReadItem(ri))
+ }
+ }
+ return true
+ }, ReverseScan())
+
+ if err != nil {
+ t.Fatalf("read RowList: %v", err)
+ }
+
+ if got := strings.Join(elt, ","); got != want {
+ t.Fatalf("bulk read: wrong reads.\n got %q\nwant %q", got, want)
+ }
+}
func TestIntegration_DeleteRow(t *testing.T) {
ctx := context.Background()
@@ -424,7 +457,7 @@
if !testutil.Equal(r, wantRow) {
t.Fatalf("Cell with multiple versions and LatestNFilter(2),\n got %v\nwant %v", r, wantRow)
}
- // Check cell offset / limit
+ // Check cell offset / end
r, err = table.ReadRow(ctx, "testrow", RowFilter(CellsPerRowLimitFilter(3)))
if err != nil {
t.Fatalf("Reading row: %v", err)
@@ -778,7 +811,7 @@
}
verifyDirectPathRemoteAddress(testEnv, t)
if rc != wantRc {
- t.Fatalf("Scan with row limit returned %d rows, want %d", rc, wantRc)
+ t.Fatalf("Scan with row end returned %d rows, want %d", rc, wantRc)
}
// Test bulk mutations
@@ -975,7 +1008,7 @@
want: "",
},
{
- desc: "read with ColumnFilter + row limit",
+ desc: "read with ColumnFilter + row end",
rr: RowRange{},
filter: ColumnFilter(".*j.*"), // matches "j§adams" and "tjefferson"
limit: LimitRows(2),
@@ -996,7 +1029,7 @@
want: "gwashington-j§adams-,j§adams-gwashington-,j§adams-tjefferson-,tjefferson-gwashington-,tjefferson-j§adams-,tjefferson-wmckinley-,wmckinley-tjefferson-",
},
{
- desc: "read with ColumnFilter + row limit + strip values",
+ desc: "read with ColumnFilter + row end + strip values",
rr: RowRange{},
filter: ChainFilters(ColumnFilter(".*j.*"), StripValueFilter()), // matches "j§adams" and "tjefferson"
limit: LimitRows(2),
@@ -1015,7 +1048,7 @@
want: "gwashington-j§adams-,j§adams-gwashington-,j§adams-tjefferson-,tjefferson-gwashington-,tjefferson-j§adams-,tjefferson-wmckinley-,wmckinley-tjefferson-",
},
{
- desc: "read with ValueRangeFilter + row limit",
+ desc: "read with ValueRangeFilter + row end",
rr: RowRange{},
filter: ValueRangeFilter([]byte("1"), []byte("5")), // matches our value of "1"
limit: LimitRows(2),
@@ -1128,10 +1161,11 @@
}
for _, test := range []struct {
- desc string
- rr RowSet
- filter Filter // may be nil
- limit ReadOption // may be nil
+ desc string
+ rr RowSet
+ filter Filter // may be nil
+ limit ReadOption // may be nil
+ reverseScan bool
// We do the read and grab all the stats.
cellsReturnedCount int64
@@ -1251,7 +1285,7 @@
rowsReturnedCount: 0,
},
{
- desc: "read with ColumnFilter + row limit",
+ desc: "read with ColumnFilter + row end",
rr: RowRange{},
filter: ColumnFilter(".*j.*"), // matches "j§adams" and "tjefferson"
limit: LimitRows(2),
@@ -1274,7 +1308,7 @@
rowsReturnedCount: 4,
},
{
- desc: "read with ColumnFilter + row limit + strip values",
+ desc: "read with ColumnFilter + row end + strip values",
rr: RowRange{},
filter: ChainFilters(ColumnFilter(".*j.*"), StripValueFilter()), // matches "j§adams" and "tjefferson"
limit: LimitRows(2),
@@ -1296,7 +1330,7 @@
rowsReturnedCount: 4,
},
{
- desc: "read with ValueRangeFilter + row limit",
+ desc: "read with ValueRangeFilter + row end",
rr: RowRange{},
filter: ValueRangeFilter([]byte("1"), []byte("5")), // matches our value of "1"
limit: LimitRows(2),
@@ -1358,6 +1392,20 @@
cellsReturnedCount: 0,
rowsReturnedCount: 0,
},
+ {
+ desc: "reverse read all, unfiltered",
+ rr: RowRange{},
+ reverseScan: true,
+ cellsReturnedCount: 7,
+ rowsReturnedCount: 4,
+ },
+ {
+ desc: "reverse read with InfiniteRange, unfiltered",
+ rr: InfiniteReverseRange("wmckinley"),
+ reverseScan: true,
+ cellsReturnedCount: 7,
+ rowsReturnedCount: 4,
+ },
} {
t.Run(test.desc, func(t *testing.T) {
var opts []ReadOption
@@ -1367,6 +1415,9 @@
if test.limit != nil {
opts = append(opts, test.limit)
}
+ if test.reverseScan {
+ opts = append(opts, ReverseScan())
+ }
// Define a callback for validating request stats.
callbackInvoked := false
statsValidator := WithFullReadStats(
@@ -1387,7 +1438,7 @@
// We use lenient checks for CellsSeenCount and RowsSeenCount. Exact checks would be brittle.
// Note that the emulator and prod sometimes yield different values:
// - Sometimes prod scans fewer cells due to optimizations that allow prod to skip cells.
- // - Sometimes prod scans more cells due to to filters that must rescan cells.
+ // - Sometimes prod scans more cells due to filters that must rescan cells.
// Similar issues apply for RowsSeenCount.
if got, want := readStats.CellsSeenCount, readStats.CellsReturnedCount; got < want {
t.Errorf("CellsSeenCount should be greater than or equal to CellsReturnedCount. got: %d < want: %d",
@@ -2119,7 +2170,7 @@
time.Sleep(time.Second * 10)
}
if encryptionKeyVersion == "" {
- t.Fatalf("Encryption Key not created within alotted time limit")
+ t.Fatalf("Encryption Key not created within alotted time end")
}
// Validate Encryption Info under getTable
diff --git a/bigtable/reader.go b/bigtable/reader.go
index 64aabc9..8f0a4c1 100644
--- a/bigtable/reader.go
+++ b/bigtable/reader.go
@@ -19,6 +19,7 @@
import (
"bytes"
"fmt"
+ "strings"
btpb "google.golang.org/genproto/googleapis/bigtable/v2"
)
@@ -58,6 +59,7 @@
// chunkReader handles cell chunks from the read rows response and combines
// them into full Rows.
type chunkReader struct {
+ reversed bool
state rrState
curKey []byte
curLabels []string
@@ -71,7 +73,11 @@
// newChunkReader returns a new chunkReader for handling read rows responses.
func newChunkReader() *chunkReader {
- return &chunkReader{state: newRow}
+ return &chunkReader{reversed: false, state: newRow}
+}
+
+func newReverseChunkReader() *chunkReader {
+ return &chunkReader{reversed: true, state: newRow}
}
// Process takes a cell chunk and returns a new Row if the given chunk
@@ -200,9 +206,19 @@
if cc.RowKey == nil || cc.FamilyName == nil || cc.Qualifier == nil {
return fmt.Errorf("missing key field for new row %v", cc)
}
- if cr.lastKey != "" && cr.lastKey >= string(cc.RowKey) {
- return fmt.Errorf("out of order row key: %q, %q", cr.lastKey, string(cc.RowKey))
+
+ if cr.lastKey != "" {
+ r := strings.Compare(string(cc.RowKey), cr.lastKey)
+ direction := "increasing"
+ if cr.reversed {
+ r *= -1
+ direction = "decreasing"
+ }
+ if r <= 0 {
+ return fmt.Errorf("out of order row key, must be strictly %s. new key: %q prev row: %q", direction, cc.RowKey, cr.lastKey)
+ }
}
+
return nil
}
diff --git a/bigtable/retry_test.go b/bigtable/retry_test.go
index 69e6fda..3d549ae 100644
--- a/bigtable/retry_test.go
+++ b/bigtable/retry_test.go
@@ -345,7 +345,7 @@
func TestRetainRowsAfter(t *testing.T) {
prevRowRange := NewRange("a", "z")
prevRowKey := "m"
- want := NewRange("m\x00", "z")
+ want := NewOpenRange("m", "z")
got := prevRowRange.retainRowsAfter(prevRowKey)
if !testutil.Equal(want, got, cmp.AllowUnexported(RowRange{})) {
t.Errorf("range retry: got %v, want %v", got, want)
@@ -353,7 +353,7 @@
prevRowRangeList := RowRangeList{NewRange("a", "d"), NewRange("e", "g"), NewRange("h", "l")}
prevRowKey = "f"
- wantRowRangeList := RowRangeList{NewRange("f\x00", "g"), NewRange("h", "l")}
+ wantRowRangeList := RowRangeList{NewOpenRange("f", "g"), NewRange("h", "l")}
got = prevRowRangeList.retainRowsAfter(prevRowKey)
if !testutil.Equal(wantRowRangeList, got, cmp.AllowUnexported(RowRange{})) {
t.Errorf("range list retry: got %v, want %v", got, wantRowRangeList)
@@ -406,7 +406,7 @@
err = status.Errorf(codes.Unavailable, "")
case 2:
// Retryable request failure
- if want, got := "b\x00", string(req.Rows.RowRanges[0].GetStartKeyClosed()); want != got {
+ if want, got := "b", string(req.Rows.RowRanges[0].GetStartKeyOpen()); want != got {
t.Errorf("2 range retries: got %q, want %q", got, want)
}
err = status.Errorf(codes.Unavailable, "")
@@ -418,7 +418,7 @@
must(ss.SendMsg(&btpb.ReadRowsResponse{LastScannedRowKey: []byte("e")}))
err = status.Errorf(codes.Unavailable, "")
case 5:
- if want, got := "e\x00", string(req.Rows.RowRanges[0].GetStartKeyClosed()); want != got {
+ if want, got := "e", string(req.Rows.RowRanges[0].GetStartKeyOpen()); want != got {
t.Errorf("3 range retries: got %q, want %q", got, want)
}
must(writeReadRowsResponse(ss, "f", "g"))
@@ -439,6 +439,80 @@
}
}
+func TestRetryReverseReadRows(t *testing.T) {
+ ctx := context.Background()
+
+ // Intercept requests and delegate to an interceptor defined by the test case
+ errCount := 0
+ var f func(grpc.ServerStream) error
+ errInjector := func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
+ if strings.HasSuffix(info.FullMethod, "ReadRows") {
+ return f(ss)
+ }
+ return handler(ctx, ss)
+ }
+
+ tbl, cleanup, err := setupFakeServer(grpc.StreamInterceptor(errInjector))
+ defer cleanup()
+ if err != nil {
+ t.Fatalf("fake server setup: %v", err)
+ }
+
+ errCount = 0
+ // Test overall request failure and retries
+ f = func(ss grpc.ServerStream) error {
+ var err error
+ req := new(btpb.ReadRowsRequest)
+ must(ss.RecvMsg(req))
+ switch errCount {
+ case 0:
+ // Retryable request failure
+ err = status.Errorf(codes.Unavailable, "")
+ case 1:
+ // Write two rows then error
+ if want, got := "z", string(req.Rows.RowRanges[0].GetEndKeyClosed()); want != got {
+ t.Errorf("first retry, no data received yet: got %q, want %q", got, want)
+ }
+ must(writeReadRowsResponse(ss, "g", "f"))
+ err = status.Errorf(codes.Unavailable, "")
+ case 2:
+ // Retryable request failure
+ if want, got := "f", string(req.Rows.RowRanges[0].GetEndKeyOpen()); want != got {
+ t.Errorf("2 range retries: got %q, want %q", got, want)
+ }
+ err = status.Errorf(codes.Unavailable, "")
+ case 3:
+ must(ss.SendMsg(&btpb.ReadRowsResponse{LastScannedRowKey: []byte("e")}))
+ err = status.Errorf(codes.Unavailable, "")
+ case 4:
+ if want, got := "e", string(req.Rows.RowRanges[0].GetEndKeyOpen()); want != got {
+ t.Errorf("3 range retries: got %q, want %q", got, want)
+ }
+ // Write two more rows
+ must(writeReadRowsResponse(ss, "d", "c"))
+ err = status.Errorf(codes.Unavailable, "")
+ case 5:
+ if want, got := "c", string(req.Rows.RowRanges[0].GetEndKeyOpen()); want != got {
+ t.Errorf("3 range retries: got %q, want %q", got, want)
+ }
+ must(writeReadRowsResponse(ss, "b", "a"))
+ err = nil
+ }
+ errCount++
+ return err
+ }
+
+ var got []string
+ must(tbl.ReadRows(ctx, NewClosedRange("a", "z"), func(r Row) bool {
+ got = append(got, r.Key())
+ return true
+ }, ReverseScan()))
+ want := []string{"g", "f", "d", "c", "b", "a"}
+ if !testutil.Equal(got, want) {
+ t.Errorf("retry range integration: got %v, want %v", got, want)
+ }
+}
+
func writeReadRowsResponse(ss grpc.ServerStream, rowKeys ...string) error {
var chunks []*btpb.ReadRowsResponse_CellChunk
for _, key := range rowKeys {