datastore: add retry
As per https://cloud.google.com/datastore/docs/concepts/errors, we retry
on UNAVAILABLE and DEADLINE_EXCEEDED.
All the RPCs used are either idempotent or harmless to retry.
Fixes #652.
Change-Id: I27bf7c252a1a567ced598d0aec499205bd46322b
Reviewed-on: https://code-review.googlesource.com/15810
Reviewed-by: Michael Darakananda <pongad@google.com>
diff --git a/datastore/client.go b/datastore/client.go
index c55ac20..940bfec 100644
--- a/datastore/client.go
+++ b/datastore/client.go
@@ -17,11 +17,16 @@
import (
"fmt"
+ gax "github.com/googleapis/gax-go"
+
+ "cloud.google.com/go/internal"
"cloud.google.com/go/internal/version"
"golang.org/x/net/context"
pb "google.golang.org/genproto/googleapis/datastore/v1"
"google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
+ "google.golang.org/grpc/status"
)
// datastoreClient is a wrapper for the pb.DatastoreClient that includes gRPC
@@ -44,26 +49,70 @@
}
}
-func (dc *datastoreClient) Lookup(ctx context.Context, in *pb.LookupRequest, opts ...grpc.CallOption) (*pb.LookupResponse, error) {
- return dc.c.Lookup(metadata.NewOutgoingContext(ctx, dc.md), in, opts...)
+func (dc *datastoreClient) Lookup(ctx context.Context, in *pb.LookupRequest, opts ...grpc.CallOption) (res *pb.LookupResponse, err error) {
+ err = dc.invoke(ctx, func(ctx context.Context) error {
+ res, err = dc.c.Lookup(ctx, in, opts...)
+ return err
+ })
+ return res, err
}
-func (dc *datastoreClient) RunQuery(ctx context.Context, in *pb.RunQueryRequest, opts ...grpc.CallOption) (*pb.RunQueryResponse, error) {
- return dc.c.RunQuery(metadata.NewOutgoingContext(ctx, dc.md), in, opts...)
+func (dc *datastoreClient) RunQuery(ctx context.Context, in *pb.RunQueryRequest, opts ...grpc.CallOption) (res *pb.RunQueryResponse, err error) {
+ err = dc.invoke(ctx, func(ctx context.Context) error {
+ res, err = dc.c.RunQuery(ctx, in, opts...)
+ return err
+ })
+ return res, err
}
-func (dc *datastoreClient) BeginTransaction(ctx context.Context, in *pb.BeginTransactionRequest, opts ...grpc.CallOption) (*pb.BeginTransactionResponse, error) {
- return dc.c.BeginTransaction(metadata.NewOutgoingContext(ctx, dc.md), in, opts...)
+func (dc *datastoreClient) BeginTransaction(ctx context.Context, in *pb.BeginTransactionRequest, opts ...grpc.CallOption) (res *pb.BeginTransactionResponse, err error) {
+ err = dc.invoke(ctx, func(ctx context.Context) error {
+ res, err = dc.c.BeginTransaction(ctx, in, opts...)
+ return err
+ })
+ return res, err
}
-func (dc *datastoreClient) Commit(ctx context.Context, in *pb.CommitRequest, opts ...grpc.CallOption) (*pb.CommitResponse, error) {
- return dc.c.Commit(metadata.NewOutgoingContext(ctx, dc.md), in, opts...)
+func (dc *datastoreClient) Commit(ctx context.Context, in *pb.CommitRequest, opts ...grpc.CallOption) (res *pb.CommitResponse, err error) {
+ err = dc.invoke(ctx, func(ctx context.Context) error {
+ res, err = dc.c.Commit(ctx, in, opts...)
+ return err
+ })
+ return res, err
}
-func (dc *datastoreClient) Rollback(ctx context.Context, in *pb.RollbackRequest, opts ...grpc.CallOption) (*pb.RollbackResponse, error) {
- return dc.c.Rollback(metadata.NewOutgoingContext(ctx, dc.md), in, opts...)
+func (dc *datastoreClient) Rollback(ctx context.Context, in *pb.RollbackRequest, opts ...grpc.CallOption) (res *pb.RollbackResponse, err error) {
+ err = dc.invoke(ctx, func(ctx context.Context) error {
+ res, err = dc.c.Rollback(ctx, in, opts...)
+ return err
+ })
+ return res, err
}
-func (dc *datastoreClient) AllocateIds(ctx context.Context, in *pb.AllocateIdsRequest, opts ...grpc.CallOption) (*pb.AllocateIdsResponse, error) {
- return dc.c.AllocateIds(metadata.NewOutgoingContext(ctx, dc.md), in, opts...)
+func (dc *datastoreClient) AllocateIds(ctx context.Context, in *pb.AllocateIdsRequest, opts ...grpc.CallOption) (res *pb.AllocateIdsResponse, err error) {
+ err = dc.invoke(ctx, func(ctx context.Context) error {
+ res, err = dc.c.AllocateIds(ctx, in, opts...)
+ return err
+ })
+ return res, err
+}
+
+func (dc *datastoreClient) invoke(ctx context.Context, f func(ctx context.Context) error) error {
+ ctx = metadata.NewOutgoingContext(ctx, dc.md)
+ return internal.Retry(ctx, gax.Backoff{}, func() (stop bool, err error) {
+ err = f(ctx)
+ return !shouldRetry(err), err
+ })
+}
+
+func shouldRetry(err error) bool {
+ if err == nil {
+ return false
+ }
+ s, ok := status.FromError(err)
+ if !ok {
+ return false
+ }
+ // See https://cloud.google.com/datastore/docs/concepts/errors.
+ return s.Code() == codes.Unavailable || s.Code() == codes.DeadlineExceeded
}