blob: d9421fe68e63135b5ad8c6871fbd9b5e7060291b [file] [log] [blame]
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package actions
import (
"context"
"fmt"
"log"
"cloud.google.com/go/longrunning/autogen/longrunningpb"
"cloud.google.com/go/spanner"
database "cloud.google.com/go/spanner/admin/database/apiv1"
adminpb "cloud.google.com/go/spanner/admin/database/apiv1/databasepb"
instance "cloud.google.com/go/spanner/admin/instance/apiv1"
"cloud.google.com/go/spanner/admin/instance/apiv1/instancepb"
"cloud.google.com/go/spanner/executor/apiv1/executorpb"
"cloud.google.com/go/spanner/test/cloudexecutor/executor/internal/outputstream"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
spb "google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/fieldmaskpb"
)
// AdminActionHandler holds the necessary components and options required for performing admin tasks.
type AdminActionHandler struct {
Action *executorpb.AdminAction
FlowContext *ExecutionFlowContext
OutcomeSender *outputstream.OutcomeSender
Options []option.ClientOption
}
// ExecuteAction execute admin actions by action case, using OutcomeSender to send status and results back.
func (h *AdminActionHandler) ExecuteAction(ctx context.Context) error {
log.Printf("executing admin action %v", h.Action)
h.FlowContext.mu.Lock()
defer h.FlowContext.mu.Unlock()
var err error
switch h.Action.GetAction().(type) {
case *executorpb.AdminAction_CreateCloudInstance:
err = executeCreateCloudInstance(ctx, h.Action.GetCreateCloudInstance(), h.FlowContext, h.Options, h.OutcomeSender)
case *executorpb.AdminAction_UpdateCloudInstance:
err = executeUpdateCloudInstance(ctx, h.Action.GetUpdateCloudInstance(), h.FlowContext, h.Options, h.OutcomeSender)
case *executorpb.AdminAction_DeleteCloudInstance:
err = executeDeleteCloudInstance(ctx, h.Action.GetDeleteCloudInstance(), h.FlowContext, h.Options, h.OutcomeSender)
case *executorpb.AdminAction_ListCloudInstances:
err = executeListCloudInstances(ctx, h.Action.GetListCloudInstances(), h.FlowContext, h.Options, h.OutcomeSender)
case *executorpb.AdminAction_ListInstanceConfigs:
err = executeListInstanceConfigs(ctx, h.Action.GetListInstanceConfigs(), h.FlowContext, h.Options, h.OutcomeSender)
case *executorpb.AdminAction_GetCloudInstanceConfig:
err = executeGetCloudInstanceConfig(ctx, h.Action.GetGetCloudInstanceConfig(), h.FlowContext, h.Options, h.OutcomeSender)
case *executorpb.AdminAction_GetCloudInstance:
err = executeGetCloudInstance(ctx, h.Action.GetGetCloudInstance(), h.FlowContext, h.Options, h.OutcomeSender)
case *executorpb.AdminAction_CreateUserInstanceConfig:
err = executeCreateUserInstanceConfig(ctx, h.Action.GetCreateUserInstanceConfig(), h.FlowContext, h.Options, h.OutcomeSender)
case *executorpb.AdminAction_DeleteUserInstanceConfig:
err = executeDeleteUserInstanceConfig(ctx, h.Action.GetDeleteUserInstanceConfig(), h.FlowContext, h.Options, h.OutcomeSender)
case *executorpb.AdminAction_CreateCloudDatabase:
err = executeCreateCloudDatabase(ctx, h.Action.GetCreateCloudDatabase(), h.FlowContext, h.Options, h.OutcomeSender)
case *executorpb.AdminAction_UpdateCloudDatabaseDdl:
err = executeUpdateCloudDatabaseDdl(ctx, h.Action.GetUpdateCloudDatabaseDdl(), h.FlowContext, h.Options, h.OutcomeSender)
case *executorpb.AdminAction_DropCloudDatabase:
err = executeDropCloudDatabase(ctx, h.Action.GetDropCloudDatabase(), h.FlowContext, h.Options, h.OutcomeSender)
case *executorpb.AdminAction_CreateCloudBackup:
err = executeCreateCloudBackup(ctx, h.Action.GetCreateCloudBackup(), h.FlowContext, h.Options, h.OutcomeSender)
case *executorpb.AdminAction_CopyCloudBackup:
err = executeCopyCloudBackup(ctx, h.Action.GetCopyCloudBackup(), h.FlowContext, h.Options, h.OutcomeSender)
case *executorpb.AdminAction_GetCloudBackup:
err = executeGetCloudBackup(ctx, h.Action.GetGetCloudBackup(), h.FlowContext, h.Options, h.OutcomeSender)
case *executorpb.AdminAction_UpdateCloudBackup:
err = executeUpdateCloudBackup(ctx, h.Action.GetUpdateCloudBackup(), h.FlowContext, h.Options, h.OutcomeSender)
case *executorpb.AdminAction_DeleteCloudBackup:
err = executeDeleteCloudBackup(ctx, h.Action.GetDeleteCloudBackup(), h.FlowContext, h.Options, h.OutcomeSender)
case *executorpb.AdminAction_ListCloudBackups:
err = executeListCloudBackups(ctx, h.Action.GetListCloudBackups(), h.FlowContext, h.Options, h.OutcomeSender)
case *executorpb.AdminAction_ListCloudBackupOperations:
err = executeListCloudBackupOperations(ctx, h.Action.GetListCloudBackupOperations(), h.FlowContext, h.Options, h.OutcomeSender)
case *executorpb.AdminAction_ListCloudDatabases:
err = executeListCloudDatabases(ctx, h.Action.GetListCloudDatabases(), h.FlowContext, h.Options, h.OutcomeSender)
case *executorpb.AdminAction_ListCloudDatabaseOperations:
err = executeListCloudDatabaseOperations(ctx, h.Action.GetListCloudDatabaseOperations(), h.FlowContext, h.Options, h.OutcomeSender)
case *executorpb.AdminAction_RestoreCloudDatabase:
err = executeRestoreCloudDatabase(ctx, h.Action.GetRestoreCloudDatabase(), h.FlowContext, h.Options, h.OutcomeSender)
case *executorpb.AdminAction_GetCloudDatabase:
err = executeGetCloudDatabase(ctx, h.Action.GetGetCloudDatabase(), h.FlowContext, h.Options, h.OutcomeSender)
case *executorpb.AdminAction_GetOperation:
err = executeGetOperation(ctx, h.Action.GetGetOperation(), h.FlowContext, h.Options, h.OutcomeSender)
case *executorpb.AdminAction_CancelOperation:
err = executeCancelOperation(ctx, h.Action.GetCancelOperation(), h.FlowContext, h.Options, h.OutcomeSender)
default:
err = spanner.ToSpannerError(status.Error(codes.Unimplemented, fmt.Sprintf("Not implemented yet: %v", h.Action)))
}
if err != nil {
return h.OutcomeSender.FinishWithError(err)
}
return nil
}
// execute action that creates a cloud instance.
func executeCreateCloudInstance(ctx context.Context, action *executorpb.CreateCloudInstanceAction, h *ExecutionFlowContext, opts []option.ClientOption, o *outputstream.OutcomeSender) error {
log.Printf("creating instance: %v", action)
projectID := action.GetProjectId()
instanceID := action.GetInstanceId()
instanceAdminClient, err := instance.NewInstanceAdminClient(ctx, opts...)
if err != nil {
return err
}
defer instanceAdminClient.Close()
op, err := instanceAdminClient.CreateInstance(ctx, &instancepb.CreateInstanceRequest{
Parent: fmt.Sprintf("projects/%s", action.GetProjectId()),
InstanceId: instanceID,
Instance: &instancepb.Instance{
Config: fmt.Sprintf("projects/%s/instanceConfigs/%s", projectID, action.GetInstanceConfigId()),
DisplayName: instanceID,
NodeCount: action.GetNodeCount(),
ProcessingUnits: action.GetProcessingUnits(),
Labels: action.GetLabels(),
},
})
if err != nil {
return err
}
// Wait for the instance creation to finish.
_, err = op.Wait(ctx)
if err != nil {
if status.Code(err) == codes.AlreadyExists {
return nil
}
return err
}
return o.FinishSuccessfully()
}
// execute action that updates a cloud instance.
func executeUpdateCloudInstance(ctx context.Context, action *executorpb.UpdateCloudInstanceAction, h *ExecutionFlowContext, opts []option.ClientOption, o *outputstream.OutcomeSender) error {
log.Printf("updating instance: %v", action)
projectID := action.GetProjectId()
instanceID := action.GetInstanceId()
instanceAdminClient, err := instance.NewInstanceAdminClient(ctx, opts...)
if err != nil {
return err
}
defer instanceAdminClient.Close()
instanceObj := &instancepb.Instance{Name: fmt.Sprintf("projects/%s/instances/%s", projectID, instanceID)}
var fieldsToUpdate []string
if action.DisplayName != nil {
fieldsToUpdate = append(fieldsToUpdate, "display_name")
instanceObj.DisplayName = instanceID
}
if action.NodeCount != nil {
fieldsToUpdate = append(fieldsToUpdate, "node_count")
instanceObj.NodeCount = action.GetNodeCount()
}
if action.ProcessingUnits != nil {
fieldsToUpdate = append(fieldsToUpdate, "processing_units")
instanceObj.ProcessingUnits = action.GetProcessingUnits()
}
if action.Labels != nil {
fieldsToUpdate = append(fieldsToUpdate, "labels")
instanceObj.Labels = action.GetLabels()
}
op, err := instanceAdminClient.UpdateInstance(ctx, &instancepb.UpdateInstanceRequest{
Instance: instanceObj,
FieldMask: &fieldmaskpb.FieldMask{
Paths: fieldsToUpdate,
},
})
if err != nil {
return err
}
// Wait for the instance update to finish.
_, err = op.Wait(ctx)
if err != nil {
return err
}
return o.FinishSuccessfully()
}
// execute action that deletes a cloud instance.
func executeDeleteCloudInstance(ctx context.Context, action *executorpb.DeleteCloudInstanceAction, h *ExecutionFlowContext, opts []option.ClientOption, o *outputstream.OutcomeSender) error {
log.Printf("deleting instance: %v", action)
projectID := action.GetProjectId()
instanceID := action.GetInstanceId()
instanceAdminClient, err := instance.NewInstanceAdminClient(ctx, opts...)
if err != nil {
return err
}
defer instanceAdminClient.Close()
err = instanceAdminClient.DeleteInstance(ctx, &instancepb.DeleteInstanceRequest{
Name: fmt.Sprintf("projects/%s/instances/%s", projectID, instanceID),
})
if err != nil {
return err
}
return o.FinishSuccessfully()
}
// execute action that lists cloud instances.
func executeListCloudInstances(ctx context.Context, action *executorpb.ListCloudInstancesAction, h *ExecutionFlowContext, opts []option.ClientOption, o *outputstream.OutcomeSender) error {
log.Printf("listing instance: %v", action)
projectID := action.GetProjectId()
instanceAdminClient, err := instance.NewInstanceAdminClient(ctx, opts...)
if err != nil {
return err
}
defer instanceAdminClient.Close()
listInstancesRequest := &instancepb.ListInstancesRequest{
Parent: fmt.Sprintf("projects/%s", projectID),
}
if action.PageSize != nil {
listInstancesRequest.PageSize = action.GetPageSize()
}
if action.Filter != nil {
listInstancesRequest.Filter = action.GetFilter()
}
if action.PageToken != nil {
listInstancesRequest.PageToken = action.GetPageToken()
}
iter := instanceAdminClient.ListInstances(ctx, listInstancesRequest)
var instances []*instancepb.Instance
for {
instanceObj, err := iter.Next()
if err == iterator.Done {
break
}
if err != nil {
return err
}
instances = append(instances, instanceObj)
}
spannerActionOutcome := &executorpb.SpannerActionOutcome{
Status: &spb.Status{Code: int32(codes.OK)},
AdminResult: &executorpb.AdminResult{
InstanceResponse: &executorpb.CloudInstanceResponse{
ListedInstances: instances,
},
},
}
err = o.SendOutcome(spannerActionOutcome)
if err != nil {
return err
}
return nil
}
// execute action that lists cloud instance configs.
func executeListInstanceConfigs(ctx context.Context, action *executorpb.ListCloudInstanceConfigsAction, h *ExecutionFlowContext, opts []option.ClientOption, o *outputstream.OutcomeSender) error {
log.Printf("listing instance configs: %v", action)
projectID := action.GetProjectId()
instanceAdminClient, err := instance.NewInstanceAdminClient(ctx, opts...)
if err != nil {
return err
}
defer instanceAdminClient.Close()
listInstanceConfigsRequest := &instancepb.ListInstanceConfigsRequest{
Parent: fmt.Sprintf("projects/%s", projectID),
}
if action.PageSize != nil {
listInstanceConfigsRequest.PageSize = action.GetPageSize()
}
if action.PageToken != nil {
listInstanceConfigsRequest.PageToken = action.GetPageToken()
}
iter := instanceAdminClient.ListInstanceConfigs(ctx, listInstanceConfigsRequest)
var instanceConfigs []*instancepb.InstanceConfig
for {
instanceConfig, err := iter.Next()
if err == iterator.Done {
break
}
if err != nil {
return err
}
instanceConfigs = append(instanceConfigs, instanceConfig)
}
spannerActionOutcome := &executorpb.SpannerActionOutcome{
Status: &spb.Status{Code: int32(codes.OK)},
AdminResult: &executorpb.AdminResult{
InstanceConfigResponse: &executorpb.CloudInstanceConfigResponse{
ListedInstanceConfigs: instanceConfigs,
},
},
}
err = o.SendOutcome(spannerActionOutcome)
if err != nil {
return err
}
return nil
}
// execute action that gets a cloud instance config.
func executeGetCloudInstanceConfig(ctx context.Context, action *executorpb.GetCloudInstanceConfigAction, h *ExecutionFlowContext, opts []option.ClientOption, o *outputstream.OutcomeSender) error {
log.Printf("getting instance config: %v", action)
projectID := action.GetProjectId()
instanceConfigID := action.GetInstanceConfigId()
instanceAdminClient, err := instance.NewInstanceAdminClient(ctx, opts...)
if err != nil {
return err
}
defer instanceAdminClient.Close()
instanceConfig, err := instanceAdminClient.GetInstanceConfig(ctx, &instancepb.GetInstanceConfigRequest{
Name: fmt.Sprintf("projects/%s/instanceConfigs/%s", projectID, instanceConfigID),
})
if err != nil {
return err
}
spannerActionOutcome := &executorpb.SpannerActionOutcome{
Status: &spb.Status{Code: int32(codes.OK)},
AdminResult: &executorpb.AdminResult{
InstanceConfigResponse: &executorpb.CloudInstanceConfigResponse{
InstanceConfig: instanceConfig,
},
},
}
err = o.SendOutcome(spannerActionOutcome)
if err != nil {
return err
}
return nil
}
// execute action that retrieves a cloud instance.
func executeGetCloudInstance(ctx context.Context, action *executorpb.GetCloudInstanceAction, h *ExecutionFlowContext, opts []option.ClientOption, o *outputstream.OutcomeSender) error {
log.Printf("retrieving instance: %v", action)
projectID := action.GetProjectId()
instanceID := action.GetInstanceId()
instanceAdminClient, err := instance.NewInstanceAdminClient(ctx, opts...)
if err != nil {
return err
}
defer instanceAdminClient.Close()
instanceObj, err := instanceAdminClient.GetInstance(ctx, &instancepb.GetInstanceRequest{
Name: fmt.Sprintf("projects/%s/instances/%s", projectID, instanceID),
})
if err != nil {
return err
}
spannerActionOutcome := &executorpb.SpannerActionOutcome{
Status: &spb.Status{Code: int32(codes.OK)},
AdminResult: &executorpb.AdminResult{
InstanceResponse: &executorpb.CloudInstanceResponse{
Instance: instanceObj,
},
},
}
err = o.SendOutcome(spannerActionOutcome)
if err != nil {
return err
}
return nil
}
// execute action that creates a user instance config.
func executeCreateUserInstanceConfig(ctx context.Context, action *executorpb.CreateUserInstanceConfigAction, h *ExecutionFlowContext, opts []option.ClientOption, o *outputstream.OutcomeSender) error {
log.Printf("Creating user instance config: %v", action)
projectID := action.GetProjectId()
baseConfigID := action.GetBaseConfigId()
userConfigID := action.GetUserConfigId()
instanceAdminClient, err := instance.NewInstanceAdminClient(ctx, opts...)
if err != nil {
return err
}
op, err := instanceAdminClient.CreateInstanceConfig(ctx, &instancepb.CreateInstanceConfigRequest{
Parent: fmt.Sprintf("projects/%s", projectID),
InstanceConfigId: userConfigID,
InstanceConfig: &instancepb.InstanceConfig{
Name: fmt.Sprintf("projects/%s/instanceConfigs/%s", projectID, userConfigID),
DisplayName: userConfigID,
Replicas: action.GetReplicas(),
BaseConfig: fmt.Sprintf("projects/%s/instanceConfigs/%s", projectID, baseConfigID),
},
})
if err != nil {
return err
}
_, err = op.Wait(ctx)
if err != nil {
return err
}
return o.FinishSuccessfully()
}
// execute action that deletes a user instance config.
func executeDeleteUserInstanceConfig(ctx context.Context, action *executorpb.DeleteUserInstanceConfigAction, h *ExecutionFlowContext, opts []option.ClientOption, o *outputstream.OutcomeSender) error {
log.Printf("deleting user instance config: %v", action)
instanceAdminClient, err := instance.NewInstanceAdminClient(ctx, opts...)
if err != nil {
return err
}
err = instanceAdminClient.DeleteInstanceConfig(ctx, &instancepb.DeleteInstanceConfigRequest{
Name: fmt.Sprintf("projects/%s/instanceConfigs/%s", action.GetProjectId(), action.GetUserConfigId()),
})
if err != nil {
return err
}
return o.FinishSuccessfully()
}
// execute action that creates a cloud database or cloud custom encrypted database.
func executeCreateCloudDatabase(ctx context.Context, action *executorpb.CreateCloudDatabaseAction, h *ExecutionFlowContext, opts []option.ClientOption, o *outputstream.OutcomeSender) error {
log.Printf("creating database: %v", action)
projectID := action.GetProjectId()
instanceID := action.GetInstanceId()
databaseID := action.GetDatabaseId()
databaseAdminClient, err := database.NewDatabaseAdminClient(ctx, opts...)
if err != nil {
return err
}
createDatabaseRequest := &adminpb.CreateDatabaseRequest{
Parent: fmt.Sprintf("projects/%s/instances/%s", projectID, instanceID),
CreateStatement: "CREATE DATABASE `" + databaseID + "`",
ExtraStatements: action.GetSdlStatement(),
}
if action.GetEncryptionConfig() != nil {
createDatabaseRequest.EncryptionConfig = action.GetEncryptionConfig()
}
op, err := databaseAdminClient.CreateDatabase(ctx, createDatabaseRequest)
if err != nil {
return err
}
if _, err := op.Wait(ctx); err != nil {
return err
}
return o.FinishSuccessfully()
}
// execute action that updates a cloud database.
func executeUpdateCloudDatabaseDdl(ctx context.Context, action *executorpb.UpdateCloudDatabaseDdlAction, h *ExecutionFlowContext, opts []option.ClientOption, o *outputstream.OutcomeSender) error {
log.Printf("updating database ddl %v", action)
dbPath := fmt.Sprintf("projects/%v/instances/%v/databases/%v", action.GetProjectId(), action.GetInstanceId(), action.GetDatabaseId())
databaseAdminClient, err := database.NewDatabaseAdminClient(ctx, opts...)
if err != nil {
return err
}
defer databaseAdminClient.Close()
op, err := databaseAdminClient.UpdateDatabaseDdl(ctx, &adminpb.UpdateDatabaseDdlRequest{
Database: dbPath,
Statements: action.GetSdlStatement(),
OperationId: action.GetOperationId(),
})
if err != nil {
return fmt.Errorf("UpdateDatabaseDdl: %w", err)
}
if err := op.Wait(ctx); err != nil {
return err
}
return o.FinishSuccessfully()
}
// execute action that drops a cloud database.
func executeDropCloudDatabase(ctx context.Context, action *executorpb.DropCloudDatabaseAction, h *ExecutionFlowContext, opts []option.ClientOption, o *outputstream.OutcomeSender) error {
log.Printf("dropping database: %v", action)
projectID := action.GetProjectId()
instanceID := action.GetInstanceId()
databaseID := action.GetDatabaseId()
databaseAdminClient, err := database.NewDatabaseAdminClient(ctx, opts...)
if err != nil {
return err
}
err = databaseAdminClient.DropDatabase(ctx, &adminpb.DropDatabaseRequest{
Database: fmt.Sprintf("projects/%s/instances/%s/databases/%s", projectID, instanceID, databaseID),
})
if err != nil {
return err
}
return o.FinishSuccessfully()
}
// execute action that creates a cloud database backup.
func executeCreateCloudBackup(ctx context.Context, action *executorpb.CreateCloudBackupAction, h *ExecutionFlowContext, opts []option.ClientOption, o *outputstream.OutcomeSender) error {
log.Printf("creating backup: %v", action)
projectID := action.GetProjectId()
instanceID := action.GetInstanceId()
databaseID := action.GetDatabaseId()
backupID := action.GetBackupId()
databaseAdminClient, err := database.NewDatabaseAdminClient(ctx, opts...)
if err != nil {
return err
}
op, err := databaseAdminClient.CreateBackup(ctx, &adminpb.CreateBackupRequest{
Parent: fmt.Sprintf("projects/%s/instances/%s", projectID, instanceID),
BackupId: backupID,
Backup: &adminpb.Backup{
Database: fmt.Sprintf("projects/%s/instances/%s/databases/%s", projectID, instanceID, databaseID),
VersionTime: action.GetVersionTime(),
ExpireTime: action.GetExpireTime(),
},
})
if err != nil {
return err
}
backup, err := op.Wait(ctx)
if err != nil {
return err
}
spannerActionOutcome := &executorpb.SpannerActionOutcome{
Status: &spb.Status{Code: int32(codes.OK)},
AdminResult: &executorpb.AdminResult{
BackupResponse: &executorpb.CloudBackupResponse{
Backup: backup,
},
},
}
return o.SendOutcome(spannerActionOutcome)
}
// execute action that copies a cloud database backup.
func executeCopyCloudBackup(ctx context.Context, action *executorpb.CopyCloudBackupAction, h *ExecutionFlowContext, opts []option.ClientOption, o *outputstream.OutcomeSender) error {
log.Printf("copying backup: %v", action)
projectID := action.GetProjectId()
instanceID := action.GetInstanceId()
backupID := action.GetBackupId()
sourceBackupID := action.GetSourceBackup()
databaseAdminClient, err := database.NewDatabaseAdminClient(ctx, opts...)
if err != nil {
return err
}
op, err := databaseAdminClient.CopyBackup(ctx, &adminpb.CopyBackupRequest{
Parent: fmt.Sprintf("projects/%s/instances/%s", projectID, instanceID),
BackupId: backupID,
SourceBackup: fmt.Sprintf("projects/%s/instances/%s/backups/%s", projectID, instanceID, sourceBackupID),
ExpireTime: action.GetExpireTime(),
})
if err != nil {
return err
}
backup, err := op.Wait(ctx)
if err != nil {
return err
}
spannerActionOutcome := &executorpb.SpannerActionOutcome{
Status: &spb.Status{Code: int32(codes.OK)},
AdminResult: &executorpb.AdminResult{
BackupResponse: &executorpb.CloudBackupResponse{
Backup: backup,
},
},
}
return o.SendOutcome(spannerActionOutcome)
}
// execute action that gets a cloud database backup.
func executeGetCloudBackup(ctx context.Context, action *executorpb.GetCloudBackupAction, h *ExecutionFlowContext, opts []option.ClientOption, o *outputstream.OutcomeSender) error {
log.Printf("getting backup: %v", action)
projectID := action.GetProjectId()
instanceID := action.GetInstanceId()
backupID := action.GetBackupId()
databaseAdminClient, err := database.NewDatabaseAdminClient(ctx, opts...)
if err != nil {
return err
}
backup, err := databaseAdminClient.GetBackup(ctx, &adminpb.GetBackupRequest{
Name: fmt.Sprintf("projects/%s/instances/%s/backups/%s", projectID, instanceID, backupID),
})
if err != nil {
return err
}
spannerActionOutcome := &executorpb.SpannerActionOutcome{
Status: &spb.Status{Code: int32(codes.OK)},
AdminResult: &executorpb.AdminResult{
BackupResponse: &executorpb.CloudBackupResponse{
Backup: backup,
},
},
}
return o.SendOutcome(spannerActionOutcome)
}
// execute action that updates a cloud database backup.
func executeUpdateCloudBackup(ctx context.Context, action *executorpb.UpdateCloudBackupAction, h *ExecutionFlowContext, opts []option.ClientOption, o *outputstream.OutcomeSender) error {
log.Printf("updating backup: %v", action)
projectID := action.GetProjectId()
instanceID := action.GetInstanceId()
backupID := action.GetBackupId()
databaseAdminClient, err := database.NewDatabaseAdminClient(ctx, opts...)
if err != nil {
return err
}
backup, err := databaseAdminClient.UpdateBackup(ctx, &adminpb.UpdateBackupRequest{
Backup: &adminpb.Backup{
ExpireTime: action.GetExpireTime(),
Name: fmt.Sprintf("projects/%s/instances/%s/backups/%s", projectID, instanceID, backupID),
},
UpdateMask: &fieldmaskpb.FieldMask{
Paths: []string{"expire_time"},
},
})
if err != nil {
return err
}
spannerActionOutcome := &executorpb.SpannerActionOutcome{
Status: &spb.Status{Code: int32(codes.OK)},
AdminResult: &executorpb.AdminResult{
BackupResponse: &executorpb.CloudBackupResponse{
Backup: backup,
},
},
}
return o.SendOutcome(spannerActionOutcome)
}
// execute action that deletes a cloud database backup.
func executeDeleteCloudBackup(ctx context.Context, action *executorpb.DeleteCloudBackupAction, h *ExecutionFlowContext, opts []option.ClientOption, o *outputstream.OutcomeSender) error {
log.Printf("deleting backup: %v", action)
projectID := action.GetProjectId()
instanceID := action.GetInstanceId()
backupID := action.GetBackupId()
databaseAdminClient, err := database.NewDatabaseAdminClient(ctx, opts...)
if err != nil {
return err
}
err = databaseAdminClient.DeleteBackup(ctx, &adminpb.DeleteBackupRequest{
Name: fmt.Sprintf("projects/%s/instances/%s/backups/%s", projectID, instanceID, backupID),
})
if err != nil {
return err
}
return o.FinishSuccessfully()
}
// execute action that lists cloud database backups.
func executeListCloudBackups(ctx context.Context, action *executorpb.ListCloudBackupsAction, h *ExecutionFlowContext, opts []option.ClientOption, o *outputstream.OutcomeSender) error {
log.Printf("listing backup: %v", action)
projectID := action.GetProjectId()
instanceID := action.GetInstanceId()
databaseAdminClient, err := database.NewDatabaseAdminClient(ctx, opts...)
if err != nil {
return err
}
iter := databaseAdminClient.ListBackups(ctx, &adminpb.ListBackupsRequest{
Parent: fmt.Sprintf("projects/%s/instances/%s", projectID, instanceID),
Filter: action.GetFilter(),
PageSize: action.GetPageSize(),
PageToken: action.GetPageToken(),
})
if err != nil {
return err
}
var backupList []*adminpb.Backup
for {
backup, err := iter.Next()
if err == iterator.Done {
break
}
if err != nil {
return err
}
backupList = append(backupList, backup)
}
spannerActionOutcome := &executorpb.SpannerActionOutcome{
Status: &spb.Status{Code: int32(codes.OK)},
AdminResult: &executorpb.AdminResult{
BackupResponse: &executorpb.CloudBackupResponse{
ListedBackups: backupList,
NextPageToken: "",
},
},
}
return o.SendOutcome(spannerActionOutcome)
}
// execute action that lists cloud database backup operations.
func executeListCloudBackupOperations(ctx context.Context, action *executorpb.ListCloudBackupOperationsAction, h *ExecutionFlowContext, opts []option.ClientOption, o *outputstream.OutcomeSender) error {
log.Printf("listing backup operation: %v", action)
projectID := action.GetProjectId()
instanceID := action.GetInstanceId()
databaseAdminClient, err := database.NewDatabaseAdminClient(ctx, opts...)
if err != nil {
return err
}
iter := databaseAdminClient.ListBackupOperations(ctx, &adminpb.ListBackupOperationsRequest{
Parent: fmt.Sprintf("projects/%s/instances/%s", projectID, instanceID),
Filter: action.GetFilter(),
PageSize: action.GetPageSize(),
PageToken: action.GetPageToken(),
})
if err != nil {
return err
}
var lro []*longrunningpb.Operation
for {
operation, err := iter.Next()
if err == iterator.Done {
break
}
if err != nil {
return err
}
lro = append(lro, operation)
}
spannerActionOutcome := &executorpb.SpannerActionOutcome{
Status: &spb.Status{Code: int32(codes.OK)},
AdminResult: &executorpb.AdminResult{
BackupResponse: &executorpb.CloudBackupResponse{
ListedBackupOperations: lro,
},
},
}
return o.SendOutcome(spannerActionOutcome)
}
// execute action that list cloud databases.
func executeListCloudDatabases(ctx context.Context, action *executorpb.ListCloudDatabasesAction, h *ExecutionFlowContext, opts []option.ClientOption, o *outputstream.OutcomeSender) error {
log.Printf("listing database: %v", action)
projectID := action.GetProjectId()
instanceID := action.GetInstanceId()
databaseAdminClient, err := database.NewDatabaseAdminClient(ctx, opts...)
if err != nil {
return err
}
iter := databaseAdminClient.ListDatabases(ctx, &adminpb.ListDatabasesRequest{
Parent: fmt.Sprintf("projects/%s/instances/%s", projectID, instanceID),
PageSize: action.GetPageSize(),
PageToken: action.GetPageToken(),
})
if err != nil {
return err
}
var databaseList []*adminpb.Database
for {
databaseObj, err := iter.Next()
if err == iterator.Done {
break
}
if err != nil {
return err
}
databaseList = append(databaseList, databaseObj)
}
spannerActionOutcome := &executorpb.SpannerActionOutcome{
Status: &spb.Status{Code: int32(codes.OK)},
AdminResult: &executorpb.AdminResult{
DatabaseResponse: &executorpb.CloudDatabaseResponse{
ListedDatabases: databaseList,
},
},
}
return o.SendOutcome(spannerActionOutcome)
}
// execute action that lists cloud database operations.
func executeListCloudDatabaseOperations(ctx context.Context, action *executorpb.ListCloudDatabaseOperationsAction, h *ExecutionFlowContext, opts []option.ClientOption, o *outputstream.OutcomeSender) error {
log.Printf("listing database operation: %v", action)
projectID := action.GetProjectId()
instanceID := action.GetInstanceId()
databaseAdminClient, err := database.NewDatabaseAdminClient(ctx, opts...)
if err != nil {
return err
}
iter := databaseAdminClient.ListDatabaseOperations(ctx, &adminpb.ListDatabaseOperationsRequest{
Parent: fmt.Sprintf("projects/%s/instances/%s", projectID, instanceID),
PageSize: action.GetPageSize(),
Filter: action.GetFilter(),
PageToken: action.GetPageToken(),
})
if err != nil {
return err
}
var lro []*longrunningpb.Operation
for {
operation, err := iter.Next()
if err == iterator.Done {
break
}
if err != nil {
return err
}
lro = append(lro, operation)
}
spannerActionOutcome := &executorpb.SpannerActionOutcome{
Status: &spb.Status{Code: int32(codes.OK)},
AdminResult: &executorpb.AdminResult{
DatabaseResponse: &executorpb.CloudDatabaseResponse{
ListedDatabaseOperations: lro,
NextPageToken: "",
},
},
}
return o.SendOutcome(spannerActionOutcome)
}
// execute action that restores a cloud database.
func executeRestoreCloudDatabase(ctx context.Context, action *executorpb.RestoreCloudDatabaseAction, h *ExecutionFlowContext, opts []option.ClientOption, o *outputstream.OutcomeSender) error {
log.Printf("restoring database: %v", action)
projectID := action.GetProjectId()
databaseInstanceID := action.GetDatabaseInstanceId()
databaseAdminClient, err := database.NewDatabaseAdminClient(ctx, opts...)
if err != nil {
return err
}
restoreOp, err := databaseAdminClient.RestoreDatabase(ctx, &adminpb.RestoreDatabaseRequest{
Parent: fmt.Sprintf("projects/%s/instances/%s", projectID, databaseInstanceID),
DatabaseId: fmt.Sprintf("projects/%s/instances/%s/databases/%s", projectID, databaseInstanceID, action.GetDatabaseId()),
Source: &adminpb.RestoreDatabaseRequest_Backup{
Backup: fmt.Sprintf("projects/%s/instances/%s/backups/%s", projectID, action.GetBackupInstanceId(), action.GetBackupId()),
},
EncryptionConfig: nil,
})
if err != nil {
return err
}
databaseObject, err := restoreOp.Wait(ctx)
if err != nil {
return err
}
spannerActionOutcome := &executorpb.SpannerActionOutcome{
Status: &spb.Status{Code: int32(codes.OK)},
AdminResult: &executorpb.AdminResult{
DatabaseResponse: &executorpb.CloudDatabaseResponse{
Database: databaseObject,
},
},
}
return o.SendOutcome(spannerActionOutcome)
}
// execute action that gets a cloud database.
func executeGetCloudDatabase(ctx context.Context, action *executorpb.GetCloudDatabaseAction, h *ExecutionFlowContext, opts []option.ClientOption, o *outputstream.OutcomeSender) error {
log.Printf("getting database: %v", action)
projectID := action.GetProjectId()
instanceID := action.GetInstanceId()
databaseID := action.GetDatabaseId()
databaseAdminClient, err := database.NewDatabaseAdminClient(ctx, opts...)
if err != nil {
return err
}
db, err := databaseAdminClient.GetDatabase(ctx, &adminpb.GetDatabaseRequest{
Name: fmt.Sprintf("projects/%s/instances/%s/databases/%s", projectID, instanceID, databaseID),
})
if err != nil {
return err
}
spannerActionOutcome := &executorpb.SpannerActionOutcome{
Status: &spb.Status{Code: int32(codes.OK)},
AdminResult: &executorpb.AdminResult{
DatabaseResponse: &executorpb.CloudDatabaseResponse{
Database: db,
},
},
}
return o.SendOutcome(spannerActionOutcome)
}
// execute action that gets an operation.
func executeGetOperation(ctx context.Context, action *executorpb.GetOperationAction, h *ExecutionFlowContext, opts []option.ClientOption, o *outputstream.OutcomeSender) error {
log.Printf("getting operation: %v", action)
databaseAdminClient, err := database.NewDatabaseAdminClient(ctx, opts...)
if err != nil {
return err
}
operationResult, err := databaseAdminClient.GetOperation(ctx, &longrunningpb.GetOperationRequest{
Name: action.GetOperation(),
})
if err != nil {
return err
}
spannerActionOutcome := &executorpb.SpannerActionOutcome{
Status: &spb.Status{Code: int32(codes.OK)},
AdminResult: &executorpb.AdminResult{
OperationResponse: &executorpb.OperationResponse{
Operation: operationResult,
},
},
}
return o.SendOutcome(spannerActionOutcome)
}
// execute action that cancels an operation.
func executeCancelOperation(ctx context.Context, action *executorpb.CancelOperationAction, h *ExecutionFlowContext, opts []option.ClientOption, o *outputstream.OutcomeSender) error {
log.Printf("cancelling operation: %v", action)
databaseAdminClient, err := database.NewDatabaseAdminClient(ctx, opts...)
if err != nil {
return err
}
err = databaseAdminClient.LROClient.CancelOperation(ctx, &longrunningpb.CancelOperationRequest{
Name: action.GetOperation(),
})
if err != nil {
return err
}
return o.FinishSuccessfully()
}