spanner: Add resource-based routing

This adds a step to retrieve the instance-specific endpoint before
creating the session client when creating a new spanner client.
It includes the following changes:

* Added an extra step to get the instance-specific endpoint when
creating a new client.
* Added a mocked instance admin server for testing purpose.
* Added an environment variable
"GOOGLE_CLOUD_SPANNER_ENABLE_RESOURCE_BASED_ROUTING"
to enable this feature. By default, it is disabled.
* If there is a PermissionDenied error when calling GetInstance(),
fallback to use the global endpoint or the user-specified endpoint.
* Included tests to verify the functionality.

Change-Id: Ie447d13b8e414f6299fc56acb678d293531849ae
Reviewed-on: https://code-review.googlesource.com/c/gocloud/+/48895
Reviewed-by: kokoro <noreply+kokoro@google.com>
Reviewed-by: Reid Hironaga <rhiro@google.com>
Reviewed-by: Shanika Kuruppu <skuruppu@google.com>
diff --git a/spanner/client.go b/spanner/client.go
index ae05960..4f0ee8e 100644
--- a/spanner/client.go
+++ b/spanner/client.go
@@ -25,12 +25,16 @@
 	"time"
 
 	"cloud.google.com/go/internal/trace"
+	instance "cloud.google.com/go/spanner/admin/instance/apiv1"
 	vkit "cloud.google.com/go/spanner/apiv1"
 	"google.golang.org/api/option"
+	instancepb "google.golang.org/genproto/googleapis/spanner/admin/instance/v1"
 	sppb "google.golang.org/genproto/googleapis/spanner/v1"
+	field_mask "google.golang.org/genproto/protobuf/field_mask"
 	"google.golang.org/grpc"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/metadata"
+	"google.golang.org/grpc/status"
 )
 
 const (
@@ -50,7 +54,8 @@
 )
 
 var (
-	validDBPattern = regexp.MustCompile("^projects/[^/]+/instances/[^/]+/databases/[^/]+$")
+	validDBPattern       = regexp.MustCompile("^projects/[^/]+/instances/[^/]+/databases/[^/]+$")
+	validInstancePattern = regexp.MustCompile("^projects/[^/]+/instances/[^/]+")
 )
 
 func validDatabaseName(db string) error {
@@ -61,6 +66,15 @@
 	return nil
 }
 
+func getInstanceName(db string) (string, error) {
+	matches := validInstancePattern.FindStringSubmatch(db)
+	if len(matches) == 0 {
+		return "", fmt.Errorf("Failed to retrieve instance name from %q according to pattern %q",
+			db, validInstancePattern.String())
+	}
+	return matches[0], nil
+}
+
 // Client is a client for reading and writing data to a Cloud Spanner database.
 // A client is safe to use concurrently, except for its Close method.
 type Client struct {
@@ -103,6 +117,42 @@
 	return metadata.NewOutgoingContext(ctx, md)
 }
 
+// getInstanceEndpoint returns an instance-specific endpoint if one exists. If
+// multiple endpoints exist, it returns the first one.
+func getInstanceEndpoint(ctx context.Context, database string, opts ...option.ClientOption) (string, error) {
+	instanceName, err := getInstanceName(database)
+	if err != nil {
+		return "", fmt.Errorf("Failed to resolve endpoint: %v", err)
+	}
+
+	c, err := instance.NewInstanceAdminClient(ctx, opts...)
+	if err != nil {
+		return "", err
+	}
+	defer c.Close()
+
+	req := &instancepb.GetInstanceRequest{
+		Name: instanceName,
+		FieldMask: &field_mask.FieldMask{
+			Paths: []string{"endpoint_uris"},
+		},
+	}
+
+	resp, err := c.GetInstance(ctx, req)
+	if err != nil {
+		return "", err
+	}
+
+	endpointURIs := resp.GetEndpointUris()
+
+	if len(endpointURIs) > 0 {
+		return endpointURIs[0], nil
+	}
+
+	// Return empty string when no endpoints exist.
+	return "", nil
+}
+
 // NewClient creates a client to a database. A valid database name has the
 // form projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID. It uses
 // a default configuration.
@@ -142,6 +192,32 @@
 			option.WithoutAuthentication(),
 		}
 		opts = append(opts, emulatorOpts...)
+	} else if os.Getenv("GOOGLE_CLOUD_SPANNER_ENABLE_RESOURCE_BASED_ROUTING") == "true" {
+		// Fetch the instance-specific endpoint.
+		reqOpts := []option.ClientOption{option.WithEndpoint(endpoint)}
+		reqOpts = append(reqOpts, opts...)
+		instanceEndpoint, err := getInstanceEndpoint(ctx, database, reqOpts...)
+
+		if err != nil {
+			// If there is a PermissionDenied error, fall back to use the global endpoint
+			// or the user-specified endpoint.
+			if status.Code(err) == codes.PermissionDenied {
+				logf(config.logger, `
+Warning: The client library attempted to connect to an endpoint closer to your
+Cloud Spanner data but was unable to do so. The client library will fall back
+and route requests to the global Spanner endpoint (spanner.googleapis.com),
+which may result in increased latency. We recommend including the scope
+https://www.googleapis.com/auth/spanner.admin so that the client library can
+get an instance-specific endpoint and efficiently route requests.
+`)
+			} else {
+				return nil, err
+			}
+		}
+
+		if instanceEndpoint != "" {
+			opts = append(opts, option.WithEndpoint(instanceEndpoint))
+		}
 	}
 
 	// gRPC options.
diff --git a/spanner/client_test.go b/spanner/client_test.go
index 8f08f38..40cf693 100644
--- a/spanner/client_test.go
+++ b/spanner/client_test.go
@@ -20,14 +20,20 @@
 	"context"
 	"fmt"
 	"io"
+	"io/ioutil"
+	"log"
+	"os"
 	"strings"
 	"testing"
 	"time"
 
 	itestutil "cloud.google.com/go/internal/testutil"
 	. "cloud.google.com/go/spanner/internal/testutil"
+	"github.com/golang/protobuf/proto"
 	"google.golang.org/api/iterator"
 	"google.golang.org/api/option"
+	instancepb "google.golang.org/genproto/googleapis/spanner/admin/instance/v1"
+	sppb "google.golang.org/genproto/googleapis/spanner/v1"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
 )
@@ -94,6 +100,34 @@
 	}
 }
 
+// Test getInstanceName()
+func TestGetInstanceName(t *testing.T) {
+	validDbURI := "projects/spanner-cloud-test/instances/foo/databases/foodb"
+	invalidDbUris := []string{
+		// Completely wrong DB URI.
+		"foobarDB",
+		// Project ID contains "/".
+		"projects/spanner-cloud/test/instances/foo/databases/foodb",
+		// No instance ID.
+		"projects/spanner-cloud-test/instances//databases/foodb",
+	}
+	want := "projects/spanner-cloud-test/instances/foo"
+	got, err := getInstanceName(validDbURI)
+	if err != nil {
+		t.Errorf("getInstanceName(%q) has an error: %q, want nil", validDbURI, err)
+	}
+	if got != want {
+		t.Errorf("getInstanceName(%q) = %q, want %q", validDbURI, got, want)
+	}
+	for _, d := range invalidDbUris {
+		wantErr := "Failed to retrieve instance name"
+		_, err = getInstanceName(d)
+		if !strings.Contains(err.Error(), wantErr) {
+			t.Errorf("getInstanceName(%q) has an error: %q, want error pattern %q", validDbURI, err, wantErr)
+		}
+	}
+}
+
 func TestReadOnlyTransactionClose(t *testing.T) {
 	// Closing a ReadOnlyTransaction shouldn't panic.
 	c := &Client{}
@@ -121,7 +155,7 @@
 	t.Parallel()
 	err := testSingleQuery(t, status.Error(codes.InvalidArgument, "Invalid argument"))
 	if status.Code(err) != codes.InvalidArgument {
-		t.Fatalf("got unexpected exception %v, expected InvalidArgument", err)
+		t.Fatalf("got: %v, want: %v", err, codes.InvalidArgument)
 	}
 }
 
@@ -289,6 +323,157 @@
 	}
 }
 
+func TestClient_ResourceBasedRouting_WithEndpointsReturned(t *testing.T) {
+	os.Setenv("GOOGLE_CLOUD_SPANNER_ENABLE_RESOURCE_BASED_ROUTING", "true")
+	defer os.Setenv("GOOGLE_CLOUD_SPANNER_ENABLE_RESOURCE_BASED_ROUTING", "")
+
+	// Create two servers. The base server receives the GetInstance request and
+	// returns the instance endpoint of the target server. The client should contact
+	// the target server after getting the instance endpoint.
+	serverBase, optsBase, serverTeardownBase := NewMockedSpannerInMemTestServerWithAddr(t, "localhost:8081")
+	defer serverTeardownBase()
+	serverTarget, optsTarget, serverTeardownTarget := NewMockedSpannerInMemTestServerWithAddr(t, "localhost:8082")
+	defer serverTeardownTarget()
+
+	// Return the instance endpoint.
+	instanceEndpoint := fmt.Sprintf("%s", optsTarget[0])
+	resps := []proto.Message{&instancepb.Instance{
+		EndpointUris: []string{instanceEndpoint},
+	}}
+	serverBase.TestInstanceAdmin.SetResps(resps)
+
+	ctx := context.Background()
+	formattedDatabase := fmt.Sprintf("projects/%s/instances/%s/databases/%s", "some-project", "some-instance", "some-database")
+	client, err := NewClientWithConfig(ctx, formattedDatabase, ClientConfig{}, optsBase...)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if err := executeSingerQuery(ctx, client.Single()); err != nil {
+		t.Fatal(err)
+	}
+
+	// The base server should not receive any requests.
+	if _, err := shouldHaveReceived(serverBase.TestSpanner, []interface{}{}); err != nil {
+		t.Fatal(err)
+	}
+
+	// The target server should receive requests.
+	if _, err = shouldHaveReceived(serverTarget.TestSpanner, []interface{}{
+		&sppb.CreateSessionRequest{},
+		&sppb.ExecuteSqlRequest{},
+	}); err != nil {
+		t.Fatal(err)
+	}
+}
+
+func TestClient_ResourceBasedRouting_WithoutEndpointsReturned(t *testing.T) {
+	os.Setenv("GOOGLE_CLOUD_SPANNER_ENABLE_RESOURCE_BASED_ROUTING", "true")
+	defer os.Setenv("GOOGLE_CLOUD_SPANNER_ENABLE_RESOURCE_BASED_ROUTING", "")
+
+	server, opts, serverTeardown := NewMockedSpannerInMemTestServer(t)
+	defer serverTeardown()
+
+	// Return an empty list of endpoints.
+	resps := []proto.Message{&instancepb.Instance{
+		EndpointUris: []string{},
+	}}
+	server.TestInstanceAdmin.SetResps(resps)
+
+	ctx := context.Background()
+	formattedDatabase := fmt.Sprintf("projects/%s/instances/%s/databases/%s", "some-project", "some-instance", "some-database")
+	client, err := NewClientWithConfig(ctx, formattedDatabase, ClientConfig{}, opts...)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if err := executeSingerQuery(ctx, client.Single()); err != nil {
+		t.Fatal(err)
+	}
+
+	// Check if the request goes to the default endpoint.
+	if _, err := shouldHaveReceived(server.TestSpanner, []interface{}{
+		&sppb.CreateSessionRequest{},
+		&sppb.ExecuteSqlRequest{},
+	}); err != nil {
+		t.Fatal(err)
+	}
+}
+
+func TestClient_ResourceBasedRouting_WithPermissionDeniedError(t *testing.T) {
+	os.Setenv("GOOGLE_CLOUD_SPANNER_ENABLE_RESOURCE_BASED_ROUTING", "true")
+	defer os.Setenv("GOOGLE_CLOUD_SPANNER_ENABLE_RESOURCE_BASED_ROUTING", "")
+
+	server, opts, serverTeardown := NewMockedSpannerInMemTestServer(t)
+	defer serverTeardown()
+
+	server.TestInstanceAdmin.SetErr(status.Error(codes.PermissionDenied, "Permission Denied"))
+
+	ctx := context.Background()
+	formattedDatabase := fmt.Sprintf("projects/%s/instances/%s/databases/%s", "some-project", "some-instance", "some-database")
+	// `PermissionDeniedError` causes a warning message to be logged, which is expected.
+	// We set the output to be discarded to avoid spamming the log.
+	logger := log.New(ioutil.Discard, "", log.LstdFlags)
+	client, err := NewClientWithConfig(ctx, formattedDatabase, ClientConfig{logger: logger}, opts...)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if err := executeSingerQuery(ctx, client.Single()); err != nil {
+		t.Fatal(err)
+	}
+
+	// Fallback to use the default endpoint when calling GetInstance() returns
+	// a PermissionDenied error.
+	if _, err := shouldHaveReceived(server.TestSpanner, []interface{}{
+		&sppb.CreateSessionRequest{},
+		&sppb.ExecuteSqlRequest{},
+	}); err != nil {
+		t.Fatal(err)
+	}
+}
+
+func TestClient_ResourceBasedRouting_WithUnavailableError(t *testing.T) {
+	os.Setenv("GOOGLE_CLOUD_SPANNER_ENABLE_RESOURCE_BASED_ROUTING", "true")
+	defer os.Setenv("GOOGLE_CLOUD_SPANNER_ENABLE_RESOURCE_BASED_ROUTING", "")
+
+	server, opts, serverTeardown := NewMockedSpannerInMemTestServer(t)
+	defer serverTeardown()
+
+	resps := []proto.Message{&instancepb.Instance{
+		EndpointUris: []string{},
+	}}
+	server.TestInstanceAdmin.SetResps(resps)
+	server.TestInstanceAdmin.SetErr(status.Error(codes.Unavailable, "Temporary unavailable"))
+
+	ctx := context.Background()
+	formattedDatabase := fmt.Sprintf("projects/%s/instances/%s/databases/%s", "some-project", "some-instance", "some-database")
+	_, err := NewClientWithConfig(ctx, formattedDatabase, ClientConfig{}, opts...)
+	// The first request will get an error and the server resets the error to nil,
+	// so the next request will be fine. Due to retrying, there is no errors.
+	if err != nil {
+		t.Fatal(err)
+	}
+}
+
+func TestClient_ResourceBasedRouting_WithInvalidArgumentError(t *testing.T) {
+	os.Setenv("GOOGLE_CLOUD_SPANNER_ENABLE_RESOURCE_BASED_ROUTING", "true")
+	defer os.Setenv("GOOGLE_CLOUD_SPANNER_ENABLE_RESOURCE_BASED_ROUTING", "")
+
+	server, opts, serverTeardown := NewMockedSpannerInMemTestServer(t)
+	defer serverTeardown()
+
+	server.TestInstanceAdmin.SetErr(status.Error(codes.InvalidArgument, "Invalid argument"))
+
+	ctx := context.Background()
+	formattedDatabase := fmt.Sprintf("projects/%s/instances/%s/databases/%s", "some-project", "some-instance", "some-database")
+	_, err := NewClientWithConfig(ctx, formattedDatabase, ClientConfig{}, opts...)
+
+	if status.Code(err) != codes.InvalidArgument {
+		t.Fatalf("got unexpected exception %v, expected InvalidArgument", err)
+	}
+}
+
 func testSingleQuery(t *testing.T, serverError error) error {
 	ctx := context.Background()
 	server, client, teardown := setupMockedTestServer(t)
diff --git a/spanner/integration_test.go b/spanner/integration_test.go
index 70c19c9..92324d3 100644
--- a/spanner/integration_test.go
+++ b/spanner/integration_test.go
@@ -440,6 +440,50 @@
 	}
 }
 
+// Test resource-based routing enabled.
+func TestIntegration_SingleUse_WithResourceBasedRouting(t *testing.T) {
+	os.Setenv("GOOGLE_CLOUD_SPANNER_ENABLE_RESOURCE_BASED_ROUTING", "true")
+	defer os.Setenv("GOOGLE_CLOUD_SPANNER_ENABLE_RESOURCE_BASED_ROUTING", "")
+
+	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
+	defer cancel()
+	// Set up testing environment.
+	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
+	defer cleanup()
+
+	writes := []struct {
+		row []interface{}
+		ts  time.Time
+	}{
+		{row: []interface{}{1, "Marc", "Foo"}},
+		{row: []interface{}{2, "Tars", "Bar"}},
+		{row: []interface{}{3, "Alpha", "Beta"}},
+		{row: []interface{}{4, "Last", "End"}},
+	}
+	// Try to write four rows through the Apply API.
+	for i, w := range writes {
+		var err error
+		m := InsertOrUpdate("Singers",
+			[]string{"SingerId", "FirstName", "LastName"},
+			w.row)
+		if writes[i].ts, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil {
+			t.Fatal(err)
+		}
+	}
+
+	row, err := client.Single().ReadRow(ctx, "Singers", Key{3}, []string{"FirstName"})
+	if err != nil {
+		t.Errorf("SingleUse.ReadRow returns error %v, want nil", err)
+	}
+	var got string
+	if err := row.Column(0, &got); err != nil {
+		t.Errorf("row.Column returns error %v, want nil", err)
+	}
+	if want := "Alpha"; got != want {
+		t.Errorf("got %q, want %q", got, want)
+	}
+}
+
 func TestIntegration_SingleUse_ReadingWithLimit(t *testing.T) {
 	t.Parallel()
 
@@ -2594,7 +2638,7 @@
 func createClient(ctx context.Context, dbPath string, spc SessionPoolConfig) (client *Client, err error) {
 	client, err = NewClientWithConfig(ctx, dbPath, ClientConfig{
 		SessionPoolConfig: spc,
-	}, option.WithTokenSource(testutil.TokenSource(ctx, Scope)), option.WithEndpoint(endpoint))
+	}, option.WithTokenSource(testutil.TokenSource(ctx, Scope, AdminScope)), option.WithEndpoint(endpoint))
 	if err != nil {
 		return nil, fmt.Errorf("cannot create data client on DB %v: %v", dbPath, err)
 	}
diff --git a/spanner/internal/testutil/inmem_instance_admin_server.go b/spanner/internal/testutil/inmem_instance_admin_server.go
new file mode 100644
index 0000000..f02dce0
--- /dev/null
+++ b/spanner/internal/testutil/inmem_instance_admin_server.go
@@ -0,0 +1,86 @@
+// Copyright 2020 Google LLC
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package testutil
+
+import (
+	"context"
+
+	"github.com/golang/protobuf/proto"
+	instancepb "google.golang.org/genproto/googleapis/spanner/admin/instance/v1"
+)
+
+// InMemInstanceAdminServer contains the InstanceAdminServer interface plus a couple
+// of specific methods for setting mocked results.
+type InMemInstanceAdminServer interface {
+	instancepb.InstanceAdminServer
+	Stop()
+	Resps() []proto.Message
+	SetResps([]proto.Message)
+	Reqs() []proto.Message
+	SetReqs([]proto.Message)
+	SetErr(error)
+}
+
+// inMemInstanceAdminServer implements InMemInstanceAdminServer interface. Note that
+// there is no mutex protecting the data structures, so it is not safe for
+// concurrent use.
+type inMemInstanceAdminServer struct {
+	instancepb.InstanceAdminServer
+	reqs []proto.Message
+	// If set, all calls return this error
+	err error
+	// responses to return if err == nil
+	resps []proto.Message
+}
+
+// NewInMemInstanceAdminServer creates a new in-mem test server.
+func NewInMemInstanceAdminServer() InMemInstanceAdminServer {
+	res := &inMemInstanceAdminServer{}
+	return res
+}
+
+// GetInstance returns the metadata of a spanner instance.
+func (s *inMemInstanceAdminServer) GetInstance(ctx context.Context, req *instancepb.GetInstanceRequest) (*instancepb.Instance, error) {
+	s.reqs = append(s.reqs, req)
+	if s.err != nil {
+		defer func() { s.err = nil }()
+		return nil, s.err
+	}
+	return s.resps[0].(*instancepb.Instance), nil
+}
+
+func (s *inMemInstanceAdminServer) Stop() {
+	// do nothing
+}
+
+func (s *inMemInstanceAdminServer) Resps() []proto.Message {
+	return s.resps
+}
+
+func (s *inMemInstanceAdminServer) SetResps(resps []proto.Message) {
+	s.resps = resps
+}
+
+func (s *inMemInstanceAdminServer) Reqs() []proto.Message {
+	return s.reqs
+}
+
+func (s *inMemInstanceAdminServer) SetReqs(reqs []proto.Message) {
+	s.reqs = reqs
+}
+
+func (s *inMemInstanceAdminServer) SetErr(err error) {
+	s.err = err
+}
diff --git a/spanner/internal/testutil/inmem_instance_admin_server_test.go b/spanner/internal/testutil/inmem_instance_admin_server_test.go
new file mode 100644
index 0000000..f4e3b22
--- /dev/null
+++ b/spanner/internal/testutil/inmem_instance_admin_server_test.go
@@ -0,0 +1,95 @@
+// Copyright 2020 Google LLC
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package testutil_test
+
+import (
+	"context"
+	"flag"
+	"fmt"
+	"log"
+	"net"
+	"testing"
+
+	instance "cloud.google.com/go/spanner/admin/instance/apiv1"
+	"cloud.google.com/go/spanner/internal/testutil"
+	"github.com/golang/protobuf/proto"
+	"google.golang.org/api/option"
+	instancepb "google.golang.org/genproto/googleapis/spanner/admin/instance/v1"
+	"google.golang.org/grpc"
+)
+
+var instanceClientOpt option.ClientOption
+
+var (
+	mockInstanceAdmin = testutil.NewInMemInstanceAdminServer()
+)
+
+func setupInstanceAdminServer() {
+	flag.Parse()
+
+	serv := grpc.NewServer()
+	instancepb.RegisterInstanceAdminServer(serv, mockInstanceAdmin)
+
+	lis, err := net.Listen("tcp", "localhost:0")
+	if err != nil {
+		log.Fatal(err)
+	}
+	go serv.Serve(lis)
+
+	conn, err := grpc.Dial(lis.Addr().String(), grpc.WithInsecure())
+	if err != nil {
+		log.Fatal(err)
+	}
+	instanceClientOpt = option.WithGRPCConn(conn)
+}
+
+func TestInstanceAdminGetInstance(t *testing.T) {
+	setupInstanceAdminServer()
+	var expectedResponse = &instancepb.Instance{
+		Name:        "name2-1052831874",
+		Config:      "config-1354792126",
+		DisplayName: "displayName1615086568",
+		NodeCount:   1539922066,
+	}
+
+	mockInstanceAdmin.SetErr(nil)
+	mockInstanceAdmin.SetReqs(nil)
+
+	mockInstanceAdmin.SetResps(append(mockInstanceAdmin.Resps()[:0], expectedResponse))
+
+	var formattedName string = fmt.Sprintf("projects/%s/instances/%s", "[PROJECT]", "[INSTANCE]")
+	var request = &instancepb.GetInstanceRequest{
+		Name: formattedName,
+	}
+
+	c, err := instance.NewInstanceAdminClient(context.Background(), instanceClientOpt)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	resp, err := c.GetInstance(context.Background(), request)
+
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if want, got := request, mockInstanceAdmin.Reqs()[0]; !proto.Equal(want, got) {
+		t.Errorf("wrong request %q, want %q", got, want)
+	}
+
+	if want, got := expectedResponse, resp; !proto.Equal(want, got) {
+		t.Errorf("wrong response %q, want %q)", got, want)
+	}
+}
diff --git a/spanner/internal/testutil/mocked_inmem_server.go b/spanner/internal/testutil/mocked_inmem_server.go
index 9ac328f..028f038 100644
--- a/spanner/internal/testutil/mocked_inmem_server.go
+++ b/spanner/internal/testutil/mocked_inmem_server.go
@@ -22,6 +22,7 @@
 
 	structpb "github.com/golang/protobuf/ptypes/struct"
 	"google.golang.org/api/option"
+	instancepb "google.golang.org/genproto/googleapis/spanner/admin/instance/v1"
 	spannerpb "google.golang.org/genproto/googleapis/spanner/v1"
 	"google.golang.org/grpc"
 )
@@ -58,29 +59,41 @@
 // MockedSpannerInMemTestServer is an InMemSpannerServer with results for a
 // number of SQL statements readily mocked.
 type MockedSpannerInMemTestServer struct {
-	TestSpanner InMemSpannerServer
-	server      *grpc.Server
+	TestSpanner       InMemSpannerServer
+	TestInstanceAdmin InMemInstanceAdminServer
+	server            *grpc.Server
 }
 
-// NewMockedSpannerInMemTestServer creates a MockedSpannerInMemTestServer and
-// returns client options that can be used to connect to it.
+// NewMockedSpannerInMemTestServer creates a MockedSpannerInMemTestServer at
+// localhost with a random port and returns client options that can be used
+// to connect to it.
 func NewMockedSpannerInMemTestServer(t *testing.T) (mockedServer *MockedSpannerInMemTestServer, opts []option.ClientOption, teardown func()) {
+	return NewMockedSpannerInMemTestServerWithAddr(t, "localhost:0")
+}
+
+// NewMockedSpannerInMemTestServerWithAddr creates a MockedSpannerInMemTestServer
+// at a given listening address and returns client options that can be used
+// to connect to it.
+func NewMockedSpannerInMemTestServerWithAddr(t *testing.T, addr string) (mockedServer *MockedSpannerInMemTestServer, opts []option.ClientOption, teardown func()) {
 	mockedServer = &MockedSpannerInMemTestServer{}
-	opts = mockedServer.setupMockedServer(t)
+	opts = mockedServer.setupMockedServerWithAddr(t, addr)
 	return mockedServer, opts, func() {
 		mockedServer.TestSpanner.Stop()
+		mockedServer.TestInstanceAdmin.Stop()
 		mockedServer.server.Stop()
 	}
 }
 
-func (s *MockedSpannerInMemTestServer) setupMockedServer(t *testing.T) []option.ClientOption {
+func (s *MockedSpannerInMemTestServer) setupMockedServerWithAddr(t *testing.T, addr string) []option.ClientOption {
 	s.TestSpanner = NewInMemSpannerServer()
+	s.TestInstanceAdmin = NewInMemInstanceAdminServer()
 	s.setupFooResults()
 	s.setupSingersResults()
 	s.server = grpc.NewServer()
 	spannerpb.RegisterSpannerServer(s.server, s.TestSpanner)
+	instancepb.RegisterInstanceAdminServer(s.server, s.TestInstanceAdmin)
 
-	lis, err := net.Listen("tcp", "localhost:0")
+	lis, err := net.Listen("tcp", addr)
 	if err != nil {
 		t.Fatal(err)
 	}