test(bigquery): add integration test for Column ACLs (#3895)

* testing(bigquery): add integration test for Column ACLs

Now that we have a v1 PTM client, we can test the policy tag integration
with column ACLs.

This also plumbed in a new client as part of the integration testing
setup, much like we make a cloud storage client available.
diff --git a/bigquery/integration_test.go b/bigquery/integration_test.go
index 8aadccc..cedd4fa 100644
--- a/bigquery/integration_test.go
+++ b/bigquery/integration_test.go
@@ -30,6 +30,7 @@
 	"time"
 
 	"cloud.google.com/go/civil"
+	datacatalog "cloud.google.com/go/datacatalog/apiv1"
 	"cloud.google.com/go/httpreplay"
 	"cloud.google.com/go/iam"
 	"cloud.google.com/go/internal"
@@ -43,6 +44,7 @@
 	"google.golang.org/api/googleapi"
 	"google.golang.org/api/iterator"
 	"google.golang.org/api/option"
+	datacatalogpb "google.golang.org/genproto/googleapis/cloud/datacatalog/v1"
 )
 
 const replayFilename = "bigquery.replay"
@@ -50,10 +52,11 @@
 var record = flag.Bool("record", false, "record RPCs")
 
 var (
-	client        *Client
-	storageClient *storage.Client
-	dataset       *Dataset
-	schema        = Schema{
+	client                 *Client
+	storageClient          *storage.Client
+	policyTagManagerClient *datacatalog.PolicyTagManagerClient
+	dataset                *Dataset
+	schema                 = Schema{
 		{Name: "name", Type: StringFieldType},
 		{Name: "nums", Type: IntegerFieldType, Repeated: true},
 		{Name: "rec", Type: RecordFieldType, Schema: Schema{
@@ -119,6 +122,10 @@
 		if err != nil {
 			log.Fatal(err)
 		}
+		policyTagManagerClient, err = datacatalog.NewPolicyTagManagerClient(ctx)
+		if err != nil {
+			log.Fatal(err)
+		}
 		cleanup := initTestState(client, t)
 		return func() {
 			cleanup()
@@ -142,6 +149,7 @@
 		}
 		bqOpts := []option.ClientOption{option.WithTokenSource(ts)}
 		sOpts := []option.ClientOption{option.WithTokenSource(testutil.TokenSource(ctx, storage.ScopeFullControl))}
+		ptmOpts := []option.ClientOption{option.WithTokenSource(testutil.TokenSource(ctx, "https://www.googleapis.com/auth/cloud-platform"))}
 		cleanup := func() {}
 		now := time.Now().UTC()
 		if *record {
@@ -179,6 +187,7 @@
 			// incompatible with gRPC options.
 			bqOpts = append(bqOpts, grpcHeadersChecker.CallOptions()...)
 			sOpts = append(sOpts, grpcHeadersChecker.CallOptions()...)
+			ptmOpts = append(ptmOpts, grpcHeadersChecker.CallOptions()...)
 		}
 		var err error
 		client, err = NewClient(ctx, projID, bqOpts...)
@@ -189,6 +198,7 @@
 		if err != nil {
 			log.Fatalf("storage.NewClient: %v", err)
 		}
+		policyTagManagerClient, err = datacatalog.NewPolicyTagManagerClient(ctx, ptmOpts...)
 		c := initTestState(client, now)
 		return func() { c(); cleanup() }
 	}
@@ -889,6 +899,88 @@
 	}
 }
 
+// setupPolicyTag is a helper for setting up policy tags in the datacatalog service.
+//
+// It returns a string for a policy tag identifier and a cleanup function, or an error.
+func setupPolicyTag(ctx context.Context) (string, func(), error) {
+	location := "us"
+	req := &datacatalogpb.CreateTaxonomyRequest{
+		Parent: fmt.Sprintf("projects/%s/locations/%s", testutil.ProjID(), location),
+		Taxonomy: &datacatalogpb.Taxonomy{
+			DisplayName: "google-cloud-go bigquery testing taxonomy",
+			Description: "Taxonomy created for google-cloud-go integration tests",
+			ActivatedPolicyTypes: []datacatalogpb.Taxonomy_PolicyType{
+				datacatalogpb.Taxonomy_FINE_GRAINED_ACCESS_CONTROL,
+			},
+		},
+	}
+	resp, err := policyTagManagerClient.CreateTaxonomy(ctx, req)
+	if err != nil {
+		return "", nil, fmt.Errorf("datacatalog.CreateTaxonomy: %v", err)
+	}
+	taxonomyID := resp.GetName()
+	cleanupFunc := func() {
+		policyTagManagerClient.DeleteTaxonomy(ctx, &datacatalogpb.DeleteTaxonomyRequest{
+			Name: taxonomyID,
+		})
+	}
+
+	tagReq := &datacatalogpb.CreatePolicyTagRequest{
+		Parent: resp.GetName(),
+		PolicyTag: &datacatalogpb.PolicyTag{
+			DisplayName: "ExamplePolicyTag",
+		},
+	}
+	tagResp, err := policyTagManagerClient.CreatePolicyTag(ctx, tagReq)
+	if err != nil {
+		// we're failed to create tags, but we did create taxonomy. clean it up and signal error.
+		cleanupFunc()
+		return "", nil, fmt.Errorf("datacatalog.CreatePolicyTag: %v", err)
+	}
+	return tagResp.GetName(), cleanupFunc, nil
+}
+
+func TestIntegration_ColumnACLs(t *testing.T) {
+	if client == nil {
+		t.Skip("Integration tests skipped")
+	}
+	ctx := context.Background()
+	testSchema := Schema{
+		{Name: "name", Type: StringFieldType},
+		{Name: "ssn", Type: StringFieldType},
+		{Name: "acct_balance", Type: NumericFieldType},
+	}
+	table := newTable(t, testSchema)
+	defer table.Delete(ctx)
+
+	tagID, cleanupFunc, err := setupPolicyTag(ctx)
+	if err != nil {
+		t.Fatalf("failed to setup policy tag resources: %v", err)
+	}
+	defer cleanupFunc()
+	// amend the test schema to add a policy tag
+	testSchema[1].PolicyTags = &PolicyTagList{
+		Names: []string{tagID},
+	}
+
+	// Test: Amend an existing schema with a policy tag.
+	_, err = table.Update(ctx, TableMetadataToUpdate{
+		Schema: testSchema,
+	}, "")
+	if err != nil {
+		t.Errorf("update with policyTag failed: %v", err)
+	}
+
+	// Test: Create a new table with a policy tag defined.
+	newTable := dataset.Table(tableIDs.New())
+	if err = newTable.Create(ctx, &TableMetadata{
+		Schema:      schema,
+		Description: "foo",
+	}); err != nil {
+		t.Errorf("failed to create new table with policy tag: %v", err)
+	}
+}
+
 func TestIntegration_TableIAM(t *testing.T) {
 	if client == nil {
 		t.Skip("Integration tests skipped")