pubsub: add dead lettering support
This change adds the fields necessary to create subscriptions
with a dead letter policy. This allows messages to be redelivered
to a separate dead letter topic after a configured number of
delivery attempts have failed (nack/ack_deadline expired).
Change-Id: I78da592caa68d84a24c696708129be3cc912a7e1
Reviewed-on: https://code-review.googlesource.com/c/gocloud/+/48790
Reviewed-by: kokoro <noreply+kokoro@google.com>
Reviewed-by: Cody Oss <codyoss@google.com>
diff --git a/pubsub/go.sum b/pubsub/go.sum
index 6a45c7c..14603e3 100644
--- a/pubsub/go.sum
+++ b/pubsub/go.sum
@@ -81,6 +81,7 @@
go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
+golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5 h1:58fnuSXlxZmFdJyvtTFVmVhcMLU6v5fEb/ok4wyqtNU=
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
diff --git a/pubsub/integration_test.go b/pubsub/integration_test.go
index a6254a3..d8e940a 100644
--- a/pubsub/integration_test.go
+++ b/pubsub/integration_test.go
@@ -1078,3 +1078,81 @@
t.Fatalf("\ngot: - want: +\n%s", diff)
}
}
+
+func TestIntegration_CreateSubscription_DeadLetterPolicy(t *testing.T) {
+ ctx := context.Background()
+ client := integrationTestClient(ctx, t)
+ defer client.Close()
+
+ topic, err := client.CreateTopic(ctx, topicIDs.New())
+ if err != nil {
+ t.Fatalf("CreateTopic error: %v", err)
+ }
+ defer topic.Delete(ctx)
+ defer topic.Stop()
+
+ deadLetterTopic, err := client.CreateTopic(ctx, topicIDs.New())
+ if err != nil {
+ t.Fatalf("CreateTopic error: %v", err)
+ }
+ defer deadLetterTopic.Delete(ctx)
+ defer deadLetterTopic.Stop()
+
+ // We don't set MaxDeliveryAttempts in DeadLetterPolicy so that we can test
+ // that MaxDeliveryAttempts defaults properly to 5 if not set.
+ cfg := SubscriptionConfig{
+ Topic: topic,
+ DeadLetterPolicy: &DeadLetterPolicy{
+ DeadLetterTopic: deadLetterTopic.String(),
+ },
+ }
+ var sub *Subscription
+ if sub, err = client.CreateSubscription(ctx, subIDs.New(), cfg); err != nil {
+ t.Fatalf("CreateSub error: %v", err)
+ }
+ defer sub.Delete(ctx)
+
+ got, err := sub.Config(ctx)
+ if err != nil {
+ t.Fatal(err)
+ }
+ want := SubscriptionConfig{
+ Topic: topic,
+ AckDeadline: 10 * time.Second,
+ RetainAckedMessages: false,
+ RetentionDuration: defaultRetentionDuration,
+ ExpirationPolicy: defaultExpirationPolicy,
+ DeadLetterPolicy: &DeadLetterPolicy{
+ DeadLetterTopic: deadLetterTopic.String(),
+ MaxDeliveryAttempts: 5,
+ },
+ }
+ if diff := testutil.Diff(got, want); diff != "" {
+ t.Fatalf("\ngot: - want: +\n%s", diff)
+ }
+
+ res := topic.Publish(ctx, &Message{
+ Data: []byte("failed message"),
+ })
+ if _, err := res.Get(ctx); err != nil {
+ t.Fatalf("Publish message error: %v", err)
+ }
+
+ ctx2, cancel := context.WithCancel(ctx)
+ numAttempts := 1
+ err = sub.Receive(ctx2, func(_ context.Context, m *Message) {
+ if numAttempts >= 5 {
+ cancel()
+ m.Ack()
+ return
+ }
+ if m.DeliveryAttempt != numAttempts {
+ t.Fatalf("Message delivery attempt: %d does not match numAttempts: %d\n", m.DeliveryAttempt, numAttempts)
+ }
+ numAttempts++
+ m.Nack()
+ })
+ if err != nil {
+ t.Fatalf("Streaming pull error: %v\n", err)
+ }
+}
diff --git a/pubsub/message.go b/pubsub/message.go
index c4b16e9..65003fb 100644
--- a/pubsub/message.go
+++ b/pubsub/message.go
@@ -46,6 +46,18 @@
// receiveTime is the time the message was received by the client.
receiveTime time.Time
+ // DeliveryAttempt is the number of times a message has been delivered.
+ // This is part of the dead lettering feature that forwards messages that
+ // fail to be processed (from nack/ack deadline timeout) to a dead letter topic.
+ // If dead lettering is enabled, this will be set on all attempts starting
+ // value 1. Otherwise, the value will be 0.
+ // This field is read-only.
+ //
+ // It is EXPERIMENTAL and a part of a closed alpha that may not be
+ // accessible to all users. This field is subject to change or removal
+ // without notice.
+ DeliveryAttempt int
+
// size is the approximate size of the message's data and attributes.
size int
@@ -64,12 +76,14 @@
if err != nil {
return nil, err
}
+
return &Message{
- ackID: resp.AckId,
- Data: resp.Message.Data,
- Attributes: resp.Message.Attributes,
- ID: resp.Message.MessageId,
- PublishTime: pubTime,
+ ackID: resp.AckId,
+ Data: resp.Message.Data,
+ Attributes: resp.Message.Attributes,
+ ID: resp.Message.MessageId,
+ PublishTime: pubTime,
+ DeliveryAttempt: int(resp.DeliveryAttempt),
}, nil
}
diff --git a/pubsub/subscription.go b/pubsub/subscription.go
index c7239e7..3d8229d 100644
--- a/pubsub/subscription.go
+++ b/pubsub/subscription.go
@@ -221,6 +221,14 @@
// The set of labels for the subscription.
Labels map[string]string
+
+ // DeadLetterPolicy specifies the conditions for dead lettering messages in
+ // a subscription. If not set, dead lettering is disabled.
+ //
+ // It is EXPERIMENTAL and a part of a closed alpha that may not be
+ // accessible to all users. This field is subject to change or removal
+ // without notice.
+ DeadLetterPolicy *DeadLetterPolicy
}
func (cfg *SubscriptionConfig) toProto(name string) *pb.Subscription {
@@ -232,6 +240,10 @@
if cfg.RetentionDuration != 0 {
retentionDuration = ptypes.DurationProto(cfg.RetentionDuration)
}
+ var pbDeadLetter *pb.DeadLetterPolicy
+ if cfg.DeadLetterPolicy != nil {
+ pbDeadLetter = cfg.DeadLetterPolicy.toProto()
+ }
return &pb.Subscription{
Name: name,
Topic: cfg.Topic.name,
@@ -241,6 +253,7 @@
MessageRetentionDuration: retentionDuration,
Labels: cfg.Labels,
ExpirationPolicy: expirationPolicyToProto(cfg.ExpirationPolicy),
+ DeadLetterPolicy: pbDeadLetter,
}
}
@@ -260,6 +273,7 @@
return SubscriptionConfig{}, err
}
}
+ dlp := protoToDLP(pbSub.DeadLetterPolicy)
subC := SubscriptionConfig{
Topic: newTopic(c, pbSub.Topic),
AckDeadline: time.Second * time.Duration(pbSub.AckDeadlineSeconds),
@@ -267,6 +281,7 @@
RetentionDuration: rd,
Labels: pbSub.Labels,
ExpirationPolicy: expirationPolicy,
+ DeadLetterPolicy: dlp,
}
pc := protoToPushConfig(pbSub.PushConfig)
if pc != nil {
@@ -294,6 +309,35 @@
return pc
}
+// DeadLetterPolicy specifies the conditions for dead lettering messages in
+// a subscription.
+//
+// It is EXPERIMENTAL and a part of a closed alpha that may not be
+// accessible to all users.
+type DeadLetterPolicy struct {
+ DeadLetterTopic string
+ MaxDeliveryAttempts int
+}
+
+func (dlp *DeadLetterPolicy) toProto() *pb.DeadLetterPolicy {
+ if dlp == nil {
+ return nil
+ }
+ return &pb.DeadLetterPolicy{
+ DeadLetterTopic: dlp.DeadLetterTopic,
+ MaxDeliveryAttempts: int32(dlp.MaxDeliveryAttempts),
+ }
+}
+func protoToDLP(pbDLP *pb.DeadLetterPolicy) *DeadLetterPolicy {
+ if pbDLP == nil {
+ return nil
+ }
+ return &DeadLetterPolicy{
+ DeadLetterTopic: pbDLP.GetDeadLetterTopic(),
+ MaxDeliveryAttempts: int(pbDLP.MaxDeliveryAttempts),
+ }
+}
+
// ReceiveSettings configure the Receive method.
// A zero ReceiveSettings will result in values equivalent to DefaultReceiveSettings.
type ReceiveSettings struct {
@@ -428,6 +472,12 @@
// If non-zero, Expiration is changed.
ExpirationPolicy optional.Duration
+ // If non-nil, DeadLetterPolicy is changed.
+ //
+ // It is EXPERIMENTAL and a part of a closed alpha that may not be
+ // accessible to all users.
+ DeadLetterPolicy *DeadLetterPolicy
+
// If non-nil, the current set of labels is completely
// replaced by the new set.
// This field has beta status. It is not subject to the stability guarantee
@@ -477,6 +527,10 @@
psub.ExpirationPolicy = expirationPolicyToProto(cfg.ExpirationPolicy)
paths = append(paths, "expiration_policy")
}
+ if cfg.DeadLetterPolicy != nil {
+ psub.DeadLetterPolicy = cfg.DeadLetterPolicy.toProto()
+ paths = append(paths, "dead_letter_policy")
+ }
if cfg.Labels != nil {
psub.Labels = cfg.Labels
paths = append(paths, "labels")
diff --git a/pubsub/subscription_test.go b/pubsub/subscription_test.go
index dfe24d8..56e3183 100644
--- a/pubsub/subscription_test.go
+++ b/pubsub/subscription_test.go
@@ -22,6 +22,7 @@
"cloud.google.com/go/internal/testutil"
"cloud.google.com/go/pubsub/pstest"
+ "github.com/golang/protobuf/ptypes"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
pb "google.golang.org/genproto/googleapis/pubsub/v1"
@@ -313,3 +314,49 @@
t.Errorf("Roundtrip to Proto failed\ngot: - want: +\n%s", diff)
}
}
+
+func TestDeadLettering_toProto(t *testing.T) {
+ in := &DeadLetterPolicy{
+ MaxDeliveryAttempts: 10,
+ DeadLetterTopic: "projects/p/topics/t",
+ }
+ got := in.toProto()
+ want := &pb.DeadLetterPolicy{
+ DeadLetterTopic: "projects/p/topics/t",
+ MaxDeliveryAttempts: 10,
+ }
+ if diff := testutil.Diff(got, want); diff != "" {
+ t.Errorf("Roundtrip to Proto failed\ngot: - want: +\n%s", diff)
+ }
+}
+
+// Check if incoming ReceivedMessages are properly converted to Message structs
+// that expose the DeliveryAttempt field when dead lettering is enabled/disabled.
+func TestDeadLettering_toMessage(t *testing.T) {
+ // If dead lettering is disabled, DeliveryAttempt should default to 0.
+ receivedMsg := &pb.ReceivedMessage{
+ AckId: "1234",
+ Message: &pb.PubsubMessage{
+ Data: []byte("some message"),
+ MessageId: "id-1234",
+ PublishTime: ptypes.TimestampNow(),
+ },
+ }
+ got, err := toMessage(receivedMsg)
+ if err != nil {
+ t.Errorf("toMessage failed: %v", err)
+ }
+ if got.DeliveryAttempt != 0 {
+ t.Errorf("toMessage with dead-lettering disabled failed\ngot: %d, want 0", got.DeliveryAttempt)
+ }
+
+ // If dead lettering is enabled, toMessage should properly pass through the DeliveryAttempt field.
+ receivedMsg.DeliveryAttempt = 10
+ got, err = toMessage(receivedMsg)
+ if err != nil {
+ t.Errorf("toMessage failed: %v", err)
+ }
+ if got.DeliveryAttempt != int(receivedMsg.DeliveryAttempt) {
+ t.Errorf("toMessage with dead-lettered enabled failed\ngot: %d, want %d", got.DeliveryAttempt, receivedMsg.DeliveryAttempt)
+ }
+}