fix(pubsub/pstest): add missing support for filter and retry policy (#2901)
* fix(pubsub/pstest): add missing support for filter and retry policy
* cleanup context usage
diff --git a/pubsub/pstest/fake.go b/pubsub/pstest/fake.go
index f9b042e..21a07ba 100644
--- a/pubsub/pstest/fake.go
+++ b/pubsub/pstest/fake.go
@@ -463,6 +463,12 @@
case "dead_letter_policy":
sub.proto.DeadLetterPolicy = req.Subscription.DeadLetterPolicy
+ case "retry_policy":
+ sub.proto.RetryPolicy = req.Subscription.RetryPolicy
+
+ case "filter":
+ sub.proto.Filter = req.Subscription.Filter
+
default:
return nil, status.Errorf(codes.InvalidArgument, "unknown field name %q", path)
}
diff --git a/pubsub/pstest/fake_test.go b/pubsub/pstest/fake_test.go
index 612828a..d03e2b5 100644
--- a/pubsub/pstest/fake_test.go
+++ b/pubsub/pstest/fake_test.go
@@ -784,6 +784,72 @@
}
}
+func TestUpdateRetryPolicy(t *testing.T) {
+ ctx := context.Background()
+ pclient, sclient, _, cleanup := newFake(ctx, t)
+ defer cleanup()
+
+ top := mustCreateTopic(ctx, t, pclient, &pb.Topic{Name: "projects/P/topics/T"})
+ sub := mustCreateSubscription(ctx, t, sclient, &pb.Subscription{
+ AckDeadlineSeconds: minAckDeadlineSecs,
+ Name: "projects/P/subscriptions/S",
+ Topic: top.Name,
+ RetryPolicy: &pb.RetryPolicy{
+ MinimumBackoff: ptypes.DurationProto(10 * time.Second),
+ MaximumBackoff: ptypes.DurationProto(60 * time.Second),
+ },
+ })
+
+ update := &pb.Subscription{
+ AckDeadlineSeconds: sub.AckDeadlineSeconds,
+ Name: sub.Name,
+ Topic: top.Name,
+ RetryPolicy: &pb.RetryPolicy{
+ MinimumBackoff: ptypes.DurationProto(20 * time.Second),
+ MaximumBackoff: ptypes.DurationProto(100 * time.Second),
+ },
+ }
+
+ updated := mustUpdateSubscription(ctx, t, sclient, &pb.UpdateSubscriptionRequest{
+ Subscription: update,
+ UpdateMask: &field_mask.FieldMask{Paths: []string{"retry_policy"}},
+ })
+
+ if got, want := updated.RetryPolicy, update.RetryPolicy; testutil.Diff(got, want) != "" {
+ t.Fatalf("got %v, want %v", got, want)
+ }
+}
+
+func TestUpdateFilter(t *testing.T) {
+ ctx := context.Background()
+ pclient, sclient, _, cleanup := newFake(ctx, t)
+ defer cleanup()
+
+ top := mustCreateTopic(ctx, t, pclient, &pb.Topic{Name: "projects/P/topics/T"})
+ sub := mustCreateSubscription(ctx, t, sclient, &pb.Subscription{
+ AckDeadlineSeconds: minAckDeadlineSecs,
+ Name: "projects/P/subscriptions/S",
+ Topic: top.Name,
+ Filter: "some-filter",
+ })
+
+ update := &pb.Subscription{
+ AckDeadlineSeconds: sub.AckDeadlineSeconds,
+ Name: sub.Name,
+ Topic: top.Name,
+ Filter: "new-filter",
+ }
+
+ updated := mustUpdateSubscription(ctx, t, sclient, &pb.UpdateSubscriptionRequest{
+ Subscription: update,
+ UpdateMask: &field_mask.FieldMask{Paths: []string{"filter"}},
+ })
+
+ if got, want := updated.Filter, update.Filter; got != want {
+ t.Fatalf("got %v, want %v", got, want)
+ }
+}
+
func mustStartStreamingPull(ctx context.Context, t *testing.T, sc pb.SubscriberClient, sub *pb.Subscription) pb.Subscriber_StreamingPullClient {
spc, err := sc.StreamingPull(ctx)
if err != nil {