feat(bigtable): Support AuthorizedView in data and admin client (#9515)

* feat(bigtable): Support AuthorizedView in data and admin client

Change-Id: I1c04c6abb7a0ecaaa5a95334b9e6934f638b5afb

* fix: Address review comments

Change-Id: Id22fe08bc0a830cab83f30d892f6d78c269d2895

* feat: Add AuthorizedView IAM

Change-Id: I9781e0882f1b152228edb276f12284d5ccf64315

* fix: fix request headers for AuthorizedViews

Change-Id: I5dc6c0c79067903fe5c71c0f9d51e40d3f5233ca

* fix: resolve vet failures

Change-Id: Ie1e82d3a8389d26217f0fc10ffa9ca16f3b72b6d

* Expose FamilySubset struct and a small fix

Change-Id: Ia07847f4c43197d8498ead0e225651fe2b4713d7

* Address review comments

Change-Id: I99f87a46cd96d96710aac4532a61361dcc7e2a27

* Let Get method return Info struct and reordering

Change-Id: I974abdf400050d65b76a2fed34685e20d2e7781f

* fix: Remove GetSubsetView() and reordering

Change-Id: I28880079e167d37cad1fc6740c089e51be10f347

---------

Co-authored-by: trollyxia <lixiachen@google.com>
diff --git a/bigtable/admin.go b/bigtable/admin.go
index 37d2eed..a2cd0d9 100644
--- a/bigtable/admin.go
+++ b/bigtable/admin.go
@@ -124,6 +124,10 @@
 	return fmt.Sprintf("projects/%s/instances/%s/clusters/%s/backups/%s", ac.project, instance, cluster, backup)
 }
 
+func (ac *AdminClient) authorizedViewPath(table, authorizedView string) string {
+	return fmt.Sprintf("%s/tables/%s/authorizedViews/%s", ac.instancePrefix(), table, authorizedView)
+}
+
 // EncryptionInfo represents the encryption info of a table.
 type EncryptionInfo struct {
 	Status        *Status
@@ -881,6 +885,11 @@
 	return iam.InternalNewHandleGRPCClient(ac.tClient, ac.backupPath(cluster, ac.instance, backup))
 }
 
+// AuthorizedViewIAM creates an IAM Handle specific to a given Table and AuthorizedView.
+func (ac *AdminClient) AuthorizedViewIAM(table, authorizedView string) *iam.Handle {
+	return iam.InternalNewHandleGRPCClient(ac.tClient, ac.authorizedViewPath(table, authorizedView))
+}
+
 const instanceAdminAddr = "bigtableadmin.googleapis.com:443"
 const mtlsInstanceAdminAddr = "bigtableadmin.mtls.googleapis.com:443"
 
@@ -2175,3 +2184,263 @@
 	_, err := ac.tClient.UpdateBackup(ctx, req)
 	return err
 }
+
+// AuthorizedViewConf contains information about an authorized view.
+type AuthorizedViewConf struct {
+	TableID          string
+	AuthorizedViewID string
+
+	// Types that are valid to be assigned to AuthorizedView:
+	//  *SubsetViewConf
+	AuthorizedView     isAuthorizedView
+	DeletionProtection DeletionProtection
+}
+
+// A private interface that currently only implemented by SubsetViewConf, ensuring that only SubsetViewConf instances are accepted as an AuthorizedView.
+// In the future if a new type of AuthorizedView is introduced, it should also implements this interface.
+type isAuthorizedView interface {
+	isAuthorizedView()
+}
+
+func (av AuthorizedViewConf) proto() *btapb.AuthorizedView {
+	var avp btapb.AuthorizedView
+
+	switch dp := av.DeletionProtection; dp {
+	case Protected:
+		avp.DeletionProtection = true
+	case Unprotected:
+		avp.DeletionProtection = false
+	default:
+		break
+	}
+
+	switch avt := av.AuthorizedView.(type) {
+	case *SubsetViewConf:
+		avp.AuthorizedView = &btapb.AuthorizedView_SubsetView_{
+			SubsetView: avt.proto(),
+		}
+	default:
+		break
+	}
+	return &avp
+}
+
+// FamilySubset represents a subset of a column family.
+type FamilySubset struct {
+	Qualifiers        [][]byte
+	QualifierPrefixes [][]byte
+}
+
+// SubsetViewConf contains configuration specific to an authorized view of subset view type.
+type SubsetViewConf struct {
+	RowPrefixes   [][]byte
+	FamilySubsets map[string]FamilySubset
+}
+
+func (*SubsetViewConf) isAuthorizedView() {}
+
+// AddRowPrefix adds a new row prefix to the subset view.
+func (s *SubsetViewConf) AddRowPrefix(prefix []byte) {
+	s.RowPrefixes = append(s.RowPrefixes, prefix)
+}
+
+func (s *SubsetViewConf) getOrCreateFamilySubset(familyName string) FamilySubset {
+	if s.FamilySubsets == nil {
+		s.FamilySubsets = make(map[string]FamilySubset)
+	}
+	if _, ok := s.FamilySubsets[familyName]; !ok {
+		s.FamilySubsets[familyName] = FamilySubset{}
+	}
+	return s.FamilySubsets[familyName]
+}
+
+func (s SubsetViewConf) proto() *btapb.AuthorizedView_SubsetView {
+	var p btapb.AuthorizedView_SubsetView
+	p.RowPrefixes = append(p.RowPrefixes, s.RowPrefixes...)
+	if p.FamilySubsets == nil {
+		p.FamilySubsets = make(map[string]*btapb.AuthorizedView_FamilySubsets)
+	}
+	for familyName, subset := range s.FamilySubsets {
+		p.FamilySubsets[familyName] = &btapb.AuthorizedView_FamilySubsets{
+			Qualifiers:        subset.Qualifiers,
+			QualifierPrefixes: subset.QualifierPrefixes,
+		}
+	}
+	return &p
+}
+
+// AddFamilySubsetQualifier adds an individual column qualifier to be included in a subset view.
+func (s *SubsetViewConf) AddFamilySubsetQualifier(familyName string, qualifier []byte) {
+	fs := s.getOrCreateFamilySubset(familyName)
+	fs.Qualifiers = append(fs.Qualifiers, qualifier)
+	s.FamilySubsets[familyName] = fs
+}
+
+// AddFamilySubsetQualifierPrefix adds a prefix for column qualifiers to be included in a subset view.
+func (s *SubsetViewConf) AddFamilySubsetQualifierPrefix(familyName string, qualifierPrefix []byte) {
+	fs := s.getOrCreateFamilySubset(familyName)
+	fs.QualifierPrefixes = append(fs.QualifierPrefixes, qualifierPrefix)
+	s.FamilySubsets[familyName] = fs
+}
+
+// CreateAuthorizedView creates a new authorized view in a table.
+func (ac *AdminClient) CreateAuthorizedView(ctx context.Context, conf *AuthorizedViewConf) error {
+	if conf.TableID == "" || conf.AuthorizedViewID == "" {
+		return errors.New("both AuthorizedViewID and TableID are required")
+	}
+	if _, ok := conf.AuthorizedView.(*SubsetViewConf); !ok {
+		return errors.New("SubsetView must be specified in AuthorizedViewConf")
+	}
+
+	ctx = mergeOutgoingMetadata(ctx, ac.md)
+	req := &btapb.CreateAuthorizedViewRequest{
+		Parent:           fmt.Sprintf("%s/tables/%s", ac.instancePrefix(), conf.TableID),
+		AuthorizedViewId: conf.AuthorizedViewID,
+		AuthorizedView:   conf.proto(),
+	}
+	_, err := ac.tClient.CreateAuthorizedView(ctx, req)
+	return err
+}
+
+// AuthorizedViewInfo contains authorized view metadata. This struct is read-only.
+type AuthorizedViewInfo struct {
+	TableID          string
+	AuthorizedViewID string
+
+	AuthorizedView     isAuthorizedViewInfo
+	DeletionProtection DeletionProtection
+}
+
+type isAuthorizedViewInfo interface {
+	isAuthorizedViewInfo()
+}
+
+// SubsetViewInfo contains read-only SubsetView metadata.
+type SubsetViewInfo struct {
+	RowPrefixes   [][]byte
+	FamilySubsets map[string]FamilySubset
+}
+
+func (*SubsetViewInfo) isAuthorizedViewInfo() {}
+
+func (s *SubsetViewInfo) fillInfo(internal *btapb.AuthorizedView_SubsetView) {
+	s.RowPrefixes = [][]byte{}
+	s.RowPrefixes = append(s.RowPrefixes, internal.RowPrefixes...)
+	if s.FamilySubsets == nil {
+		s.FamilySubsets = make(map[string]FamilySubset)
+	}
+	for k, v := range internal.FamilySubsets {
+		s.FamilySubsets[k] = FamilySubset{
+			Qualifiers:        v.Qualifiers,
+			QualifierPrefixes: v.QualifierPrefixes,
+		}
+	}
+}
+
+// AuthorizedViewInfo retrieves information about an authorized view.
+func (ac *AdminClient) AuthorizedViewInfo(ctx context.Context, tableID, authorizedViewID string) (*AuthorizedViewInfo, error) {
+	ctx = mergeOutgoingMetadata(ctx, ac.md)
+	req := &btapb.GetAuthorizedViewRequest{
+		Name: fmt.Sprintf("%s/tables/%s/authorizedViews/%s", ac.instancePrefix(), tableID, authorizedViewID),
+	}
+	var res *btapb.AuthorizedView
+
+	err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error {
+		var err error
+		res, err = ac.tClient.GetAuthorizedView(ctx, req)
+		return err
+	}, retryOptions...)
+
+	if err != nil {
+		return nil, err
+	}
+
+	av := &AuthorizedViewInfo{TableID: tableID, AuthorizedViewID: authorizedViewID}
+	if res.DeletionProtection {
+		av.DeletionProtection = Protected
+	} else {
+		av.DeletionProtection = Unprotected
+	}
+	if res.GetSubsetView() != nil {
+		s := SubsetViewInfo{}
+		s.fillInfo(res.GetSubsetView())
+		av.AuthorizedView = &s
+	}
+	return av, nil
+}
+
+// AuthorizedViews returns a list of the authorized views in the table.
+func (ac *AdminClient) AuthorizedViews(ctx context.Context, tableID string) ([]string, error) {
+	names := []string{}
+	prefix := fmt.Sprintf("%s/tables/%s", ac.instancePrefix(), tableID)
+
+	req := &btapb.ListAuthorizedViewsRequest{
+		Parent: prefix,
+		View:   btapb.AuthorizedView_NAME_ONLY,
+	}
+	var res *btapb.ListAuthorizedViewsResponse
+	err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error {
+		var err error
+		res, err = ac.tClient.ListAuthorizedViews(ctx, req)
+		return err
+	}, retryOptions...)
+	if err != nil {
+		return nil, err
+	}
+
+	for _, av := range res.AuthorizedViews {
+		names = append(names, strings.TrimPrefix(av.Name, prefix+"/authorizedViews/"))
+	}
+	return names, nil
+}
+
+// UpdateAuthorizedViewConf contains all the information necessary to update or partial update an authorized view.
+type UpdateAuthorizedViewConf struct {
+	AuthorizedViewConf AuthorizedViewConf
+	IgnoreWarnings     bool
+}
+
+// UpdateAuthorizedView updates an authorized view in a table according to the given configuration.
+func (ac *AdminClient) UpdateAuthorizedView(ctx context.Context, conf UpdateAuthorizedViewConf) error {
+	ctx = mergeOutgoingMetadata(ctx, ac.md)
+	if conf.AuthorizedViewConf.TableID == "" || conf.AuthorizedViewConf.AuthorizedViewID == "" {
+		return errors.New("both AuthorizedViewID and TableID is required")
+	}
+	av := conf.AuthorizedViewConf.proto()
+	av.Name = ac.authorizedViewPath(conf.AuthorizedViewConf.TableID, conf.AuthorizedViewConf.AuthorizedViewID)
+
+	updateMask := &field_mask.FieldMask{
+		Paths: []string{},
+	}
+	if conf.AuthorizedViewConf.DeletionProtection != None {
+		updateMask.Paths = append(updateMask.Paths, "deletion_protection")
+	}
+	if _, ok := conf.AuthorizedViewConf.AuthorizedView.(*SubsetViewConf); ok {
+		updateMask.Paths = append(updateMask.Paths, "subset_view")
+	}
+	req := &btapb.UpdateAuthorizedViewRequest{
+		AuthorizedView: av,
+		UpdateMask:     updateMask,
+		IgnoreWarnings: conf.IgnoreWarnings,
+	}
+	lro, err := ac.tClient.UpdateAuthorizedView(ctx, req)
+	if err != nil {
+		return fmt.Errorf("error from update authorized view: %w", err)
+	}
+	var res btapb.AuthorizedView
+	op := longrunning.InternalNewOperation(ac.lroClient, lro)
+	if err = op.Wait(ctx, &res); err != nil {
+		return fmt.Errorf("error from operation: %v", err)
+	}
+	return nil
+}
+
+// DeleteAuthorizedView deletes an authorized view in a table.
+func (ac *AdminClient) DeleteAuthorizedView(ctx context.Context, tableID, authorizedViewID string) error {
+	ctx = mergeOutgoingMetadata(ctx, ac.md)
+	req := &btapb.DeleteAuthorizedViewRequest{
+		Name: ac.authorizedViewPath(tableID, authorizedViewID),
+	}
+	_, err := ac.tClient.DeleteAuthorizedView(ctx, req)
+	return err
+}
diff --git a/bigtable/admin_test.go b/bigtable/admin_test.go
index 95f8640..0485012 100644
--- a/bigtable/admin_test.go
+++ b/bigtable/admin_test.go
@@ -41,6 +41,11 @@
 
 	copyBackupReq   *btapb.CopyBackupRequest
 	copyBackupError error
+
+	createAuthorizedViewReq   *btapb.CreateAuthorizedViewRequest
+	createAuthorizedViewError error
+	updateAuthorizedViewReq   *btapb.UpdateAuthorizedViewRequest
+	updateAuthorizedViewError error
 }
 
 func (c *mockTableAdminClock) CreateTable(
@@ -70,6 +75,30 @@
 	return nil, c.copyBackupError
 }
 
+func (c *mockTableAdminClock) CreateAuthorizedView(
+	ctx context.Context, in *btapb.CreateAuthorizedViewRequest, opts ...grpc.CallOption,
+) (*longrunning.Operation, error) {
+	c.createAuthorizedViewReq = in
+	return &longrunning.Operation{
+		Done: true,
+		Result: &longrunning.Operation_Response{
+			Response: &anypb.Any{TypeUrl: "google.bigtable.admin.v2.AuthorizedView"},
+		},
+	}, c.createAuthorizedViewError
+}
+
+func (c *mockTableAdminClock) UpdateAuthorizedView(
+	ctx context.Context, in *btapb.UpdateAuthorizedViewRequest, opts ...grpc.CallOption,
+) (*longrunning.Operation, error) {
+	c.updateAuthorizedViewReq = in
+	return &longrunning.Operation{
+		Done: true,
+		Result: &longrunning.Operation_Response{
+			Response: &anypb.Any{TypeUrl: "google.bigtable.admin.v2.AuthorizedView"},
+		},
+	}, c.updateAuthorizedViewError
+}
+
 func setupTableClient(t *testing.T, ac btapb.BigtableTableAdminClient) *AdminClient {
 	ctx := context.Background()
 	c, err := NewAdminClient(ctx, "my-cool-project", "my-cool-instance")
@@ -277,6 +306,120 @@
 	}
 }
 
+func TestTableAdmin_CreateAuthorizedView_DeletionProtection_Protected(t *testing.T) {
+	mock := &mockTableAdminClock{}
+	c := setupTableClient(t, mock)
+
+	err := c.CreateTableFromConf(context.Background(), &TableConf{TableID: "my-cool-table"})
+	if err != nil {
+		t.Fatalf("CreateTableFromConf failed: %v", err)
+	}
+
+	deletionProtection := Protected
+	err = c.CreateAuthorizedView(context.Background(), &AuthorizedViewConf{
+		TableID:            "my-cool-table",
+		AuthorizedViewID:   "my-cool-authorized-view",
+		AuthorizedView:     &SubsetViewConf{},
+		DeletionProtection: deletionProtection,
+	})
+	if err != nil {
+		t.Fatalf("CreateAuthorizedView failed: %v", err)
+	}
+	createAuthorizedViewReq := mock.createAuthorizedViewReq
+	if !cmp.Equal(createAuthorizedViewReq.Parent, "projects/my-cool-project/instances/my-cool-instance/tables/my-cool-table") {
+		t.Errorf("Unexpected parent: %v, expected %v", createAuthorizedViewReq.Parent, "projects/my-cool-project/instances/my-cool-instance/tables/my-cool-table")
+	}
+	if !cmp.Equal(createAuthorizedViewReq.AuthorizedViewId, "my-cool-authorized-view") {
+		t.Errorf("Unexpected authorized view ID: %v, expected %v", createAuthorizedViewReq.Parent, "my-cool-authorized-view")
+	}
+	if !cmp.Equal(createAuthorizedViewReq.AuthorizedView.DeletionProtection, true) {
+		t.Errorf("Unexpected authorized view deletion protection: %v, expected %v", createAuthorizedViewReq.AuthorizedView.DeletionProtection, true)
+	}
+}
+
+func TestTableAdmin_CreateAuthorizedView_DeletionProtection_Unprotected(t *testing.T) {
+	mock := &mockTableAdminClock{}
+	c := setupTableClient(t, mock)
+
+	deletionProtection := Unprotected
+	err := c.CreateAuthorizedView(context.Background(), &AuthorizedViewConf{
+		TableID:            "my-cool-table",
+		AuthorizedViewID:   "my-cool-authorized-view",
+		AuthorizedView:     &SubsetViewConf{},
+		DeletionProtection: deletionProtection,
+	})
+	if err != nil {
+		t.Fatalf("CreateAuthorizedView failed: %v", err)
+	}
+	createAuthorizedViewReq := mock.createAuthorizedViewReq
+	if !cmp.Equal(createAuthorizedViewReq.Parent, "projects/my-cool-project/instances/my-cool-instance/tables/my-cool-table") {
+		t.Errorf("Unexpected parent: %v, expected %v", createAuthorizedViewReq.Parent, "projects/my-cool-project/instances/my-cool-instance/tables/my-cool-table")
+	}
+	if !cmp.Equal(createAuthorizedViewReq.AuthorizedViewId, "my-cool-authorized-view") {
+		t.Errorf("Unexpected authorized view ID: %v, expected %v", createAuthorizedViewReq.Parent, "my-cool-authorized-view")
+	}
+	if !cmp.Equal(createAuthorizedViewReq.AuthorizedView.DeletionProtection, false) {
+		t.Errorf("Unexpected authorized view deletion protection: %v, expected %v", createAuthorizedViewReq.AuthorizedView.DeletionProtection, false)
+	}
+}
+
+func TestTableAdmin_UpdateAuthorizedViewWithDeletionProtection(t *testing.T) {
+	mock := &mockTableAdminClock{}
+	c := setupTableClient(t, mock)
+	deletionProtection := Protected
+
+	// Check if the deletion protection updates correctly
+	err := c.UpdateAuthorizedView(context.Background(), UpdateAuthorizedViewConf{
+		AuthorizedViewConf: AuthorizedViewConf{
+			TableID:            "my-cool-table",
+			AuthorizedViewID:   "my-cool-authorized-view",
+			DeletionProtection: deletionProtection,
+		},
+	})
+	if err != nil {
+		t.Fatalf("UpdateAuthorizedView failed: %v", err)
+	}
+	updateAuthorizedViewReq := mock.updateAuthorizedViewReq
+	if !cmp.Equal(updateAuthorizedViewReq.AuthorizedView.Name, "projects/my-cool-project/instances/my-cool-instance/tables/my-cool-table/authorizedViews/my-cool-authorized-view") {
+		t.Errorf("UpdateAuthorizedViewRequest does not match: AuthorizedViewName: %v, expected %v", updateAuthorizedViewReq.AuthorizedView.Name, "projects/my-cool-project/instances/my-cool-instance/tables/my-cool-table/authorizedViews/my-cool-authorized-view")
+	}
+	if !cmp.Equal(updateAuthorizedViewReq.AuthorizedView.DeletionProtection, true) {
+		t.Errorf("UpdateAuthorizedViewRequest does not match: DeletionProtection: %v, expected %v", updateAuthorizedViewReq.AuthorizedView.DeletionProtection, true)
+	}
+	if !cmp.Equal(len(updateAuthorizedViewReq.UpdateMask.Paths), 1) {
+		t.Errorf("UpdateAuthorizedViewRequest does not match: UpdateMask has length of %d, expected %v", len(updateAuthorizedViewReq.UpdateMask.Paths), 1)
+	}
+	if !cmp.Equal(updateAuthorizedViewReq.UpdateMask.Paths[0], "deletion_protection") {
+		t.Errorf("UpdateAuthorizedViewRequest does not match: updateAuthorizedViewReq.UpdateMask.Paths[0]: %v, expected: %v", updateAuthorizedViewReq.UpdateMask.Paths[0], "deletion_protection")
+	}
+}
+
+func TestTableAdmin_UpdateAuthorizedViewWithSubsetView(t *testing.T) {
+	mock := &mockTableAdminClock{}
+	c := setupTableClient(t, mock)
+
+	err := c.UpdateAuthorizedView(context.Background(), UpdateAuthorizedViewConf{
+		AuthorizedViewConf: AuthorizedViewConf{
+			TableID:          "my-cool-table",
+			AuthorizedViewID: "my-cool-authorized-view",
+			AuthorizedView:   &SubsetViewConf{},
+		},
+	})
+	if err != nil {
+		t.Fatalf("UpdateAuthorizedView failed: %v", err)
+	}
+	updateAuthorizedViewReq := mock.updateAuthorizedViewReq
+	if !cmp.Equal(updateAuthorizedViewReq.AuthorizedView.Name, "projects/my-cool-project/instances/my-cool-instance/tables/my-cool-table/authorizedViews/my-cool-authorized-view") {
+		t.Errorf("UpdateAuthorizedViewRequest does not match: AuthorizedViewName: %v, expected %v", updateAuthorizedViewReq.AuthorizedView.Name, "projects/my-cool-project/instances/my-cool-instance/tables/my-cool-table/authorizedViews/my-cool-authorized-view")
+	}
+	if !cmp.Equal(len(updateAuthorizedViewReq.UpdateMask.Paths), 1) {
+		t.Errorf("UpdateAuthorizedViewRequest does not match: UpdateMask has length of %d, expected %v", len(updateAuthorizedViewReq.UpdateMask.Paths), 1)
+	}
+	if !cmp.Equal(updateAuthorizedViewReq.UpdateMask.Paths[0], "subset_view") {
+		t.Errorf("UpdateAuthorizedViewRequest does not match: updateAuthorizedViewReq.UpdateMask.Paths[0]: %v, expected: %v", updateAuthorizedViewReq.UpdateMask.Paths[0], "subset_view")
+	}
+}
+
 type mockAdminClock struct {
 	btapb.BigtableInstanceAdminClient
 
diff --git a/bigtable/bigtable.go b/bigtable/bigtable.go
index 5a1bdbb..d86f5dd 100644
--- a/bigtable/bigtable.go
+++ b/bigtable/bigtable.go
@@ -148,6 +148,10 @@
 	return fmt.Sprintf("projects/%s/instances/%s/tables/%s", c.project, c.instance, table)
 }
 
+func (c *Client) fullAuthorizedViewName(table string, authorizedView string) string {
+	return fmt.Sprintf("projects/%s/instances/%s/tables/%s/authorizedViews/%s", c.project, c.instance, table, authorizedView)
+}
+
 func (c *Client) requestParamsHeaderValue(table string) string {
 	return fmt.Sprintf("table_name=%s&app_profile_id=%s", url.QueryEscape(c.fullTableName(table)), url.QueryEscape(c.appProfile))
 }
@@ -161,6 +165,20 @@
 	return metadata.NewOutgoingContext(ctx, metadata.Join(allMDs...))
 }
 
+// TableAPI interface allows existing data APIs to be applied to either an authorized view or a table.
+type TableAPI interface {
+	ReadRows(ctx context.Context, arg RowSet, f func(Row) bool, opts ...ReadOption) error
+	ReadRow(ctx context.Context, row string, opts ...ReadOption) (Row, error)
+	Apply(ctx context.Context, row string, m *Mutation, opts ...ApplyOption) error
+	ApplyBulk(ctx context.Context, rowKeys []string, muts []*Mutation, opts ...ApplyOption) ([]error, error)
+	SampleRowKeys(ctx context.Context) ([]string, error)
+	ApplyReadModifyWrite(ctx context.Context, row string, m *ReadModifyWrite) (Row, error)
+}
+
+type tableImpl struct {
+	Table
+}
+
 // A Table refers to a table.
 //
 // A Table is safe to use concurrently.
@@ -169,7 +187,8 @@
 	table string
 
 	// Metadata to be sent with each request.
-	md metadata.MD
+	md             metadata.MD
+	authorizedView string
 }
 
 // Open opens a table.
@@ -184,6 +203,51 @@
 	}
 }
 
+// OpenTable opens a table.
+func (c *Client) OpenTable(table string) TableAPI {
+	return &tableImpl{Table{
+		c:     c,
+		table: table,
+		md: metadata.Join(metadata.Pairs(
+			resourcePrefixHeader, c.fullTableName(table),
+			requestParamsHeader, c.requestParamsHeaderValue(table),
+		), btopt.WithFeatureFlags()),
+	}}
+}
+
+// OpenAuthorizedView opens an authorized view.
+func (c *Client) OpenAuthorizedView(table, authorizedView string) TableAPI {
+	return &tableImpl{Table{
+		c:     c,
+		table: table,
+		md: metadata.Join(metadata.Pairs(
+			resourcePrefixHeader, c.fullAuthorizedViewName(table, authorizedView),
+			requestParamsHeader, c.requestParamsHeaderValue(table),
+		), btopt.WithFeatureFlags()),
+		authorizedView: authorizedView,
+	}}
+}
+
+func (ti *tableImpl) ReadRows(ctx context.Context, arg RowSet, f func(Row) bool, opts ...ReadOption) error {
+	return ti.Table.ReadRows(ctx, arg, f, opts...)
+}
+
+func (ti *tableImpl) Apply(ctx context.Context, row string, m *Mutation, opts ...ApplyOption) error {
+	return ti.Table.Apply(ctx, row, m, opts...)
+}
+
+func (ti *tableImpl) ApplyBulk(ctx context.Context, rowKeys []string, muts []*Mutation, opts ...ApplyOption) ([]error, error) {
+	return ti.Table.ApplyBulk(ctx, rowKeys, muts, opts...)
+}
+
+func (ti *tableImpl) SampleRowKeys(ctx context.Context) ([]string, error) {
+	return ti.Table.SampleRowKeys(ctx)
+}
+
+func (ti *tableImpl) ApplyReadModifyWrite(ctx context.Context, row string, m *ReadModifyWrite) (Row, error) {
+	return ti.Table.ApplyReadModifyWrite(ctx, row, m)
+}
+
 // TODO(dsymonds): Read method that returns a sequence of ReadItems.
 
 // ReadRows reads rows from a table. f is called for each row.
@@ -202,9 +266,13 @@
 	attrMap := make(map[string]interface{})
 	err = gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error {
 		req := &btpb.ReadRowsRequest{
-			TableName:    t.c.fullTableName(t.table),
 			AppProfileId: t.c.appProfile,
 		}
+		if t.authorizedView == "" {
+			req.TableName = t.c.fullTableName(t.table)
+		} else {
+			req.AuthorizedViewName = t.c.fullAuthorizedViewName(t.table, t.authorizedView)
+		}
 
 		if arg != nil {
 			if !arg.valid() {
@@ -827,11 +895,15 @@
 	var callOptions []gax.CallOption
 	if m.cond == nil {
 		req := &btpb.MutateRowRequest{
-			TableName:    t.c.fullTableName(t.table),
 			AppProfileId: t.c.appProfile,
 			RowKey:       []byte(row),
 			Mutations:    m.ops,
 		}
+		if t.authorizedView == "" {
+			req.TableName = t.c.fullTableName(t.table)
+		} else {
+			req.AuthorizedViewName = t.c.fullAuthorizedViewName(t.table, t.authorizedView)
+		}
 		if mutationsAreRetryable(m.ops) {
 			callOptions = retryOptions
 		}
@@ -848,11 +920,15 @@
 	}
 
 	req := &btpb.CheckAndMutateRowRequest{
-		TableName:       t.c.fullTableName(t.table),
 		AppProfileId:    t.c.appProfile,
 		RowKey:          []byte(row),
 		PredicateFilter: m.cond.proto(),
 	}
+	if t.authorizedView == "" {
+		req.TableName = t.c.fullTableName(t.table)
+	} else {
+		req.AuthorizedViewName = t.c.fullAuthorizedViewName(t.table, t.authorizedView)
+	}
 	if m.mtrue != nil {
 		if m.mtrue.cond != nil {
 			return errors.New("bigtable: conditional mutations cannot be nested")
@@ -1086,10 +1162,14 @@
 		entries[i] = entryErr.Entry
 	}
 	req := &btpb.MutateRowsRequest{
-		TableName:    t.c.fullTableName(t.table),
 		AppProfileId: t.c.appProfile,
 		Entries:      entries,
 	}
+	if t.authorizedView == "" {
+		req.TableName = t.c.fullTableName(t.table)
+	} else {
+		req.AuthorizedViewName = t.c.fullAuthorizedViewName(t.table, t.authorizedView)
+	}
 	stream, err := t.c.client.MutateRows(ctx, req)
 	if err != nil {
 		return err
@@ -1172,11 +1252,15 @@
 func (t *Table) ApplyReadModifyWrite(ctx context.Context, row string, m *ReadModifyWrite) (Row, error) {
 	ctx = mergeOutgoingMetadata(ctx, t.md)
 	req := &btpb.ReadModifyWriteRowRequest{
-		TableName:    t.c.fullTableName(t.table),
 		AppProfileId: t.c.appProfile,
 		RowKey:       []byte(row),
 		Rules:        m.ops,
 	}
+	if t.authorizedView == "" {
+		req.TableName = t.c.fullTableName(t.table)
+	} else {
+		req.AuthorizedViewName = t.c.fullAuthorizedViewName(t.table, t.authorizedView)
+	}
 	res, err := t.c.client.ReadModifyWriteRow(ctx, req)
 	if err != nil {
 		return nil, err
@@ -1236,9 +1320,13 @@
 	err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error {
 		sampledRowKeys = nil
 		req := &btpb.SampleRowKeysRequest{
-			TableName:    t.c.fullTableName(t.table),
 			AppProfileId: t.c.appProfile,
 		}
+		if t.authorizedView == "" {
+			req.TableName = t.c.fullTableName(t.table)
+		} else {
+			req.AuthorizedViewName = t.c.fullAuthorizedViewName(t.table, t.authorizedView)
+		}
 		ctx, cancel := context.WithCancel(ctx) // for aborting the stream
 		defer cancel()
 
diff --git a/bigtable/integration_test.go b/bigtable/integration_test.go
index 4a796e8..73f1250 100644
--- a/bigtable/integration_test.go
+++ b/bigtable/integration_test.go
@@ -1999,6 +1999,80 @@
 	}
 }
 
+func TestIntegration_AuthorizedViewIAM(t *testing.T) {
+	testEnv, err := NewIntegrationEnv()
+	if err != nil {
+		t.Fatalf("IntegrationEnv: %v", err)
+	}
+	defer testEnv.Close()
+
+	if !testEnv.Config().UseProd {
+		t.Skip("emulator doesn't support IAM Policy creation")
+	}
+	timeout := 5 * time.Minute
+	ctx, cancel := context.WithTimeout(context.Background(), timeout)
+	defer cancel()
+
+	adminClient, err := testEnv.NewAdminClient()
+	if err != nil {
+		t.Fatalf("NewAdminClient: %v", err)
+	}
+	defer adminClient.Close()
+
+	table := testEnv.Config().Table
+
+	defer deleteTable(ctx, t, adminClient, table)
+	if err := adminClient.CreateTable(ctx, table); err != nil {
+		t.Fatalf("Creating table: %v", err)
+	}
+
+	// Create authorized view.
+	opts := &uid.Options{Sep: '_'}
+	authorizedViewUUID := uid.NewSpace("authorizedView", opts)
+	authorizedView := authorizedViewUUID.New()
+
+	defer adminClient.DeleteAuthorizedView(ctx, table, authorizedView)
+
+	if err = adminClient.CreateAuthorizedView(ctx, &AuthorizedViewConf{
+		TableID:            table,
+		AuthorizedViewID:   authorizedView,
+		AuthorizedView:     &SubsetViewConf{},
+		DeletionProtection: Unprotected,
+	}); err != nil {
+		t.Fatalf("Creating authorizedView: %v", err)
+	}
+	iamHandle := adminClient.AuthorizedViewIAM(table, authorizedView)
+	// Get authorized view policy.
+	p, err := iamHandle.Policy(ctx)
+	if err != nil {
+		t.Errorf("iamHandle.Policy: %v", err)
+	}
+	// The resource is new, so the policy should be empty.
+	if got := p.Roles(); len(got) > 0 {
+		t.Errorf("got roles %v, want none", got)
+	}
+	// Set authorized view policy.
+	member := "domain:google.com"
+	// Add a member, set the policy, then check that the member is present.
+	p.Add(member, iam.Viewer)
+	if err = iamHandle.SetPolicy(ctx, p); err != nil {
+		t.Errorf("iamHandle.SetPolicy: %v", err)
+	}
+	p, err = iamHandle.Policy(ctx)
+	if err != nil {
+		t.Errorf("iamHandle.Policy: %v", err)
+	}
+	if got, want := p.Members(iam.Viewer), []string{member}; !testutil.Equal(got, want) {
+		t.Errorf("iamHandle.Policy: got %v, want %v", got, want)
+	}
+	// Test authorized view permissions.
+	permissions := []string{"bigtable.authorizedViews.get", "bigtable.authorizedViews.update"}
+	_, err = iamHandle.TestPermissions(ctx, permissions)
+	if err != nil {
+		t.Errorf("iamHandle.TestPermissions: %v", err)
+	}
+}
+
 func TestIntegration_AdminCreateInstance(t *testing.T) {
 	if instanceToCreate == "" {
 		t.Skip("instanceToCreate not set, skipping instance creation testing")
@@ -3421,6 +3495,335 @@
 	}
 }
 
+func TestIntegration_AdminAuthorizedView(t *testing.T) {
+	testEnv, err := NewIntegrationEnv()
+	if err != nil {
+		t.Fatalf("IntegrationEnv: %v", err)
+	}
+	defer testEnv.Close()
+
+	if !testEnv.Config().UseProd {
+		t.Skip("emulator doesn't support authorizedViews")
+	}
+
+	timeout := 15 * time.Minute
+	ctx, cancel := context.WithTimeout(context.Background(), timeout)
+	defer cancel()
+
+	adminClient, err := testEnv.NewAdminClient()
+	if err != nil {
+		t.Fatalf("NewAdminClient: %v", err)
+	}
+	defer adminClient.Close()
+
+	tblConf := TableConf{
+		TableID: testEnv.Config().Table,
+		Families: map[string]GCPolicy{
+			"fam1": MaxVersionsPolicy(1),
+			"fam2": MaxVersionsPolicy(2),
+		},
+	}
+	if err := adminClient.CreateTableFromConf(ctx, &tblConf); err != nil {
+		t.Fatalf("Creating table from TableConf: %v", err)
+	}
+	// Delete the table at the end of the test. Schedule ahead of time
+	// in case the client fails
+	defer deleteTable(ctx, t, adminClient, tblConf.TableID)
+
+	// Create authorized view
+	authorizedViewUUID := uid.NewSpace("authorizedView-", &uid.Options{})
+	authorizedView := authorizedViewUUID.New()
+	defer adminClient.DeleteAuthorizedView(ctx, tblConf.TableID, authorizedView)
+
+	authorizedViewConf := AuthorizedViewConf{
+		TableID:          tblConf.TableID,
+		AuthorizedViewID: authorizedView,
+		AuthorizedView: &SubsetViewConf{
+			RowPrefixes: [][]byte{[]byte("r1")},
+		},
+		DeletionProtection: Protected,
+	}
+	if err = adminClient.CreateAuthorizedView(ctx, &authorizedViewConf); err != nil {
+		t.Fatalf("Creating authorized view: %v", err)
+	}
+
+	// List authorized views
+	authorizedViews, err := adminClient.AuthorizedViews(ctx, tblConf.TableID)
+	if err != nil {
+		t.Fatalf("Listing authorized views: %v", err)
+	}
+	if got, want := len(authorizedViews), 1; got != want {
+		t.Fatalf("Listing authorized views count: %d, want: != %d", got, want)
+	}
+	if got, want := authorizedViews[0], authorizedView; got != want {
+		t.Errorf("AuthorizedView Name: %s, want: %s", got, want)
+	}
+
+	// Get authorized view
+	avInfo, err := adminClient.AuthorizedViewInfo(ctx, tblConf.TableID, authorizedView)
+	if err != nil {
+		t.Fatalf("Getting authorized view: %v", err)
+	}
+	if got, want := avInfo.AuthorizedView.(*SubsetViewInfo), authorizedViewConf.AuthorizedView.(*SubsetViewConf); cmp.Equal(got, want) {
+		t.Errorf("SubsetViewConf: %v, want: %v", got, want)
+	}
+
+	// Cannot delete the authorized view because it is deletion protected
+	if err = adminClient.DeleteAuthorizedView(ctx, tblConf.TableID, authorizedView); err == nil {
+		t.Fatalf("Expect error when deleting authorized view")
+	}
+
+	// Update authorized view
+	newAuthorizedViewConf := AuthorizedViewConf{
+		TableID:            tblConf.TableID,
+		AuthorizedViewID:   authorizedView,
+		DeletionProtection: Unprotected,
+	}
+	err = adminClient.UpdateAuthorizedView(ctx, UpdateAuthorizedViewConf{
+		AuthorizedViewConf: newAuthorizedViewConf,
+	})
+	if err != nil {
+		t.Fatalf("UpdateAuthorizedView failed: %v", err)
+	}
+
+	// Check that updated authorized view has the correct deletion protection
+	avInfo, err = adminClient.AuthorizedViewInfo(ctx, tblConf.TableID, authorizedView)
+	if err != nil {
+		t.Fatalf("Getting authorized view: %v", err)
+	}
+	if got, want := avInfo.DeletionProtection, Unprotected; got != want {
+		t.Errorf("AuthorizedView deletion protection: %v, want: %v", got, want)
+	}
+	// Check that the subset_view field doesn't change
+	if got, want := avInfo.AuthorizedView.(*SubsetViewInfo), authorizedViewConf.AuthorizedView.(*SubsetViewConf); cmp.Equal(got, want) {
+		t.Errorf("SubsetViewConf: %v, want: %v", got, want)
+	}
+
+	// Delete authorized view
+	if err = adminClient.DeleteAuthorizedView(ctx, tblConf.TableID, authorizedView); err != nil {
+		t.Fatalf("DeleteAuthorizedView: %v", err)
+	}
+
+	// Verify the authorized view was deleted.
+	authorizedViews, err = adminClient.AuthorizedViews(ctx, tblConf.TableID)
+	if err != nil {
+		t.Fatalf("Listing authorized views: %v", err)
+	}
+	if got, want := len(authorizedViews), 0; got != want {
+		t.Fatalf("Listing authorized views count: %d, want: != %d", got, want)
+	}
+}
+
+func TestIntegration_DataAuthorizedView(t *testing.T) {
+	testEnv, err := NewIntegrationEnv()
+	if err != nil {
+		t.Fatalf("IntegrationEnv: %v", err)
+	}
+	defer testEnv.Close()
+
+	if !testEnv.Config().UseProd {
+		t.Skip("emulator doesn't support authorizedViews")
+	}
+
+	timeout := 15 * time.Minute
+	ctx, cancel := context.WithTimeout(context.Background(), timeout)
+	defer cancel()
+
+	adminClient, err := testEnv.NewAdminClient()
+	if err != nil {
+		t.Fatalf("NewAdminClient: %v", err)
+	}
+	defer adminClient.Close()
+
+	tblConf := TableConf{
+		TableID: testEnv.Config().Table,
+		Families: map[string]GCPolicy{
+			"fam1": MaxVersionsPolicy(1),
+			"fam2": MaxVersionsPolicy(2),
+		},
+	}
+	if err := adminClient.CreateTableFromConf(ctx, &tblConf); err != nil {
+		t.Fatalf("Creating table from TableConf: %v", err)
+	}
+	// Delete the table at the end of the test. Schedule ahead of time
+	// in case the client fails
+	defer deleteTable(ctx, t, adminClient, tblConf.TableID)
+
+	// Create authorized view
+	authorizedViewUUID := uid.NewSpace("authorizedView-", &uid.Options{})
+	authorizedView := authorizedViewUUID.New()
+	defer adminClient.DeleteAuthorizedView(ctx, tblConf.TableID, authorizedView)
+
+	authorizedViewConf := AuthorizedViewConf{
+		TableID:          tblConf.TableID,
+		AuthorizedViewID: authorizedView,
+		AuthorizedView: &SubsetViewConf{
+			RowPrefixes: [][]byte{[]byte("r1")},
+			FamilySubsets: map[string]FamilySubset{
+				"fam1": {
+					QualifierPrefixes: [][]byte{[]byte("col")},
+				},
+				"fam2": {
+					Qualifiers: [][]byte{[]byte("col")},
+				},
+			},
+		},
+		DeletionProtection: Unprotected,
+	}
+	if err = adminClient.CreateAuthorizedView(ctx, &authorizedViewConf); err != nil {
+		t.Fatalf("Creating authorized view: %v", err)
+	}
+
+	client, err := testEnv.NewClient()
+	if err != nil {
+		t.Fatalf("NewClient: %v", err)
+	}
+	defer client.Close()
+	av := client.OpenAuthorizedView(tblConf.TableID, authorizedView)
+	tbl := client.OpenTable(tblConf.TableID)
+
+	prefix1 := "r1"
+	prefix2 := "r2" // outside of the authorized view
+	mut1 := NewMutation()
+	mut1.Set("fam1", "col1", 1000, []byte("1"))
+	mut2 := NewMutation()
+	mut2.Set("fam1", "col2", 1000, []byte("1"))
+	mut3 := NewMutation()
+	mut3.Set("fam2", "column", 1000, []byte("1")) // outside of the authorized view
+
+	// Test mutation
+	if err := av.Apply(ctx, prefix1, mut1); err != nil {
+		t.Fatalf("Mutating row from an authorized view: %v", err)
+	}
+	if err := av.Apply(ctx, prefix2, mut1); err == nil {
+		t.Fatalf("Expect error when mutating a row outside of the authorized view: %v", err)
+	}
+	if err := tbl.Apply(ctx, prefix2, mut1); err != nil {
+		t.Fatalf("Mutating row from a table: %v", err)
+	}
+
+	// Test bulk mutations
+	status, err := av.ApplyBulk(ctx, []string{prefix1, prefix2, prefix1}, []*Mutation{mut2, mut2, mut3})
+	if err != nil {
+		t.Fatalf("Mutating rows from an authorized view: %v", err)
+	}
+	if status == nil {
+		t.Fatalf("Expect error for bad bulk mutation outside of the authorized view")
+	} else if status[0] != nil || status[1] == nil || status[2] == nil {
+		t.Fatalf("Expect error for bad bulk mutation outside of the authorized view")
+	}
+
+	// Test ReadRow
+	gotRow, err := av.ReadRow(ctx, "r1")
+	if err != nil {
+		t.Fatalf("Reading row from an authorized view: %v", err)
+	}
+	wantRow := Row{
+		"fam1": []ReadItem{
+			{Row: "r1", Column: "fam1:col1", Timestamp: 1000, Value: []byte("1")},
+			{Row: "r1", Column: "fam1:col2", Timestamp: 1000, Value: []byte("1")},
+		},
+	}
+	if !testutil.Equal(gotRow, wantRow) {
+		t.Fatalf("Error reading row from authorized view.\n Got %v\n Want %v", gotRow, wantRow)
+	}
+	gotRow, err = av.ReadRow(ctx, "r2")
+	if err != nil {
+		t.Fatalf("Reading row from an authorized view: %v", err)
+	}
+	if len(gotRow) != 0 {
+		t.Fatalf("Expect empty result when reading row from outside an authorized view")
+	}
+	gotRow, err = tbl.ReadRow(ctx, "r2")
+	if err != nil {
+		t.Fatalf("Reading row from a table: %v", err)
+	}
+	if len(gotRow) != 1 {
+		t.Fatalf("Invalid row count when reading from a table: %d, want: != %d", len(gotRow), 1)
+	}
+
+	// Test ReadRows
+	var elt []string
+	f := func(row Row) bool {
+		for _, ris := range row {
+			for _, ri := range ris {
+				elt = append(elt, formatReadItem(ri))
+			}
+		}
+		return true
+	}
+	if err = av.ReadRows(ctx, RowRange{}, f); err != nil {
+		t.Fatalf("Reading rows from an authorized view: %v", err)
+	}
+	want := "r1-col1-1,r1-col2-1"
+	if got := strings.Join(elt, ","); got != want {
+		t.Fatalf("Error bulk reading from authorized view.\n Got %v\n Want %v", got, want)
+	}
+	elt = nil
+	if err = tbl.ReadRows(ctx, RowRange{}, f); err != nil {
+		t.Fatalf("Reading rows from a table: %v", err)
+	}
+	want = "r1-col1-1,r1-col2-1,r2-col1-1"
+	if got := strings.Join(elt, ","); got != want {
+		t.Fatalf("Error bulk reading from table.\n Got %v\n Want %v", got, want)
+	}
+
+	// Test ReadModifyWrite
+	rmw := NewReadModifyWrite()
+	rmw.AppendValue("fam1", "col1", []byte("1"))
+	gotRow, err = av.ApplyReadModifyWrite(ctx, "r1", rmw)
+	if err != nil {
+		t.Fatalf("Applying ReadModifyWrite from an authorized view: %v", err)
+	}
+	wantRow = Row{
+		"fam1": []ReadItem{
+			{Row: "r1", Column: "fam1:col1", Value: []byte("11")},
+		},
+	}
+	// Make sure the modified cell returned by the RMW operation has a timestamp.
+	if gotRow["fam1"][0].Timestamp == 0 {
+		t.Fatalf("RMW returned cell timestamp: got %v, want > 0", gotRow["fam1"][0].Timestamp)
+	}
+	clearTimestamps(gotRow)
+	if !testutil.Equal(gotRow, wantRow) {
+		t.Fatalf("Error applying ReadModifyWrite from authorized view.\n Got %v\n Want %v", gotRow, wantRow)
+	}
+	if _, err = av.ApplyReadModifyWrite(ctx, "r2", rmw); err == nil {
+		t.Fatalf("Expect error applying ReadModifyWrite from outside an authorized view")
+	}
+
+	// Test SampleRowKeys
+	presplitTable := fmt.Sprintf("presplit-table-%d", time.Now().Unix())
+	if err := adminClient.CreatePresplitTable(ctx, presplitTable, []string{"r0", "r11", "r12", "r2"}); err != nil {
+		t.Fatal(err)
+	}
+	defer adminClient.DeleteTable(ctx, presplitTable)
+	if err := adminClient.CreateColumnFamily(ctx, presplitTable, "fam1"); err != nil {
+		t.Fatal(err)
+	}
+	defer adminClient.DeleteAuthorizedView(ctx, presplitTable, authorizedView)
+	if err = adminClient.CreateAuthorizedView(ctx, &AuthorizedViewConf{
+		TableID:          presplitTable,
+		AuthorizedViewID: authorizedView,
+		AuthorizedView: &SubsetViewConf{
+			RowPrefixes: [][]byte{[]byte("r1")},
+		},
+		DeletionProtection: Unprotected,
+	}); err != nil {
+		t.Fatalf("Creating authorized view: %v", err)
+	}
+
+	av = client.OpenAuthorizedView(presplitTable, authorizedView)
+	sampleKeys, err := av.SampleRowKeys(ctx)
+	if err != nil {
+		t.Fatalf("Sampling row keys from an authorized view: %v", err)
+	}
+	want = "r11,r12,r2"
+	if got := strings.Join(sampleKeys, ","); got != want {
+		t.Fatalf("Error sample row keys from an authorized view.\n Got %v\n Want %v", got, want)
+	}
+}
+
 // TestIntegration_DirectPathFallback tests the CFE fallback when the directpath net is blackholed.
 func TestIntegration_DirectPathFallback(t *testing.T) {
 	ctx := context.Background()