Revert "spanner: Retry "Session not found" for read-only transactions"
This reverts commit 5d17c755af508b64841e1aed4afdca7bc72f04ab.
Reason for revert: breakings TestIntegration_SingleUse
Change-Id: Ib7fde8b667603a2aa15aedc96a257f1a018650e0
Reviewed-on: https://code-review.googlesource.com/c/gocloud/+/45190
Reviewed-by: kokoro <noreply+kokoro@google.com>
Reviewed-by: Jean de Klerk <deklerk@google.com>
diff --git a/spanner/client_test.go b/spanner/client_test.go
index 36233f1..73693f1 100644
--- a/spanner/client_test.go
+++ b/spanner/client_test.go
@@ -112,20 +112,14 @@
}
func testSingleQuery(t *testing.T, serverError error) error {
+ ctx := context.Background()
server, client, teardown := setupMockedTestServer(t)
defer teardown()
if serverError != nil {
server.TestSpanner.SetError(serverError)
}
- tx := client.Single()
- return executeTestQuery(t, tx)
-}
-
-func executeTestQuery(t *testing.T, tx *ReadOnlyTransaction) error {
- ctx := context.Background()
- iter := tx.Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums))
+ iter := client.Single().Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums))
defer iter.Stop()
- var rowCount int64
for {
row, err := iter.Next()
if err == iterator.Done {
@@ -139,10 +133,6 @@
if err := row.Columns(&singerID, &albumID, &albumTitle); err != nil {
return err
}
- rowCount++
- }
- if rowCount != SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount {
- t.Fatalf("Row count mismatch\ngot: %v\nwant: %v", rowCount, SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount)
}
return nil
}
@@ -210,16 +200,6 @@
}
}
-func TestClient_ReadOnlyTransaction_SessionNotFoundOnBeginTransaction(t *testing.T) {
- t.Parallel()
- exec := map[string]SimulatedExecutionTime{
- MethodBeginTransaction: {Errors: []error{gstatus.Error(codes.NotFound, "Session not found")}},
- }
- if err := testReadOnlyTransaction(t, exec); err != nil {
- t.Fatal(err)
- }
-}
-
func testReadOnlyTransaction(t *testing.T, executionTimes map[string]SimulatedExecutionTime) error {
server, client, teardown := setupMockedTestServer(t)
defer teardown()
@@ -228,7 +208,24 @@
}
tx := client.ReadOnlyTransaction()
defer tx.Close()
- return executeTestQuery(t, tx)
+ ctx := context.Background()
+ iter := tx.Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums))
+ defer iter.Stop()
+ for {
+ row, err := iter.Next()
+ if err == iterator.Done {
+ break
+ }
+ if err != nil {
+ return err
+ }
+ var singerID, albumID int64
+ var albumTitle string
+ if err := row.Columns(&singerID, &albumID, &albumTitle); err != nil {
+ return err
+ }
+ }
+ return nil
}
func TestClient_ReadWriteTransaction(t *testing.T) {
diff --git a/spanner/internal/testutil/inmem_spanner_server.go b/spanner/internal/testutil/inmem_spanner_server.go
index 022135b..5c4f546 100644
--- a/spanner/internal/testutil/inmem_spanner_server.go
+++ b/spanner/internal/testutil/inmem_spanner_server.go
@@ -59,7 +59,6 @@
MethodGetSession string = "GET_SESSION"
MethodExecuteSql string = "EXECUTE_SQL"
MethodExecuteStreamingSql string = "EXECUTE_STREAMING_SQL"
- MethodStreamingRead string = "EXECUTE_STREAMING_READ"
)
// StatementResult represents a mocked result on the test server. Th result can
@@ -704,9 +703,13 @@
}
func (s *inMemSpannerServer) StreamingRead(req *spannerpb.ReadRequest, stream spannerpb.Spanner_StreamingReadServer) error {
- if err := s.simulateExecutionTime(MethodStreamingRead, req); err != nil {
- return err
+ s.mu.Lock()
+ if s.stopped {
+ s.mu.Unlock()
+ return gstatus.Error(codes.Unavailable, "server has been stopped")
}
+ s.receivedRequests <- req
+ s.mu.Unlock()
return gstatus.Error(codes.Unimplemented, "Method not yet implemented")
}
diff --git a/spanner/transaction.go b/spanner/transaction.go
index 9f1414b..a53486b 100644
--- a/spanner/transaction.go
+++ b/spanner/transaction.go
@@ -363,41 +363,18 @@
sh.recycle()
}
}()
-
- // Create transaction options.
- readOnlyOptions := buildTransactionOptionsReadOnly(t.getTimestampBound(), true)
- transactionOptions := &sppb.TransactionOptions{
- Mode: &sppb.TransactionOptions_ReadOnly_{
- ReadOnly: readOnlyOptions,
+ sh, err = t.sp.take(ctx)
+ if err != nil {
+ return err
+ }
+ res, err := sh.getClient().BeginTransaction(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.BeginTransactionRequest{
+ Session: sh.getID(),
+ Options: &sppb.TransactionOptions{
+ Mode: &sppb.TransactionOptions_ReadOnly_{
+ ReadOnly: buildTransactionOptionsReadOnly(t.getTimestampBound(), true),
+ },
},
- }
- // Retry TakeSession and BeginTransaction on Session not found.
- retryOnNotFound := gax.OnCodes([]codes.Code{codes.NotFound}, gax.Backoff{})
- beginTxWithRetry := func(ctx context.Context) (*sppb.Transaction, error) {
- for {
- sh, err = t.sp.take(ctx)
- if err != nil {
- return nil, err
- }
- client := sh.getClient()
- ctx := contextWithOutgoingMetadata(ctx, sh.getMetadata())
- res, err := client.BeginTransaction(ctx, &sppb.BeginTransactionRequest{
- Session: sh.getID(),
- Options: transactionOptions,
- })
- if err == nil {
- return res, nil
- }
- // We should not wait before retrying.
- if _, shouldRetry := retryOnNotFound.Retry(err); !shouldRetry {
- return nil, err
- }
- // Delete session and then retry with a new one.
- sh.destroy()
- }
- }
-
- res, err := beginTxWithRetry(ctx)
+ })
if err == nil {
tx = res.Id
if res.ReadTimestamp != nil {
diff --git a/spanner/transaction_test.go b/spanner/transaction_test.go
index 677b1b4..2976c96 100644
--- a/spanner/transaction_test.go
+++ b/spanner/transaction_test.go
@@ -195,64 +195,46 @@
}
}
-// Tests that NotFound errors cause failures, and aren't retried, except for
-// BeginTransaction.
+// Tests that NotFound errors cause failures, and aren't retried.
func TestTransaction_NotFound(t *testing.T) {
t.Parallel()
ctx := context.Background()
server, client, teardown := setupMockedTestServer(t)
defer teardown()
- errSessionNotFound := spannerErrorf(codes.NotFound, "Session not found")
- // BeginTransaction should retry automatically.
+ wantErr := spannerErrorf(codes.NotFound, "Session not found")
server.TestSpanner.PutExecutionTime(MethodBeginTransaction,
SimulatedExecutionTime{
- Errors: []error{errSessionNotFound},
+ Errors: []error{wantErr, wantErr, wantErr},
})
- txn := client.ReadOnlyTransaction()
- if _, _, got := txn.acquire(ctx); got != nil {
- t.Fatalf("Expect acquire to succeed, got %v, want nil.", got)
- }
- txn.Close()
-
- // Query should fail with Session not found.
- server.TestSpanner.PutExecutionTime(MethodExecuteStreamingSql,
- SimulatedExecutionTime{
- Errors: []error{errSessionNotFound},
- })
- txn = client.ReadOnlyTransaction()
- iter := txn.Query(ctx, NewStatement("SELECT 1"))
- _, got := iter.Next()
- if !testEqual(errSessionNotFound, got) {
- t.Fatalf("Expect Query to fail\ngot: %v\nwant: %v", got, errSessionNotFound)
- }
- iter.Stop()
-
- // Read should fail with Session not found.
- server.TestSpanner.PutExecutionTime(MethodStreamingRead,
- SimulatedExecutionTime{
- Errors: []error{errSessionNotFound},
- })
- txn = client.ReadOnlyTransaction()
- iter = txn.Read(ctx, "Users", KeySets(Key{"alice"}, Key{"bob"}), []string{"name", "email"})
- _, got = iter.Next()
- if !testEqual(errSessionNotFound, got) {
- t.Fatalf("Expect Read to fail\ngot: %v\nwant: %v", got, errSessionNotFound)
- }
- iter.Stop()
-
- // Commit should fail with Session not found.
server.TestSpanner.PutExecutionTime(MethodCommitTransaction,
SimulatedExecutionTime{
- Errors: []error{errSessionNotFound},
+ Errors: []error{wantErr, wantErr, wantErr},
})
+ txn := client.ReadOnlyTransaction()
+ defer txn.Close()
+
+ if _, _, got := txn.acquire(ctx); !testEqual(wantErr, got) {
+ t.Fatalf("Expect acquire to fail, got %v, want %v.", got, wantErr)
+ }
+
+ // The failure should recycle the session, we expect it to be used in
+ // following requests.
+ if got := txn.Query(ctx, NewStatement("SELECT 1")); !testEqual(wantErr, got.err) {
+ t.Fatalf("Expect Query to fail, got %v, want %v.", got.err, wantErr)
+ }
+
+ if got := txn.Read(ctx, "Users", KeySets(Key{"alice"}, Key{"bob"}), []string{"name", "email"}); !testEqual(wantErr, got.err) {
+ t.Fatalf("Expect Read to fail, got %v, want %v.", got.err, wantErr)
+ }
+
ms := []*Mutation{
Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}),
Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}),
}
- if _, got := client.Apply(ctx, ms, ApplyAtLeastOnce()); !testEqual(errSessionNotFound, got) {
- t.Fatalf("Expect Apply to fail\ngot: %v\nwant: %v", got, errSessionNotFound)
+ if _, got := client.Apply(ctx, ms, ApplyAtLeastOnce()); !testEqual(wantErr, got) {
+ t.Fatalf("Expect Apply to fail, got %v, want %v.", got, wantErr)
}
}