pubsub: add dead lettering support

This change adds the fields necessary to create subscriptions
with a dead letter policy. This allows messages to be redelivered
to a separate dead letter topic after a configured number of
delivery attempts have failed (nack/ack_deadline expired).

Change-Id: I78da592caa68d84a24c696708129be3cc912a7e1
Reviewed-on: https://code-review.googlesource.com/c/gocloud/+/48790
Reviewed-by: kokoro <noreply+kokoro@google.com>
Reviewed-by: Cody Oss <codyoss@google.com>
diff --git a/pubsub/go.sum b/pubsub/go.sum
index 6a45c7c..14603e3 100644
--- a/pubsub/go.sum
+++ b/pubsub/go.sum
@@ -81,6 +81,7 @@
 go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
 golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
 golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
+golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5 h1:58fnuSXlxZmFdJyvtTFVmVhcMLU6v5fEb/ok4wyqtNU=
 golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
 golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
 golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
diff --git a/pubsub/integration_test.go b/pubsub/integration_test.go
index a6254a3..d8e940a 100644
--- a/pubsub/integration_test.go
+++ b/pubsub/integration_test.go
@@ -1078,3 +1078,81 @@
 		t.Fatalf("\ngot: - want: +\n%s", diff)
 	}
 }
+
+func TestIntegration_CreateSubscription_DeadLetterPolicy(t *testing.T) {
+	ctx := context.Background()
+	client := integrationTestClient(ctx, t)
+	defer client.Close()
+
+	topic, err := client.CreateTopic(ctx, topicIDs.New())
+	if err != nil {
+		t.Fatalf("CreateTopic error: %v", err)
+	}
+	defer topic.Delete(ctx)
+	defer topic.Stop()
+
+	deadLetterTopic, err := client.CreateTopic(ctx, topicIDs.New())
+	if err != nil {
+		t.Fatalf("CreateTopic error: %v", err)
+	}
+	defer deadLetterTopic.Delete(ctx)
+	defer deadLetterTopic.Stop()
+
+	// We don't set MaxDeliveryAttempts in DeadLetterPolicy so that we can test
+	// that MaxDeliveryAttempts defaults properly to 5 if not set.
+	cfg := SubscriptionConfig{
+		Topic: topic,
+		DeadLetterPolicy: &DeadLetterPolicy{
+			DeadLetterTopic: deadLetterTopic.String(),
+		},
+	}
+	var sub *Subscription
+	if sub, err = client.CreateSubscription(ctx, subIDs.New(), cfg); err != nil {
+		t.Fatalf("CreateSub error: %v", err)
+	}
+	defer sub.Delete(ctx)
+
+	got, err := sub.Config(ctx)
+	if err != nil {
+		t.Fatal(err)
+	}
+	want := SubscriptionConfig{
+		Topic:               topic,
+		AckDeadline:         10 * time.Second,
+		RetainAckedMessages: false,
+		RetentionDuration:   defaultRetentionDuration,
+		ExpirationPolicy:    defaultExpirationPolicy,
+		DeadLetterPolicy: &DeadLetterPolicy{
+			DeadLetterTopic:     deadLetterTopic.String(),
+			MaxDeliveryAttempts: 5,
+		},
+	}
+	if diff := testutil.Diff(got, want); diff != "" {
+		t.Fatalf("\ngot: - want: +\n%s", diff)
+	}
+
+	res := topic.Publish(ctx, &Message{
+		Data: []byte("failed message"),
+	})
+	if _, err := res.Get(ctx); err != nil {
+		t.Fatalf("Publish message error: %v", err)
+	}
+
+	ctx2, cancel := context.WithCancel(ctx)
+	numAttempts := 1
+	err = sub.Receive(ctx2, func(_ context.Context, m *Message) {
+		if numAttempts >= 5 {
+			cancel()
+			m.Ack()
+			return
+		}
+		if m.DeliveryAttempt != numAttempts {
+			t.Fatalf("Message delivery attempt: %d does not match numAttempts: %d\n", m.DeliveryAttempt, numAttempts)
+		}
+		numAttempts++
+		m.Nack()
+	})
+	if err != nil {
+		t.Fatalf("Streaming pull error: %v\n", err)
+	}
+}
diff --git a/pubsub/message.go b/pubsub/message.go
index c4b16e9..65003fb 100644
--- a/pubsub/message.go
+++ b/pubsub/message.go
@@ -46,6 +46,18 @@
 	// receiveTime is the time the message was received by the client.
 	receiveTime time.Time
 
+	// DeliveryAttempt is the number of times a message has been delivered.
+	// This is part of the dead lettering feature that forwards messages that
+	// fail to be processed (from nack/ack deadline timeout) to a dead letter topic.
+	// If dead lettering is enabled, this will be set on all attempts starting
+	// value 1. Otherwise, the value will be 0.
+	// This field is read-only.
+	//
+	// It is EXPERIMENTAL and a part of a closed alpha that may not be
+	// accessible to all users. This field is subject to change or removal
+	// without notice.
+	DeliveryAttempt int
+
 	// size is the approximate size of the message's data and attributes.
 	size int
 
@@ -64,12 +76,14 @@
 	if err != nil {
 		return nil, err
 	}
+
 	return &Message{
-		ackID:       resp.AckId,
-		Data:        resp.Message.Data,
-		Attributes:  resp.Message.Attributes,
-		ID:          resp.Message.MessageId,
-		PublishTime: pubTime,
+		ackID:           resp.AckId,
+		Data:            resp.Message.Data,
+		Attributes:      resp.Message.Attributes,
+		ID:              resp.Message.MessageId,
+		PublishTime:     pubTime,
+		DeliveryAttempt: int(resp.DeliveryAttempt),
 	}, nil
 }
 
diff --git a/pubsub/subscription.go b/pubsub/subscription.go
index c7239e7..3d8229d 100644
--- a/pubsub/subscription.go
+++ b/pubsub/subscription.go
@@ -221,6 +221,14 @@
 
 	// The set of labels for the subscription.
 	Labels map[string]string
+
+	// DeadLetterPolicy specifies the conditions for dead lettering messages in
+	// a subscription. If not set, dead lettering is disabled.
+	//
+	// It is EXPERIMENTAL and a part of a closed alpha that may not be
+	// accessible to all users. This field is subject to change or removal
+	// without notice.
+	DeadLetterPolicy *DeadLetterPolicy
 }
 
 func (cfg *SubscriptionConfig) toProto(name string) *pb.Subscription {
@@ -232,6 +240,10 @@
 	if cfg.RetentionDuration != 0 {
 		retentionDuration = ptypes.DurationProto(cfg.RetentionDuration)
 	}
+	var pbDeadLetter *pb.DeadLetterPolicy
+	if cfg.DeadLetterPolicy != nil {
+		pbDeadLetter = cfg.DeadLetterPolicy.toProto()
+	}
 	return &pb.Subscription{
 		Name:                     name,
 		Topic:                    cfg.Topic.name,
@@ -241,6 +253,7 @@
 		MessageRetentionDuration: retentionDuration,
 		Labels:                   cfg.Labels,
 		ExpirationPolicy:         expirationPolicyToProto(cfg.ExpirationPolicy),
+		DeadLetterPolicy:         pbDeadLetter,
 	}
 }
 
@@ -260,6 +273,7 @@
 			return SubscriptionConfig{}, err
 		}
 	}
+	dlp := protoToDLP(pbSub.DeadLetterPolicy)
 	subC := SubscriptionConfig{
 		Topic:               newTopic(c, pbSub.Topic),
 		AckDeadline:         time.Second * time.Duration(pbSub.AckDeadlineSeconds),
@@ -267,6 +281,7 @@
 		RetentionDuration:   rd,
 		Labels:              pbSub.Labels,
 		ExpirationPolicy:    expirationPolicy,
+		DeadLetterPolicy:    dlp,
 	}
 	pc := protoToPushConfig(pbSub.PushConfig)
 	if pc != nil {
@@ -294,6 +309,35 @@
 	return pc
 }
 
+// DeadLetterPolicy specifies the conditions for dead lettering messages in
+// a subscription.
+//
+// It is EXPERIMENTAL and a part of a closed alpha that may not be
+// accessible to all users.
+type DeadLetterPolicy struct {
+	DeadLetterTopic     string
+	MaxDeliveryAttempts int
+}
+
+func (dlp *DeadLetterPolicy) toProto() *pb.DeadLetterPolicy {
+	if dlp == nil {
+		return nil
+	}
+	return &pb.DeadLetterPolicy{
+		DeadLetterTopic:     dlp.DeadLetterTopic,
+		MaxDeliveryAttempts: int32(dlp.MaxDeliveryAttempts),
+	}
+}
+func protoToDLP(pbDLP *pb.DeadLetterPolicy) *DeadLetterPolicy {
+	if pbDLP == nil {
+		return nil
+	}
+	return &DeadLetterPolicy{
+		DeadLetterTopic:     pbDLP.GetDeadLetterTopic(),
+		MaxDeliveryAttempts: int(pbDLP.MaxDeliveryAttempts),
+	}
+}
+
 // ReceiveSettings configure the Receive method.
 // A zero ReceiveSettings will result in values equivalent to DefaultReceiveSettings.
 type ReceiveSettings struct {
@@ -428,6 +472,12 @@
 	// If non-zero, Expiration is changed.
 	ExpirationPolicy optional.Duration
 
+	// If non-nil, DeadLetterPolicy is changed.
+	//
+	// It is EXPERIMENTAL and a part of a closed alpha that may not be
+	// accessible to all users.
+	DeadLetterPolicy *DeadLetterPolicy
+
 	// If non-nil, the current set of labels is completely
 	// replaced by the new set.
 	// This field has beta status. It is not subject to the stability guarantee
@@ -477,6 +527,10 @@
 		psub.ExpirationPolicy = expirationPolicyToProto(cfg.ExpirationPolicy)
 		paths = append(paths, "expiration_policy")
 	}
+	if cfg.DeadLetterPolicy != nil {
+		psub.DeadLetterPolicy = cfg.DeadLetterPolicy.toProto()
+		paths = append(paths, "dead_letter_policy")
+	}
 	if cfg.Labels != nil {
 		psub.Labels = cfg.Labels
 		paths = append(paths, "labels")
diff --git a/pubsub/subscription_test.go b/pubsub/subscription_test.go
index dfe24d8..56e3183 100644
--- a/pubsub/subscription_test.go
+++ b/pubsub/subscription_test.go
@@ -22,6 +22,7 @@
 
 	"cloud.google.com/go/internal/testutil"
 	"cloud.google.com/go/pubsub/pstest"
+	"github.com/golang/protobuf/ptypes"
 	"google.golang.org/api/iterator"
 	"google.golang.org/api/option"
 	pb "google.golang.org/genproto/googleapis/pubsub/v1"
@@ -313,3 +314,49 @@
 		t.Errorf("Roundtrip to Proto failed\ngot: - want: +\n%s", diff)
 	}
 }
+
+func TestDeadLettering_toProto(t *testing.T) {
+	in := &DeadLetterPolicy{
+		MaxDeliveryAttempts: 10,
+		DeadLetterTopic:     "projects/p/topics/t",
+	}
+	got := in.toProto()
+	want := &pb.DeadLetterPolicy{
+		DeadLetterTopic:     "projects/p/topics/t",
+		MaxDeliveryAttempts: 10,
+	}
+	if diff := testutil.Diff(got, want); diff != "" {
+		t.Errorf("Roundtrip to Proto failed\ngot: - want: +\n%s", diff)
+	}
+}
+
+// Check if incoming ReceivedMessages are properly converted to Message structs
+// that expose the DeliveryAttempt field when dead lettering is enabled/disabled.
+func TestDeadLettering_toMessage(t *testing.T) {
+	// If dead lettering is disabled, DeliveryAttempt should default to 0.
+	receivedMsg := &pb.ReceivedMessage{
+		AckId: "1234",
+		Message: &pb.PubsubMessage{
+			Data:        []byte("some message"),
+			MessageId:   "id-1234",
+			PublishTime: ptypes.TimestampNow(),
+		},
+	}
+	got, err := toMessage(receivedMsg)
+	if err != nil {
+		t.Errorf("toMessage failed: %v", err)
+	}
+	if got.DeliveryAttempt != 0 {
+		t.Errorf("toMessage with dead-lettering disabled failed\ngot: %d, want 0", got.DeliveryAttempt)
+	}
+
+	// If dead lettering is enabled, toMessage should properly pass through the DeliveryAttempt field.
+	receivedMsg.DeliveryAttempt = 10
+	got, err = toMessage(receivedMsg)
+	if err != nil {
+		t.Errorf("toMessage failed: %v", err)
+	}
+	if got.DeliveryAttempt != int(receivedMsg.DeliveryAttempt) {
+		t.Errorf("toMessage with dead-lettered enabled failed\ngot: %d, want %d", got.DeliveryAttempt, receivedMsg.DeliveryAttempt)
+	}
+}