blob: fd6cf27736fb13e22c66a93ecd0f730397db872b [file] [log] [blame]
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package actions
import (
"context"
"log"
"strings"
"sync"
"time"
"cloud.google.com/go/spanner"
"cloud.google.com/go/spanner/executor/apiv1/executorpb"
"cloud.google.com/go/spanner/test/cloudexecutor/executor/internal/utility"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
type currentActiveTransaction int
// Current transaction status
const (
None currentActiveTransaction = iota
Read
ReadWrite
Batch
)
// ExecutionFlowContext represents a context in which SpannerActions are executed. Among other
// things, it includes currently active transactions and table metadata. There is exactly one
// instance of this per stubby call, created when the call is initialized and shared with all
// actionHandlers.
type ExecutionFlowContext struct {
mu sync.Mutex // protects all internal state
Database string // current database path
rwTxn *spanner.ReadWriteStmtBasedTransaction // Current read-write transaction
roTxn *spanner.ReadOnlyTransaction // Current read-only transaction
batchTxn *spanner.BatchReadOnlyTransaction // Current batch read-only transaction
DbClient *spanner.Client // Current database client
tableMetadata *utility.TableMetadataHelper // If in a txn (except batch), this has metadata info about table columns
numPendingReads int64 // Number of pending read/query actions.
readAborted bool // Indicate whether there's a read/query action got aborted and the transaction need to be reset.
transactionSeed string // Log the workid and op pair for tracing the thread.
currentActiveTransaction currentActiveTransaction
TxnContext context.Context
}
// isTransactionActiveLocked returns true if any kind of transaction is currently active. Must hold c.mu
// when calling.
func (c *ExecutionFlowContext) isTransactionActiveLocked() bool {
return c.rwTxn != nil || c.roTxn != nil || c.batchTxn != nil
}
// IsTransactionActive returns true if any kind of transaction is currently active.
func (c *ExecutionFlowContext) IsTransactionActive() bool {
c.mu.Lock()
defer c.mu.Unlock()
return c.isTransactionActiveLocked()
}
// Return current database. Must hold c.mu when calling.
func (c *ExecutionFlowContext) getDatabase() (string, error) {
if c.Database == "" {
return "", status.Error(codes.InvalidArgument, "database doesn't exist")
}
return c.Database, nil
}
// Return current transaction that can be used for performing read/query actions. Must hold c.mu when calling.
func (c *ExecutionFlowContext) getTransactionForRead() (*spanner.ReadOnlyTransaction, error) {
if c.roTxn != nil {
return c.roTxn, nil
}
return nil, status.Error(codes.InvalidArgument, "no currently active transaction for read")
}
// Return current transaction that can be used for performing mutation/update actions. Must hold c.mu when calling.
func (c *ExecutionFlowContext) getTransactionForWrite() (*spanner.ReadWriteStmtBasedTransaction, error) {
if c.rwTxn != nil {
return c.rwTxn, nil
}
return nil, status.Error(codes.InvalidArgument, "no currently active transaction for read-write")
}
// Return current transaction that can be used for performing batch actions. Must hold c.mu when calling.
func (c *ExecutionFlowContext) getBatchTransaction() (*spanner.BatchReadOnlyTransaction, error) {
if c.batchTxn != nil {
return c.batchTxn, nil
}
return nil, status.Error(codes.InvalidArgument, "no currently active batch transaction")
}
// Increase the read count when a read/query is issued.
func (c *ExecutionFlowContext) startRead() {
c.numPendingReads++
}
// Decrease the read count when a read/query is finished, if status is aborted and there's no
// pending read/query, reset the transaction for retry.
func (c *ExecutionFlowContext) finishRead(code codes.Code) {
if code == codes.Aborted {
c.readAborted = true
}
c.numPendingReads--
if c.readAborted && c.numPendingReads <= 0 {
log.Println("Transaction reset due to read/query abort")
c.readAborted = false
}
}
// Initialize the read count and aborted status when transaction started.
func (c *ExecutionFlowContext) initReadState() {
c.readAborted = false
c.numPendingReads = 0
}
// clear clears the transaction related variables.
func (c *ExecutionFlowContext) clear() {
c.roTxn = nil
c.rwTxn = nil
c.tableMetadata = nil
}
// finish attempts to finish the transaction by either committing it or exiting without committing.
// In order to follow the ExecuteActions protocol, we must distinguish Spanner-generated errors
// (e.g. RowNotFound) and internal errors (e.g. a precondition is not met). Errors returned from
// Spanner populate the status of SpannerActionOutcome. Internal errors, in contrast, break the
// stubby call. For this reason, finish() has two return values dedicated to errors. If any of
// these errors is not nil, other return values are undefined.
// Return values in order:
// 1. Whether or not the transaction is restarted. It will be true if commit has been attempted,
// but Spanner returned aborted and restarted instead. When that happens, all reads and writes
// should be replayed, followed by another commit attempt.
// 2. Commit timestamp. It's returned only if commit has succeeded.
// 3. Spanner error -- an error that Spanner client returned.
// 4. Internal error.
func (c *ExecutionFlowContext) finish(ctx context.Context, txnFinishMode executorpb.FinishTransactionAction_Mode) (bool, *time.Time, error, error) {
if txnFinishMode == executorpb.FinishTransactionAction_COMMIT {
var err error
ts, err := c.rwTxn.Commit(ctx)
if err != nil {
log.Printf("Transaction finished with error %v", err)
if status.Code(err) == codes.Aborted {
log.Println("Transaction Aborted. Sending status to outcome sender to restart the transaction.")
return true, nil, nil, nil
}
// Filter expected errors
if status.Code(err) == codes.Unknown && strings.Contains(err.Error(), "Transaction outcome unknown") {
return false, nil, spanner.ToSpannerError(status.Error(codes.DeadlineExceeded, "Transaction outcome unknown")), nil
}
// TODO(harsha): check if this is an internal or spanner error
return false, nil, err, nil
}
return false, &ts, nil, nil
} else if txnFinishMode == executorpb.FinishTransactionAction_ABANDON {
log.Printf("Transaction Abandoned")
c.rwTxn.Rollback(ctx)
return false, nil, nil, nil
} else {
return false, nil, nil, spanner.ToSpannerError(status.Errorf(codes.InvalidArgument, "Unsupported finish mode %s", txnFinishMode.String()))
}
}
// CloseOpenTransactions cleans up all the active transactions if the stubby call is closing.
func (c *ExecutionFlowContext) CloseOpenTransactions() {
c.mu.Lock()
defer c.mu.Unlock()
if c.roTxn != nil {
log.Println("A read only transaction was active when stubby call closed.")
c.roTxn.Close()
}
if c.rwTxn != nil {
log.Println("Abandon a read-write transaction that was active when stubby call closed.")
_, _, _, err := c.finish(context.Background(), executorpb.FinishTransactionAction_ABANDON)
if err != nil {
log.Printf("Failed to abandon a read-write transaction: %v\n", err)
}
}
if c.batchTxn != nil {
log.Println("A batch transaction was active when stubby call closed.")
c.batchTxn.Close()
}
}