blob: 27526dd10e9a492023b5927b4a2fee84cccd096f [file] [log] [blame] [edit]
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package adapt
import (
"encoding/base64"
"fmt"
"strings"
storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/descriptorpb"
"google.golang.org/protobuf/types/known/wrapperspb"
)
var bqModeToFieldLabelMapProto2 = map[storagepb.TableFieldSchema_Mode]descriptorpb.FieldDescriptorProto_Label{
storagepb.TableFieldSchema_NULLABLE: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL,
storagepb.TableFieldSchema_REPEATED: descriptorpb.FieldDescriptorProto_LABEL_REPEATED,
storagepb.TableFieldSchema_REQUIRED: descriptorpb.FieldDescriptorProto_LABEL_REQUIRED,
}
var bqModeToFieldLabelMapProto3 = map[storagepb.TableFieldSchema_Mode]descriptorpb.FieldDescriptorProto_Label{
storagepb.TableFieldSchema_NULLABLE: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL,
storagepb.TableFieldSchema_REPEATED: descriptorpb.FieldDescriptorProto_LABEL_REPEATED,
storagepb.TableFieldSchema_REQUIRED: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL,
}
func convertModeToLabel(mode storagepb.TableFieldSchema_Mode, useProto3 bool) *descriptorpb.FieldDescriptorProto_Label {
if useProto3 {
return bqModeToFieldLabelMapProto3[mode].Enum()
}
return bqModeToFieldLabelMapProto2[mode].Enum()
}
// Allows conversion between BQ schema type and FieldDescriptorProto's type.
var bqTypeToFieldTypeMap = map[storagepb.TableFieldSchema_Type]descriptorpb.FieldDescriptorProto_Type{
storagepb.TableFieldSchema_BIGNUMERIC: descriptorpb.FieldDescriptorProto_TYPE_BYTES,
storagepb.TableFieldSchema_BOOL: descriptorpb.FieldDescriptorProto_TYPE_BOOL,
storagepb.TableFieldSchema_BYTES: descriptorpb.FieldDescriptorProto_TYPE_BYTES,
storagepb.TableFieldSchema_DATE: descriptorpb.FieldDescriptorProto_TYPE_INT32,
storagepb.TableFieldSchema_DATETIME: descriptorpb.FieldDescriptorProto_TYPE_INT64,
storagepb.TableFieldSchema_DOUBLE: descriptorpb.FieldDescriptorProto_TYPE_DOUBLE,
storagepb.TableFieldSchema_GEOGRAPHY: descriptorpb.FieldDescriptorProto_TYPE_STRING,
storagepb.TableFieldSchema_INT64: descriptorpb.FieldDescriptorProto_TYPE_INT64,
storagepb.TableFieldSchema_NUMERIC: descriptorpb.FieldDescriptorProto_TYPE_BYTES,
storagepb.TableFieldSchema_STRING: descriptorpb.FieldDescriptorProto_TYPE_STRING,
storagepb.TableFieldSchema_STRUCT: descriptorpb.FieldDescriptorProto_TYPE_MESSAGE,
storagepb.TableFieldSchema_TIME: descriptorpb.FieldDescriptorProto_TYPE_INT64,
storagepb.TableFieldSchema_TIMESTAMP: descriptorpb.FieldDescriptorProto_TYPE_INT64,
}
// For TableFieldSchema OPTIONAL mode, we use the wrapper types to allow for the
// proper representation of NULL values, as proto3 semantics would just use default value.
var bqTypeToWrapperMap = map[storagepb.TableFieldSchema_Type]string{
storagepb.TableFieldSchema_BIGNUMERIC: ".google.protobuf.BytesValue",
storagepb.TableFieldSchema_BOOL: ".google.protobuf.BoolValue",
storagepb.TableFieldSchema_BYTES: ".google.protobuf.BytesValue",
storagepb.TableFieldSchema_DATE: ".google.protobuf.Int32Value",
storagepb.TableFieldSchema_DATETIME: ".google.protobuf.Int64Value",
storagepb.TableFieldSchema_DOUBLE: ".google.protobuf.DoubleValue",
storagepb.TableFieldSchema_GEOGRAPHY: ".google.protobuf.StringValue",
storagepb.TableFieldSchema_INT64: ".google.protobuf.Int64Value",
storagepb.TableFieldSchema_NUMERIC: ".google.protobuf.BytesValue",
storagepb.TableFieldSchema_STRING: ".google.protobuf.StringValue",
storagepb.TableFieldSchema_TIME: ".google.protobuf.Int64Value",
storagepb.TableFieldSchema_TIMESTAMP: ".google.protobuf.Int64Value",
}
// filename used by well known types proto
var wellKnownTypesWrapperName = "google/protobuf/wrappers.proto"
// dependencyCache is used to reduce the number of unique messages we generate by caching based on the tableschema.
//
// keys are based on the base64-encoded serialized tableschema value.
type dependencyCache map[string]protoreflect.Descriptor
func (dm dependencyCache) get(schema *storagepb.TableSchema) protoreflect.Descriptor {
if dm == nil {
return nil
}
b, err := proto.Marshal(schema)
if err != nil {
return nil
}
encoded := base64.StdEncoding.EncodeToString(b)
if desc, ok := (dm)[encoded]; ok {
return desc
}
return nil
}
func (dm dependencyCache) add(schema *storagepb.TableSchema, descriptor protoreflect.Descriptor) error {
if dm == nil {
return fmt.Errorf("cache is nil")
}
b, err := proto.Marshal(schema)
if err != nil {
return fmt.Errorf("failed to serialize tableschema: %v", err)
}
encoded := base64.StdEncoding.EncodeToString(b)
(dm)[encoded] = descriptor
return nil
}
// StorageSchemaToProto2Descriptor builds a protoreflect.Descriptor for a given table schema using proto2 syntax.
func StorageSchemaToProto2Descriptor(inSchema *storagepb.TableSchema, scope string) (protoreflect.Descriptor, error) {
dc := make(dependencyCache)
// TODO: b/193064992 tracks support for wrapper types. In the interim, disable wrapper usage.
return storageSchemaToDescriptorInternal(inSchema, scope, &dc, false)
}
// StorageSchemaToProto3Descriptor builds a protoreflect.Descriptor for a given table schema using proto3 syntax.
//
// NOTE: Currently the write API doesn't yet support proto3 behaviors (default value, wrapper types, etc), but this is provided for
// completeness.
func StorageSchemaToProto3Descriptor(inSchema *storagepb.TableSchema, scope string) (protoreflect.Descriptor, error) {
dc := make(dependencyCache)
return storageSchemaToDescriptorInternal(inSchema, scope, &dc, true)
}
// internal implementation of the conversion code.
func storageSchemaToDescriptorInternal(inSchema *storagepb.TableSchema, scope string, cache *dependencyCache, useProto3 bool) (protoreflect.Descriptor, error) {
if inSchema == nil {
return nil, newConversionError(scope, fmt.Errorf("no input schema was provided"))
}
var fields []*descriptorpb.FieldDescriptorProto
var deps []protoreflect.FileDescriptor
var fNumber int32
for _, f := range inSchema.GetFields() {
fNumber = fNumber + 1
currentScope := fmt.Sprintf("%s__%s", scope, f.GetName())
// If we're dealing with a STRUCT type, we must deal with sub messages.
// As multiple submessages may share the same type definition, we use a dependency cache
// and interrogate it / populate it as we're going.
if f.Type == storagepb.TableFieldSchema_STRUCT {
foundDesc := cache.get(&storagepb.TableSchema{Fields: f.GetFields()})
if foundDesc != nil {
// check to see if we already have this in current dependency list
haveDep := false
for _, curDep := range deps {
if foundDesc.ParentFile().FullName() == curDep.FullName() {
haveDep = true
break
}
}
// if dep is missing, add to current dependencies
if !haveDep {
deps = append(deps, foundDesc.ParentFile())
}
// construct field descriptor for the message
fdp, err := tableFieldSchemaToFieldDescriptorProto(f, fNumber, string(foundDesc.FullName()), useProto3)
if err != nil {
return nil, newConversionError(scope, fmt.Errorf("couldn't convert field to FieldDescriptorProto: %v", err))
}
fields = append(fields, fdp)
} else {
// Wrap the current struct's fields in a TableSchema outer message, and then build the submessage.
ts := &storagepb.TableSchema{
Fields: f.GetFields(),
}
desc, err := storageSchemaToDescriptorInternal(ts, currentScope, cache, useProto3)
if err != nil {
return nil, newConversionError(currentScope, fmt.Errorf("couldn't convert message: %v", err))
}
// Now that we have the submessage definition, we append it both to the local dependencies, as well
// as inserting it into the cache for possible reuse elsewhere.
deps = append(deps, desc.ParentFile())
err = cache.add(ts, desc)
if err != nil {
return nil, newConversionError(currentScope, fmt.Errorf("failed to add descriptor to dependency cache: %v", err))
}
fdp, err := tableFieldSchemaToFieldDescriptorProto(f, fNumber, currentScope, useProto3)
if err != nil {
return nil, newConversionError(currentScope, fmt.Errorf("couldn't compute field schema : %v", err))
}
fields = append(fields, fdp)
}
} else {
fd, err := tableFieldSchemaToFieldDescriptorProto(f, fNumber, currentScope, useProto3)
if err != nil {
return nil, newConversionError(currentScope, err)
}
fields = append(fields, fd)
}
}
// Start constructing a DescriptorProto.
dp := &descriptorpb.DescriptorProto{
Name: proto.String(scope),
Field: fields,
}
// Use the local dependencies to generate a list of filenames.
depNames := []string{
wellKnownTypesWrapperName,
}
for _, d := range deps {
depNames = append(depNames, d.ParentFile().Path())
}
// Now, construct a FileDescriptorProto.
fdp := &descriptorpb.FileDescriptorProto{
MessageType: []*descriptorpb.DescriptorProto{dp},
Name: proto.String(fmt.Sprintf("%s.proto", scope)),
Syntax: proto.String("proto3"),
Dependency: depNames,
}
if !useProto3 {
fdp.Syntax = proto.String("proto2")
}
// We'll need a FileDescriptorSet as we have a FileDescriptorProto for the current
// descriptor we're building, but we need to include all the referenced dependencies.
fds := &descriptorpb.FileDescriptorSet{
File: []*descriptorpb.FileDescriptorProto{
fdp,
protodesc.ToFileDescriptorProto(wrapperspb.File_google_protobuf_wrappers_proto),
},
}
for _, d := range deps {
fds.File = append(fds.File, protodesc.ToFileDescriptorProto(d))
}
// Load the set into a registry, then interrogate it for the descriptor corresponding to the top level message.
files, err := protodesc.NewFiles(fds)
if err != nil {
return nil, err
}
return files.FindDescriptorByName(protoreflect.FullName(scope))
}
// tableFieldSchemaToFieldDescriptorProto builds individual field descriptors for a proto message.
//
// For proto3, in cases where the mode is nullable we use the well known wrapper types.
// For proto2, we propagate the mode->label annotation as expected.
//
// Messages are always nullable, and repeated fields are as well.
func tableFieldSchemaToFieldDescriptorProto(field *storagepb.TableFieldSchema, idx int32, scope string, useProto3 bool) (*descriptorpb.FieldDescriptorProto, error) {
name := strings.ToLower(field.GetName())
if field.GetType() == storagepb.TableFieldSchema_STRUCT {
return &descriptorpb.FieldDescriptorProto{
Name: proto.String(name),
Number: proto.Int32(idx),
TypeName: proto.String(scope),
Label: convertModeToLabel(field.GetMode(), useProto3),
}, nil
}
// For (REQUIRED||REPEATED) fields for proto3, or all cases for proto2, we can use the expected scalar types.
if field.GetMode() != storagepb.TableFieldSchema_NULLABLE || !useProto3 {
return &descriptorpb.FieldDescriptorProto{
Name: proto.String(name),
Number: proto.Int32(idx),
Type: bqTypeToFieldTypeMap[field.GetType()].Enum(),
Label: convertModeToLabel(field.GetMode(), useProto3),
}, nil
}
// For NULLABLE proto3 fields, use a wrapper type.
return &descriptorpb.FieldDescriptorProto{
Name: proto.String(name),
Number: proto.Int32(idx),
Type: descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum(),
TypeName: proto.String(bqTypeToWrapperMap[field.GetType()]),
Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
}, nil
}
// NormalizeDescriptor builds a self-contained DescriptorProto suitable for communicating schema
// information with the BigQuery Storage write API. It's primarily used for cases where users are
// interested in sending data using a predefined protocol buffer message.
//
// The storage API accepts a single DescriptorProto for decoding message data. In many cases, a message
// is comprised of multiple independent messages, from the same .proto file or from multiple sources. Rather
// than being forced to communicate all these messages independently, what this method does is rewrite the
// DescriptorProto to inline all messages as nested submessages. As the backend only cares about the types
// and not the namespaces when decoding, this is sufficient for the needs of the API's representation.
//
// In addition to nesting messages, this method also handles some encapsulation of enum types to avoid possible
// conflicts due to ambiguities.
func NormalizeDescriptor(in protoreflect.MessageDescriptor) (*descriptorpb.DescriptorProto, error) {
return normalizeDescriptorInternal(in, newStringSet(), newStringSet(), newStringSet(), nil)
}
func normalizeDescriptorInternal(in protoreflect.MessageDescriptor, visitedTypes, enumTypes, structTypes *stringSet, root *descriptorpb.DescriptorProto) (*descriptorpb.DescriptorProto, error) {
if in == nil {
return nil, fmt.Errorf("no messagedescriptor provided")
}
resultDP := &descriptorpb.DescriptorProto{}
if root == nil {
root = resultDP
}
fullProtoName := string(in.FullName())
resultDP.Name = proto.String(normalizeName(fullProtoName))
visitedTypes.add(fullProtoName)
for i := 0; i < in.Fields().Len(); i++ {
inField := in.Fields().Get(i)
resultFDP := protodesc.ToFieldDescriptorProto(inField)
if inField.Kind() == protoreflect.MessageKind || inField.Kind() == protoreflect.GroupKind {
// Handle fields that reference messages.
// Groups are a proto2-ism which predated nested messages.
msgFullName := string(inField.Message().FullName())
if !skipNormalization(msgFullName) {
// for everything but well known types, normalize.
normName := normalizeName(string(msgFullName))
if structTypes.contains(msgFullName) {
resultFDP.TypeName = proto.String(normName)
} else {
if visitedTypes.contains(msgFullName) {
return nil, fmt.Errorf("recursize type not supported: %s", inField.FullName())
}
visitedTypes.add(msgFullName)
dp, err := normalizeDescriptorInternal(inField.Message(), visitedTypes, enumTypes, structTypes, root)
if err != nil {
return nil, fmt.Errorf("error converting message %s: %v", inField.FullName(), err)
}
root.NestedType = append(root.NestedType, dp)
visitedTypes.delete(msgFullName)
lastNested := root.GetNestedType()[len(root.GetNestedType())-1].GetName()
resultFDP.TypeName = proto.String(lastNested)
}
}
}
if inField.Kind() == protoreflect.EnumKind {
// For enums, in order to avoid value conflict, we will always define
// a enclosing struct called enum_full_name_E that includes the actual
// enum.
enumFullName := string(inField.Enum().FullName())
enclosingTypeName := normalizeName(enumFullName) + "_E"
enumName := string(inField.Enum().Name())
actualFullName := fmt.Sprintf("%s.%s", enclosingTypeName, enumName)
if enumTypes.contains(enumFullName) {
resultFDP.TypeName = proto.String(actualFullName)
} else {
enumDP := protodesc.ToEnumDescriptorProto(inField.Enum())
enumDP.Name = proto.String(enumName)
resultDP.NestedType = append(resultDP.NestedType, &descriptorpb.DescriptorProto{
Name: proto.String(enclosingTypeName),
EnumType: []*descriptorpb.EnumDescriptorProto{enumDP},
})
resultFDP.TypeName = proto.String(actualFullName)
enumTypes.add(enumFullName)
}
}
resultDP.Field = append(resultDP.Field, resultFDP)
}
structTypes.add(fullProtoName)
return resultDP, nil
}
type stringSet struct {
m map[string]struct{}
}
func (s *stringSet) contains(k string) bool {
_, ok := s.m[k]
return ok
}
func (s *stringSet) add(k string) {
s.m[k] = struct{}{}
}
func (s *stringSet) delete(k string) {
delete(s.m, k)
}
func newStringSet() *stringSet {
return &stringSet{
m: make(map[string]struct{}),
}
}
func normalizeName(in string) string {
return strings.Replace(in, ".", "_", -1)
}
// these types don't get normalized into the fully-contained structure.
var normalizationSkipList = []string{
/*
TODO: when backend supports resolving well known types, this list should be enabled.
"google.protobuf.DoubleValue",
"google.protobuf.FloatValue",
"google.protobuf.Int64Value",
"google.protobuf.UInt64Value",
"google.protobuf.Int32Value",
"google.protobuf.Uint32Value",
"google.protobuf.BoolValue",
"google.protobuf.StringValue",
"google.protobuf.BytesValue",
*/
}
func skipNormalization(fullName string) bool {
for _, v := range normalizationSkipList {
if v == fullName {
return true
}
}
return false
}