blob: 11358725567e0090860873cce4c73624ebbb52cf [file] [log] [blame]
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package actions
import (
"context"
"fmt"
"log"
"cloud.google.com/go/spanner"
"cloud.google.com/go/spanner/apiv1/spannerpb"
"cloud.google.com/go/spanner/executor/apiv1/executorpb"
"cloud.google.com/go/spanner/test/cloudexecutor/executor/internal/outputstream"
"cloud.google.com/go/spanner/test/cloudexecutor/executor/internal/utility"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// WriteActionHandler holds the necessary components required for Write action.
type WriteActionHandler struct {
Action *executorpb.MutationAction
FlowContext *ExecutionFlowContext
OutcomeSender *outputstream.OutcomeSender
}
// ExecuteAction that execute a Write action request.
func (h *WriteActionHandler) ExecuteAction(ctx context.Context) error {
log.Printf("executing write action: %v", h.Action)
h.FlowContext.mu.Lock()
defer h.FlowContext.mu.Unlock()
m, err := createMutation(h.Action, h.FlowContext.tableMetadata)
if err != nil {
return h.OutcomeSender.FinishWithError(err)
}
_, err = h.FlowContext.DbClient.Apply(ctx, m)
if err != nil {
return h.OutcomeSender.FinishWithError(err)
}
return h.OutcomeSender.FinishSuccessfully()
}
// MutationActionHandler holds the necessary components required for Mutation action.
type MutationActionHandler struct {
Action *executorpb.MutationAction
FlowContext *ExecutionFlowContext
OutcomeSender *outputstream.OutcomeSender
}
// ExecuteAction that execute a Mutation action request.
func (h *MutationActionHandler) ExecuteAction(ctx context.Context) error {
log.Printf("Buffering mutation %v", h.Action)
h.FlowContext.mu.Lock()
defer h.FlowContext.mu.Unlock()
txn, err := h.FlowContext.getTransactionForWrite()
if err != nil {
return h.OutcomeSender.FinishWithError(err)
}
m, err := createMutation(h.Action, h.FlowContext.tableMetadata)
if err != nil {
return h.OutcomeSender.FinishWithError(err)
}
err = txn.BufferWrite(m)
if err != nil {
return h.OutcomeSender.FinishWithError(err)
}
return h.OutcomeSender.FinishSuccessfully()
}
// createMutation creates cloud spanner.Mutation from given executorpb.MutationAction.
func createMutation(action *executorpb.MutationAction, tableMetadata *utility.TableMetadataHelper) ([]*spanner.Mutation, error) {
prevTable := ""
var m []*spanner.Mutation
for _, mod := range action.Mod {
table := mod.GetTable()
if table == "" {
table = prevTable
}
if table == "" {
return nil, spanner.ToSpannerError(status.Error(codes.InvalidArgument, fmt.Sprintf("table name is missing from mod: action %s ", action.String())))
}
prevTable = table
log.Printf("executing mutation mod: \n%s", mod.String())
switch {
case mod.Insert != nil:
ia := mod.Insert
cloudRows, err := cloudValuesFromExecutorValueLists(ia.GetValues(), ia.GetType())
if err != nil {
return nil, err
}
for _, cloudRow := range cloudRows {
m = append(m, spanner.Insert(table, ia.GetColumn(), cloudRow))
}
case mod.Update != nil:
ua := mod.Update
cloudRows, err := cloudValuesFromExecutorValueLists(ua.GetValues(), ua.GetType())
if err != nil {
return nil, err
}
for _, cloudRow := range cloudRows {
m = append(m, spanner.Update(table, ua.GetColumn(), cloudRow))
}
case mod.InsertOrUpdate != nil:
ia := mod.InsertOrUpdate
cloudRows, err := cloudValuesFromExecutorValueLists(ia.GetValues(), ia.GetType())
if err != nil {
return nil, err
}
for _, cloudRow := range cloudRows {
m = append(m, spanner.InsertOrUpdate(table, ia.GetColumn(), cloudRow))
}
case mod.Replace != nil:
ia := mod.Replace
cloudRows, err := cloudValuesFromExecutorValueLists(ia.GetValues(), ia.GetType())
if err != nil {
return nil, err
}
for _, cloudRow := range cloudRows {
m = append(m, spanner.Replace(table, ia.GetColumn(), cloudRow))
}
case mod.DeleteKeys != nil:
keyColTypes, err := tableMetadata.GetKeyColumnTypes(table)
if err != nil {
return nil, err
}
keySet, err := utility.KeySetProtoToCloudKeySet(mod.DeleteKeys, keyColTypes)
m = append(m, spanner.Delete(table, keySet))
default:
return nil, spanner.ToSpannerError(status.Errorf(codes.InvalidArgument, "unsupported mod: %s", mod.String()))
}
}
return m, nil
}
// cloudValuesFromExecutorValueLists produces rows of Cloud Spanner values given []*executorpb.ValueList and []*spannerpb.Type.
// Each ValueList results in a row, and all of them should have the same column types.
func cloudValuesFromExecutorValueLists(valueLists []*executorpb.ValueList, types []*spannerpb.Type) ([][]any, error) {
var cloudRows [][]any
for _, rowValues := range valueLists {
log.Printf("Converting ValueList: %s\n", rowValues)
if len(rowValues.GetValue()) != len(types) {
return nil, spanner.ToSpannerError(status.Error(codes.InvalidArgument, "number of values should be equal to number of types"))
}
var cloudRow []any
for i, v := range rowValues.GetValue() {
isNull := false
switch v.GetValueType().(type) {
case *executorpb.Value_IsNull:
isNull = true
}
val, err := utility.ExecutorValueToSpannerValue(types[i], v, isNull)
if err != nil {
return nil, err
}
cloudRow = append(cloudRow, val)
}
cloudRows = append(cloudRows, cloudRow)
}
return cloudRows, nil
}