datastore: add retry

As per https://cloud.google.com/datastore/docs/concepts/errors, we retry
on UNAVAILABLE and DEADLINE_EXCEEDED.

All the RPCs used are either idempotent or harmless to retry.

Fixes #652.

Change-Id: I27bf7c252a1a567ced598d0aec499205bd46322b
Reviewed-on: https://code-review.googlesource.com/15810
Reviewed-by: Michael Darakananda <pongad@google.com>
diff --git a/datastore/client.go b/datastore/client.go
index c55ac20..940bfec 100644
--- a/datastore/client.go
+++ b/datastore/client.go
@@ -17,11 +17,16 @@
 import (
 	"fmt"
 
+	gax "github.com/googleapis/gax-go"
+
+	"cloud.google.com/go/internal"
 	"cloud.google.com/go/internal/version"
 	"golang.org/x/net/context"
 	pb "google.golang.org/genproto/googleapis/datastore/v1"
 	"google.golang.org/grpc"
+	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/metadata"
+	"google.golang.org/grpc/status"
 )
 
 // datastoreClient is a wrapper for the pb.DatastoreClient that includes gRPC
@@ -44,26 +49,70 @@
 	}
 }
 
-func (dc *datastoreClient) Lookup(ctx context.Context, in *pb.LookupRequest, opts ...grpc.CallOption) (*pb.LookupResponse, error) {
-	return dc.c.Lookup(metadata.NewOutgoingContext(ctx, dc.md), in, opts...)
+func (dc *datastoreClient) Lookup(ctx context.Context, in *pb.LookupRequest, opts ...grpc.CallOption) (res *pb.LookupResponse, err error) {
+	err = dc.invoke(ctx, func(ctx context.Context) error {
+		res, err = dc.c.Lookup(ctx, in, opts...)
+		return err
+	})
+	return res, err
 }
 
-func (dc *datastoreClient) RunQuery(ctx context.Context, in *pb.RunQueryRequest, opts ...grpc.CallOption) (*pb.RunQueryResponse, error) {
-	return dc.c.RunQuery(metadata.NewOutgoingContext(ctx, dc.md), in, opts...)
+func (dc *datastoreClient) RunQuery(ctx context.Context, in *pb.RunQueryRequest, opts ...grpc.CallOption) (res *pb.RunQueryResponse, err error) {
+	err = dc.invoke(ctx, func(ctx context.Context) error {
+		res, err = dc.c.RunQuery(ctx, in, opts...)
+		return err
+	})
+	return res, err
 }
 
-func (dc *datastoreClient) BeginTransaction(ctx context.Context, in *pb.BeginTransactionRequest, opts ...grpc.CallOption) (*pb.BeginTransactionResponse, error) {
-	return dc.c.BeginTransaction(metadata.NewOutgoingContext(ctx, dc.md), in, opts...)
+func (dc *datastoreClient) BeginTransaction(ctx context.Context, in *pb.BeginTransactionRequest, opts ...grpc.CallOption) (res *pb.BeginTransactionResponse, err error) {
+	err = dc.invoke(ctx, func(ctx context.Context) error {
+		res, err = dc.c.BeginTransaction(ctx, in, opts...)
+		return err
+	})
+	return res, err
 }
 
-func (dc *datastoreClient) Commit(ctx context.Context, in *pb.CommitRequest, opts ...grpc.CallOption) (*pb.CommitResponse, error) {
-	return dc.c.Commit(metadata.NewOutgoingContext(ctx, dc.md), in, opts...)
+func (dc *datastoreClient) Commit(ctx context.Context, in *pb.CommitRequest, opts ...grpc.CallOption) (res *pb.CommitResponse, err error) {
+	err = dc.invoke(ctx, func(ctx context.Context) error {
+		res, err = dc.c.Commit(ctx, in, opts...)
+		return err
+	})
+	return res, err
 }
 
-func (dc *datastoreClient) Rollback(ctx context.Context, in *pb.RollbackRequest, opts ...grpc.CallOption) (*pb.RollbackResponse, error) {
-	return dc.c.Rollback(metadata.NewOutgoingContext(ctx, dc.md), in, opts...)
+func (dc *datastoreClient) Rollback(ctx context.Context, in *pb.RollbackRequest, opts ...grpc.CallOption) (res *pb.RollbackResponse, err error) {
+	err = dc.invoke(ctx, func(ctx context.Context) error {
+		res, err = dc.c.Rollback(ctx, in, opts...)
+		return err
+	})
+	return res, err
 }
 
-func (dc *datastoreClient) AllocateIds(ctx context.Context, in *pb.AllocateIdsRequest, opts ...grpc.CallOption) (*pb.AllocateIdsResponse, error) {
-	return dc.c.AllocateIds(metadata.NewOutgoingContext(ctx, dc.md), in, opts...)
+func (dc *datastoreClient) AllocateIds(ctx context.Context, in *pb.AllocateIdsRequest, opts ...grpc.CallOption) (res *pb.AllocateIdsResponse, err error) {
+	err = dc.invoke(ctx, func(ctx context.Context) error {
+		res, err = dc.c.AllocateIds(ctx, in, opts...)
+		return err
+	})
+	return res, err
+}
+
+func (dc *datastoreClient) invoke(ctx context.Context, f func(ctx context.Context) error) error {
+	ctx = metadata.NewOutgoingContext(ctx, dc.md)
+	return internal.Retry(ctx, gax.Backoff{}, func() (stop bool, err error) {
+		err = f(ctx)
+		return !shouldRetry(err), err
+	})
+}
+
+func shouldRetry(err error) bool {
+	if err == nil {
+		return false
+	}
+	s, ok := status.FromError(err)
+	if !ok {
+		return false
+	}
+	// See https://cloud.google.com/datastore/docs/concepts/errors.
+	return s.Code() == codes.Unavailable || s.Code() == codes.DeadlineExceeded
 }