feat(bigquery managedwriter): schema conversion support (#4357)
This is the first of multiple PRs to build up the functionality of a new
thick client over the new BigQuery Storage API's write mechanism.
This PR exposes schema conversion between the main bigquery package and
the bigquery storage API.
Towards: https://github.com/googleapis/google-cloud-go/issues/4366
diff --git a/bigquery/storage/managedwriter/adapt/doc.go b/bigquery/storage/managedwriter/adapt/doc.go
new file mode 100644
index 0000000..c06d303
--- /dev/null
+++ b/bigquery/storage/managedwriter/adapt/doc.go
@@ -0,0 +1,19 @@
+// Copyright 2021 Google LLC
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package adapt adds functionality related to converting bigquery representations
+// like schema and data type representations.
+//
+// It is EXPERIMENTAL and subject to change or removal without notice.
+package adapt
diff --git a/bigquery/storage/managedwriter/adapt/schemaconversion.go b/bigquery/storage/managedwriter/adapt/schemaconversion.go
new file mode 100644
index 0000000..8de2257
--- /dev/null
+++ b/bigquery/storage/managedwriter/adapt/schemaconversion.go
@@ -0,0 +1,140 @@
+// Copyright 2021 Google LLC
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package adapt
+
+import (
+ "fmt"
+
+ "cloud.google.com/go/bigquery"
+ storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1beta2"
+)
+
+var fieldTypeMap = map[bigquery.FieldType]storagepb.TableFieldSchema_Type{
+ bigquery.StringFieldType: storagepb.TableFieldSchema_STRING,
+ bigquery.BytesFieldType: storagepb.TableFieldSchema_BYTES,
+ bigquery.IntegerFieldType: storagepb.TableFieldSchema_INT64,
+ bigquery.FloatFieldType: storagepb.TableFieldSchema_DOUBLE,
+ bigquery.BooleanFieldType: storagepb.TableFieldSchema_BOOL,
+ bigquery.TimestampFieldType: storagepb.TableFieldSchema_TIMESTAMP,
+ bigquery.RecordFieldType: storagepb.TableFieldSchema_STRUCT,
+ bigquery.DateFieldType: storagepb.TableFieldSchema_DATE,
+ bigquery.TimeFieldType: storagepb.TableFieldSchema_TIME,
+ bigquery.DateTimeFieldType: storagepb.TableFieldSchema_DATETIME,
+ bigquery.NumericFieldType: storagepb.TableFieldSchema_NUMERIC,
+ bigquery.BigNumericFieldType: storagepb.TableFieldSchema_BIGNUMERIC,
+ bigquery.GeographyFieldType: storagepb.TableFieldSchema_GEOGRAPHY,
+}
+
+func bqFieldToProto(in *bigquery.FieldSchema) (*storagepb.TableFieldSchema, error) {
+ if in == nil {
+ return nil, nil
+ }
+ out := &storagepb.TableFieldSchema{
+ Name: in.Name,
+ Description: in.Description,
+ }
+
+ // Type conversion.
+ typ, ok := fieldTypeMap[in.Type]
+ if !ok {
+ return nil, fmt.Errorf("could not convert field (%s) due to unknown type value: %s", in.Name, in.Type)
+ }
+ out.Type = typ
+
+ // Mode conversion. Repeated trumps required.
+ out.Mode = storagepb.TableFieldSchema_NULLABLE
+ if in.Repeated {
+ out.Mode = storagepb.TableFieldSchema_REPEATED
+ }
+ if !in.Repeated && in.Required {
+ out.Mode = storagepb.TableFieldSchema_REQUIRED
+ }
+
+ for _, s := range in.Schema {
+ subField, err := bqFieldToProto(s)
+ if err != nil {
+ return nil, err
+ }
+ out.Fields = append(out.Fields, subField)
+ }
+ return out, nil
+}
+
+func protoToBQField(in *storagepb.TableFieldSchema) (*bigquery.FieldSchema, error) {
+ if in == nil {
+ return nil, nil
+ }
+ out := &bigquery.FieldSchema{
+ Name: in.GetName(),
+ Description: in.GetDescription(),
+ Repeated: in.GetMode() == storagepb.TableFieldSchema_REPEATED,
+ Required: in.GetMode() == storagepb.TableFieldSchema_REQUIRED,
+ }
+
+ typeResolved := false
+ for k, v := range fieldTypeMap {
+ if v == in.GetType() {
+ out.Type = k
+ typeResolved = true
+ break
+ }
+ }
+ if !typeResolved {
+ return nil, fmt.Errorf("could not convert proto type to bigquery type: %v", in.GetType().String())
+ }
+
+ for _, s := range in.Fields {
+ subField, err := protoToBQField(s)
+ if err != nil {
+ return nil, err
+ }
+ out.Schema = append(out.Schema, subField)
+ }
+ return out, nil
+}
+
+// BQSchemaToStorageTableSchema converts a bigquery Schema into the protobuf-based TableSchema used
+// by the BigQuery Storage WriteClient.
+func BQSchemaToStorageTableSchema(in bigquery.Schema) (*storagepb.TableSchema, error) {
+ if in == nil {
+ return nil, nil
+ }
+ out := &storagepb.TableSchema{}
+ for _, s := range in {
+ converted, err := bqFieldToProto(s)
+ if err != nil {
+ return nil, err
+ }
+ out.Fields = append(out.Fields, converted)
+ }
+ return out, nil
+}
+
+// StorageTableSchemaToBQSchema converts a TableSchema from the BigQuery Storage WriteClient
+// into the equivalent BigQuery Schema.
+func StorageTableSchemaToBQSchema(in *storagepb.TableSchema) (bigquery.Schema, error) {
+ if in == nil {
+ return nil, nil
+ }
+ var out bigquery.Schema
+ for _, s := range in.Fields {
+ converted, err := protoToBQField(s)
+ if err != nil {
+ return nil, err
+ }
+ out = append(out, converted)
+ }
+ return out, nil
+}
diff --git a/bigquery/storage/managedwriter/adapt/schemaconversion_test.go b/bigquery/storage/managedwriter/adapt/schemaconversion_test.go
new file mode 100644
index 0000000..aab85fa
--- /dev/null
+++ b/bigquery/storage/managedwriter/adapt/schemaconversion_test.go
@@ -0,0 +1,203 @@
+// Copyright 2021 Google LLC
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package adapt
+
+import (
+ "testing"
+
+ "cloud.google.com/go/bigquery"
+ "cloud.google.com/go/internal/testutil"
+ "github.com/google/go-cmp/cmp"
+ storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1beta2"
+ "google.golang.org/protobuf/testing/protocmp"
+)
+
+func TestFieldConversions(t *testing.T) {
+ testCases := []struct {
+ desc string
+ bq *bigquery.FieldSchema
+ proto *storagepb.TableFieldSchema
+ }{
+ {
+ desc: "nil",
+ bq: nil,
+ proto: nil,
+ },
+ {
+ desc: "string field",
+ bq: &bigquery.FieldSchema{
+ Name: "name",
+ Type: bigquery.StringFieldType,
+ Description: "description",
+ },
+ proto: &storagepb.TableFieldSchema{
+ Name: "name",
+ Type: storagepb.TableFieldSchema_STRING,
+ Description: "description",
+ Mode: storagepb.TableFieldSchema_NULLABLE,
+ },
+ },
+ {
+ desc: "required integer field",
+ bq: &bigquery.FieldSchema{
+ Name: "name",
+ Type: bigquery.IntegerFieldType,
+ Description: "description",
+ Required: true,
+ },
+ proto: &storagepb.TableFieldSchema{
+ Name: "name",
+ Type: storagepb.TableFieldSchema_INT64,
+ Description: "description",
+ Mode: storagepb.TableFieldSchema_REQUIRED,
+ },
+ },
+ {
+ desc: "struct with repeated bytes subfield",
+ bq: &bigquery.FieldSchema{
+ Name: "name",
+ Type: bigquery.RecordFieldType,
+ Description: "description",
+ Required: true,
+ Schema: bigquery.Schema{
+ &bigquery.FieldSchema{
+ Name: "inner1",
+ Repeated: true,
+ Description: "repeat",
+ Type: bigquery.BytesFieldType,
+ },
+ },
+ },
+ proto: &storagepb.TableFieldSchema{
+ Name: "name",
+ Type: storagepb.TableFieldSchema_STRUCT,
+ Description: "description",
+ Mode: storagepb.TableFieldSchema_REQUIRED,
+ Fields: []*storagepb.TableFieldSchema{
+ {
+ Name: "inner1",
+ Mode: storagepb.TableFieldSchema_REPEATED,
+ Description: "repeat",
+ Type: storagepb.TableFieldSchema_BYTES,
+ },
+ },
+ },
+ },
+ }
+
+ for _, tc := range testCases {
+ // first, bq to proto
+ converted, err := bqFieldToProto(tc.bq)
+ if err != nil {
+ t.Errorf("case (%s) failed conversion from bq: %v", tc.desc, err)
+ }
+ if diff := cmp.Diff(converted, tc.proto, protocmp.Transform()); diff != "" {
+ t.Errorf("conversion to proto diff (%s):\n%v", tc.desc, diff)
+ }
+ // reverse conversion, proto to bq
+ reverse, err := protoToBQField(tc.proto)
+ if err != nil {
+ t.Errorf("case (%s) failed conversion from proto: %v", tc.desc, err)
+ }
+ if diff := cmp.Diff(reverse, tc.bq); diff != "" {
+ t.Errorf("conversion to BQ diff (%s):\n%v", tc.desc, diff)
+ }
+ }
+}
+
+func TestSchemaConversion(t *testing.T) {
+
+ testCases := []struct {
+ description string
+ bqSchema bigquery.Schema
+ storageSchema *storagepb.TableSchema
+ }{
+ {
+ description: "nil",
+ bqSchema: nil,
+ storageSchema: nil,
+ },
+ {
+ description: "scalars",
+ bqSchema: bigquery.Schema{
+ {Name: "f1", Type: bigquery.StringFieldType},
+ {Name: "f2", Type: bigquery.IntegerFieldType},
+ {Name: "f3", Type: bigquery.BooleanFieldType},
+ },
+ storageSchema: &storagepb.TableSchema{
+ Fields: []*storagepb.TableFieldSchema{
+ {Name: "f1", Type: storagepb.TableFieldSchema_STRING, Mode: storagepb.TableFieldSchema_NULLABLE},
+ {Name: "f2", Type: storagepb.TableFieldSchema_INT64, Mode: storagepb.TableFieldSchema_NULLABLE},
+ {Name: "f3", Type: storagepb.TableFieldSchema_BOOL, Mode: storagepb.TableFieldSchema_NULLABLE},
+ },
+ },
+ },
+ {
+ description: "array",
+ bqSchema: bigquery.Schema{
+ {Name: "arr", Type: bigquery.NumericFieldType, Repeated: true},
+ {Name: "big", Type: bigquery.BigNumericFieldType, Required: true},
+ },
+ storageSchema: &storagepb.TableSchema{
+ Fields: []*storagepb.TableFieldSchema{
+ {Name: "arr", Type: storagepb.TableFieldSchema_NUMERIC, Mode: storagepb.TableFieldSchema_REPEATED},
+ {Name: "big", Type: storagepb.TableFieldSchema_BIGNUMERIC, Mode: storagepb.TableFieldSchema_REQUIRED},
+ },
+ },
+ },
+ {
+ description: "nested",
+ bqSchema: bigquery.Schema{
+ {Name: "struct1", Type: bigquery.RecordFieldType, Schema: []*bigquery.FieldSchema{
+ {Name: "leaf1", Type: bigquery.DateFieldType},
+ {Name: "leaf2", Type: bigquery.DateTimeFieldType},
+ }},
+ {Name: "field2", Type: bigquery.StringFieldType},
+ },
+ storageSchema: &storagepb.TableSchema{
+ Fields: []*storagepb.TableFieldSchema{
+ {Name: "struct1",
+ Type: storagepb.TableFieldSchema_STRUCT,
+ Mode: storagepb.TableFieldSchema_NULLABLE,
+ Fields: []*storagepb.TableFieldSchema{
+ {Name: "leaf1", Type: storagepb.TableFieldSchema_DATE, Mode: storagepb.TableFieldSchema_NULLABLE},
+ {Name: "leaf2", Type: storagepb.TableFieldSchema_DATETIME, Mode: storagepb.TableFieldSchema_NULLABLE},
+ }},
+ {Name: "field2", Type: storagepb.TableFieldSchema_STRING, Mode: storagepb.TableFieldSchema_NULLABLE},
+ },
+ },
+ },
+ }
+ for _, tc := range testCases {
+
+ // BQ -> Storage
+ storageS, err := BQSchemaToStorageTableSchema(tc.bqSchema)
+ if err != nil {
+ t.Errorf("BQSchemaToStorageTableSchema(%s): %v", tc.description, err)
+ }
+ if diff := testutil.Diff(storageS, tc.storageSchema); diff != "" {
+ t.Fatalf("BQSchemaToStorageTableSchema(%s): -got, +want:\n%s", tc.description, diff)
+ }
+
+ // Storage -> BQ
+ bqS, err := StorageTableSchemaToBQSchema(tc.storageSchema)
+ if err != nil {
+ t.Errorf("StorageTableSchemaToBQSchema(%s): %v", tc.description, err)
+ }
+ if diff := testutil.Diff(bqS, tc.bqSchema); diff != "" {
+ t.Fatalf("StorageTableSchemaToBQSchema(%s): -got, +want:\n%s", tc.description, diff)
+ }
+ }
+}
diff --git a/bigquery/storage/managedwriter/doc.go b/bigquery/storage/managedwriter/doc.go
new file mode 100644
index 0000000..a8e580b
--- /dev/null
+++ b/bigquery/storage/managedwriter/doc.go
@@ -0,0 +1,22 @@
+// Copyright 2021 Google LLC
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package managedwriter will be a thick client around the storage API's BigQueryWriteClient.
+//
+// It is EXPERIMENTAL and subject to change or removal without notice.
+//
+// Currently, the BigQueryWriteClient this library targets is exposed in the storage v1beta2 endpoint, and is
+// a successor to the streaming interface. API method tabledata.insertAll is the primary backend method, and
+// the Inserter abstraction is the equivalent to this in the cloud.google.com/go/bigquery package.
+package managedwriter