blob: 8f9fc747f56a1dfd1cd2567c1659c94202abb22d [file] [log] [blame]
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"context"
"fmt"
"log"
"net"
"os"
"testing"
"cloud.google.com/go/bigtable"
"cloud.google.com/go/bigtable/bttest"
pb "github.com/googleapis/cloud-bigtable-clients-test/testproxypb"
"google.golang.org/api/option"
btpb "google.golang.org/genproto/googleapis/bigtable/v2"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/test/bufconn"
)
const (
buffer = 1024 * 1024
tableName = "projects/my-project/instances/my-instance/tables/table"
tableID = "table"
columnFamily = "cf"
testProxyClient = "testProxyClient"
testProxyAddress = "localhost:9990"
bigtableAddress = "localhost:9999"
rowKey = "row"
)
var (
lis *bufconn.Listener
client pb.CloudBigtableV2TestProxyClient
)
func bufDialer(context.Context, string) (net.Conn, error) {
return lis.Dial()
}
// helper function to populate the in-memory BT table.
func populateTable(bts *bttest.Server) error {
ctx := context.Background()
conn, err := grpc.Dial(bts.Addr, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock())
if err != nil {
return fmt.Errorf("testproxy setup: can't dial inmem Bigtable server: %v", err)
}
defer conn.Close()
adminClient, err := bigtable.NewAdminClient(ctx, "client", "instance",
option.WithGRPCConn(conn), option.WithGRPCDialOption(grpc.WithBlock()))
if err != nil {
return fmt.Errorf("testproxy setup: can't create AdminClient: %v", err)
}
defer adminClient.Close()
if err := adminClient.CreateTable(ctx, tableID); err != nil {
return fmt.Errorf("testproxy setup: can't create table: %v", err)
}
// Create column families (3 is an arbitrarily sufficient number)
count := 3
for i := 0; i < count; i++ {
cfName := fmt.Sprintf("%s%d", columnFamily, i)
if err := adminClient.CreateColumnFamily(ctx, tableID, cfName); err != nil {
return fmt.Errorf("testproxy setup: can't create column family: %s", cfName)
}
}
dataClient, err := bigtable.NewClient(ctx, "client", "instance",
option.WithGRPCConn(conn), option.WithGRPCDialOption(grpc.WithBlock()))
if err != nil {
return fmt.Errorf("testproxy setup: can't create Bigtable client: %v", err)
}
defer dataClient.Close()
t := dataClient.Open(tableID)
for fc := 0; fc < count; fc++ {
for cc := count; cc > 0; cc-- {
for tc := 0; tc < count; tc++ {
rmw := bigtable.NewReadModifyWrite()
rmw.AppendValue(fmt.Sprintf("%s%d", columnFamily, fc), fmt.Sprintf("coll%d", cc), []byte("test data"))
_, err = t.ApplyReadModifyWrite(ctx, rowKey, rmw)
if err != nil {
return fmt.Errorf("testproxy setup: failure populating row: %v", err)
}
}
}
}
return nil
}
/*
TestMain has three threads that it needs to start:
1) The mocked Bigtable service (server)
2) The NewCloudBigtableV2TestProxyClient client that sends requests to the testproxy server.
3) The testproxy server under test
The communication sequence looks kind of like:
TestProxyClient <=> test proxy server (what we're testing) <=> mocked BT server
*/
func TestMain(m *testing.M) {
ctx := context.Background()
// 1) Start the mocked Bigtable service
// This requires creating a "table" in memory
bts, err := bttest.NewServer(bigtableAddress)
if err != nil {
log.Fatalf("testproxy setup: can't create inmem Bigtable server")
}
err = populateTable(bts)
if err != nil {
log.Fatalf("testproxy setup: can't populate mock table")
}
// 3) Start the test proxy server
lis = bufconn.Listen(buffer)
s := newProxyServer(lis)
defer s.Stop()
go func() {
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}()
// 2) Create the test proxy client
conn2, err := grpc.DialContext(ctx, lis.Addr().String(),
grpc.WithContextDialer(bufDialer),
grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("testproxy setup: failed to dial testproxy: %v", err)
}
defer conn2.Close()
client = pb.NewCloudBigtableV2TestProxyClient(conn2)
// This could create a little bit of a race condition with the previous
// go routine ...
// However, we need to have the test proxy server running in order to create
// a reusable client stored in its memory
req := &pb.CreateClientRequest{
ClientId: testProxyClient,
ProjectId: "client",
DataTarget: bigtableAddress,
InstanceId: "instance",
}
_, err = client.CreateClient(ctx, req)
if err != nil {
log.Fatalf("testproxy setup: CreateClient() failed: %v", err)
}
os.Exit(m.Run())
}
func TestCreateAndRemoveClient(t *testing.T) {
// Test
cid := "testCreateClient"
ctx := context.Background()
req := &pb.CreateClientRequest{
ClientId: cid,
ProjectId: "client",
DataTarget: bigtableAddress,
InstanceId: "instance",
}
_, err := client.CreateClient(ctx, req)
if err != nil {
t.Fatalf("testproxy test: CreateClient() failed: %v", err)
}
t.Log("testproxy test: client created successfully in test proxy")
_, err = client.RemoveClient(ctx, &pb.RemoveClientRequest{
ClientId: cid,
})
if err != nil {
t.Errorf("testproxy test: RemoveClient() failed: %v", err)
}
}
func TestReadRow(t *testing.T) {
ctx := context.Background()
req := &pb.ReadRowRequest{
TableName: tableName,
ClientId: testProxyClient,
RowKey: rowKey,
}
resp, err := client.ReadRow(ctx, req)
if err != nil {
t.Fatalf("testproxy test: ReadRow() failed: %v", err)
}
stat := resp.Status.Code
if stat != int32(codes.OK) {
t.Errorf("testproxy test: ReadRow() didn't return OK")
}
row := resp.Row
if string(row.Key) != "row" {
t.Errorf("testproxy test: ReadRow() returned wrong row")
}
}
func TestBulkMutateRows(t *testing.T) {
ctx := context.Background()
req := &pb.MutateRowsRequest{
ClientId: testProxyClient,
Request: &btpb.MutateRowsRequest{
TableName: tableName,
Entries: []*btpb.MutateRowsRequest_Entry{
{
RowKey: []byte(rowKey),
Mutations: []*btpb.Mutation{
{
Mutation: &btpb.Mutation_SetCell_{
SetCell: &btpb.Mutation_SetCell{
ColumnQualifier: []byte("coll2"),
FamilyName: "cf0",
Value: []byte("bulked up mutant!"),
},
},
},
},
},
},
},
}
resp, err := client.BulkMutateRows(ctx, req)
if err != nil {
t.Fatalf("testproxy test: BulkMutateRows returned error: %v", err)
}
if resp.Status.Code != int32(codes.OK) {
t.Errorf("testproxy test: BulkMutateRows() didn't return OK; got %v", resp.Status.Code)
}
if len(resp.Entries) != 0 {
t.Errorf("testproxy test: BulkMutateRows() returned individual errors; got %v", resp.Entries)
}
}
func TestMutateRow(t *testing.T) {
ctx := context.Background()
req := &pb.MutateRowRequest{
ClientId: testProxyClient,
Request: &btpb.MutateRowRequest{
TableName: tableName,
RowKey: []byte(rowKey),
Mutations: []*btpb.Mutation{
{
Mutation: &btpb.Mutation_SetCell_{
SetCell: &btpb.Mutation_SetCell{
ColumnQualifier: []byte("coll1"),
FamilyName: "cf0",
Value: []byte("mutant!"),
},
},
},
},
},
}
resp, err := client.MutateRow(ctx, req)
if err != nil {
t.Fatalf("testproxy test: MutateRow() returned error: %v", err)
}
if resp.Status.Code != int32(codes.OK) {
t.Errorf("testproxy test: MutateRow() didn't return OK; got %v", resp.Status.Code)
}
}
func TestReadRows(t *testing.T) {
ctx := context.Background()
req := &pb.ReadRowsRequest{
ClientId: testProxyClient,
Request: &btpb.ReadRowsRequest{
TableName: tableName,
},
}
resp, err := client.ReadRows(ctx, req)
if err != nil {
t.Fatalf("testproxy test: ReadRows returned error: %v", err)
}
if resp.Status.Code != int32(codes.OK) {
t.Errorf("testproxy test: ReadRows() didn't return OK; got %v", resp.Status.Code)
}
if len(resp.Rows) != 1 {
t.Errorf("testproxy test: SampleRowKeys() returned wrong number of results; got: %d", len(resp.Rows))
}
}
func TestCheckAndMutateRow(t *testing.T) {
ctx := context.Background()
req := &pb.CheckAndMutateRowRequest{
ClientId: testProxyClient,
Request: &btpb.CheckAndMutateRowRequest{
TableName: tableName,
RowKey: []byte(rowKey),
PredicateFilter: &btpb.RowFilter{
Filter: &btpb.RowFilter_PassAllFilter{},
},
TrueMutations: []*btpb.Mutation{
{
Mutation: &btpb.Mutation_SetCell_{
SetCell: &btpb.Mutation_SetCell{
ColumnQualifier: []byte("coll1"),
FamilyName: "cf0",
Value: []byte("check success"),
},
},
},
},
FalseMutations: []*btpb.Mutation{
{
Mutation: &btpb.Mutation_SetCell_{
SetCell: &btpb.Mutation_SetCell{
ColumnQualifier: []byte("coll1"),
FamilyName: "cf0",
Value: []byte("check failed!"),
},
},
},
},
},
}
resp, err := client.CheckAndMutateRow(ctx, req)
if err != nil {
t.Fatalf("testproxy test: CheckAndMutateRow() returned error: %v", err)
}
if resp.Status.Code != int32(codes.OK) {
t.Errorf("testproxy test: CheckAndMutateRow() didn't return OK; got %v", resp.Status.Code)
}
if resp.Result.PredicateMatched != true {
t.Errorf("testproxy test: CheckAndMutateRow() returned wrong results; got: %v", resp.Result.PredicateMatched)
}
}
func TestSampleRowKeys(t *testing.T) {
ctx := context.Background()
req := &pb.SampleRowKeysRequest{
ClientId: testProxyClient,
Request: &btpb.SampleRowKeysRequest{
TableName: tableName,
},
}
resp, err := client.SampleRowKeys(ctx, req)
if err != nil {
t.Fatalf("testproxy test: SampleRowKeys() returned error: %v", err)
}
if resp.Status.Code != int32(codes.OK) {
t.Errorf("testproxy test: SampleRowKeys() didn't return OK; got %v", resp.Status.Code)
}
if len(resp.Samples) != 1 {
t.Errorf("testproxy test: SampleRowKeys() returned wrong number of results; got: %d", len(resp.Samples))
}
}
func TestReadWriteRow(t *testing.T) {
ctx := context.Background()
req := &pb.ReadModifyWriteRowRequest{
ClientId: testProxyClient,
Request: &btpb.ReadModifyWriteRowRequest{
TableName: tableName,
RowKey: []byte(rowKey),
Rules: []*btpb.ReadModifyWriteRule{
{
Rule: &btpb.ReadModifyWriteRule_AppendValue{
AppendValue: []byte("appended!"),
},
FamilyName: "cf0",
ColumnQualifier: []byte("coll1"),
},
},
},
}
resp, err := client.ReadModifyWriteRow(ctx, req)
if err != nil {
t.Fatalf("testproxy test: ReadModifyWriteRow() returned error: %v", err)
}
if resp.Status.Code != int32(codes.OK) {
t.Errorf("testproxy test: ReadModifyWriteRow() didn't return OK; got %v", resp.Status.Code)
}
if string(resp.Row.Key) != rowKey {
t.Errorf("testproxy test: ReadModifyWriteRow() returned wrong results; got: %v", resp.Row.Key)
}
}