spanner: add basic benchserver, enable emulator usage
- Allow emulator usage via SPANNER_EMULATOR_HOST.
- Add basic benchserver that just returns OK responses with zero processing.
Change-Id: I52398fc75b35cd25a97a65c8d528a7fc5081fe99
Reviewed-on: https://code-review.googlesource.com/c/gocloud/+/42950
Reviewed-by: Emmanuel Odeke <emm.odeke@gmail.com>
diff --git a/spanner/client.go b/spanner/client.go
index 38c3f49..21e5045 100644
--- a/spanner/client.go
+++ b/spanner/client.go
@@ -19,6 +19,7 @@
import (
"context"
"fmt"
+ "os"
"regexp"
"sync/atomic"
"time"
@@ -122,13 +123,6 @@
// NewClientWithConfig creates a client to a database. A valid database name has
// the form projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID.
func NewClientWithConfig(ctx context.Context, database string, config ClientConfig, opts ...option.ClientOption) (c *Client, err error) {
- ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.NewClient")
- defer func() { trace.EndSpan(ctx, err) }()
-
- // Validate database path.
- if err := validDatabaseName(database); err != nil {
- return nil, err
- }
c = &Client{
database: database,
md: metadata.Pairs(
@@ -142,19 +136,6 @@
c.sessionLabels[k] = v
}
- // gRPC options.
- allOpts := []option.ClientOption{
- option.WithEndpoint(endpoint),
- option.WithScopes(Scope),
- option.WithGRPCDialOption(
- grpc.WithDefaultCallOptions(
- grpc.MaxCallSendMsgSize(100<<20),
- grpc.MaxCallRecvMsgSize(100<<20),
- ),
- ),
- }
- allOpts = append(allOpts, opts...)
-
// Prepare gRPC channels.
if config.NumChannels == 0 {
config.NumChannels = numChannels
@@ -168,13 +149,46 @@
config.MaxBurst = 10
}
- // TODO(deklerk): This should be replaced with a balancer with
- // config.NumChannels connections, instead of config.NumChannels
- // clients.
- for i := 0; i < config.NumChannels; i++ {
- client, err := vkit.NewClient(ctx, allOpts...)
+ if emulatorAddr := os.Getenv("SPANNER_EMULATOR_HOST"); emulatorAddr == "" {
+ // Validate database path.
+ if err := validDatabaseName(database); err != nil {
+ return nil, err
+ }
+
+ ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.NewClient")
+ defer func() { trace.EndSpan(ctx, err) }()
+
+ // gRPC options.
+ allOpts := []option.ClientOption{
+ option.WithEndpoint(endpoint),
+ option.WithScopes(Scope),
+ option.WithGRPCDialOption(
+ grpc.WithDefaultCallOptions(
+ grpc.MaxCallSendMsgSize(100<<20),
+ grpc.MaxCallRecvMsgSize(100<<20),
+ ),
+ ),
+ }
+ allOpts = append(allOpts, opts...)
+
+ // TODO(deklerk): This should be replaced with a balancer with
+ // config.NumChannels connections, instead of config.NumChannels
+ // clients.
+ for i := 0; i < config.NumChannels; i++ {
+ client, err := vkit.NewClient(ctx, allOpts...)
+ if err != nil {
+ return nil, errDial(i, err)
+ }
+ c.clients = append(c.clients, client)
+ }
+ } else {
+ conn, err := grpc.Dial(emulatorAddr, grpc.WithInsecure())
if err != nil {
- return nil, errDial(i, err)
+ return nil, errDial(0, err)
+ }
+ client, err := vkit.NewClient(ctx, option.WithGRPCConn(conn))
+ if err != nil {
+ return nil, errDial(0, err)
}
c.clients = append(c.clients, client)
}
diff --git a/spanner/client_test.go b/spanner/client_test.go
index 3df5366..49b5485 100644
--- a/spanner/client_test.go
+++ b/spanner/client_test.go
@@ -19,9 +19,11 @@
import (
"context"
"io"
+ "os"
"strings"
"testing"
+ "cloud.google.com/go/spanner/internal/benchserver"
"cloud.google.com/go/spanner/internal/testutil"
"google.golang.org/api/iterator"
"google.golang.org/grpc/codes"
@@ -407,3 +409,29 @@
t.Fatalf("unexpected number of attempts: %d, expected %d", attempts, 1)
}
}
+
+func TestNewClient_ConnectToEmulator(t *testing.T) {
+ ctx := context.Background()
+
+ s, err := benchserver.NewMockCloudSpanner()
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer s.Stop()
+ go s.Serve()
+
+ oldEnv := os.Getenv("SPANNER_EMULATOR_HOST")
+ os.Setenv("SPANNER_EMULATOR_HOST", s.Addr())
+ defer os.Setenv("SPANNER_EMULATOR_HOST", oldEnv)
+
+ c, err := NewClient(ctx, "some-db")
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer c.Close()
+
+ _, err = c.Single().ReadRow(ctx, "Accounts", Key{"alice"}, []string{"balance"})
+ if err != nil {
+ t.Fatal(err)
+ }
+}
diff --git a/spanner/internal/benchserver/benchserver.go b/spanner/internal/benchserver/benchserver.go
new file mode 100644
index 0000000..32c0d2c
--- /dev/null
+++ b/spanner/internal/benchserver/benchserver.go
@@ -0,0 +1,145 @@
+/*
+Copyright 2019 Google LLC
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package benchserver
+
+import (
+ "context"
+ "encoding/binary"
+ "net"
+ "time"
+
+ "github.com/golang/protobuf/ptypes/empty"
+ proto3 "github.com/golang/protobuf/ptypes/struct"
+ pbt "github.com/golang/protobuf/ptypes/timestamp"
+ sppb "google.golang.org/genproto/googleapis/spanner/v1"
+ "google.golang.org/grpc"
+)
+
+var (
+ // KvMeta is the Metadata for mocked KV table.
+ KvMeta = sppb.ResultSetMetadata{
+ RowType: &sppb.StructType{
+ Fields: []*sppb.StructType_Field{
+ {
+ Name: "Key",
+ Type: &sppb.Type{Code: sppb.TypeCode_STRING},
+ },
+ {
+ Name: "Value",
+ Type: &sppb.Type{Code: sppb.TypeCode_STRING},
+ },
+ },
+ },
+ }
+)
+
+// MockCloudSpanner is a mock implementation of SpannerServer interface.
+type MockCloudSpanner struct {
+ sppb.SpannerServer
+
+ gsrv *grpc.Server
+ lis net.Listener
+ addr string
+}
+
+// NewMockCloudSpanner creates a new MockCloudSpanner instance.
+func NewMockCloudSpanner() (*MockCloudSpanner, error) {
+ gsrv := grpc.NewServer()
+ lis, err := net.Listen("tcp", "localhost:0")
+ if err != nil {
+ return nil, err
+ }
+ mcs := &MockCloudSpanner{
+ gsrv: gsrv,
+ lis: lis,
+ addr: lis.Addr().String(),
+ }
+ sppb.RegisterSpannerServer(gsrv, mcs)
+ return mcs, nil
+}
+
+// Serve starts the server and blocks.
+func (m *MockCloudSpanner) Serve() error {
+ return m.gsrv.Serve(m.lis)
+}
+
+// Addr returns the listening address of mock server.
+func (m *MockCloudSpanner) Addr() string {
+ return m.addr
+}
+
+// Stop terminates MockCloudSpanner and closes the serving port.
+func (m *MockCloudSpanner) Stop() {
+ m.gsrv.Stop()
+}
+
+// CreateSession is a placeholder for SpannerServer.CreateSession.
+func (m *MockCloudSpanner) CreateSession(c context.Context, r *sppb.CreateSessionRequest) (*sppb.Session, error) {
+ return &sppb.Session{Name: "some-session"}, nil
+}
+
+// DeleteSession is a placeholder for SpannerServer.DeleteSession.
+func (m *MockCloudSpanner) DeleteSession(c context.Context, r *sppb.DeleteSessionRequest) (*empty.Empty, error) {
+ return &empty.Empty{}, nil
+}
+
+// ExecuteStreamingSql is a mock implementation of SpannerServer.ExecuteStreamingSql.
+func (m *MockCloudSpanner) ExecuteStreamingSql(r *sppb.ExecuteSqlRequest, s sppb.Spanner_ExecuteStreamingSqlServer) error {
+ rt := EncodeResumeToken(uint64(1))
+ meta := KvMeta
+ meta.Transaction = &sppb.Transaction{
+ ReadTimestamp: &pbt.Timestamp{
+ Seconds: time.Now().Unix(),
+ Nanos: int32(time.Now().Nanosecond()),
+ },
+ }
+ return s.Send(&sppb.PartialResultSet{
+ Metadata: &meta,
+ Values: []*proto3.Value{
+ {Kind: &proto3.Value_StringValue{StringValue: "foo"}},
+ {Kind: &proto3.Value_StringValue{StringValue: "bar"}},
+ },
+ ResumeToken: rt,
+ })
+}
+
+// StreamingRead is a placeholder for SpannerServer.StreamingRead.
+func (m *MockCloudSpanner) StreamingRead(r *sppb.ReadRequest, s sppb.Spanner_StreamingReadServer) error {
+ rt := EncodeResumeToken(uint64(1))
+ meta := KvMeta
+ meta.Transaction = &sppb.Transaction{
+ ReadTimestamp: &pbt.Timestamp{
+ Seconds: -1,
+ Nanos: -1,
+ },
+ }
+ return s.Send(&sppb.PartialResultSet{
+ Metadata: &meta,
+ Values: []*proto3.Value{
+ {Kind: &proto3.Value_StringValue{StringValue: "foo"}},
+ {Kind: &proto3.Value_StringValue{StringValue: "bar"}},
+ },
+ ResumeToken: rt,
+ })
+}
+
+// EncodeResumeToken return mock resume token encoding for an uint64 integer.
+func EncodeResumeToken(t uint64) []byte {
+ rt := make([]byte, 16)
+ binary.PutUvarint(rt, t)
+ return rt
+}
diff --git a/spanner/internal/cmd/benchserver/main.go b/spanner/internal/cmd/benchserver/main.go
new file mode 100644
index 0000000..275afbc
--- /dev/null
+++ b/spanner/internal/cmd/benchserver/main.go
@@ -0,0 +1,30 @@
+// Copyright 2019 Google LLC
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+ "fmt"
+
+ "cloud.google.com/go/spanner/internal/benchserver"
+)
+
+func main() {
+ ms, err := benchserver.NewMockCloudSpanner()
+ if err != nil {
+ panic(err)
+ }
+ fmt.Println("Running on", ms.Addr())
+ panic(ms.Serve())
+}