spanner: use ResourceInfo to extract error

Detecting a 'Session not found' error should be extracted from the
ResourceInfo of the underlying statusErr instead of depending on the
error message.

Updates #1814.

Change-Id: I99e448ef4242573b8fb00e0020de839c4bd39660
Reviewed-on: https://code-review.googlesource.com/c/gocloud/+/52791
Reviewed-by: kokoro <noreply+kokoro@google.com>
Reviewed-by: Hengfeng Li <hengfeng@google.com>
diff --git a/spanner/client_test.go b/spanner/client_test.go
index 13e239f..fd81d6b 100644
--- a/spanner/client_test.go
+++ b/spanner/client_test.go
@@ -168,7 +168,7 @@
 	defer teardown()
 	server.TestSpanner.PutExecutionTime(
 		MethodExecuteStreamingSql,
-		SimulatedExecutionTime{Errors: []error{status.Errorf(codes.NotFound, "Session not found")}},
+		SimulatedExecutionTime{Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}},
 	)
 	ctx := context.Background()
 	iter := client.Single().Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums))
@@ -250,7 +250,7 @@
 		SelectSingerIDAlbumIDAlbumTitleFromAlbums,
 		PartialResultSetExecutionTime{
 			ResumeToken: EncodeResumeToken(3),
-			Err:         status.Errorf(codes.NotFound, "Session not found"),
+			Err:         newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s"),
 		},
 	)
 	ctx := context.Background()
@@ -593,9 +593,9 @@
 	// transaction, as we would need to start a new transaction on a new
 	// session.
 	err := testReadOnlyTransaction(t, map[string]SimulatedExecutionTime{
-		MethodExecuteStreamingSql: {Errors: []error{status.Errorf(codes.NotFound, "Session not found")}},
+		MethodExecuteStreamingSql: {Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}},
 	})
-	want := spannerErrorf(codes.NotFound, "Session not found")
+	want := toSpannerError(newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s"))
 	if err == nil {
 		t.Fatalf("missing expected error\nGot: nil\nWant: %v", want)
 	}
@@ -633,7 +633,7 @@
 	if err := testReadOnlyTransaction(
 		t,
 		map[string]SimulatedExecutionTime{
-			MethodBeginTransaction: {Errors: []error{status.Error(codes.NotFound, "Session not found")}},
+			MethodBeginTransaction: {Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}},
 		},
 	); err != nil {
 		t.Fatal(err)
@@ -653,7 +653,7 @@
 	defer teardown()
 	server.TestSpanner.PutExecutionTime(
 		MethodBeginTransaction,
-		SimulatedExecutionTime{Errors: []error{status.Error(codes.NotFound, "Session not found")}},
+		SimulatedExecutionTime{Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}},
 	)
 	tx := client.ReadOnlyTransaction()
 	defer tx.Close()
@@ -694,7 +694,7 @@
 func TestClient_ReadWriteTransaction_SessionNotFoundOnCommit(t *testing.T) {
 	t.Parallel()
 	if err := testReadWriteTransaction(t, map[string]SimulatedExecutionTime{
-		MethodCommitTransaction: {Errors: []error{status.Error(codes.NotFound, "Session not found")}},
+		MethodCommitTransaction: {Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}},
 	}, 2); err != nil {
 		t.Fatal(err)
 	}
@@ -705,7 +705,7 @@
 	// We expect only 1 attempt, as the 'Session not found' error is already
 	//handled in the session pool where the session is prepared.
 	if err := testReadWriteTransaction(t, map[string]SimulatedExecutionTime{
-		MethodBeginTransaction: {Errors: []error{status.Error(codes.NotFound, "Session not found")}},
+		MethodBeginTransaction: {Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}},
 	}, 1); err != nil {
 		t.Fatal(err)
 	}
@@ -720,7 +720,7 @@
 	if err := testReadWriteTransactionWithConfig(t, ClientConfig{
 		SessionPoolConfig: SessionPoolConfig{WriteSessions: 0.0},
 	}, map[string]SimulatedExecutionTime{
-		MethodBeginTransaction: {Errors: []error{status.Error(codes.NotFound, "Session not found")}},
+		MethodBeginTransaction: {Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}},
 	}, 1); err != nil {
 		t.Fatal(err)
 	}
@@ -729,7 +729,7 @@
 func TestClient_ReadWriteTransaction_SessionNotFoundOnExecuteStreamingSql(t *testing.T) {
 	t.Parallel()
 	if err := testReadWriteTransaction(t, map[string]SimulatedExecutionTime{
-		MethodExecuteStreamingSql: {Errors: []error{status.Error(codes.NotFound, "Session not found")}},
+		MethodExecuteStreamingSql: {Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}},
 	}, 2); err != nil {
 		t.Fatal(err)
 	}
@@ -742,7 +742,7 @@
 	defer teardown()
 	server.TestSpanner.PutExecutionTime(
 		MethodExecuteSql,
-		SimulatedExecutionTime{Errors: []error{status.Error(codes.NotFound, "Session not found")}},
+		SimulatedExecutionTime{Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}},
 	)
 	ctx := context.Background()
 	var attempts int
@@ -772,7 +772,7 @@
 	defer teardown()
 	server.TestSpanner.PutExecutionTime(
 		MethodExecuteBatchDml,
-		SimulatedExecutionTime{Errors: []error{status.Error(codes.NotFound, "Session not found")}},
+		SimulatedExecutionTime{Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}},
 	)
 	ctx := context.Background()
 	var attempts int
@@ -1292,15 +1292,15 @@
 	defer teardown()
 	server.TestSpanner.PutExecutionTime(MethodBeginTransaction,
 		SimulatedExecutionTime{
-			Errors: []error{status.Error(codes.NotFound, "Session not found")},
+			Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")},
 		})
 	server.TestSpanner.PutExecutionTime(MethodExecuteStreamingSql,
 		SimulatedExecutionTime{
-			Errors: []error{status.Error(codes.NotFound, "Session not found")},
+			Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")},
 		})
 	server.TestSpanner.PutExecutionTime(MethodCommitTransaction,
 		SimulatedExecutionTime{
-			Errors: []error{status.Error(codes.NotFound, "Session not found")},
+			Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")},
 		})
 	msg := "query failed"
 	numAttempts := 0
diff --git a/spanner/errors.go b/spanner/errors.go
index 2c6c7ef..d4d159b 100644
--- a/spanner/errors.go
+++ b/spanner/errors.go
@@ -20,6 +20,7 @@
 	"context"
 	"fmt"
 
+	"google.golang.org/genproto/googleapis/rpc/errdetails"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
 )
@@ -171,3 +172,25 @@
 	}
 	return se.Desc
 }
+
+// extractResourceType extracts the resource type from any ResourceInfo detail
+// included in the error.
+func extractResourceType(err error) (string, bool) {
+	var s *status.Status
+	var se *Error
+	if errorAs(err, &se) {
+		// Unwrap statusError.
+		s = status.Convert(se.Unwrap())
+	} else {
+		s = status.Convert(err)
+	}
+	if s == nil {
+		return "", false
+	}
+	for _, detail := range s.Details() {
+		if resourceInfo, ok := detail.(*errdetails.ResourceInfo); ok {
+			return resourceInfo.ResourceType, true
+		}
+	}
+	return "", false
+}
diff --git a/spanner/integration_test.go b/spanner/integration_test.go
index 5fe62d7..7437f45 100644
--- a/spanner/integration_test.go
+++ b/spanner/integration_test.go
@@ -231,14 +231,15 @@
 func TestIntegration_InitSessionPool(t *testing.T) {
 	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
 	defer cancel()
-	// Set up testing environment.
-	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
+	// Set up an empty testing environment.
+	client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, []string{})
 	defer cleanup()
 	sp := client.idleSessions
 	sp.mu.Lock()
 	want := sp.MinOpened
 	sp.mu.Unlock()
 	var numOpened int
+loop:
 	for {
 		select {
 		case <-ctx.Done():
@@ -248,10 +249,42 @@
 			numOpened = sp.idleList.Len() + sp.idleWriteList.Len()
 			sp.mu.Unlock()
 			if uint64(numOpened) == want {
-				return
+				break loop
 			}
 		}
 	}
+	// Delete all sessions in the pool on the backend and then try to execute a
+	// simple query. The 'Session not found' error should cause an automatic
+	// retry of the read-only transaction.
+	sp.mu.Lock()
+	s := sp.idleList.Front()
+	for {
+		if s == nil {
+			break
+		}
+		// This will delete the session on the backend without removing it
+		// from the pool.
+		s.Value.(*session).delete(context.Background())
+		s = s.Next()
+	}
+	sp.mu.Unlock()
+	sql := "SELECT 1, 'FOO', 'BAR'"
+	tx := client.ReadOnlyTransaction()
+	defer tx.Close()
+	iter := tx.Query(context.Background(), NewStatement(sql))
+	rows, err := readAll(iter)
+	if err != nil {
+		t.Fatalf("Unexpected error for query %q: %v", sql, err)
+	}
+	if got, want := len(rows), 1; got != want {
+		t.Fatalf("Row count mismatch for query %q\nGot: %v\nWant: %v", sql, got, want)
+	}
+	if got, want := len(rows[0]), 3; got != want {
+		t.Fatalf("Column count mismatch for query %q\nGot: %v\nWant: %v", sql, got, want)
+	}
+	if got, want := rows[0][0].(int64), int64(1); got != want {
+		t.Fatalf("Column value mismatch for query %q\nGot: %v\nWant: %v", sql, got, want)
+	}
 }
 
 // Test SingleUse transaction.
diff --git a/spanner/internal/testutil/inmem_spanner_server.go b/spanner/internal/testutil/inmem_spanner_server.go
index f9c35fb..e11af9b 100644
--- a/spanner/internal/testutil/inmem_spanner_server.go
+++ b/spanner/internal/testutil/inmem_spanner_server.go
@@ -433,11 +433,20 @@
 	defer s.mu.Unlock()
 	session := s.sessions[name]
 	if session == nil {
-		return nil, gstatus.Error(codes.NotFound, fmt.Sprintf("Session not found: %s", name))
+		return nil, newSessionNotFoundError(name)
 	}
 	return session, nil
 }
 
+// sessionResourceType is the type name of Spanner sessions.
+const sessionResourceType = "type.googleapis.com/google.spanner.v1.Session"
+
+func newSessionNotFoundError(name string) error {
+	s := gstatus.Newf(codes.NotFound, "Session not found: Session with id %s not found", name)
+	s, _ = s.WithDetails(&errdetails.ResourceInfo{ResourceType: sessionResourceType, ResourceName: name})
+	return s.Err()
+}
+
 func (s *inMemSpannerServer) updateSessionLastUseTime(session string) {
 	s.mu.Lock()
 	defer s.mu.Unlock()
diff --git a/spanner/internal/testutil/mockclient.go b/spanner/internal/testutil/mockclient.go
index 2a3b918..ed912b7 100644
--- a/spanner/internal/testutil/mockclient.go
+++ b/spanner/internal/testutil/mockclient.go
@@ -114,7 +114,7 @@
 	defer m.mu.Unlock()
 	m.pings = append(m.pings, r.Name)
 	if _, ok := m.sessions[r.Name]; !ok {
-		return nil, status.Errorf(codes.NotFound, fmt.Sprintf("Session not found: %v", r.Name))
+		return nil, newSessionNotFoundError(r.Name)
 	}
 	return &sppb.Session{Name: r.Name}, nil
 }
@@ -128,7 +128,7 @@
 	defer m.mu.Unlock()
 	if _, ok := m.sessions[r.Name]; !ok {
 		// Session not found.
-		return &empty.Empty{}, status.Errorf(codes.NotFound, fmt.Sprintf("Session not found: %v", r.Name))
+		return &empty.Empty{}, newSessionNotFoundError(r.Name)
 	}
 	// Delete session from in-memory table.
 	delete(m.sessions, r.Name)
diff --git a/spanner/session.go b/spanner/session.go
index 9964af6..3d861c9 100644
--- a/spanner/session.go
+++ b/spanner/session.go
@@ -25,7 +25,6 @@
 	"math"
 	"math/rand"
 	"runtime/debug"
-	"strings"
 	"sync"
 	"time"
 
@@ -1522,16 +1521,19 @@
 	return a
 }
 
+// sessionResourceType is the type name of Spanner sessions.
+const sessionResourceType = "type.googleapis.com/google.spanner.v1.Session"
+
 // isSessionNotFoundError returns true if the given error is a
 // `Session not found` error.
 func isSessionNotFoundError(err error) bool {
 	if err == nil {
 		return false
 	}
-	// We are checking specifically for the error message `Session not found`,
-	// as the error could also be a `Database not found`. The latter should
-	// cause the session pool to stop preparing sessions for read/write
-	// transactions, while the former should not.
-	// TODO: once gRPC can return auxiliary error information, stop parsing the error message.
-	return ErrCode(err) == codes.NotFound && strings.Contains(err.Error(), "Session not found")
+	if ErrCode(err) == codes.NotFound {
+		if rt, ok := extractResourceType(err); ok {
+			return rt == sessionResourceType
+		}
+	}
+	return false
 }
diff --git a/spanner/session_test.go b/spanner/session_test.go
index f4521d4..0aee9fe 100644
--- a/spanner/session_test.go
+++ b/spanner/session_test.go
@@ -31,11 +31,18 @@
 
 	. "cloud.google.com/go/spanner/internal/testutil"
 	"google.golang.org/api/iterator"
+	"google.golang.org/genproto/googleapis/rpc/errdetails"
 	sppb "google.golang.org/genproto/googleapis/spanner/v1"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
 )
 
+func newSessionNotFoundError(name string) error {
+	s := status.Newf(codes.NotFound, "Session not found: Session with id %s not found", name)
+	s, _ = s.WithDetails(&errdetails.ResourceInfo{ResourceType: sessionResourceType, ResourceName: name})
+	return s.Err()
+}
+
 // TestSessionPoolConfigValidation tests session pool config validation.
 func TestSessionPoolConfigValidation(t *testing.T) {
 	t.Parallel()
@@ -366,7 +373,7 @@
 	// will create a new session.
 	server.TestSpanner.PutExecutionTime(MethodGetSession,
 		SimulatedExecutionTime{
-			Errors: []error{status.Errorf(codes.NotFound, "Session not found")},
+			Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")},
 		})
 
 	// Force ping by setting check time in the past.
@@ -446,7 +453,7 @@
 	// will create a new session.
 	server.TestSpanner.PutExecutionTime(MethodGetSession,
 		SimulatedExecutionTime{
-			Errors: []error{status.Errorf(codes.NotFound, "Session not found")},
+			Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")},
 		})
 
 	// Force ping by setting check time in the past.
@@ -1131,7 +1138,7 @@
 
 	// The server will return 'Session not found' for the first 8
 	// BeginTransaction calls.
-	sessionNotFoundErr := status.Errorf(codes.NotFound, `Session not found: projects/<project>/instances/<instance>/databases/<database>/sessions/<session> resource_type: "Session" resource_name: "projects/<project>/instances/<instance>/databases/<database>/sessions/<session>" description: "Session does not exist."`)
+	sessionNotFoundErr := newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")
 	serverErrors := make([]error, 8)
 	for i := range serverErrors {
 		serverErrors[i] = sessionNotFoundErr
@@ -1239,7 +1246,7 @@
 	server.TestSpanner.Freeze()
 	server.TestSpanner.PutExecutionTime(MethodGetSession,
 		SimulatedExecutionTime{
-			Errors:    []error{status.Errorf(codes.NotFound, "Session not found")},
+			Errors:    []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")},
 			KeepError: true,
 		})
 	server.TestSpanner.Unfreeze()
diff --git a/spanner/transaction_test.go b/spanner/transaction_test.go
index 88448fd..8a09d29 100644
--- a/spanner/transaction_test.go
+++ b/spanner/transaction_test.go
@@ -21,12 +21,14 @@
 	"errors"
 	"fmt"
 	"reflect"
+	"strings"
 	"sync"
 	"testing"
 	"time"
 
 	. "cloud.google.com/go/spanner/internal/testutil"
 	"github.com/golang/protobuf/ptypes"
+	"github.com/google/go-cmp/cmp"
 	"google.golang.org/genproto/googleapis/rpc/errdetails"
 	sppb "google.golang.org/genproto/googleapis/spanner/v1"
 	"google.golang.org/grpc/codes"
@@ -203,7 +205,7 @@
 	server, client, teardown := setupMockedTestServer(t)
 	defer teardown()
 
-	serverErr := gstatus.Errorf(codes.NotFound, "Session not found")
+	serverErr := newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")
 	server.TestSpanner.PutExecutionTime(MethodBeginTransaction,
 		SimulatedExecutionTime{
 			Errors: []error{serverErr, serverErr, serverErr},
@@ -235,14 +237,24 @@
 		t.Fatalf("Expect Read to succeed, got %v, want %v.", got.err, wantErr)
 	}
 
-	wantErr = toSpannerError(serverErr)
+	wantErr = toSpannerError(newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s"))
 	ms := []*Mutation{
 		Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}),
 		Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}),
 	}
 	_, got := client.Apply(ctx, ms, ApplyAtLeastOnce())
-	if !testEqual(wantErr, got) {
-		t.Fatalf("Expect Apply to fail, got %v, want %v.", got, wantErr)
+	if !cmp.Equal(wantErr, got,
+		cmp.AllowUnexported(Error{}), cmp.FilterPath(func(path cmp.Path) bool {
+			// Ignore statusError Details and Error.trailers.
+			if strings.Contains(path.GoString(), "{*spanner.Error}.err.(*status.statusError).Details") {
+				return true
+			}
+			if strings.Contains(path.GoString(), "{*spanner.Error}.trailers") {
+				return true
+			}
+			return false
+		}, cmp.Ignore())) {
+		t.Fatalf("Expect Apply to fail\nGot:  %v\nWant: %v\n", got, wantErr)
 	}
 }