Revert "spanner: Retry "Session not found" for read-only transactions"

This reverts commit 5d17c755af508b64841e1aed4afdca7bc72f04ab.

Reason for revert: breakings TestIntegration_SingleUse

Change-Id: Ib7fde8b667603a2aa15aedc96a257f1a018650e0
Reviewed-on: https://code-review.googlesource.com/c/gocloud/+/45190
Reviewed-by: kokoro <noreply+kokoro@google.com>
Reviewed-by: Jean de Klerk <deklerk@google.com>
diff --git a/spanner/client_test.go b/spanner/client_test.go
index 36233f1..73693f1 100644
--- a/spanner/client_test.go
+++ b/spanner/client_test.go
@@ -112,20 +112,14 @@
 }
 
 func testSingleQuery(t *testing.T, serverError error) error {
+	ctx := context.Background()
 	server, client, teardown := setupMockedTestServer(t)
 	defer teardown()
 	if serverError != nil {
 		server.TestSpanner.SetError(serverError)
 	}
-	tx := client.Single()
-	return executeTestQuery(t, tx)
-}
-
-func executeTestQuery(t *testing.T, tx *ReadOnlyTransaction) error {
-	ctx := context.Background()
-	iter := tx.Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums))
+	iter := client.Single().Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums))
 	defer iter.Stop()
-	var rowCount int64
 	for {
 		row, err := iter.Next()
 		if err == iterator.Done {
@@ -139,10 +133,6 @@
 		if err := row.Columns(&singerID, &albumID, &albumTitle); err != nil {
 			return err
 		}
-		rowCount++
-	}
-	if rowCount != SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount {
-		t.Fatalf("Row count mismatch\ngot: %v\nwant: %v", rowCount, SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount)
 	}
 	return nil
 }
@@ -210,16 +200,6 @@
 	}
 }
 
-func TestClient_ReadOnlyTransaction_SessionNotFoundOnBeginTransaction(t *testing.T) {
-	t.Parallel()
-	exec := map[string]SimulatedExecutionTime{
-		MethodBeginTransaction: {Errors: []error{gstatus.Error(codes.NotFound, "Session not found")}},
-	}
-	if err := testReadOnlyTransaction(t, exec); err != nil {
-		t.Fatal(err)
-	}
-}
-
 func testReadOnlyTransaction(t *testing.T, executionTimes map[string]SimulatedExecutionTime) error {
 	server, client, teardown := setupMockedTestServer(t)
 	defer teardown()
@@ -228,7 +208,24 @@
 	}
 	tx := client.ReadOnlyTransaction()
 	defer tx.Close()
-	return executeTestQuery(t, tx)
+	ctx := context.Background()
+	iter := tx.Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums))
+	defer iter.Stop()
+	for {
+		row, err := iter.Next()
+		if err == iterator.Done {
+			break
+		}
+		if err != nil {
+			return err
+		}
+		var singerID, albumID int64
+		var albumTitle string
+		if err := row.Columns(&singerID, &albumID, &albumTitle); err != nil {
+			return err
+		}
+	}
+	return nil
 }
 
 func TestClient_ReadWriteTransaction(t *testing.T) {
diff --git a/spanner/internal/testutil/inmem_spanner_server.go b/spanner/internal/testutil/inmem_spanner_server.go
index 022135b..5c4f546 100644
--- a/spanner/internal/testutil/inmem_spanner_server.go
+++ b/spanner/internal/testutil/inmem_spanner_server.go
@@ -59,7 +59,6 @@
 	MethodGetSession          string = "GET_SESSION"
 	MethodExecuteSql          string = "EXECUTE_SQL"
 	MethodExecuteStreamingSql string = "EXECUTE_STREAMING_SQL"
-	MethodStreamingRead       string = "EXECUTE_STREAMING_READ"
 )
 
 // StatementResult represents a mocked result on the test server. Th result can
@@ -704,9 +703,13 @@
 }
 
 func (s *inMemSpannerServer) StreamingRead(req *spannerpb.ReadRequest, stream spannerpb.Spanner_StreamingReadServer) error {
-	if err := s.simulateExecutionTime(MethodStreamingRead, req); err != nil {
-		return err
+	s.mu.Lock()
+	if s.stopped {
+		s.mu.Unlock()
+		return gstatus.Error(codes.Unavailable, "server has been stopped")
 	}
+	s.receivedRequests <- req
+	s.mu.Unlock()
 	return gstatus.Error(codes.Unimplemented, "Method not yet implemented")
 }
 
diff --git a/spanner/transaction.go b/spanner/transaction.go
index 9f1414b..a53486b 100644
--- a/spanner/transaction.go
+++ b/spanner/transaction.go
@@ -363,41 +363,18 @@
 			sh.recycle()
 		}
 	}()
-
-	// Create transaction options.
-	readOnlyOptions := buildTransactionOptionsReadOnly(t.getTimestampBound(), true)
-	transactionOptions := &sppb.TransactionOptions{
-		Mode: &sppb.TransactionOptions_ReadOnly_{
-			ReadOnly: readOnlyOptions,
+	sh, err = t.sp.take(ctx)
+	if err != nil {
+		return err
+	}
+	res, err := sh.getClient().BeginTransaction(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.BeginTransactionRequest{
+		Session: sh.getID(),
+		Options: &sppb.TransactionOptions{
+			Mode: &sppb.TransactionOptions_ReadOnly_{
+				ReadOnly: buildTransactionOptionsReadOnly(t.getTimestampBound(), true),
+			},
 		},
-	}
-	// Retry TakeSession and BeginTransaction on Session not found.
-	retryOnNotFound := gax.OnCodes([]codes.Code{codes.NotFound}, gax.Backoff{})
-	beginTxWithRetry := func(ctx context.Context) (*sppb.Transaction, error) {
-		for {
-			sh, err = t.sp.take(ctx)
-			if err != nil {
-				return nil, err
-			}
-			client := sh.getClient()
-			ctx := contextWithOutgoingMetadata(ctx, sh.getMetadata())
-			res, err := client.BeginTransaction(ctx, &sppb.BeginTransactionRequest{
-				Session: sh.getID(),
-				Options: transactionOptions,
-			})
-			if err == nil {
-				return res, nil
-			}
-			// We should not wait before retrying.
-			if _, shouldRetry := retryOnNotFound.Retry(err); !shouldRetry {
-				return nil, err
-			}
-			// Delete session and then retry with a new one.
-			sh.destroy()
-		}
-	}
-
-	res, err := beginTxWithRetry(ctx)
+	})
 	if err == nil {
 		tx = res.Id
 		if res.ReadTimestamp != nil {
diff --git a/spanner/transaction_test.go b/spanner/transaction_test.go
index 677b1b4..2976c96 100644
--- a/spanner/transaction_test.go
+++ b/spanner/transaction_test.go
@@ -195,64 +195,46 @@
 	}
 }
 
-// Tests that NotFound errors cause failures, and aren't retried, except for
-// BeginTransaction.
+// Tests that NotFound errors cause failures, and aren't retried.
 func TestTransaction_NotFound(t *testing.T) {
 	t.Parallel()
 	ctx := context.Background()
 	server, client, teardown := setupMockedTestServer(t)
 	defer teardown()
 
-	errSessionNotFound := spannerErrorf(codes.NotFound, "Session not found")
-	// BeginTransaction should retry automatically.
+	wantErr := spannerErrorf(codes.NotFound, "Session not found")
 	server.TestSpanner.PutExecutionTime(MethodBeginTransaction,
 		SimulatedExecutionTime{
-			Errors: []error{errSessionNotFound},
+			Errors: []error{wantErr, wantErr, wantErr},
 		})
-	txn := client.ReadOnlyTransaction()
-	if _, _, got := txn.acquire(ctx); got != nil {
-		t.Fatalf("Expect acquire to succeed, got %v, want nil.", got)
-	}
-	txn.Close()
-
-	// Query should fail with Session not found.
-	server.TestSpanner.PutExecutionTime(MethodExecuteStreamingSql,
-		SimulatedExecutionTime{
-			Errors: []error{errSessionNotFound},
-		})
-	txn = client.ReadOnlyTransaction()
-	iter := txn.Query(ctx, NewStatement("SELECT 1"))
-	_, got := iter.Next()
-	if !testEqual(errSessionNotFound, got) {
-		t.Fatalf("Expect Query to fail\ngot: %v\nwant: %v", got, errSessionNotFound)
-	}
-	iter.Stop()
-
-	// Read should fail with Session not found.
-	server.TestSpanner.PutExecutionTime(MethodStreamingRead,
-		SimulatedExecutionTime{
-			Errors: []error{errSessionNotFound},
-		})
-	txn = client.ReadOnlyTransaction()
-	iter = txn.Read(ctx, "Users", KeySets(Key{"alice"}, Key{"bob"}), []string{"name", "email"})
-	_, got = iter.Next()
-	if !testEqual(errSessionNotFound, got) {
-		t.Fatalf("Expect Read to fail\ngot: %v\nwant: %v", got, errSessionNotFound)
-	}
-	iter.Stop()
-
-	// Commit should fail with Session not found.
 	server.TestSpanner.PutExecutionTime(MethodCommitTransaction,
 		SimulatedExecutionTime{
-			Errors: []error{errSessionNotFound},
+			Errors: []error{wantErr, wantErr, wantErr},
 		})
 
+	txn := client.ReadOnlyTransaction()
+	defer txn.Close()
+
+	if _, _, got := txn.acquire(ctx); !testEqual(wantErr, got) {
+		t.Fatalf("Expect acquire to fail, got %v, want %v.", got, wantErr)
+	}
+
+	// The failure should recycle the session, we expect it to be used in
+	// following requests.
+	if got := txn.Query(ctx, NewStatement("SELECT 1")); !testEqual(wantErr, got.err) {
+		t.Fatalf("Expect Query to fail, got %v, want %v.", got.err, wantErr)
+	}
+
+	if got := txn.Read(ctx, "Users", KeySets(Key{"alice"}, Key{"bob"}), []string{"name", "email"}); !testEqual(wantErr, got.err) {
+		t.Fatalf("Expect Read to fail, got %v, want %v.", got.err, wantErr)
+	}
+
 	ms := []*Mutation{
 		Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}),
 		Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}),
 	}
-	if _, got := client.Apply(ctx, ms, ApplyAtLeastOnce()); !testEqual(errSessionNotFound, got) {
-		t.Fatalf("Expect Apply to fail\ngot: %v\nwant: %v", got, errSessionNotFound)
+	if _, got := client.Apply(ctx, ms, ApplyAtLeastOnce()); !testEqual(wantErr, got) {
+		t.Fatalf("Expect Apply to fail, got %v, want %v.", got, wantErr)
 	}
 }