feat(bigtable): Add support for reverse scans (#8755)

* feat: add more expressive range api

* feat(bigtable): Add support for reverse scans

* resolve feature flag conflict after merge

* add a new constructor for chunkReader to avoid line noise

* remove orphaned code

* fix typo

* adding a first test

* extended RowRange to express bound types

* adding unit tests

* correcting backwards compatability behavior

* adding test proxy support for reverse scan

* added todo reminder

* all unit tests pass

* updated naming

* fix retries on reverse scan

* more tests

* exposing client messages to test proxy
formatting

* exposing client messages to test proxy
formatting

* fixing vet errors

* minor style tweaks

* changing error message to be consistent with java and cpp

* cleaning up code and adding tests

* simplify RowRange valid logic

* rolling back test proxy changes

* rolling back test proxy

* changed default bound type for better backwards compatability
rolled back some integration test changes to ensure backwards compatability

---------

Co-authored-by: Igor Berntein <igorbernstein@google.com>
diff --git a/bigtable/bigtable.go b/bigtable/bigtable.go
index 43993fc..0cd7a70 100644
--- a/bigtable/bigtable.go
+++ b/bigtable/bigtable.go
@@ -207,7 +207,14 @@
 		if err != nil {
 			return err
 		}
-		cr := newChunkReader()
+
+		var cr *chunkReader
+		if req.Reversed {
+			cr = newReverseChunkReader()
+		} else {
+			cr = newChunkReader()
+		}
+
 		for {
 			res, err := stream.Recv()
 			if err == io.EOF {
@@ -215,7 +222,11 @@
 			}
 			if err != nil {
 				// Reset arg for next Invoke call.
-				arg = arg.retainRowsAfter(prevRowKey)
+				if req.Reversed {
+					arg = arg.retainRowsBefore(prevRowKey)
+				} else {
+					arg = arg.retainRowsAfter(prevRowKey)
+				}
 				attrMap["rowKey"] = prevRowKey
 				attrMap["error"] = err.Error()
 				attrMap["time_secs"] = time.Since(startTime).Seconds()
@@ -306,6 +317,10 @@
 	// given row key or any row key lexicographically less than it.
 	retainRowsAfter(lastRowKey string) RowSet
 
+	// retainRowsBefore returns a new RowSet that does not include the
+	// given row key or any row key lexicographically greater than it.
+	retainRowsBefore(lastRowKey string) RowSet
+
 	// Valid reports whether this set can cover at least one row.
 	valid() bool
 }
@@ -331,53 +346,184 @@
 	return retryKeys
 }
 
+func (r RowList) retainRowsBefore(lastRowKey string) RowSet {
+	var retryKeys RowList
+	for _, key := range r {
+		if key < lastRowKey {
+			retryKeys = append(retryKeys, key)
+		}
+	}
+	return retryKeys
+}
+
 func (r RowList) valid() bool {
 	return len(r) > 0
 }
 
-// A RowRange is a half-open interval [Start, Limit) encompassing
-// all the rows with keys at least as large as Start, and less than Limit.
-// (Bigtable string comparison is the same as Go's.)
-// A RowRange can be unbounded, encompassing all keys at least as large as Start.
+type rangeBoundType int64
+
+const (
+	rangeUnbounded rangeBoundType = iota
+	rangeOpen
+	rangeClosed
+)
+
+// A RowRange describes a range of rows between the start and end key. Start and
+// end keys may be rangeOpen, rangeClosed or rangeUnbounded.
 type RowRange struct {
-	start string
-	limit string
+	startBound rangeBoundType
+	start      string
+	endBound   rangeBoundType
+	end        string
 }
 
 // NewRange returns the new RowRange [begin, end).
 func NewRange(begin, end string) RowRange {
+	return createRowRange(rangeClosed, begin, rangeOpen, end)
+}
+
+// NewClosedOpenRange returns the RowRange consisting of all greater than or
+// equal to the start and less than the end: [start, end).
+func NewClosedOpenRange(start, end string) RowRange {
+	return createRowRange(rangeClosed, start, rangeOpen, end)
+}
+
+// NewOpenClosedRange returns the RowRange consisting of all keys greater than
+// the start and less than or equal to the end: (start, end].
+func NewOpenClosedRange(start, end string) RowRange {
+	return createRowRange(rangeOpen, start, rangeClosed, end)
+}
+
+// NewOpenRange returns the RowRange consisting of all keys greater than the
+// start and less than the end: (start, end).
+func NewOpenRange(start, end string) RowRange {
+	return createRowRange(rangeOpen, start, rangeOpen, end)
+}
+
+// NewClosedRange returns the RowRange consisting of all keys greater than or
+// equal to the start and less than or equal to the end: [start, end].
+func NewClosedRange(start, end string) RowRange {
+	return createRowRange(rangeClosed, start, rangeClosed, end)
+}
+
+// PrefixRange returns a RowRange consisting of all keys starting with the prefix.
+func PrefixRange(prefix string) RowRange {
+	end := prefixSuccessor(prefix)
+	return createRowRange(rangeClosed, prefix, rangeOpen, end)
+}
+
+// InfiniteRange returns the RowRange consisting of all keys at least as
+// large as start: [start, ∞).
+func InfiniteRange(start string) RowRange {
+	return createRowRange(rangeClosed, start, rangeUnbounded, "")
+}
+
+// InfiniteReverseRange returns the RowRange consisting of all keys less than or
+// equal to the end: (∞, end].
+func InfiniteReverseRange(end string) RowRange {
+	return createRowRange(rangeUnbounded, "", rangeClosed, end)
+}
+
+// createRowRange creates a new RowRange, normalizing start and end
+// rangeBoundType to rangeUnbounded if they're empty strings because empty
+// strings also represent unbounded keys
+func createRowRange(startBound rangeBoundType, start string, endBound rangeBoundType, end string) RowRange {
+	// normalize start bound type
+	if start == "" {
+		startBound = rangeUnbounded
+	}
+	// normalize end bound type
+	if end == "" {
+		endBound = rangeUnbounded
+	}
 	return RowRange{
-		start: begin,
-		limit: end,
+		startBound: startBound,
+		start:      start,
+		endBound:   endBound,
+		end:        end,
 	}
 }
 
 // Unbounded tests whether a RowRange is unbounded.
 func (r RowRange) Unbounded() bool {
-	return r.limit == ""
+	return r.startBound == rangeUnbounded || r.endBound == rangeUnbounded
 }
 
 // Contains says whether the RowRange contains the key.
 func (r RowRange) Contains(row string) bool {
-	return r.start <= row && (r.limit == "" || r.limit > row)
+	switch r.startBound {
+	case rangeOpen:
+		if r.start >= row {
+			return false
+		}
+	case rangeClosed:
+		if r.start > row {
+			return false
+		}
+	case rangeUnbounded:
+	}
+
+	switch r.endBound {
+	case rangeOpen:
+		if r.end <= row {
+			return false
+		}
+	case rangeClosed:
+		if r.end < row {
+			return false
+		}
+	case rangeUnbounded:
+	}
+
+	return true
 }
 
 // String provides a printable description of a RowRange.
 func (r RowRange) String() string {
-	a := strconv.Quote(r.start)
-	if r.Unbounded() {
-		return fmt.Sprintf("[%s,∞)", a)
+	var startStr string
+	switch r.startBound {
+	case rangeOpen:
+		startStr = "(" + strconv.Quote(r.start)
+	case rangeClosed:
+		startStr = "[" + strconv.Quote(r.start)
+	case rangeUnbounded:
+		startStr = "(∞"
 	}
-	return fmt.Sprintf("[%s,%q)", a, r.limit)
+
+	var endStr string
+	switch r.endBound {
+	case rangeOpen:
+		endStr = r.end + ")"
+	case rangeClosed:
+		endStr = r.end + "]"
+	case rangeUnbounded:
+		endStr = "∞)"
+	}
+
+	return fmt.Sprintf("%s,%s", startStr, endStr)
 }
 
 func (r RowRange) proto() *btpb.RowSet {
-	rr := &btpb.RowRange{
-		StartKey: &btpb.RowRange_StartKeyClosed{StartKeyClosed: []byte(r.start)},
+	rr := &btpb.RowRange{}
+
+	switch r.startBound {
+	case rangeOpen:
+		rr.StartKey = &btpb.RowRange_StartKeyOpen{StartKeyOpen: []byte(r.start)}
+	case rangeClosed:
+		rr.StartKey = &btpb.RowRange_StartKeyClosed{StartKeyClosed: []byte(r.start)}
+	case rangeUnbounded:
+		// leave unbounded
 	}
-	if !r.Unbounded() {
-		rr.EndKey = &btpb.RowRange_EndKeyOpen{EndKeyOpen: []byte(r.limit)}
+
+	switch r.endBound {
+	case rangeOpen:
+		rr.EndKey = &btpb.RowRange_EndKeyOpen{EndKeyOpen: []byte(r.end)}
+	case rangeClosed:
+		rr.EndKey = &btpb.RowRange_EndKeyClosed{EndKeyClosed: []byte(r.end)}
+	case rangeUnbounded:
+		// leave unbounded
 	}
+
 	return &btpb.RowSet{RowRanges: []*btpb.RowRange{rr}}
 }
 
@@ -385,16 +531,45 @@
 	if lastRowKey == "" || lastRowKey < r.start {
 		return r
 	}
-	// Set the beginning of the range to the row after the last scanned.
-	start := lastRowKey + "\x00"
-	if r.Unbounded() {
-		return InfiniteRange(start)
+
+	return RowRange{
+		// Set the beginning of the range to the row after the last scanned.
+		startBound: rangeOpen,
+		start:      lastRowKey,
+		endBound:   r.endBound,
+		end:        r.end,
 	}
-	return NewRange(start, r.limit)
+}
+
+func (r RowRange) retainRowsBefore(lastRowKey string) RowSet {
+	if lastRowKey == "" || (r.endBound != rangeUnbounded && r.end < lastRowKey) {
+		return r
+	}
+
+	return RowRange{
+		startBound: r.startBound,
+		start:      r.start,
+		endBound:   rangeOpen,
+		end:        lastRowKey,
+	}
 }
 
 func (r RowRange) valid() bool {
-	return r.Unbounded() || r.start < r.limit
+	// If either end is unbounded, then the range is always valid.
+	if r.Unbounded() {
+		return true
+	}
+
+	// If either end is an open interval, then the start must be strictly less
+	// than the end and since neither end is unbounded, we don't have to check
+	// for empty strings.
+	if r.startBound == rangeOpen || r.endBound == rangeOpen {
+		return r.start < r.end
+	}
+
+	// At this point both endpoints must be closed, which makes [a,a] a valid
+	// interval
+	return r.start <= r.end
 }
 
 // RowRangeList is a sequence of RowRanges representing the union of the ranges.
@@ -424,6 +599,21 @@
 	return ranges
 }
 
+func (r RowRangeList) retainRowsBefore(lastRowKey string) RowSet {
+	if lastRowKey == "" {
+		return r
+	}
+	// Return a list of any range that has not yet been completely processed
+	var ranges RowRangeList
+	for _, rr := range r {
+		retained := rr.retainRowsBefore(lastRowKey)
+		if retained.valid() {
+			ranges = append(ranges, retained.(RowRange))
+		}
+	}
+	return ranges
+}
+
 func (r RowRangeList) valid() bool {
 	for _, rr := range r {
 		if rr.valid() {
@@ -438,23 +628,6 @@
 	return RowList{row}
 }
 
-// PrefixRange returns a RowRange consisting of all keys starting with the prefix.
-func PrefixRange(prefix string) RowRange {
-	return RowRange{
-		start: prefix,
-		limit: prefixSuccessor(prefix),
-	}
-}
-
-// InfiniteRange returns the RowRange consisting of all keys at least as
-// large as start.
-func InfiniteRange(start string) RowRange {
-	return RowRange{
-		start: start,
-		limit: "",
-	}
-}
-
 // prefixSuccessor returns the lexically smallest string greater than the
 // prefix, if it exists, or "" otherwise.  In either case, it is the string
 // needed for the Limit of a RowRange.
@@ -557,7 +730,7 @@
 
 func (rf rowFilter) set(settings *readSettings) { settings.req.Filter = rf.f.proto() }
 
-// LimitRows returns a ReadOption that will limit the number of rows to be read.
+// LimitRows returns a ReadOption that will end the number of rows to be read.
 func LimitRows(limit int64) ReadOption { return limitRows{limit} }
 
 type limitRows struct{ limit int64 }
@@ -577,6 +750,25 @@
 	settings.fullReadStatsFunc = wrs.f
 }
 
+// ReverseScan returns a RadOption that will reverse the results of a Scan.
+// The rows will be streamed in reverse lexiographic order of the keys. The row key ranges of the RowSet are
+// still expected to be oriented the same way as forwards. ie [a,c] where a <= c. The row content
+// will remain unchanged from the ordering forward scans. This is particularly useful to get the
+// last N records before a key:
+//
+//	table.ReadRows(ctx, NewOpenClosedRange("", "key"), func(row bigtable.Row) bool {
+//	   return true
+//	}, bigtable.ReverseScan(), bigtable.LimitRows(10))
+func ReverseScan() ReadOption {
+	return reverseScan{}
+}
+
+type reverseScan struct{}
+
+func (rs reverseScan) set(settings *readSettings) {
+	settings.req.Reversed = true
+}
+
 // mutationsAreRetryable returns true if all mutations are idempotent
 // and therefore retryable. A mutation is idempotent iff all cell timestamps
 // have an explicit timestamp set and do not rely on the timestamp being set on the server.
diff --git a/bigtable/bigtable_test.go b/bigtable/bigtable_test.go
index 4fc0d13..0bea28e 100644
--- a/bigtable/bigtable_test.go
+++ b/bigtable/bigtable_test.go
@@ -18,6 +18,7 @@
 
 import (
 	"context"
+	"reflect"
 	"testing"
 	"time"
 
@@ -42,11 +43,206 @@
 			continue
 		}
 		r := PrefixRange(test.prefix)
-		if test.succ == "" && r.limit != "" {
-			t.Errorf("PrefixRange(%q) got limit %q", test.prefix, r.limit)
+		if test.succ == "" && r.end != "" {
+			t.Errorf("PrefixRange(%q) got end %q", test.prefix, r.end)
 		}
-		if test.succ != "" && r.limit != test.succ {
-			t.Errorf("PrefixRange(%q) got limit %q, want %q", test.prefix, r.limit, test.succ)
+		if test.succ != "" && r.end != test.succ {
+			t.Errorf("PrefixRange(%q) got end %q, want %q", test.prefix, r.end, test.succ)
+		}
+	}
+}
+
+func TestNewClosedOpenRange(t *testing.T) {
+	start := "b"
+	limit := "b\x01"
+	r := NewClosedOpenRange(start, limit)
+	for _, test := range []struct {
+		k        string
+		contains bool
+	}{
+		{"a", false},
+		{"b", true},
+		{"b\x00", true},
+		{"b\x01", false},
+	} {
+		if want, got := test.contains, r.Contains(test.k); want != got {
+			t.Errorf("%s.Contains(%q) = %t, want %t", r.String(), test.k, got, want)
+		}
+	}
+
+	for _, test := range []struct {
+		start, limit string
+		valid        bool
+	}{
+		{"a", "a", false},
+		{"b", "a", false},
+		{"a", "a\x00", true},
+		{"a", "b", true},
+	} {
+		r := NewClosedOpenRange(test.start, test.limit)
+		if want, got := test.valid, r.valid(); want != got {
+			t.Errorf("%s.valid() = %t, want %t", r.String(), got, want)
+		}
+	}
+}
+func TestNewOpenClosedRange(t *testing.T) {
+	start := "b"
+	limit := "b\x01"
+	r := NewOpenClosedRange(start, limit)
+	for _, test := range []struct {
+		k        string
+		contains bool
+	}{
+		{"a", false},
+		{"b", false},
+		{"b\x00", true},
+		{"b\x01", true},
+		{"b\x01\x00", false},
+	} {
+		if want, got := test.contains, r.Contains(test.k); want != got {
+			t.Errorf("%s.Contains(%q) = %t, want %t", r.String(), test.k, got, want)
+		}
+	}
+
+	for _, test := range []struct {
+		start, limit string
+		valid        bool
+	}{
+		{"a", "a", false},
+		{"b", "a", false},
+		{"a", "a\x00", true},
+		{"a", "b", true},
+	} {
+		r := NewOpenClosedRange(test.start, test.limit)
+		if want, got := test.valid, r.valid(); want != got {
+			t.Errorf("%s.valid() = %t, want %t", r.String(), got, want)
+		}
+	}
+}
+func TestNewClosedRange(t *testing.T) {
+	start := "b"
+	limit := "b"
+
+	r := NewClosedRange(start, limit)
+	for _, test := range []struct {
+		k        string
+		contains bool
+	}{
+		{"a", false},
+		{"b", true},
+		{"b\x01", false},
+	} {
+		if want, got := test.contains, r.Contains(test.k); want != got {
+			t.Errorf("NewClosedRange(%q, %q).Contains(%q) = %t, want %t", "a", "a\x01", test.k, got, test.contains)
+		}
+	}
+
+	for _, test := range []struct {
+		start, limit string
+		valid        bool
+	}{
+		{"a", "b", true},
+		{"b", "b", true},
+		{"b", "b\x00", true},
+		{"b\x00", "b", false},
+	} {
+		r := NewClosedRange(test.start, test.limit)
+		if want, got := test.valid, r.valid(); want != got {
+			t.Errorf("NewClosedRange(%q, %q).valid() = %t, want %t", test.start, test.limit, got, want)
+		}
+	}
+}
+
+func TestNewOpenRange(t *testing.T) {
+	start := "b"
+	limit := "b\x01"
+
+	r := NewOpenRange(start, limit)
+	for _, test := range []struct {
+		k        string
+		contains bool
+	}{
+		{"a", false},
+		{"b", false},
+		{"b\x00", true},
+		{"b\x01", false},
+	} {
+		if want, got := test.contains, r.Contains(test.k); want != got {
+			t.Errorf("NewOpenRange(%q, %q).Contains(%q) = %t, want %t", "a", "a\x01", test.k, got, test.contains)
+		}
+	}
+
+	for _, test := range []struct {
+		start, limit string
+		valid        bool
+	}{
+		{"a", "a", false},
+		{"a", "b", true},
+		{"a", "a\x00", true},
+		{"a", "a\x01", true},
+	} {
+		r := NewOpenRange(test.start, test.limit)
+		if want, got := test.valid, r.valid(); want != got {
+			t.Errorf("NewOpenRange(%q, %q).valid() = %t, want %t", test.start, test.limit, got, want)
+		}
+	}
+}
+
+func TestInfiniteRange(t *testing.T) {
+	r := InfiniteRange("b")
+	for _, test := range []struct {
+		k        string
+		contains bool
+	}{
+		{"a", false},
+		{"b", true},
+		{"b\x00", true},
+		{"z", true},
+	} {
+		if want, got := test.contains, r.Contains(test.k); want != got {
+			t.Errorf("%s.Contains(%q) = %t, want %t", r.String(), test.k, got, want)
+		}
+	}
+
+	for _, test := range []struct {
+		start string
+		valid bool
+	}{
+		{"a", true},
+		{"", true},
+	} {
+		r := InfiniteRange(test.start)
+		if want, got := test.valid, r.valid(); want != got {
+			t.Errorf("%s.valid() = %t, want %t", r.String(), got, want)
+		}
+	}
+}
+
+func TestInfiniteReverseRange(t *testing.T) {
+	r := InfiniteReverseRange("z")
+	for _, test := range []struct {
+		k        string
+		contains bool
+	}{
+		{"a", true},
+		{"z", true},
+		{"z\x00", false},
+	} {
+		if want, got := test.contains, r.Contains(test.k); want != got {
+			t.Errorf("%s.Contains(%q) = %t, want %t", r.String(), test.k, got, want)
+		}
+	}
+
+	for _, test := range []struct {
+		start string
+		valid bool
+	}{
+		{"a", true},
+		{"", true},
+	} {
+		r := InfiniteReverseRange(test.start)
+		if want, got := test.valid, r.valid(); want != got {
+			t.Errorf("%s.valid() = %t, want %t", r.String(), got, want)
 		}
 	}
 }
@@ -176,6 +372,188 @@
 	}
 }
 
+func TestRowRangeProto(t *testing.T) {
+
+	for _, test := range []struct {
+		desc  string
+		rr    RowRange
+		proto *btpb.RowSet
+	}{
+		{
+			desc: "RowRange proto start and end",
+			rr:   NewClosedOpenRange("a", "b"),
+			proto: &btpb.RowSet{RowRanges: []*btpb.RowRange{{
+				StartKey: &btpb.RowRange_StartKeyClosed{StartKeyClosed: []byte("a")},
+				EndKey:   &btpb.RowRange_EndKeyOpen{EndKeyOpen: []byte("b")},
+			}}},
+		},
+		{
+			desc: "RowRange proto start but empty end",
+			rr:   NewClosedOpenRange("a", ""),
+			proto: &btpb.RowSet{RowRanges: []*btpb.RowRange{{
+				StartKey: &btpb.RowRange_StartKeyClosed{StartKeyClosed: []byte("a")},
+			}}},
+		},
+		{
+			desc:  "RowRange proto unbound",
+			rr:    NewClosedOpenRange("", ""),
+			proto: &btpb.RowSet{RowRanges: []*btpb.RowRange{{}}},
+		},
+		{
+			desc:  "RowRange proto unbound with no start or end",
+			rr:    InfiniteRange(""),
+			proto: &btpb.RowSet{RowRanges: []*btpb.RowRange{{}}},
+		},
+		{
+			desc: "RowRange proto open closed",
+			rr:   NewOpenClosedRange("a", "b"),
+			proto: &btpb.RowSet{RowRanges: []*btpb.RowRange{{
+				StartKey: &btpb.RowRange_StartKeyOpen{StartKeyOpen: []byte("a")},
+				EndKey:   &btpb.RowRange_EndKeyClosed{EndKeyClosed: []byte("b")},
+			}}},
+		},
+		{
+			desc: "RowRange proto open closed and empty start",
+			rr:   NewOpenClosedRange("", "b"),
+			proto: &btpb.RowSet{RowRanges: []*btpb.RowRange{{
+				EndKey: &btpb.RowRange_EndKeyClosed{EndKeyClosed: []byte("b")},
+			}}},
+		},
+		{
+			desc: "RowRange proto open closed and empty start",
+			rr:   NewOpenClosedRange("", "b"),
+			proto: &btpb.RowSet{RowRanges: []*btpb.RowRange{{
+				EndKey: &btpb.RowRange_EndKeyClosed{EndKeyClosed: []byte("b")},
+			}}},
+		},
+		{
+			desc: "RowRange proto closed open",
+			rr:   NewClosedOpenRange("a", "b"),
+			proto: &btpb.RowSet{RowRanges: []*btpb.RowRange{{
+				StartKey: &btpb.RowRange_StartKeyClosed{StartKeyClosed: []byte("a")},
+				EndKey:   &btpb.RowRange_EndKeyOpen{EndKeyOpen: []byte("b")},
+			}}},
+		},
+	} {
+		t.Run(test.desc, func(t *testing.T) {
+			got := test.rr.proto()
+			want := test.proto
+			if !reflect.DeepEqual(got, want) {
+				t.Errorf("Bad proto for %s: got %v, want %v", test.rr.String(), got, want)
+			}
+		})
+	}
+}
+
+func TestRowRangeRetainRowsBefore(t *testing.T) {
+	for _, test := range []struct {
+		desc  string
+		rr    RowSet
+		proto *btpb.RowSet
+	}{
+		{
+			desc: "retain rows before",
+			rr:   NewRange("a", "c").retainRowsBefore("b"),
+			proto: &btpb.RowSet{RowRanges: []*btpb.RowRange{{
+				StartKey: &btpb.RowRange_StartKeyClosed{StartKeyClosed: []byte("a")},
+				EndKey:   &btpb.RowRange_EndKeyOpen{EndKeyOpen: []byte("b")},
+			}}},
+		},
+		{
+			desc: "retain rows before empty key",
+			rr:   NewRange("a", "c").retainRowsBefore(""),
+			proto: &btpb.RowSet{RowRanges: []*btpb.RowRange{{
+				StartKey: &btpb.RowRange_StartKeyClosed{StartKeyClosed: []byte("a")},
+				EndKey:   &btpb.RowRange_EndKeyOpen{EndKeyOpen: []byte("c")},
+			}}},
+		},
+		{
+			desc: "retain rows before key greater than range end",
+			rr:   NewClosedRange("a", "c").retainRowsBefore("d"),
+			proto: &btpb.RowSet{RowRanges: []*btpb.RowRange{{
+				StartKey: &btpb.RowRange_StartKeyClosed{StartKeyClosed: []byte("a")},
+				EndKey:   &btpb.RowRange_EndKeyClosed{EndKeyClosed: []byte("c")},
+			}}},
+		},
+		{
+			desc: "retain rows before key same as closed end key",
+			rr:   NewClosedRange("a", "c").retainRowsBefore("c"),
+			proto: &btpb.RowSet{RowRanges: []*btpb.RowRange{{
+				StartKey: &btpb.RowRange_StartKeyClosed{StartKeyClosed: []byte("a")},
+				EndKey:   &btpb.RowRange_EndKeyOpen{EndKeyOpen: []byte("c")},
+			}}},
+		},
+		{
+			desc: "retain rows before on unbounded range",
+			rr:   InfiniteRange("").retainRowsBefore("z"),
+			proto: &btpb.RowSet{RowRanges: []*btpb.RowRange{{
+				EndKey: &btpb.RowRange_EndKeyOpen{EndKeyOpen: []byte("z")},
+			}}},
+		},
+	} {
+		t.Run(test.desc, func(t *testing.T) {
+			got := test.rr.proto()
+			want := test.proto
+			if !reflect.DeepEqual(got, want) {
+				t.Errorf("Bad retain rows before proto: got %v, want %v", got, want)
+			}
+		})
+	}
+}
+
+func TestRowRangeString(t *testing.T) {
+
+	for _, test := range []struct {
+		desc string
+		rr   RowRange
+		str  string
+	}{
+		{
+			desc: "RowRange closed open",
+			rr:   NewClosedOpenRange("a", "b"),
+			str:  "[\"a\",b)",
+		},
+		{
+			desc: "RowRange open open",
+			rr:   NewOpenRange("c", "d"),
+			str:  "(\"c\",d)",
+		},
+		{
+			desc: "RowRange closed closed",
+			rr:   NewClosedRange("e", "f"),
+			str:  "[\"e\",f]",
+		},
+		{
+			desc: "RowRange open closed",
+			rr:   NewOpenClosedRange("g", "h"),
+			str:  "(\"g\",h]",
+		},
+		{
+			desc: "RowRange unbound unbound",
+			rr:   InfiniteRange(""),
+			str:  "(∞,∞)",
+		},
+		{
+			desc: "RowRange closed unbound",
+			rr:   InfiniteRange("b"),
+			str:  "[\"b\",∞)",
+		},
+		{
+			desc: "RowRange unbound closed",
+			rr:   InfiniteReverseRange("c"),
+			str:  "(∞,c]",
+		},
+	} {
+		t.Run(test.desc, func(t *testing.T) {
+			got := test.rr.String()
+			want := test.str
+			if !reflect.DeepEqual(got, want) {
+				t.Errorf("Bad String(): got %v, want %v", got, want)
+			}
+		})
+	}
+}
+
 // TestReadRowsInvalidRowSet verifies that the client doesn't send ReadRows() requests with invalid RowSets.
 func TestReadRowsInvalidRowSet(t *testing.T) {
 	testEnv, err := NewEmulatedEnv(IntegrationTestConfig{})
@@ -212,19 +590,19 @@
 		valid bool
 	}{
 		{
-			rr:    RowRange{},
+			rr:    RowRange{startBound: rangeUnbounded, endBound: rangeUnbounded},
 			valid: true,
 		},
 		{
-			rr:    RowRange{start: "b"},
+			rr:    RowRange{startBound: rangeClosed, start: "b", endBound: rangeUnbounded},
 			valid: true,
 		},
 		{
-			rr:    RowRange{start: "b", limit: "c"},
+			rr:    RowRange{startBound: rangeClosed, start: "b", endBound: rangeOpen, end: "c"},
 			valid: true,
 		},
 		{
-			rr:    RowRange{start: "b", limit: "a"},
+			rr:    RowRange{startBound: rangeClosed, start: "b", endBound: rangeOpen, end: "a"},
 			valid: false,
 		},
 		{
@@ -307,7 +685,7 @@
 	statsChannel := make(chan FullReadStats, 1)
 
 	readStart := time.Now()
-	if err := table.ReadRows(ctx, RowRange{}, func(r Row) bool { return true }, WithFullReadStats(func(s *FullReadStats) { statsChannel <- *s }), RowFilter(ColumnFilter("q.*"))); err != nil {
+	if err := table.ReadRows(ctx, InfiniteRange(""), func(r Row) bool { return true }, WithFullReadStats(func(s *FullReadStats) { statsChannel <- *s }), RowFilter(ColumnFilter("q.*"))); err != nil {
 		t.Fatalf("NewClient failed: %v", err)
 	}
 	readElapsed := time.Since(readStart)
diff --git a/bigtable/integration_test.go b/bigtable/integration_test.go
index 5b49072..1722826 100644
--- a/bigtable/integration_test.go
+++ b/bigtable/integration_test.go
@@ -208,7 +208,7 @@
 	// Do a scan and stop part way through.
 	// Verify that the ReadRows callback doesn't keep running.
 	stopped := false
-	err = table.ReadRows(ctx, InfiniteRange(""), func(r Row) bool {
+	err = table.ReadRows(ctx, RowRange{}, func(r Row) bool {
 		if r.Key() < "h" {
 			return true
 		}
@@ -256,6 +256,39 @@
 		t.Fatalf("bulk read: wrong reads.\n got %q\nwant %q", got, want)
 	}
 }
+func TestIntegration_ReadRowListReverse(t *testing.T) {
+	ctx := context.Background()
+	_, _, _, table, _, cleanup, err := setupIntegration(ctx, t)
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer cleanup()
+
+	if err := populatePresidentsGraph(table); err != nil {
+		t.Fatal(err)
+	}
+
+	// Read a RowList
+	var elt []string
+	rowRange := NewOpenClosedRange("gwashington", "wmckinley")
+	want := "wmckinley-tjefferson-1,tjefferson-gwashington-1,tjefferson-j§adams-1,j§adams-gwashington-1,j§adams-tjefferson-1"
+	err = table.ReadRows(ctx, rowRange, func(r Row) bool {
+		for _, ris := range r {
+			for _, ri := range ris {
+				elt = append(elt, formatReadItem(ri))
+			}
+		}
+		return true
+	}, ReverseScan())
+
+	if err != nil {
+		t.Fatalf("read RowList: %v", err)
+	}
+
+	if got := strings.Join(elt, ","); got != want {
+		t.Fatalf("bulk read: wrong reads.\n got %q\nwant %q", got, want)
+	}
+}
 
 func TestIntegration_DeleteRow(t *testing.T) {
 	ctx := context.Background()
@@ -424,7 +457,7 @@
 	if !testutil.Equal(r, wantRow) {
 		t.Fatalf("Cell with multiple versions and LatestNFilter(2),\n got %v\nwant %v", r, wantRow)
 	}
-	// Check cell offset / limit
+	// Check cell offset / end
 	r, err = table.ReadRow(ctx, "testrow", RowFilter(CellsPerRowLimitFilter(3)))
 	if err != nil {
 		t.Fatalf("Reading row: %v", err)
@@ -778,7 +811,7 @@
 	}
 	verifyDirectPathRemoteAddress(testEnv, t)
 	if rc != wantRc {
-		t.Fatalf("Scan with row limit returned %d rows, want %d", rc, wantRc)
+		t.Fatalf("Scan with row end returned %d rows, want %d", rc, wantRc)
 	}
 
 	// Test bulk mutations
@@ -975,7 +1008,7 @@
 			want:   "",
 		},
 		{
-			desc:   "read with ColumnFilter + row limit",
+			desc:   "read with ColumnFilter + row end",
 			rr:     RowRange{},
 			filter: ColumnFilter(".*j.*"), // matches "j§adams" and "tjefferson"
 			limit:  LimitRows(2),
@@ -996,7 +1029,7 @@
 			want:   "gwashington-j§adams-,j§adams-gwashington-,j§adams-tjefferson-,tjefferson-gwashington-,tjefferson-j§adams-,tjefferson-wmckinley-,wmckinley-tjefferson-",
 		},
 		{
-			desc:   "read with ColumnFilter + row limit + strip values",
+			desc:   "read with ColumnFilter + row end + strip values",
 			rr:     RowRange{},
 			filter: ChainFilters(ColumnFilter(".*j.*"), StripValueFilter()), // matches "j§adams" and "tjefferson"
 			limit:  LimitRows(2),
@@ -1015,7 +1048,7 @@
 			want:   "gwashington-j§adams-,j§adams-gwashington-,j§adams-tjefferson-,tjefferson-gwashington-,tjefferson-j§adams-,tjefferson-wmckinley-,wmckinley-tjefferson-",
 		},
 		{
-			desc:   "read with ValueRangeFilter + row limit",
+			desc:   "read with ValueRangeFilter + row end",
 			rr:     RowRange{},
 			filter: ValueRangeFilter([]byte("1"), []byte("5")), // matches our value of "1"
 			limit:  LimitRows(2),
@@ -1128,10 +1161,11 @@
 	}
 
 	for _, test := range []struct {
-		desc   string
-		rr     RowSet
-		filter Filter     // may be nil
-		limit  ReadOption // may be nil
+		desc        string
+		rr          RowSet
+		filter      Filter     // may be nil
+		limit       ReadOption // may be nil
+		reverseScan bool
 
 		// We do the read and grab all the stats.
 		cellsReturnedCount int64
@@ -1251,7 +1285,7 @@
 			rowsReturnedCount:  0,
 		},
 		{
-			desc:               "read with ColumnFilter + row limit",
+			desc:               "read with ColumnFilter + row end",
 			rr:                 RowRange{},
 			filter:             ColumnFilter(".*j.*"), // matches "j§adams" and "tjefferson"
 			limit:              LimitRows(2),
@@ -1274,7 +1308,7 @@
 			rowsReturnedCount:  4,
 		},
 		{
-			desc:               "read with ColumnFilter + row limit + strip values",
+			desc:               "read with ColumnFilter + row end + strip values",
 			rr:                 RowRange{},
 			filter:             ChainFilters(ColumnFilter(".*j.*"), StripValueFilter()), // matches "j§adams" and "tjefferson"
 			limit:              LimitRows(2),
@@ -1296,7 +1330,7 @@
 			rowsReturnedCount:  4,
 		},
 		{
-			desc:               "read with ValueRangeFilter + row limit",
+			desc:               "read with ValueRangeFilter + row end",
 			rr:                 RowRange{},
 			filter:             ValueRangeFilter([]byte("1"), []byte("5")), // matches our value of "1"
 			limit:              LimitRows(2),
@@ -1358,6 +1392,20 @@
 			cellsReturnedCount: 0,
 			rowsReturnedCount:  0,
 		},
+		{
+			desc:               "reverse read all, unfiltered",
+			rr:                 RowRange{},
+			reverseScan:        true,
+			cellsReturnedCount: 7,
+			rowsReturnedCount:  4,
+		},
+		{
+			desc:               "reverse read with InfiniteRange, unfiltered",
+			rr:                 InfiniteReverseRange("wmckinley"),
+			reverseScan:        true,
+			cellsReturnedCount: 7,
+			rowsReturnedCount:  4,
+		},
 	} {
 		t.Run(test.desc, func(t *testing.T) {
 			var opts []ReadOption
@@ -1367,6 +1415,9 @@
 			if test.limit != nil {
 				opts = append(opts, test.limit)
 			}
+			if test.reverseScan {
+				opts = append(opts, ReverseScan())
+			}
 			// Define a callback for validating request stats.
 			callbackInvoked := false
 			statsValidator := WithFullReadStats(
@@ -1387,7 +1438,7 @@
 					// We use lenient checks for CellsSeenCount and RowsSeenCount. Exact checks would be brittle.
 					// Note that the emulator and prod sometimes yield different values:
 					// - Sometimes prod scans fewer cells due to optimizations that allow prod to skip cells.
-					// - Sometimes prod scans more cells due to to filters that must rescan cells.
+					// - Sometimes prod scans more cells due to filters that must rescan cells.
 					// Similar issues apply for RowsSeenCount.
 					if got, want := readStats.CellsSeenCount, readStats.CellsReturnedCount; got < want {
 						t.Errorf("CellsSeenCount should be greater than or equal to CellsReturnedCount. got: %d < want: %d",
@@ -2119,7 +2170,7 @@
 		time.Sleep(time.Second * 10)
 	}
 	if encryptionKeyVersion == "" {
-		t.Fatalf("Encryption Key not created within alotted time limit")
+		t.Fatalf("Encryption Key not created within alotted time end")
 	}
 
 	// Validate Encryption Info under getTable
diff --git a/bigtable/reader.go b/bigtable/reader.go
index 64aabc9..8f0a4c1 100644
--- a/bigtable/reader.go
+++ b/bigtable/reader.go
@@ -19,6 +19,7 @@
 import (
 	"bytes"
 	"fmt"
+	"strings"
 
 	btpb "google.golang.org/genproto/googleapis/bigtable/v2"
 )
@@ -58,6 +59,7 @@
 // chunkReader handles cell chunks from the read rows response and combines
 // them into full Rows.
 type chunkReader struct {
+	reversed  bool
 	state     rrState
 	curKey    []byte
 	curLabels []string
@@ -71,7 +73,11 @@
 
 // newChunkReader returns a new chunkReader for handling read rows responses.
 func newChunkReader() *chunkReader {
-	return &chunkReader{state: newRow}
+	return &chunkReader{reversed: false, state: newRow}
+}
+
+func newReverseChunkReader() *chunkReader {
+	return &chunkReader{reversed: true, state: newRow}
 }
 
 // Process takes a cell chunk and returns a new Row if the given chunk
@@ -200,9 +206,19 @@
 	if cc.RowKey == nil || cc.FamilyName == nil || cc.Qualifier == nil {
 		return fmt.Errorf("missing key field for new row %v", cc)
 	}
-	if cr.lastKey != "" && cr.lastKey >= string(cc.RowKey) {
-		return fmt.Errorf("out of order row key: %q, %q", cr.lastKey, string(cc.RowKey))
+
+	if cr.lastKey != "" {
+		r := strings.Compare(string(cc.RowKey), cr.lastKey)
+		direction := "increasing"
+		if cr.reversed {
+			r *= -1
+			direction = "decreasing"
+		}
+		if r <= 0 {
+			return fmt.Errorf("out of order row key, must be strictly %s. new key: %q prev row: %q", direction, cc.RowKey, cr.lastKey)
+		}
 	}
+
 	return nil
 }
 
diff --git a/bigtable/retry_test.go b/bigtable/retry_test.go
index 69e6fda..3d549ae 100644
--- a/bigtable/retry_test.go
+++ b/bigtable/retry_test.go
@@ -345,7 +345,7 @@
 func TestRetainRowsAfter(t *testing.T) {
 	prevRowRange := NewRange("a", "z")
 	prevRowKey := "m"
-	want := NewRange("m\x00", "z")
+	want := NewOpenRange("m", "z")
 	got := prevRowRange.retainRowsAfter(prevRowKey)
 	if !testutil.Equal(want, got, cmp.AllowUnexported(RowRange{})) {
 		t.Errorf("range retry: got %v, want %v", got, want)
@@ -353,7 +353,7 @@
 
 	prevRowRangeList := RowRangeList{NewRange("a", "d"), NewRange("e", "g"), NewRange("h", "l")}
 	prevRowKey = "f"
-	wantRowRangeList := RowRangeList{NewRange("f\x00", "g"), NewRange("h", "l")}
+	wantRowRangeList := RowRangeList{NewOpenRange("f", "g"), NewRange("h", "l")}
 	got = prevRowRangeList.retainRowsAfter(prevRowKey)
 	if !testutil.Equal(wantRowRangeList, got, cmp.AllowUnexported(RowRange{})) {
 		t.Errorf("range list retry: got %v, want %v", got, wantRowRangeList)
@@ -406,7 +406,7 @@
 			err = status.Errorf(codes.Unavailable, "")
 		case 2:
 			// Retryable request failure
-			if want, got := "b\x00", string(req.Rows.RowRanges[0].GetStartKeyClosed()); want != got {
+			if want, got := "b", string(req.Rows.RowRanges[0].GetStartKeyOpen()); want != got {
 				t.Errorf("2 range retries: got %q, want %q", got, want)
 			}
 			err = status.Errorf(codes.Unavailable, "")
@@ -418,7 +418,7 @@
 			must(ss.SendMsg(&btpb.ReadRowsResponse{LastScannedRowKey: []byte("e")}))
 			err = status.Errorf(codes.Unavailable, "")
 		case 5:
-			if want, got := "e\x00", string(req.Rows.RowRanges[0].GetStartKeyClosed()); want != got {
+			if want, got := "e", string(req.Rows.RowRanges[0].GetStartKeyOpen()); want != got {
 				t.Errorf("3 range retries: got %q, want %q", got, want)
 			}
 			must(writeReadRowsResponse(ss, "f", "g"))
@@ -439,6 +439,80 @@
 	}
 }
 
+func TestRetryReverseReadRows(t *testing.T) {
+	ctx := context.Background()
+
+	// Intercept requests and delegate to an interceptor defined by the test case
+	errCount := 0
+	var f func(grpc.ServerStream) error
+	errInjector := func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
+		if strings.HasSuffix(info.FullMethod, "ReadRows") {
+			return f(ss)
+		}
+		return handler(ctx, ss)
+	}
+
+	tbl, cleanup, err := setupFakeServer(grpc.StreamInterceptor(errInjector))
+	defer cleanup()
+	if err != nil {
+		t.Fatalf("fake server setup: %v", err)
+	}
+
+	errCount = 0
+	// Test overall request failure and retries
+	f = func(ss grpc.ServerStream) error {
+		var err error
+		req := new(btpb.ReadRowsRequest)
+		must(ss.RecvMsg(req))
+		switch errCount {
+		case 0:
+			// Retryable request failure
+			err = status.Errorf(codes.Unavailable, "")
+		case 1:
+			// Write two rows then error
+			if want, got := "z", string(req.Rows.RowRanges[0].GetEndKeyClosed()); want != got {
+				t.Errorf("first retry, no data received yet: got %q, want %q", got, want)
+			}
+			must(writeReadRowsResponse(ss, "g", "f"))
+			err = status.Errorf(codes.Unavailable, "")
+		case 2:
+			// Retryable request failure
+			if want, got := "f", string(req.Rows.RowRanges[0].GetEndKeyOpen()); want != got {
+				t.Errorf("2 range retries: got %q, want %q", got, want)
+			}
+			err = status.Errorf(codes.Unavailable, "")
+		case 3:
+			must(ss.SendMsg(&btpb.ReadRowsResponse{LastScannedRowKey: []byte("e")}))
+			err = status.Errorf(codes.Unavailable, "")
+		case 4:
+			if want, got := "e", string(req.Rows.RowRanges[0].GetEndKeyOpen()); want != got {
+				t.Errorf("3 range retries: got %q, want %q", got, want)
+			}
+			// Write two more rows
+			must(writeReadRowsResponse(ss, "d", "c"))
+			err = status.Errorf(codes.Unavailable, "")
+		case 5:
+			if want, got := "c", string(req.Rows.RowRanges[0].GetEndKeyOpen()); want != got {
+				t.Errorf("3 range retries: got %q, want %q", got, want)
+			}
+			must(writeReadRowsResponse(ss, "b", "a"))
+			err = nil
+		}
+		errCount++
+		return err
+	}
+
+	var got []string
+	must(tbl.ReadRows(ctx, NewClosedRange("a", "z"), func(r Row) bool {
+		got = append(got, r.Key())
+		return true
+	}, ReverseScan()))
+	want := []string{"g", "f", "d", "c", "b", "a"}
+	if !testutil.Equal(got, want) {
+		t.Errorf("retry range integration: got %v, want %v", got, want)
+	}
+}
+
 func writeReadRowsResponse(ss grpc.ServerStream, rowKeys ...string) error {
 	var chunks []*btpb.ReadRowsResponse_CellChunk
 	for _, key := range rowKeys {