spanner: use ResourceInfo to extract error
Detecting a 'Session not found' error should be extracted from the
ResourceInfo of the underlying statusErr instead of depending on the
error message.
Updates #1814.
Change-Id: I99e448ef4242573b8fb00e0020de839c4bd39660
Reviewed-on: https://code-review.googlesource.com/c/gocloud/+/52791
Reviewed-by: kokoro <noreply+kokoro@google.com>
Reviewed-by: Hengfeng Li <hengfeng@google.com>
diff --git a/spanner/client_test.go b/spanner/client_test.go
index 13e239f..fd81d6b 100644
--- a/spanner/client_test.go
+++ b/spanner/client_test.go
@@ -168,7 +168,7 @@
defer teardown()
server.TestSpanner.PutExecutionTime(
MethodExecuteStreamingSql,
- SimulatedExecutionTime{Errors: []error{status.Errorf(codes.NotFound, "Session not found")}},
+ SimulatedExecutionTime{Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}},
)
ctx := context.Background()
iter := client.Single().Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums))
@@ -250,7 +250,7 @@
SelectSingerIDAlbumIDAlbumTitleFromAlbums,
PartialResultSetExecutionTime{
ResumeToken: EncodeResumeToken(3),
- Err: status.Errorf(codes.NotFound, "Session not found"),
+ Err: newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s"),
},
)
ctx := context.Background()
@@ -593,9 +593,9 @@
// transaction, as we would need to start a new transaction on a new
// session.
err := testReadOnlyTransaction(t, map[string]SimulatedExecutionTime{
- MethodExecuteStreamingSql: {Errors: []error{status.Errorf(codes.NotFound, "Session not found")}},
+ MethodExecuteStreamingSql: {Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}},
})
- want := spannerErrorf(codes.NotFound, "Session not found")
+ want := toSpannerError(newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s"))
if err == nil {
t.Fatalf("missing expected error\nGot: nil\nWant: %v", want)
}
@@ -633,7 +633,7 @@
if err := testReadOnlyTransaction(
t,
map[string]SimulatedExecutionTime{
- MethodBeginTransaction: {Errors: []error{status.Error(codes.NotFound, "Session not found")}},
+ MethodBeginTransaction: {Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}},
},
); err != nil {
t.Fatal(err)
@@ -653,7 +653,7 @@
defer teardown()
server.TestSpanner.PutExecutionTime(
MethodBeginTransaction,
- SimulatedExecutionTime{Errors: []error{status.Error(codes.NotFound, "Session not found")}},
+ SimulatedExecutionTime{Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}},
)
tx := client.ReadOnlyTransaction()
defer tx.Close()
@@ -694,7 +694,7 @@
func TestClient_ReadWriteTransaction_SessionNotFoundOnCommit(t *testing.T) {
t.Parallel()
if err := testReadWriteTransaction(t, map[string]SimulatedExecutionTime{
- MethodCommitTransaction: {Errors: []error{status.Error(codes.NotFound, "Session not found")}},
+ MethodCommitTransaction: {Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}},
}, 2); err != nil {
t.Fatal(err)
}
@@ -705,7 +705,7 @@
// We expect only 1 attempt, as the 'Session not found' error is already
//handled in the session pool where the session is prepared.
if err := testReadWriteTransaction(t, map[string]SimulatedExecutionTime{
- MethodBeginTransaction: {Errors: []error{status.Error(codes.NotFound, "Session not found")}},
+ MethodBeginTransaction: {Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}},
}, 1); err != nil {
t.Fatal(err)
}
@@ -720,7 +720,7 @@
if err := testReadWriteTransactionWithConfig(t, ClientConfig{
SessionPoolConfig: SessionPoolConfig{WriteSessions: 0.0},
}, map[string]SimulatedExecutionTime{
- MethodBeginTransaction: {Errors: []error{status.Error(codes.NotFound, "Session not found")}},
+ MethodBeginTransaction: {Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}},
}, 1); err != nil {
t.Fatal(err)
}
@@ -729,7 +729,7 @@
func TestClient_ReadWriteTransaction_SessionNotFoundOnExecuteStreamingSql(t *testing.T) {
t.Parallel()
if err := testReadWriteTransaction(t, map[string]SimulatedExecutionTime{
- MethodExecuteStreamingSql: {Errors: []error{status.Error(codes.NotFound, "Session not found")}},
+ MethodExecuteStreamingSql: {Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}},
}, 2); err != nil {
t.Fatal(err)
}
@@ -742,7 +742,7 @@
defer teardown()
server.TestSpanner.PutExecutionTime(
MethodExecuteSql,
- SimulatedExecutionTime{Errors: []error{status.Error(codes.NotFound, "Session not found")}},
+ SimulatedExecutionTime{Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}},
)
ctx := context.Background()
var attempts int
@@ -772,7 +772,7 @@
defer teardown()
server.TestSpanner.PutExecutionTime(
MethodExecuteBatchDml,
- SimulatedExecutionTime{Errors: []error{status.Error(codes.NotFound, "Session not found")}},
+ SimulatedExecutionTime{Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")}},
)
ctx := context.Background()
var attempts int
@@ -1292,15 +1292,15 @@
defer teardown()
server.TestSpanner.PutExecutionTime(MethodBeginTransaction,
SimulatedExecutionTime{
- Errors: []error{status.Error(codes.NotFound, "Session not found")},
+ Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")},
})
server.TestSpanner.PutExecutionTime(MethodExecuteStreamingSql,
SimulatedExecutionTime{
- Errors: []error{status.Error(codes.NotFound, "Session not found")},
+ Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")},
})
server.TestSpanner.PutExecutionTime(MethodCommitTransaction,
SimulatedExecutionTime{
- Errors: []error{status.Error(codes.NotFound, "Session not found")},
+ Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")},
})
msg := "query failed"
numAttempts := 0
diff --git a/spanner/errors.go b/spanner/errors.go
index 2c6c7ef..d4d159b 100644
--- a/spanner/errors.go
+++ b/spanner/errors.go
@@ -20,6 +20,7 @@
"context"
"fmt"
+ "google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
@@ -171,3 +172,25 @@
}
return se.Desc
}
+
+// extractResourceType extracts the resource type from any ResourceInfo detail
+// included in the error.
+func extractResourceType(err error) (string, bool) {
+ var s *status.Status
+ var se *Error
+ if errorAs(err, &se) {
+ // Unwrap statusError.
+ s = status.Convert(se.Unwrap())
+ } else {
+ s = status.Convert(err)
+ }
+ if s == nil {
+ return "", false
+ }
+ for _, detail := range s.Details() {
+ if resourceInfo, ok := detail.(*errdetails.ResourceInfo); ok {
+ return resourceInfo.ResourceType, true
+ }
+ }
+ return "", false
+}
diff --git a/spanner/integration_test.go b/spanner/integration_test.go
index 5fe62d7..7437f45 100644
--- a/spanner/integration_test.go
+++ b/spanner/integration_test.go
@@ -231,14 +231,15 @@
func TestIntegration_InitSessionPool(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
- // Set up testing environment.
- client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
+ // Set up an empty testing environment.
+ client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, []string{})
defer cleanup()
sp := client.idleSessions
sp.mu.Lock()
want := sp.MinOpened
sp.mu.Unlock()
var numOpened int
+loop:
for {
select {
case <-ctx.Done():
@@ -248,10 +249,42 @@
numOpened = sp.idleList.Len() + sp.idleWriteList.Len()
sp.mu.Unlock()
if uint64(numOpened) == want {
- return
+ break loop
}
}
}
+ // Delete all sessions in the pool on the backend and then try to execute a
+ // simple query. The 'Session not found' error should cause an automatic
+ // retry of the read-only transaction.
+ sp.mu.Lock()
+ s := sp.idleList.Front()
+ for {
+ if s == nil {
+ break
+ }
+ // This will delete the session on the backend without removing it
+ // from the pool.
+ s.Value.(*session).delete(context.Background())
+ s = s.Next()
+ }
+ sp.mu.Unlock()
+ sql := "SELECT 1, 'FOO', 'BAR'"
+ tx := client.ReadOnlyTransaction()
+ defer tx.Close()
+ iter := tx.Query(context.Background(), NewStatement(sql))
+ rows, err := readAll(iter)
+ if err != nil {
+ t.Fatalf("Unexpected error for query %q: %v", sql, err)
+ }
+ if got, want := len(rows), 1; got != want {
+ t.Fatalf("Row count mismatch for query %q\nGot: %v\nWant: %v", sql, got, want)
+ }
+ if got, want := len(rows[0]), 3; got != want {
+ t.Fatalf("Column count mismatch for query %q\nGot: %v\nWant: %v", sql, got, want)
+ }
+ if got, want := rows[0][0].(int64), int64(1); got != want {
+ t.Fatalf("Column value mismatch for query %q\nGot: %v\nWant: %v", sql, got, want)
+ }
}
// Test SingleUse transaction.
diff --git a/spanner/internal/testutil/inmem_spanner_server.go b/spanner/internal/testutil/inmem_spanner_server.go
index f9c35fb..e11af9b 100644
--- a/spanner/internal/testutil/inmem_spanner_server.go
+++ b/spanner/internal/testutil/inmem_spanner_server.go
@@ -433,11 +433,20 @@
defer s.mu.Unlock()
session := s.sessions[name]
if session == nil {
- return nil, gstatus.Error(codes.NotFound, fmt.Sprintf("Session not found: %s", name))
+ return nil, newSessionNotFoundError(name)
}
return session, nil
}
+// sessionResourceType is the type name of Spanner sessions.
+const sessionResourceType = "type.googleapis.com/google.spanner.v1.Session"
+
+func newSessionNotFoundError(name string) error {
+ s := gstatus.Newf(codes.NotFound, "Session not found: Session with id %s not found", name)
+ s, _ = s.WithDetails(&errdetails.ResourceInfo{ResourceType: sessionResourceType, ResourceName: name})
+ return s.Err()
+}
+
func (s *inMemSpannerServer) updateSessionLastUseTime(session string) {
s.mu.Lock()
defer s.mu.Unlock()
diff --git a/spanner/internal/testutil/mockclient.go b/spanner/internal/testutil/mockclient.go
index 2a3b918..ed912b7 100644
--- a/spanner/internal/testutil/mockclient.go
+++ b/spanner/internal/testutil/mockclient.go
@@ -114,7 +114,7 @@
defer m.mu.Unlock()
m.pings = append(m.pings, r.Name)
if _, ok := m.sessions[r.Name]; !ok {
- return nil, status.Errorf(codes.NotFound, fmt.Sprintf("Session not found: %v", r.Name))
+ return nil, newSessionNotFoundError(r.Name)
}
return &sppb.Session{Name: r.Name}, nil
}
@@ -128,7 +128,7 @@
defer m.mu.Unlock()
if _, ok := m.sessions[r.Name]; !ok {
// Session not found.
- return &empty.Empty{}, status.Errorf(codes.NotFound, fmt.Sprintf("Session not found: %v", r.Name))
+ return &empty.Empty{}, newSessionNotFoundError(r.Name)
}
// Delete session from in-memory table.
delete(m.sessions, r.Name)
diff --git a/spanner/session.go b/spanner/session.go
index 9964af6..3d861c9 100644
--- a/spanner/session.go
+++ b/spanner/session.go
@@ -25,7 +25,6 @@
"math"
"math/rand"
"runtime/debug"
- "strings"
"sync"
"time"
@@ -1522,16 +1521,19 @@
return a
}
+// sessionResourceType is the type name of Spanner sessions.
+const sessionResourceType = "type.googleapis.com/google.spanner.v1.Session"
+
// isSessionNotFoundError returns true if the given error is a
// `Session not found` error.
func isSessionNotFoundError(err error) bool {
if err == nil {
return false
}
- // We are checking specifically for the error message `Session not found`,
- // as the error could also be a `Database not found`. The latter should
- // cause the session pool to stop preparing sessions for read/write
- // transactions, while the former should not.
- // TODO: once gRPC can return auxiliary error information, stop parsing the error message.
- return ErrCode(err) == codes.NotFound && strings.Contains(err.Error(), "Session not found")
+ if ErrCode(err) == codes.NotFound {
+ if rt, ok := extractResourceType(err); ok {
+ return rt == sessionResourceType
+ }
+ }
+ return false
}
diff --git a/spanner/session_test.go b/spanner/session_test.go
index f4521d4..0aee9fe 100644
--- a/spanner/session_test.go
+++ b/spanner/session_test.go
@@ -31,11 +31,18 @@
. "cloud.google.com/go/spanner/internal/testutil"
"google.golang.org/api/iterator"
+ "google.golang.org/genproto/googleapis/rpc/errdetails"
sppb "google.golang.org/genproto/googleapis/spanner/v1"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
+func newSessionNotFoundError(name string) error {
+ s := status.Newf(codes.NotFound, "Session not found: Session with id %s not found", name)
+ s, _ = s.WithDetails(&errdetails.ResourceInfo{ResourceType: sessionResourceType, ResourceName: name})
+ return s.Err()
+}
+
// TestSessionPoolConfigValidation tests session pool config validation.
func TestSessionPoolConfigValidation(t *testing.T) {
t.Parallel()
@@ -366,7 +373,7 @@
// will create a new session.
server.TestSpanner.PutExecutionTime(MethodGetSession,
SimulatedExecutionTime{
- Errors: []error{status.Errorf(codes.NotFound, "Session not found")},
+ Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")},
})
// Force ping by setting check time in the past.
@@ -446,7 +453,7 @@
// will create a new session.
server.TestSpanner.PutExecutionTime(MethodGetSession,
SimulatedExecutionTime{
- Errors: []error{status.Errorf(codes.NotFound, "Session not found")},
+ Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")},
})
// Force ping by setting check time in the past.
@@ -1131,7 +1138,7 @@
// The server will return 'Session not found' for the first 8
// BeginTransaction calls.
- sessionNotFoundErr := status.Errorf(codes.NotFound, `Session not found: projects/<project>/instances/<instance>/databases/<database>/sessions/<session> resource_type: "Session" resource_name: "projects/<project>/instances/<instance>/databases/<database>/sessions/<session>" description: "Session does not exist."`)
+ sessionNotFoundErr := newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")
serverErrors := make([]error, 8)
for i := range serverErrors {
serverErrors[i] = sessionNotFoundErr
@@ -1239,7 +1246,7 @@
server.TestSpanner.Freeze()
server.TestSpanner.PutExecutionTime(MethodGetSession,
SimulatedExecutionTime{
- Errors: []error{status.Errorf(codes.NotFound, "Session not found")},
+ Errors: []error{newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")},
KeepError: true,
})
server.TestSpanner.Unfreeze()
diff --git a/spanner/transaction_test.go b/spanner/transaction_test.go
index 88448fd..8a09d29 100644
--- a/spanner/transaction_test.go
+++ b/spanner/transaction_test.go
@@ -21,12 +21,14 @@
"errors"
"fmt"
"reflect"
+ "strings"
"sync"
"testing"
"time"
. "cloud.google.com/go/spanner/internal/testutil"
"github.com/golang/protobuf/ptypes"
+ "github.com/google/go-cmp/cmp"
"google.golang.org/genproto/googleapis/rpc/errdetails"
sppb "google.golang.org/genproto/googleapis/spanner/v1"
"google.golang.org/grpc/codes"
@@ -203,7 +205,7 @@
server, client, teardown := setupMockedTestServer(t)
defer teardown()
- serverErr := gstatus.Errorf(codes.NotFound, "Session not found")
+ serverErr := newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s")
server.TestSpanner.PutExecutionTime(MethodBeginTransaction,
SimulatedExecutionTime{
Errors: []error{serverErr, serverErr, serverErr},
@@ -235,14 +237,24 @@
t.Fatalf("Expect Read to succeed, got %v, want %v.", got.err, wantErr)
}
- wantErr = toSpannerError(serverErr)
+ wantErr = toSpannerError(newSessionNotFoundError("projects/p/instances/i/databases/d/sessions/s"))
ms := []*Mutation{
Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}),
Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}),
}
_, got := client.Apply(ctx, ms, ApplyAtLeastOnce())
- if !testEqual(wantErr, got) {
- t.Fatalf("Expect Apply to fail, got %v, want %v.", got, wantErr)
+ if !cmp.Equal(wantErr, got,
+ cmp.AllowUnexported(Error{}), cmp.FilterPath(func(path cmp.Path) bool {
+ // Ignore statusError Details and Error.trailers.
+ if strings.Contains(path.GoString(), "{*spanner.Error}.err.(*status.statusError).Details") {
+ return true
+ }
+ if strings.Contains(path.GoString(), "{*spanner.Error}.trailers") {
+ return true
+ }
+ return false
+ }, cmp.Ignore())) {
+ t.Fatalf("Expect Apply to fail\nGot: %v\nWant: %v\n", got, wantErr)
}
}