spanner: add basic benchserver, enable emulator usage

- Allow emulator usage via SPANNER_EMULATOR_HOST.
- Add basic benchserver that just returns OK responses with zero processing.

Change-Id: I52398fc75b35cd25a97a65c8d528a7fc5081fe99
Reviewed-on: https://code-review.googlesource.com/c/gocloud/+/42950
Reviewed-by: Emmanuel Odeke <emm.odeke@gmail.com>
diff --git a/spanner/client.go b/spanner/client.go
index 38c3f49..21e5045 100644
--- a/spanner/client.go
+++ b/spanner/client.go
@@ -19,6 +19,7 @@
 import (
 	"context"
 	"fmt"
+	"os"
 	"regexp"
 	"sync/atomic"
 	"time"
@@ -122,13 +123,6 @@
 // NewClientWithConfig creates a client to a database. A valid database name has
 // the form projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID.
 func NewClientWithConfig(ctx context.Context, database string, config ClientConfig, opts ...option.ClientOption) (c *Client, err error) {
-	ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.NewClient")
-	defer func() { trace.EndSpan(ctx, err) }()
-
-	// Validate database path.
-	if err := validDatabaseName(database); err != nil {
-		return nil, err
-	}
 	c = &Client{
 		database: database,
 		md: metadata.Pairs(
@@ -142,19 +136,6 @@
 		c.sessionLabels[k] = v
 	}
 
-	// gRPC options.
-	allOpts := []option.ClientOption{
-		option.WithEndpoint(endpoint),
-		option.WithScopes(Scope),
-		option.WithGRPCDialOption(
-			grpc.WithDefaultCallOptions(
-				grpc.MaxCallSendMsgSize(100<<20),
-				grpc.MaxCallRecvMsgSize(100<<20),
-			),
-		),
-	}
-	allOpts = append(allOpts, opts...)
-
 	// Prepare gRPC channels.
 	if config.NumChannels == 0 {
 		config.NumChannels = numChannels
@@ -168,13 +149,46 @@
 		config.MaxBurst = 10
 	}
 
-	// TODO(deklerk): This should be replaced with a balancer with
-	// config.NumChannels connections, instead of config.NumChannels
-	// clients.
-	for i := 0; i < config.NumChannels; i++ {
-		client, err := vkit.NewClient(ctx, allOpts...)
+	if emulatorAddr := os.Getenv("SPANNER_EMULATOR_HOST"); emulatorAddr == "" {
+		// Validate database path.
+		if err := validDatabaseName(database); err != nil {
+			return nil, err
+		}
+
+		ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.NewClient")
+		defer func() { trace.EndSpan(ctx, err) }()
+
+		// gRPC options.
+		allOpts := []option.ClientOption{
+			option.WithEndpoint(endpoint),
+			option.WithScopes(Scope),
+			option.WithGRPCDialOption(
+				grpc.WithDefaultCallOptions(
+					grpc.MaxCallSendMsgSize(100<<20),
+					grpc.MaxCallRecvMsgSize(100<<20),
+				),
+			),
+		}
+		allOpts = append(allOpts, opts...)
+
+		// TODO(deklerk): This should be replaced with a balancer with
+		// config.NumChannels connections, instead of config.NumChannels
+		// clients.
+		for i := 0; i < config.NumChannels; i++ {
+			client, err := vkit.NewClient(ctx, allOpts...)
+			if err != nil {
+				return nil, errDial(i, err)
+			}
+			c.clients = append(c.clients, client)
+		}
+	} else {
+		conn, err := grpc.Dial(emulatorAddr, grpc.WithInsecure())
 		if err != nil {
-			return nil, errDial(i, err)
+			return nil, errDial(0, err)
+		}
+		client, err := vkit.NewClient(ctx, option.WithGRPCConn(conn))
+		if err != nil {
+			return nil, errDial(0, err)
 		}
 		c.clients = append(c.clients, client)
 	}
diff --git a/spanner/client_test.go b/spanner/client_test.go
index 3df5366..49b5485 100644
--- a/spanner/client_test.go
+++ b/spanner/client_test.go
@@ -19,9 +19,11 @@
 import (
 	"context"
 	"io"
+	"os"
 	"strings"
 	"testing"
 
+	"cloud.google.com/go/spanner/internal/benchserver"
 	"cloud.google.com/go/spanner/internal/testutil"
 	"google.golang.org/api/iterator"
 	"google.golang.org/grpc/codes"
@@ -407,3 +409,29 @@
 		t.Fatalf("unexpected number of attempts: %d, expected %d", attempts, 1)
 	}
 }
+
+func TestNewClient_ConnectToEmulator(t *testing.T) {
+	ctx := context.Background()
+
+	s, err := benchserver.NewMockCloudSpanner()
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer s.Stop()
+	go s.Serve()
+
+	oldEnv := os.Getenv("SPANNER_EMULATOR_HOST")
+	os.Setenv("SPANNER_EMULATOR_HOST", s.Addr())
+	defer os.Setenv("SPANNER_EMULATOR_HOST", oldEnv)
+
+	c, err := NewClient(ctx, "some-db")
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer c.Close()
+
+	_, err = c.Single().ReadRow(ctx, "Accounts", Key{"alice"}, []string{"balance"})
+	if err != nil {
+		t.Fatal(err)
+	}
+}
diff --git a/spanner/internal/benchserver/benchserver.go b/spanner/internal/benchserver/benchserver.go
new file mode 100644
index 0000000..32c0d2c
--- /dev/null
+++ b/spanner/internal/benchserver/benchserver.go
@@ -0,0 +1,145 @@
+/*
+Copyright 2019 Google LLC
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package benchserver
+
+import (
+	"context"
+	"encoding/binary"
+	"net"
+	"time"
+
+	"github.com/golang/protobuf/ptypes/empty"
+	proto3 "github.com/golang/protobuf/ptypes/struct"
+	pbt "github.com/golang/protobuf/ptypes/timestamp"
+	sppb "google.golang.org/genproto/googleapis/spanner/v1"
+	"google.golang.org/grpc"
+)
+
+var (
+	// KvMeta is the Metadata for mocked KV table.
+	KvMeta = sppb.ResultSetMetadata{
+		RowType: &sppb.StructType{
+			Fields: []*sppb.StructType_Field{
+				{
+					Name: "Key",
+					Type: &sppb.Type{Code: sppb.TypeCode_STRING},
+				},
+				{
+					Name: "Value",
+					Type: &sppb.Type{Code: sppb.TypeCode_STRING},
+				},
+			},
+		},
+	}
+)
+
+// MockCloudSpanner is a mock implementation of SpannerServer interface.
+type MockCloudSpanner struct {
+	sppb.SpannerServer
+
+	gsrv *grpc.Server
+	lis  net.Listener
+	addr string
+}
+
+// NewMockCloudSpanner creates a new MockCloudSpanner instance.
+func NewMockCloudSpanner() (*MockCloudSpanner, error) {
+	gsrv := grpc.NewServer()
+	lis, err := net.Listen("tcp", "localhost:0")
+	if err != nil {
+		return nil, err
+	}
+	mcs := &MockCloudSpanner{
+		gsrv: gsrv,
+		lis:  lis,
+		addr: lis.Addr().String(),
+	}
+	sppb.RegisterSpannerServer(gsrv, mcs)
+	return mcs, nil
+}
+
+// Serve starts the server and blocks.
+func (m *MockCloudSpanner) Serve() error {
+	return m.gsrv.Serve(m.lis)
+}
+
+// Addr returns the listening address of mock server.
+func (m *MockCloudSpanner) Addr() string {
+	return m.addr
+}
+
+// Stop terminates MockCloudSpanner and closes the serving port.
+func (m *MockCloudSpanner) Stop() {
+	m.gsrv.Stop()
+}
+
+// CreateSession is a placeholder for SpannerServer.CreateSession.
+func (m *MockCloudSpanner) CreateSession(c context.Context, r *sppb.CreateSessionRequest) (*sppb.Session, error) {
+	return &sppb.Session{Name: "some-session"}, nil
+}
+
+// DeleteSession is a placeholder for SpannerServer.DeleteSession.
+func (m *MockCloudSpanner) DeleteSession(c context.Context, r *sppb.DeleteSessionRequest) (*empty.Empty, error) {
+	return &empty.Empty{}, nil
+}
+
+// ExecuteStreamingSql is a mock implementation of SpannerServer.ExecuteStreamingSql.
+func (m *MockCloudSpanner) ExecuteStreamingSql(r *sppb.ExecuteSqlRequest, s sppb.Spanner_ExecuteStreamingSqlServer) error {
+	rt := EncodeResumeToken(uint64(1))
+	meta := KvMeta
+	meta.Transaction = &sppb.Transaction{
+		ReadTimestamp: &pbt.Timestamp{
+			Seconds: time.Now().Unix(),
+			Nanos:   int32(time.Now().Nanosecond()),
+		},
+	}
+	return s.Send(&sppb.PartialResultSet{
+		Metadata: &meta,
+		Values: []*proto3.Value{
+			{Kind: &proto3.Value_StringValue{StringValue: "foo"}},
+			{Kind: &proto3.Value_StringValue{StringValue: "bar"}},
+		},
+		ResumeToken: rt,
+	})
+}
+
+// StreamingRead is a placeholder for SpannerServer.StreamingRead.
+func (m *MockCloudSpanner) StreamingRead(r *sppb.ReadRequest, s sppb.Spanner_StreamingReadServer) error {
+	rt := EncodeResumeToken(uint64(1))
+	meta := KvMeta
+	meta.Transaction = &sppb.Transaction{
+		ReadTimestamp: &pbt.Timestamp{
+			Seconds: -1,
+			Nanos:   -1,
+		},
+	}
+	return s.Send(&sppb.PartialResultSet{
+		Metadata: &meta,
+		Values: []*proto3.Value{
+			{Kind: &proto3.Value_StringValue{StringValue: "foo"}},
+			{Kind: &proto3.Value_StringValue{StringValue: "bar"}},
+		},
+		ResumeToken: rt,
+	})
+}
+
+// EncodeResumeToken return mock resume token encoding for an uint64 integer.
+func EncodeResumeToken(t uint64) []byte {
+	rt := make([]byte, 16)
+	binary.PutUvarint(rt, t)
+	return rt
+}
diff --git a/spanner/internal/cmd/benchserver/main.go b/spanner/internal/cmd/benchserver/main.go
new file mode 100644
index 0000000..275afbc
--- /dev/null
+++ b/spanner/internal/cmd/benchserver/main.go
@@ -0,0 +1,30 @@
+// Copyright 2019 Google LLC
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+	"fmt"
+
+	"cloud.google.com/go/spanner/internal/benchserver"
+)
+
+func main() {
+	ms, err := benchserver.NewMockCloudSpanner()
+	if err != nil {
+		panic(err)
+	}
+	fmt.Println("Running on", ms.Addr())
+	panic(ms.Serve())
+}