blob: 441a9d2b8f63be0a0d86c48bf364066af4c3429c [file] [log] [blame]
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"context"
"errors"
"flag"
"fmt"
"log"
"net"
"strings"
"sync"
"time"
"cloud.google.com/go/bigtable"
"github.com/golang/protobuf/ptypes/duration"
pb "github.com/googleapis/cloud-bigtable-clients-test/testproxypb"
"google.golang.org/api/option"
btpb "google.golang.org/genproto/googleapis/bigtable/v2"
statpb "google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
stat "google.golang.org/grpc/status"
)
var (
port = flag.Int("port", 9999, "The server port")
logLabel = "cbt-go-proxy"
)
// rowToProto converts a Bigtable Go client Row struct into a
// Bigtable protobuf Row struct. It iterates over all of the column families
// (keys) and ReadItem slices (values) in the client Row struct
func rowToProto(btRow bigtable.Row) (*btpb.Row, error) {
pbRow := &btpb.Row{
Key: []byte(btRow.Key()),
}
for fam, ris := range btRow {
pbFam := &btpb.Family{
Name: fam,
}
for _, col := range ris {
// Format of column name is `family:columnQualifier`
colQualifier := strings.Split(col.Column, ":")[1]
pbCol := &btpb.Column{
Qualifier: []byte(colQualifier),
Cells: []*btpb.Cell{
{
Value: col.Value,
TimestampMicros: col.Timestamp.Time().UnixMicro(),
Labels: col.Labels,
},
},
}
pbFam.Columns = append(pbFam.Columns, pbCol)
}
pbRow.Families = append(pbRow.Families, pbFam)
}
return pbRow, nil
}
// rowSetFromProto translates a Bigtable v2.RowSet object to a Bigtable.RowSet
// object.
func rowSetFromProto(rs *btpb.RowSet) bigtable.RowSet {
rowKeys := rs.GetRowKeys()
rowRanges := rs.GetRowRanges()
if len(rowKeys) == 0 && len(rowRanges) == 0 {
return nil
}
// Convert all rowKeys into single-row RowRanges
if len(rowKeys) > 0 {
var rowList bigtable.RowList
for _, b := range rowKeys {
rowList = append(rowList, string(b))
}
return rowList
}
if len(rowRanges) > 0 {
rowRangeList := make(bigtable.RowRangeList, 0)
for _, rrs := range rowRanges {
var start, end string
var rr bigtable.RowRange
switch rrs.StartKey.(type) {
case *btpb.RowRange_StartKeyClosed:
start = string(rrs.GetStartKeyClosed())
case *btpb.RowRange_StartKeyOpen:
start = string(rrs.GetStartKeyOpen())
}
switch rrs.EndKey.(type) {
case *btpb.RowRange_EndKeyClosed:
end = string(rrs.GetEndKeyClosed())
rr = bigtable.NewRange(start, end)
case *btpb.RowRange_EndKeyOpen:
end = string(rrs.GetEndKeyOpen())
rr = bigtable.NewRange(start, end)
default:
// If not set, get the infinite row range
rr = bigtable.InfiniteRange(start)
}
rowRangeList = append(rowRangeList, rr)
}
return rowRangeList
}
return nil
}
// mutationFromProto translates a slice of Bigtable v2.Mutation objects into
// a single Bigtable.Mutation object.
func mutationFromProto(mPbs []*btpb.Mutation) *bigtable.Mutation {
m := bigtable.NewMutation()
for _, mpb := range mPbs {
switch mut := mpb.Mutation.(type) {
case *btpb.Mutation_DeleteFromColumn_:
del := mut
fam := del.DeleteFromColumn.FamilyName
col := del.DeleteFromColumn.ColumnQualifier
if del.DeleteFromColumn.TimeRange != nil {
start := bigtable.Time(time.UnixMicro(del.DeleteFromColumn.TimeRange.StartTimestampMicros))
end := bigtable.Time(time.UnixMicro(del.DeleteFromColumn.TimeRange.EndTimestampMicros))
m.DeleteTimestampRange(fam, string(col), start, end)
} else {
m.DeleteCellsInColumn(fam, string(col))
}
case *btpb.Mutation_DeleteFromFamily_:
del := mut
fam := del.DeleteFromFamily.FamilyName
m.DeleteCellsInFamily(fam)
case *btpb.Mutation_DeleteFromRow_:
m.DeleteRow()
case *btpb.Mutation_SetCell_:
setCell := mut
fam := setCell.SetCell.FamilyName
col := setCell.SetCell.ColumnQualifier
val := setCell.SetCell.Value
ts := setCell.SetCell.TimestampMicros
bts := bigtable.Time(time.UnixMicro(ts))
m.Set(fam, string(col), bts, val)
}
}
return m
}
// filterFromProto translates a Bigtable v2.RowFilter object into a Bigtable
// Filter object.
func filterFromProto(rfPb *btpb.RowFilter) *bigtable.Filter {
var f *bigtable.Filter
switch fpb := rfPb.Filter.(type) {
case *btpb.RowFilter_Chain_:
c := fpb
var fs []bigtable.Filter
for _, cfpb := range c.Chain.Filters {
cf := filterFromProto(cfpb)
fs = append(fs, *cf)
}
cf := bigtable.ChainFilters(fs...)
f = &cf
case *btpb.RowFilter_Interleave_:
i := fpb
fs := make([]bigtable.Filter, 0)
for _, ipb := range i.Interleave.Filters {
ipbf := filterFromProto(ipb)
fs = append(fs, *ipbf)
}
inf := bigtable.InterleaveFilters(fs...)
f = &inf
case *btpb.RowFilter_Condition_:
cond := fpb
tf := filterFromProto(cond.Condition.TrueFilter)
ff := filterFromProto(cond.Condition.TrueFilter)
pf := filterFromProto(cond.Condition.PredicateFilter)
cf := bigtable.ConditionFilter(*pf, *tf, *ff)
f = &cf
case *btpb.RowFilter_Sink:
// Not currently supported.
f = nil
case *btpb.RowFilter_PassAllFilter:
p := bigtable.PassAllFilter()
f = &p
case *btpb.RowFilter_BlockAllFilter:
b := bigtable.BlockAllFilter()
f = &b
case *btpb.RowFilter_RowKeyRegexFilter:
rf := fpb
re := rf.RowKeyRegexFilter
rrf := bigtable.RowKeyFilter(string(re))
f = &rrf
case *btpb.RowFilter_RowSampleFilter:
rsf := fpb
rs := rsf.RowSampleFilter
rf := bigtable.RowSampleFilter(rs)
f = &rf
case *btpb.RowFilter_FamilyNameRegexFilter:
fnf := fpb
re := fnf.FamilyNameRegexFilter
fn := bigtable.FamilyFilter(re)
f = &fn
case *btpb.RowFilter_ColumnQualifierRegexFilter:
cqf := fpb
re := cqf.ColumnQualifierRegexFilter
cq := bigtable.ColumnFilter(string(re))
f = &cq
case *btpb.RowFilter_ColumnRangeFilter:
crf := fpb
cr := crf.ColumnRangeFilter
var start, end string
switch sf := cr.StartQualifier; sf.(type) {
case *btpb.ColumnRange_StartQualifierOpen:
start = string(sf.(*btpb.ColumnRange_StartQualifierOpen).StartQualifierOpen)
case *btpb.ColumnRange_StartQualifierClosed:
start = string(sf.(*btpb.ColumnRange_StartQualifierClosed).StartQualifierClosed)
}
switch ef := cr.EndQualifier; ef.(type) {
case *btpb.ColumnRange_EndQualifierClosed:
end = string(ef.(*btpb.ColumnRange_EndQualifierClosed).EndQualifierClosed)
case *btpb.ColumnRange_EndQualifierOpen:
end = string(ef.(*btpb.ColumnRange_EndQualifierOpen).EndQualifierOpen)
}
cf := bigtable.ColumnRangeFilter(cr.FamilyName, start, end)
f = &cf
case *btpb.RowFilter_TimestampRangeFilter:
trf := fpb
tsr := trf.TimestampRangeFilter
tsf := bigtable.TimestampRangeFilter(time.UnixMicro(tsr.StartTimestampMicros), time.UnixMicro(tsr.EndTimestampMicros))
f = &tsf
case *btpb.RowFilter_ValueRegexFilter:
vrf := fpb
re := vrf.ValueRegexFilter
vr := bigtable.ValueFilter(string(re))
f = &vr
case *btpb.RowFilter_ValueRangeFilter:
vrf := fpb
var start, end []byte
switch sv := vrf.ValueRangeFilter.StartValue; sv.(type) {
case *btpb.ValueRange_StartValueOpen:
start = sv.(*btpb.ValueRange_StartValueOpen).StartValueOpen
case *btpb.ValueRange_StartValueClosed:
start = sv.(*btpb.ValueRange_StartValueClosed).StartValueClosed
}
switch ev := vrf.ValueRangeFilter.EndValue; ev.(type) {
case *btpb.ValueRange_EndValueOpen:
end = ev.(*btpb.ValueRange_EndValueOpen).EndValueOpen
case *btpb.ValueRange_EndValueClosed:
end = ev.(*btpb.ValueRange_EndValueClosed).EndValueClosed
}
vr := bigtable.ValueRangeFilter(start, end)
f = &vr
case *btpb.RowFilter_CellsPerRowOffsetFilter:
cof := fpb
off := cof.CellsPerRowOffsetFilter
co := bigtable.CellsPerRowOffsetFilter(int(off))
f = &co
case *btpb.RowFilter_CellsPerRowLimitFilter:
clf := fpb
lim := clf.CellsPerRowLimitFilter
cl := bigtable.CellsPerRowLimitFilter(int(lim))
f = &cl
case *btpb.RowFilter_CellsPerColumnLimitFilter:
ccf := fpb
lim := ccf.CellsPerColumnLimitFilter
cc := bigtable.LatestNFilter(int(lim))
f = &cc
case *btpb.RowFilter_StripValueTransformer:
sv := bigtable.StripValueFilter()
f = &sv
case *btpb.RowFilter_ApplyLabelTransformer:
alf := fpb
l := alf.ApplyLabelTransformer
al := bigtable.LabelFilter(l)
f = &al
}
return f
}
// statusFromError converts an error into a Status code.
func statusFromError(err error) *statpb.Status {
log.Printf("error: %v\n", err)
st := &statpb.Status{
Code: int32(codes.Unknown),
Message: fmt.Sprintf("%v", err),
}
if s, ok := stat.FromError(err); ok {
st = &statpb.Status{
Code: int32(s.Code()),
Message: s.Message(),
}
}
return st
}
// parseTableID extracts a table ID from a table name.
// For example, a table ID is in the format projects/<project>/instances/<instance>/tables/<tableID>
//
// Note that this function does not check all variants and edge cases. It assumes
// that the test suite used with the test proxy sends *generally* correct requests.
func parseTableID(tableName string) (tableID string, _ error) {
paths := strings.Split(tableName, "/")
if len(paths) < 6 {
return "", errors.New("table resource name does not have the correct format")
}
tableID = paths[len(paths)-1]
var err error
if tableID == "" {
err = errors.New("cannot read tableID from table name")
}
return tableID, err
}
// testClient contains a bigtable.Client object, cancel functions for the calls
// made using the client, an appProfileID (optionally), and a
// perOperationTimeout (optionally).
type testClient struct {
c *bigtable.Client // c stores the Bigtable client under test
appProfileID string // appProfileID is currently unused
perOperationTimeout *duration.Duration // perOperationTimeout sets a custom timeout for methods calls on this client
}
// timeout adds a timeout setting to a context if perOperationTimeout is set on
// the testClient object.
func (tc *testClient) timeout(ctx context.Context) (context.Context, context.CancelFunc) {
if tc.perOperationTimeout != nil {
return context.WithTimeout(ctx, tc.perOperationTimeout.AsDuration())
}
return context.WithCancel(ctx)
}
// getCredentialsOptions provides credentials for a Bigtable client.
//
// Note: this proxy uses insecure credentials. This function may need to be
// expanded to support different credential types.
func getCredentialsOptions(req *pb.CreateClientRequest) (opts []grpc.DialOption, _ error) {
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
return opts, nil
}
// goTestProxyServer represents an instance of the test proxy server. It keeps
// a reference to individual clients instances (stored in a testClient object).
type goTestProxyServer struct {
pb.UnimplementedCloudBigtableV2TestProxyServer
clientsLock sync.RWMutex // clientsLock prevents simultaneous mutation of the clientIDs map
clientIDs map[string]*testClient // clientIDs has all of the bigtable.Client objects under test
}
// client retrieves a testClient from the clientIDs map. You must lock clientsLock before calling
// this method.
func (s *goTestProxyServer) client(clientID string) (*testClient, error) {
client, ok := s.clientIDs[clientID]
if !ok {
return nil, fmt.Errorf("client ID %s does not exist", clientID)
}
return client, nil
}
// CreateClient responds to the CreateClient RPC. This method adds a new client
// instance to the goTestProxyServer
func (s *goTestProxyServer) CreateClient(ctx context.Context, req *pb.CreateClientRequest) (*pb.CreateClientResponse, error) {
if req.ClientId == "" ||
req.DataTarget == "" ||
req.ProjectId == "" ||
req.InstanceId == "" {
return nil, stat.Error(codes.InvalidArgument,
fmt.Sprintf("%s must provide ClientId, DataTarget, ProjectId, and InstanceId", logLabel))
}
s.clientsLock.Lock()
defer s.clientsLock.Unlock()
if _, exists := s.clientIDs[req.ClientId]; exists {
return nil, stat.Error(codes.AlreadyExists,
fmt.Sprintf("%s: ClientID already exists", logLabel))
}
opts, err := getCredentialsOptions(req)
if err != nil {
return nil, stat.Error(codes.Unauthenticated,
fmt.Sprintf("%s: failed to get credentials: %v", logLabel, err))
}
conn, err := grpc.Dial(req.DataTarget, opts...)
if err != nil {
return nil, stat.Error(codes.Unknown, fmt.Sprintf("%s: failed to create connection: %v", logLabel, err))
}
c, err := bigtable.NewClient(ctx, req.ProjectId, req.InstanceId, option.WithGRPCConn(conn))
if err != nil {
return nil, stat.Error(codes.Internal,
fmt.Sprintf("%s: failed to create client: %v", logLabel, err))
}
s.clientIDs[req.ClientId] = &testClient{
c: c,
appProfileID: req.AppProfileId,
perOperationTimeout: req.PerOperationTimeout,
}
return &pb.CreateClientResponse{}, nil
}
// CloseClient responds to the CloseClient RPC. This method closes an existing
// client, making it inaccessible to new requests.
func (s *goTestProxyServer) CloseClient(ctx context.Context, req *pb.CloseClientRequest) (*pb.CloseClientResponse, error) {
clientID := req.ClientId
s.clientsLock.Lock()
defer s.clientsLock.Unlock()
btc, err := s.client(clientID)
if err != nil {
return nil, err
}
btc.c.Close()
return &pb.CloseClientResponse{}, nil
}
// RemoveClient responds to the RemoveClient RPC. This method removes an
// existing client from the goTestProxyServer
func (s *goTestProxyServer) RemoveClient(ctx context.Context, req *pb.RemoveClientRequest) (*pb.RemoveClientResponse, error) {
clientID := req.ClientId
s.clientsLock.Lock()
defer s.clientsLock.Unlock()
// RemoveClient can ignore whether the client accepts new requests
_, err := s.client(clientID)
if err != nil {
return nil, stat.Error(codes.InvalidArgument,
fmt.Sprintf("%s: ClientID does not exist", logLabel))
}
delete(s.clientIDs, clientID)
return &pb.RemoveClientResponse{}, nil
}
// ReadRow responds to the ReadRow RPC. This method gets all of the column
// data for a single row in the Table.
func (s *goTestProxyServer) ReadRow(ctx context.Context, req *pb.ReadRowRequest) (*pb.RowResult, error) {
s.clientsLock.RLock()
btc, err := s.client(req.ClientId)
if err != nil {
return nil, err
}
s.clientsLock.RUnlock()
tid, err := parseTableID(req.TableName)
if err != nil {
return nil, err
}
t := btc.c.Open(tid)
res := &pb.RowResult{
Status: &statpb.Status{
Code: int32(codes.OK),
},
Row: &btpb.Row{},
}
ctx, cancel := btc.timeout(ctx)
defer cancel()
r, err := t.ReadRow(ctx, req.RowKey)
if err != nil {
res.Status = statusFromError(err)
return res, nil
}
if r == nil {
return res, nil
}
pbRow, err := rowToProto(r)
if err != nil {
return nil, err
}
res.Row = pbRow
return res, nil
}
// ReadRows responds to the ReadRows RPC. This method gets all of the column
// data for a set of rows, a range of rows, or the entire table.
func (s *goTestProxyServer) ReadRows(ctx context.Context, req *pb.ReadRowsRequest) (*pb.RowsResult, error) {
s.clientsLock.RLock()
btc, err := s.client(req.ClientId)
s.clientsLock.RUnlock()
if err != nil {
return nil, err
}
rrq := req.GetRequest()
if rrq == nil {
log.Printf("missing inner request: %v\n", rrq)
return nil, stat.Error(codes.InvalidArgument, "request to ReadRows() is missing inner request")
}
tid, err := parseTableID(rrq.TableName)
if err != nil {
return nil, err
}
t := btc.c.Open(tid)
rowPbs := rrq.Rows
rs := rowSetFromProto(rowPbs)
// Bigtable client doesn't have a Table.GetAll() function--RowSet must be
// provided for ReadRows. Use InfiniteRange() to get the full table.
if rs == nil {
// Should be lowest possible key value, an empty byte array
rs = bigtable.InfiniteRange("")
}
ctx, cancel := btc.timeout(ctx)
defer cancel()
var c int32
var rowsPb []*btpb.Row
lim := req.GetCancelAfterRows()
err = t.ReadRows(ctx, rs, func(r bigtable.Row) bool {
c++
if c == lim {
return false
}
rpb, err := rowToProto(r)
if err != nil {
return false
}
rowsPb = append(rowsPb, rpb)
return true
})
res := &pb.RowsResult{
Status: &statpb.Status{
Code: int32(codes.OK),
},
Rows: []*btpb.Row{},
}
if err != nil {
log.Printf("error from Table.ReadRows: %v\n", err)
if st, ok := stat.FromError(err); ok {
res.Status = &statpb.Status{
Code: st.Proto().Code,
Message: st.Message(),
}
}
return res, nil
}
res.Rows = rowsPb
return res, nil
}
// MutateRow responds to the MutateRow RPC. This methods applies a series of
// changes (or deletions) to a single row in a table.
func (s *goTestProxyServer) MutateRow(ctx context.Context, req *pb.MutateRowRequest) (*pb.MutateRowResult, error) {
s.clientsLock.RLock()
btc, err := s.client(req.ClientId)
s.clientsLock.RUnlock()
if err != nil {
return nil, err
}
rrq := req.GetRequest()
if rrq == nil {
return nil, stat.Error(codes.InvalidArgument, "request to MutateRow() is missing inner request")
}
mPbs := rrq.Mutations
m := mutationFromProto(mPbs)
tid, err := parseTableID(rrq.TableName)
if err != nil {
return nil, err
}
t := btc.c.Open(tid)
row := rrq.RowKey
res := &pb.MutateRowResult{
Status: &statpb.Status{
Code: int32(codes.OK),
},
}
ctx, cancel := btc.timeout(ctx)
defer cancel()
err = t.Apply(ctx, string(row), m)
if err != nil {
res.Status = statusFromError(err)
return res, nil
}
return res, nil
}
// BulkMutateRows responds to the BulkMutateRows RPC. This method applies a
// series of changes or deletions to multiple rows in a single call.
func (s *goTestProxyServer) BulkMutateRows(ctx context.Context, req *pb.MutateRowsRequest) (*pb.MutateRowsResult, error) {
s.clientsLock.RLock()
btc, err := s.client(req.ClientId)
s.clientsLock.RUnlock()
if err != nil {
return nil, err
}
rrq := req.GetRequest()
if rrq == nil {
log.Printf("missing inner request to BulkMutateRows: %v\n", req)
return nil, stat.Error(codes.InvalidArgument, "request to BulkMutateRows() is missing inner request")
}
mrs := rrq.Entries
tid, err := parseTableID(rrq.TableName)
if err != nil {
return nil, err
}
t := btc.c.Open(tid)
keys := make([]string, len(mrs))
muts := make([]*bigtable.Mutation, len(mrs))
for i, mr := range mrs {
key := string(mr.RowKey)
m := mutationFromProto(mr.Mutations)
// A little tricky here ... each key corresponds to a single Mutation
// object, where the indices of each slice must be sync'ed.
keys[i] = key
muts[i] = m
}
res := &pb.MutateRowsResult{
Status: &statpb.Status{
Code: int32(codes.OK),
},
}
ctx, cancel := btc.timeout(ctx)
defer cancel()
errs, err := t.ApplyBulk(ctx, keys, muts)
if err != nil {
log.Printf("received error from Table.ApplyBulk(): %v", err)
res.Status = statusFromError(err)
}
var entries []*btpb.MutateRowsResponse_Entry
// Iterate over any errors returned, matching indices with errors. If
// errs is nil, this block is skipped.
for i, e := range errs {
var me *btpb.MutateRowsResponse_Entry
if e != nil {
st := statusFromError(err)
me = &btpb.MutateRowsResponse_Entry{
Index: int64(i),
Status: st,
}
entries = append(entries, me)
}
}
res.Entries = entries
return res, nil
}
// CheckAndMutateRow responds to the CheckAndMutateRow RPC. This method applies
// one mutation if a condition is true and another mutation if it is false.
func (s *goTestProxyServer) CheckAndMutateRow(ctx context.Context, req *pb.CheckAndMutateRowRequest) (*pb.CheckAndMutateRowResult, error) {
s.clientsLock.RLock()
btc, err := s.client(req.ClientId)
s.clientsLock.RUnlock()
if err != nil {
return nil, err
}
rrq := req.GetRequest()
if rrq == nil {
log.Printf("request to CheckAndMutateRow is missing inner request: received: %v", req)
return nil, stat.Error(codes.InvalidArgument, "request to CheckAndMutateRow() is missing inner request")
}
trueMuts := mutationFromProto(rrq.TrueMutations)
falseMuts := mutationFromProto(rrq.FalseMutations)
rfPb := rrq.PredicateFilter
f := bigtable.PassAllFilter()
if rfPb != nil {
f = *filterFromProto(rfPb)
}
c := bigtable.NewCondMutation(f, trueMuts, falseMuts)
res := &pb.CheckAndMutateRowResult{
Status: &statpb.Status{
Code: int32(codes.OK),
},
}
tid, err := parseTableID(rrq.TableName)
if err != nil {
return nil, err
}
t := btc.c.Open(tid)
rowKey := string(rrq.RowKey)
var matched bool
ao := bigtable.GetCondMutationResult(&matched)
ctx, cancel := btc.timeout(ctx)
defer cancel()
err = t.Apply(ctx, rowKey, c, ao)
if err != nil {
log.Printf("received error from Table.Apply: %v", err)
res.Status = statusFromError(err)
return res, nil
}
res.Result = &btpb.CheckAndMutateRowResponse{
PredicateMatched: matched,
}
return res, nil
}
// SampleRowKeys responds to the SampleRowKeys RPC. This method gets a sampling
// of the keys available in a table.
func (s *goTestProxyServer) SampleRowKeys(ctx context.Context, req *pb.SampleRowKeysRequest) (*pb.SampleRowKeysResult, error) {
s.clientsLock.RLock()
btc, err := s.client(req.ClientId)
s.clientsLock.RUnlock()
if err != nil {
return nil, err
}
rrq := req.GetRequest()
if rrq == nil {
log.Printf("missing inner request to SampleRowKeys: %v\n", req)
return nil, stat.Error(codes.InvalidArgument, "request to SampleRowKeys() is missing inner request")
}
res := &pb.SampleRowKeysResult{
Status: &statpb.Status{
Code: int32(codes.OK),
},
}
ctx, cancel := btc.timeout(ctx)
defer cancel()
tid, err := parseTableID(rrq.TableName)
if err != nil {
return nil, err
}
t := btc.c.Open(tid)
keys, err := t.SampleRowKeys(ctx)
if err != nil {
log.Printf("received error from Table.SampleRowKeys(): %v\n", err)
res.Status = statusFromError(err)
return res, nil
}
sk := make([]*btpb.SampleRowKeysResponse, 0)
for _, k := range keys {
s := &btpb.SampleRowKeysResponse{
RowKey: []byte(k),
}
sk = append(sk, s)
}
res.Samples = sk
return res, nil
}
// ReadModifyWriteRow responds to the ReadModifyWriteRow RPC. This method
// applies a non-idempotent change to a row.
func (s *goTestProxyServer) ReadModifyWriteRow(ctx context.Context, req *pb.ReadModifyWriteRowRequest) (*pb.RowResult, error) {
s.clientsLock.RLock()
btc, err := s.client(req.ClientId)
s.clientsLock.RUnlock()
if err != nil {
return nil, err
}
rrq := req.GetRequest()
if rrq == nil {
log.Printf("missing inner request to ReadModifyWriteRow: %v\n", req)
return nil, stat.Error(codes.InvalidArgument, "request to CheckAndMutateRow() is missing inner request")
}
rpb := rrq.Rules
rmw := bigtable.NewReadModifyWrite()
for _, rp := range rpb {
switch r := rp.Rule.(type) {
case *btpb.ReadModifyWriteRule_AppendValue:
av := r
rmw.AppendValue(rp.FamilyName, string(rp.ColumnQualifier), av.AppendValue)
case *btpb.ReadModifyWriteRule_IncrementAmount:
ia := r
rmw.Increment(rp.FamilyName, string(rp.ColumnQualifier), ia.IncrementAmount)
}
}
res := &pb.RowResult{
Status: &statpb.Status{
Code: int32(codes.OK),
},
}
tid, err := parseTableID(rrq.TableName)
if err != nil {
return nil, err
}
t := btc.c.Open(tid)
k := string(rrq.RowKey)
ctx, cancel := btc.timeout(ctx)
defer cancel()
r, err := t.ApplyReadModifyWrite(ctx, k, rmw)
if err != nil {
return nil, err
}
rp, err := rowToProto(r)
if err != nil {
res.Status = statusFromError(err)
return res, nil
}
res.Row = rp
return res, nil
}
func (s *goTestProxyServer) mustEmbedUnimplementedCloudBigtableV2TestProxyServer() {}
func newProxyServer(lis net.Listener) *grpc.Server {
s := grpc.NewServer()
tps := &goTestProxyServer{
clientIDs: make(map[string]*testClient),
}
pb.RegisterCloudBigtableV2TestProxyServer(s, tps)
log.Printf("server listening at %v", lis.Addr())
return s
}
func main() {
flag.Parse()
log.Printf("attempting to listen on port %d", *port)
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
defer lis.Close()
s := newProxyServer(lis)
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
defer s.Stop()
}