fix(pubsub/pstest): add missing support for filter and retry policy (#2901)

* fix(pubsub/pstest): add missing support for filter and retry policy

* cleanup context usage
diff --git a/pubsub/pstest/fake.go b/pubsub/pstest/fake.go
index f9b042e..21a07ba 100644
--- a/pubsub/pstest/fake.go
+++ b/pubsub/pstest/fake.go
@@ -463,6 +463,12 @@
 		case "dead_letter_policy":
 			sub.proto.DeadLetterPolicy = req.Subscription.DeadLetterPolicy
 
+		case "retry_policy":
+			sub.proto.RetryPolicy = req.Subscription.RetryPolicy
+
+		case "filter":
+			sub.proto.Filter = req.Subscription.Filter
+
 		default:
 			return nil, status.Errorf(codes.InvalidArgument, "unknown field name %q", path)
 		}
diff --git a/pubsub/pstest/fake_test.go b/pubsub/pstest/fake_test.go
index 612828a..d03e2b5 100644
--- a/pubsub/pstest/fake_test.go
+++ b/pubsub/pstest/fake_test.go
@@ -784,6 +784,72 @@
 	}
 }
 
+func TestUpdateRetryPolicy(t *testing.T) {
+	ctx := context.Background()
+	pclient, sclient, _, cleanup := newFake(ctx, t)
+	defer cleanup()
+
+	top := mustCreateTopic(ctx, t, pclient, &pb.Topic{Name: "projects/P/topics/T"})
+	sub := mustCreateSubscription(ctx, t, sclient, &pb.Subscription{
+		AckDeadlineSeconds: minAckDeadlineSecs,
+		Name:               "projects/P/subscriptions/S",
+		Topic:              top.Name,
+		RetryPolicy: &pb.RetryPolicy{
+			MinimumBackoff: ptypes.DurationProto(10 * time.Second),
+			MaximumBackoff: ptypes.DurationProto(60 * time.Second),
+		},
+	})
+
+	update := &pb.Subscription{
+		AckDeadlineSeconds: sub.AckDeadlineSeconds,
+		Name:               sub.Name,
+		Topic:              top.Name,
+		RetryPolicy: &pb.RetryPolicy{
+			MinimumBackoff: ptypes.DurationProto(20 * time.Second),
+			MaximumBackoff: ptypes.DurationProto(100 * time.Second),
+		},
+	}
+
+	updated := mustUpdateSubscription(ctx, t, sclient, &pb.UpdateSubscriptionRequest{
+		Subscription: update,
+		UpdateMask:   &field_mask.FieldMask{Paths: []string{"retry_policy"}},
+	})
+
+	if got, want := updated.RetryPolicy, update.RetryPolicy; testutil.Diff(got, want) != "" {
+		t.Fatalf("got %v, want %v", got, want)
+	}
+}
+
+func TestUpdateFilter(t *testing.T) {
+	ctx := context.Background()
+	pclient, sclient, _, cleanup := newFake(ctx, t)
+	defer cleanup()
+
+	top := mustCreateTopic(ctx, t, pclient, &pb.Topic{Name: "projects/P/topics/T"})
+	sub := mustCreateSubscription(ctx, t, sclient, &pb.Subscription{
+		AckDeadlineSeconds: minAckDeadlineSecs,
+		Name:               "projects/P/subscriptions/S",
+		Topic:              top.Name,
+		Filter:             "some-filter",
+	})
+
+	update := &pb.Subscription{
+		AckDeadlineSeconds: sub.AckDeadlineSeconds,
+		Name:               sub.Name,
+		Topic:              top.Name,
+		Filter:             "new-filter",
+	}
+
+	updated := mustUpdateSubscription(ctx, t, sclient, &pb.UpdateSubscriptionRequest{
+		Subscription: update,
+		UpdateMask:   &field_mask.FieldMask{Paths: []string{"filter"}},
+	})
+
+	if got, want := updated.Filter, update.Filter; got != want {
+		t.Fatalf("got %v, want %v", got, want)
+	}
+}
+
 func mustStartStreamingPull(ctx context.Context, t *testing.T, sc pb.SubscriberClient, sub *pb.Subscription) pb.Subscriber_StreamingPullClient {
 	spc, err := sc.StreamingPull(ctx)
 	if err != nil {