pubsub: add Subscription.PushConfig.AuthenticationMethod

Adds Subscription.PushConfig.AuthenticationMethod which is
a one-of field hence the use of an interface
`AuthenticationMethod`. This field is optional
and only part of a closed alpha.
The only currently supported AuthenticationMethod is
"OIDCToken" (OpenID Connect Token), which has been added.

This change adds an example too as well roundtrip tests.

Updates #1361

Change-Id: I6de49ad57dbe813bec66b84671ffca6094b6ca5c
Reviewed-on: https://code-review.googlesource.com/c/gocloud/+/40771
Reviewed-by: kokoro <noreply+kokoro@google.com>
Reviewed-by: Alex Hong <hongalex@google.com>
diff --git a/pubsub/example_test.go b/pubsub/example_test.go
index 22793eb..5ede19e 100644
--- a/pubsub/example_test.go
+++ b/pubsub/example_test.go
@@ -309,6 +309,28 @@
 	_ = subConfig // TODO: Use SubscriptionConfig.
 }
 
+func ExampleSubscription_Update_pushConfigAuthenticationMethod() {
+	ctx := context.Background()
+	client, err := pubsub.NewClient(ctx, "project-id")
+	if err != nil {
+		// TODO: Handle error.
+	}
+	sub := client.Subscription("subName")
+	subConfig, err := sub.Update(ctx, pubsub.SubscriptionConfigToUpdate{
+		PushConfig: &pubsub.PushConfig{
+			Endpoint: "https://example.com/push",
+			AuthenticationMethod: &pubsub.OIDCToken{
+				ServiceAccountEmail: "foo@example.com",
+				Audience:            "client-12345",
+			},
+		},
+	})
+	if err != nil {
+		// TODO: Handle error.
+	}
+	_ = subConfig // TODO: Use SubscriptionConfig.
+}
+
 func ExampleSubscription_CreateSnapshot() {
 	ctx := context.Background()
 	client, err := pubsub.NewClient(ctx, "project-id")
diff --git a/pubsub/integration_test.go b/pubsub/integration_test.go
index afa0e4b..99fd37a 100644
--- a/pubsub/integration_test.go
+++ b/pubsub/integration_test.go
@@ -413,7 +413,18 @@
 	defer topic.Stop()
 
 	var sub *Subscription
-	if sub, err = client.CreateSubscription(ctx, subIDs.New(), SubscriptionConfig{Topic: topic}); err != nil {
+	projID := testutil.ProjID()
+	sCfg := SubscriptionConfig{
+		Topic: topic,
+		PushConfig: PushConfig{
+			Endpoint: "https://" + projID + ".appspot.com/_ah/push-handlers/push",
+			AuthenticationMethod: &OIDCToken{
+				Audience:            "client-12345",
+				ServiceAccountEmail: "foo@example.com",
+			},
+		},
+	}
+	if sub, err = client.CreateSubscription(ctx, subIDs.New(), sCfg); err != nil {
 		t.Fatalf("CreateSub error: %v", err)
 	}
 	defer sub.Delete(ctx)
@@ -433,10 +444,13 @@
 		t.Fatalf("\ngot: - want: +\n%s", diff)
 	}
 	// Add a PushConfig and change other fields.
-	projID := testutil.ProjID()
 	pc := PushConfig{
 		Endpoint:   "https://" + projID + ".appspot.com/_ah/push-handlers/push",
 		Attributes: map[string]string{"x-goog-version": "v1"},
+		AuthenticationMethod: &OIDCToken{
+			Audience:            "client-12345",
+			ServiceAccountEmail: "foo@example.com",
+		},
 	}
 	got, err = sub.Update(ctx, SubscriptionConfigToUpdate{
 		PushConfig:          &pc,
diff --git a/pubsub/subscription.go b/pubsub/subscription.go
index a24e70e..863ee22 100644
--- a/pubsub/subscription.go
+++ b/pubsub/subscription.go
@@ -116,13 +116,80 @@
 
 	// Endpoint configuration attributes. See https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions#pushconfig for more details.
 	Attributes map[string]string
+
+	// AuthenticationMethod is used by push endpoints to verify the source
+	// of push requests.
+	// It can be used with push endpoints that are private by default to
+	// allow requests only from the Cloud Pub/Sub system, for example.
+	// This field is optional and should be set only by users interested in
+	// authenticated push.
+	//
+	// It is EXPERIMENTAL and a part of a closed alpha that may not be
+	// accessible to all users. This field is subject to change or removal
+	// without notice.
+	AuthenticationMethod AuthenticationMethod
 }
 
 func (pc *PushConfig) toProto() *pb.PushConfig {
-	return &pb.PushConfig{
+	if pc == nil {
+		return nil
+	}
+	pbCfg := &pb.PushConfig{
 		Attributes:   pc.Attributes,
 		PushEndpoint: pc.Endpoint,
 	}
+	if authMethod := pc.AuthenticationMethod; authMethod != nil {
+		switch am := authMethod.(type) {
+		case *OIDCToken:
+			pbCfg.AuthenticationMethod = am.toProto()
+		default: // TODO: add others here when GAIC adds more definitions.
+		}
+	}
+	return pbCfg
+}
+
+// AuthenticationMethod is used by push points to verify the source of push requests.
+// This interface defines fields that are part of a closed alpha that may not be accessible
+// to all users.
+type AuthenticationMethod interface {
+	isAuthMethod() bool
+}
+
+// OIDCToken allows PushConfigs to be authenticated using
+// the OpenID Connect protocol https://openid.net/connect/
+type OIDCToken struct {
+	// Audience to be used when generating OIDC token. The audience claim
+	// identifies the recipients that the JWT is intended for. The audience
+	// value is a single case-sensitive string. Having multiple values (array)
+	// for the audience field is not supported. More info about the OIDC JWT
+	// token audience here: https://tools.ietf.org/html/rfc7519#section-4.1.3
+	// Note: if not specified, the Push endpoint URL will be used.
+	Audience string
+
+	// The service account email to be used for generating the OpenID Connect token.
+	// The caller of:
+	//  * CreateSubscription
+	//  * UpdateSubscription
+	//  * ModifyPushConfig
+	// calls must have the iam.serviceAccounts.actAs permission for the service account.
+	// See https://cloud.google.com/iam/docs/understanding-roles#service-accounts-roles.
+	ServiceAccountEmail string
+}
+
+var _ AuthenticationMethod = (*OIDCToken)(nil)
+
+func (oidcToken *OIDCToken) isAuthMethod() bool { return true }
+
+func (oidcToken *OIDCToken) toProto() *pb.PushConfig_OidcToken_ {
+	if oidcToken == nil {
+		return nil
+	}
+	return &pb.PushConfig_OidcToken_{
+		OidcToken: &pb.PushConfig_OidcToken{
+			Audience:            oidcToken.Audience,
+			ServiceAccountEmail: oidcToken.ServiceAccountEmail,
+		},
+	}
 }
 
 // SubscriptionConfig describes the configuration of a subscription.
@@ -164,11 +231,8 @@
 
 func (cfg *SubscriptionConfig) toProto(name string) *pb.Subscription {
 	var pbPushConfig *pb.PushConfig
-	if cfg.PushConfig.Endpoint != "" || len(cfg.PushConfig.Attributes) != 0 {
-		pbPushConfig = &pb.PushConfig{
-			Attributes:   cfg.PushConfig.Attributes,
-			PushEndpoint: cfg.PushConfig.Endpoint,
-		}
+	if cfg.PushConfig.Endpoint != "" || len(cfg.PushConfig.Attributes) != 0 || cfg.PushConfig.AuthenticationMethod != nil {
+		pbPushConfig = cfg.PushConfig.toProto()
 	}
 	var retentionDuration *durpb.Duration
 	if cfg.RetentionDuration != 0 {
@@ -202,18 +266,38 @@
 			return SubscriptionConfig{}, err
 		}
 	}
-	return SubscriptionConfig{
-		Topic:       newTopic(c, pbSub.Topic),
-		AckDeadline: time.Second * time.Duration(pbSub.AckDeadlineSeconds),
-		PushConfig: PushConfig{
-			Endpoint:   pbSub.PushConfig.PushEndpoint,
-			Attributes: pbSub.PushConfig.Attributes,
-		},
+	subC := SubscriptionConfig{
+		Topic:               newTopic(c, pbSub.Topic),
+		AckDeadline:         time.Second * time.Duration(pbSub.AckDeadlineSeconds),
 		RetainAckedMessages: pbSub.RetainAckedMessages,
 		RetentionDuration:   rd,
 		Labels:              pbSub.Labels,
 		ExpirationPolicy:    expirationPolicy,
-	}, nil
+	}
+	pc := protoToPushConfig(pbSub.PushConfig)
+	if pc != nil {
+		subC.PushConfig = *pc
+	}
+	return subC, nil
+}
+
+func protoToPushConfig(pbPc *pb.PushConfig) *PushConfig {
+	if pbPc == nil {
+		return nil
+	}
+	pc := &PushConfig{
+		Endpoint:   pbPc.PushEndpoint,
+		Attributes: pbPc.Attributes,
+	}
+	if am := pbPc.AuthenticationMethod; am != nil {
+		if oidcToken, ok := am.(*pb.PushConfig_OidcToken_); ok && oidcToken != nil && oidcToken.OidcToken != nil {
+			pc.AuthenticationMethod = &OIDCToken{
+				Audience:            oidcToken.OidcToken.GetAudience(),
+				ServiceAccountEmail: oidcToken.OidcToken.GetServiceAccountEmail(),
+			}
+		}
+	}
+	return pc
 }
 
 // ReceiveSettings configure the Receive method.
diff --git a/pubsub/subscription_test.go b/pubsub/subscription_test.go
index 903d799..dfe24d8 100644
--- a/pubsub/subscription_test.go
+++ b/pubsub/subscription_test.go
@@ -24,6 +24,7 @@
 	"cloud.google.com/go/pubsub/pstest"
 	"google.golang.org/api/iterator"
 	"google.golang.org/api/option"
+	pb "google.golang.org/genproto/googleapis/pubsub/v1"
 	"google.golang.org/grpc"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
@@ -132,6 +133,13 @@
 	sub, err := client.CreateSubscription(ctx, "s", SubscriptionConfig{
 		Topic:            topic,
 		ExpirationPolicy: 30 * time.Hour,
+		PushConfig: PushConfig{
+			Endpoint: "https://example.com/push",
+			AuthenticationMethod: &OIDCToken{
+				ServiceAccountEmail: "foo@example.com",
+				Audience:            "client-12345",
+			},
+		},
 	})
 	if err != nil {
 		t.Fatal(err)
@@ -146,6 +154,13 @@
 		RetainAckedMessages: false,
 		RetentionDuration:   defaultRetentionDuration,
 		ExpirationPolicy:    30 * time.Hour,
+		PushConfig: PushConfig{
+			Endpoint: "https://example.com/push",
+			AuthenticationMethod: &OIDCToken{
+				ServiceAccountEmail: "foo@example.com",
+				Audience:            "client-12345",
+			},
+		},
 	}
 	if !testutil.Equal(cfg, want) {
 		t.Fatalf("\ngot  %+v\nwant %+v", cfg, want)
@@ -156,6 +171,13 @@
 		RetainAckedMessages: true,
 		Labels:              map[string]string{"label": "value"},
 		ExpirationPolicy:    72 * time.Hour,
+		PushConfig: &PushConfig{
+			Endpoint: "https://example.com/push",
+			AuthenticationMethod: &OIDCToken{
+				ServiceAccountEmail: "foo@example.com",
+				Audience:            "client-12345",
+			},
+		},
 	})
 	if err != nil {
 		t.Fatal(err)
@@ -167,6 +189,13 @@
 		RetentionDuration:   defaultRetentionDuration,
 		Labels:              map[string]string{"label": "value"},
 		ExpirationPolicy:    72 * time.Hour,
+		PushConfig: PushConfig{
+			Endpoint: "https://example.com/push",
+			AuthenticationMethod: &OIDCToken{
+				ServiceAccountEmail: "foo@example.com",
+				Audience:            "client-12345",
+			},
+		},
 	}
 	if !testutil.Equal(got, want) {
 		t.Fatalf("\ngot  %+v\nwant %+v", got, want)
@@ -261,3 +290,26 @@
 	}
 	return client, srv
 }
+
+func TestPushConfigAuthenticationMethod_toProto(t *testing.T) {
+	in := &PushConfig{
+		Endpoint: "https://example.com/push",
+		AuthenticationMethod: &OIDCToken{
+			ServiceAccountEmail: "foo@example.com",
+			Audience:            "client-12345",
+		},
+	}
+	got := in.toProto()
+	want := &pb.PushConfig{
+		PushEndpoint: "https://example.com/push",
+		AuthenticationMethod: &pb.PushConfig_OidcToken_{
+			OidcToken: &pb.PushConfig_OidcToken{
+				ServiceAccountEmail: "foo@example.com",
+				Audience:            "client-12345",
+			},
+		},
+	}
+	if diff := testutil.Diff(got, want); diff != "" {
+		t.Errorf("Roundtrip to Proto failed\ngot: - want: +\n%s", diff)
+	}
+}