blob: af5c427e7f5a661b519ee4ed0e6a26363ffd0d2b [file] [log] [blame]
// Copyright 2014 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package datastore
import (
"errors"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
pb "google.golang.org/genproto/googleapis/datastore/v1"
)
// ErrConcurrentTransaction is returned when a transaction is rolled back due
// to a conflict with a concurrent transaction.
var ErrConcurrentTransaction = errors.New("datastore: concurrent transaction")
var errExpiredTransaction = errors.New("datastore: transaction expired")
type transactionSettings struct {
attempts int
}
// newTransactionSettings creates a transactionSettings with a given TransactionOption slice.
// Unconfigured options will be set to default values.
func newTransactionSettings(opts []TransactionOption) *transactionSettings {
s := &transactionSettings{attempts: 3}
for _, o := range opts {
o.apply(s)
}
return s
}
// TransactionOption configures the way a transaction is executed.
type TransactionOption interface {
apply(*transactionSettings)
}
// MaxAttempts returns a TransactionOption that overrides the default 3 attempt times.
func MaxAttempts(attempts int) TransactionOption {
return maxAttempts(attempts)
}
type maxAttempts int
func (w maxAttempts) apply(s *transactionSettings) {
if w > 0 {
s.attempts = int(w)
}
}
// Transaction represents a set of datastore operations to be committed atomically.
//
// Operations are enqueued by calling the Put and Delete methods on Transaction
// (or their Multi-equivalents). These operations are only committed when the
// Commit method is invoked. To ensure consistency, reads must be performed by
// using Transaction's Get method or by using the Transaction method when
// building a query.
//
// A Transaction must be committed or rolled back exactly once.
type Transaction struct {
id []byte
client *Client
ctx context.Context
mutations []*pb.Mutation // The mutations to apply.
pending map[int]*PendingKey // Map from mutation index to incomplete keys pending transaction completion.
}
// NewTransaction starts a new transaction.
func (c *Client) NewTransaction(ctx context.Context, opts ...TransactionOption) (*Transaction, error) {
for _, o := range opts {
if _, ok := o.(maxAttempts); ok {
return nil, errors.New("datastore: NewTransaction does not accept MaxAttempts option")
}
}
req := &pb.BeginTransactionRequest{
ProjectId: c.dataset,
}
resp, err := c.client.BeginTransaction(ctx, req)
if err != nil {
return nil, err
}
return &Transaction{
id: resp.Transaction,
ctx: ctx,
client: c,
mutations: nil,
pending: make(map[int]*PendingKey),
}, nil
}
// RunInTransaction runs f in a transaction. f is invoked with a Transaction
// that f should use for all the transaction's datastore operations.
//
// f must not call Commit or Rollback on the provided Transaction.
//
// If f returns nil, RunInTransaction commits the transaction,
// returning the Commit and a nil error if it succeeds. If the commit fails due
// to a conflicting transaction, RunInTransaction retries f with a new
// Transaction. It gives up and returns ErrConcurrentTransaction after three
// failed attempts (or as configured with MaxAttempts).
//
// If f returns non-nil, then the transaction will be rolled back and
// RunInTransaction will return the same error. The function f is not retried.
//
// Note that when f returns, the transaction is not committed. Calling code
// must not assume that any of f's changes have been committed until
// RunInTransaction returns nil.
//
// Since f may be called multiple times, f should usually be idempotent – that
// is, it should have the same result when called multiple times. Note that
// Transaction.Get will append when unmarshalling slice fields, so it is not
// necessarily idempotent.
func (c *Client) RunInTransaction(ctx context.Context, f func(tx *Transaction) error, opts ...TransactionOption) (*Commit, error) {
settings := newTransactionSettings(opts)
for n := 0; n < settings.attempts; n++ {
tx, err := c.NewTransaction(ctx)
if err != nil {
return nil, err
}
if err := f(tx); err != nil {
tx.Rollback()
return nil, err
}
if cmt, err := tx.Commit(); err != ErrConcurrentTransaction {
return cmt, err
}
}
return nil, ErrConcurrentTransaction
}
// Commit applies the enqueued operations atomically.
func (t *Transaction) Commit() (*Commit, error) {
if t.id == nil {
return nil, errExpiredTransaction
}
req := &pb.CommitRequest{
ProjectId: t.client.dataset,
TransactionSelector: &pb.CommitRequest_Transaction{Transaction: t.id},
Mutations: t.mutations,
Mode: pb.CommitRequest_TRANSACTIONAL,
}
t.id = nil
resp, err := t.client.client.Commit(t.ctx, req)
if err != nil {
if grpc.Code(err) == codes.Aborted {
return nil, ErrConcurrentTransaction
}
return nil, err
}
// Copy any newly minted keys into the returned keys.
commit := &Commit{}
for i, p := range t.pending {
if i >= len(resp.MutationResults) || resp.MutationResults[i].Key == nil {
return nil, errors.New("datastore: internal error: server returned the wrong mutation results")
}
key, err := protoToKey(resp.MutationResults[i].Key)
if err != nil {
return nil, errors.New("datastore: internal error: server returned an invalid key")
}
p.key = key
p.commit = commit
}
return commit, nil
}
// Rollback abandons a pending transaction.
func (t *Transaction) Rollback() error {
if t.id == nil {
return errExpiredTransaction
}
id := t.id
t.id = nil
_, err := t.client.client.Rollback(t.ctx, &pb.RollbackRequest{
ProjectId: t.client.dataset,
Transaction: id,
})
return err
}
// Get is the transaction-specific version of the package function Get.
// All reads performed during the transaction will come from a single consistent
// snapshot. Furthermore, if the transaction is set to a serializable isolation
// level, another transaction cannot concurrently modify the data that is read
// or modified by this transaction.
func (t *Transaction) Get(key *Key, dst interface{}) error {
opts := &pb.ReadOptions{
ConsistencyType: &pb.ReadOptions_Transaction{Transaction: t.id},
}
err := t.client.get(t.ctx, []*Key{key}, []interface{}{dst}, opts)
if me, ok := err.(MultiError); ok {
return me[0]
}
return err
}
// GetMulti is a batch version of Get.
func (t *Transaction) GetMulti(keys []*Key, dst interface{}) error {
if t.id == nil {
return errExpiredTransaction
}
opts := &pb.ReadOptions{
ConsistencyType: &pb.ReadOptions_Transaction{Transaction: t.id},
}
return t.client.get(t.ctx, keys, dst, opts)
}
// Put is the transaction-specific version of the package function Put.
//
// Put returns a PendingKey which can be resolved into a Key using the
// return value from a successful Commit. If key is an incomplete key, the
// returned pending key will resolve to a unique key generated by the
// datastore.
func (t *Transaction) Put(key *Key, src interface{}) (*PendingKey, error) {
h, err := t.PutMulti([]*Key{key}, []interface{}{src})
if err != nil {
if me, ok := err.(MultiError); ok {
return nil, me[0]
}
return nil, err
}
return h[0], nil
}
// PutMulti is a batch version of Put. One PendingKey is returned for each
// element of src in the same order.
func (t *Transaction) PutMulti(keys []*Key, src interface{}) ([]*PendingKey, error) {
if t.id == nil {
return nil, errExpiredTransaction
}
mutations, err := putMutations(keys, src)
if err != nil {
return nil, err
}
origin := len(t.mutations)
t.mutations = append(t.mutations, mutations...)
// Prepare the returned handles, pre-populating where possible.
ret := make([]*PendingKey, len(keys))
for i, key := range keys {
p := &PendingKey{}
if key.Incomplete() {
// This key will be in the final commit result.
t.pending[origin+i] = p
} else {
p.key = key
}
ret[i] = p
}
return ret, nil
}
// Delete is the transaction-specific version of the package function Delete.
// Delete enqueues the deletion of the entity for the given key, to be
// committed atomically upon calling Commit.
func (t *Transaction) Delete(key *Key) error {
err := t.DeleteMulti([]*Key{key})
if me, ok := err.(MultiError); ok {
return me[0]
}
return err
}
// DeleteMulti is a batch version of Delete.
func (t *Transaction) DeleteMulti(keys []*Key) error {
if t.id == nil {
return errExpiredTransaction
}
mutations, err := deleteMutations(keys)
if err != nil {
return err
}
t.mutations = append(t.mutations, mutations...)
return nil
}
// Commit represents the result of a committed transaction.
type Commit struct{}
// Key resolves a pending key handle into a final key.
func (c *Commit) Key(p *PendingKey) *Key {
if c != p.commit {
panic("PendingKey was not created by corresponding transaction")
}
return p.key
}
// PendingKey represents the key for newly-inserted entity. It can be
// resolved into a Key by calling the Key method of Commit.
type PendingKey struct {
key *Key
commit *Commit
}