feat(bigquery managedwriter): schema conversion support (#4357)

This is the first of multiple PRs to build up the functionality of a new
thick client over the new BigQuery Storage API's write mechanism.

This PR exposes schema conversion between the main bigquery package and
the bigquery storage API.

Towards: https://github.com/googleapis/google-cloud-go/issues/4366
diff --git a/bigquery/storage/managedwriter/adapt/doc.go b/bigquery/storage/managedwriter/adapt/doc.go
new file mode 100644
index 0000000..c06d303
--- /dev/null
+++ b/bigquery/storage/managedwriter/adapt/doc.go
@@ -0,0 +1,19 @@
+// Copyright 2021 Google LLC
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package adapt adds functionality related to converting bigquery representations
+// like schema and data type representations.
+//
+// It is EXPERIMENTAL and subject to change or removal without notice.
+package adapt
diff --git a/bigquery/storage/managedwriter/adapt/schemaconversion.go b/bigquery/storage/managedwriter/adapt/schemaconversion.go
new file mode 100644
index 0000000..8de2257
--- /dev/null
+++ b/bigquery/storage/managedwriter/adapt/schemaconversion.go
@@ -0,0 +1,140 @@
+// Copyright 2021 Google LLC
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package adapt
+
+import (
+	"fmt"
+
+	"cloud.google.com/go/bigquery"
+	storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1beta2"
+)
+
+var fieldTypeMap = map[bigquery.FieldType]storagepb.TableFieldSchema_Type{
+	bigquery.StringFieldType:     storagepb.TableFieldSchema_STRING,
+	bigquery.BytesFieldType:      storagepb.TableFieldSchema_BYTES,
+	bigquery.IntegerFieldType:    storagepb.TableFieldSchema_INT64,
+	bigquery.FloatFieldType:      storagepb.TableFieldSchema_DOUBLE,
+	bigquery.BooleanFieldType:    storagepb.TableFieldSchema_BOOL,
+	bigquery.TimestampFieldType:  storagepb.TableFieldSchema_TIMESTAMP,
+	bigquery.RecordFieldType:     storagepb.TableFieldSchema_STRUCT,
+	bigquery.DateFieldType:       storagepb.TableFieldSchema_DATE,
+	bigquery.TimeFieldType:       storagepb.TableFieldSchema_TIME,
+	bigquery.DateTimeFieldType:   storagepb.TableFieldSchema_DATETIME,
+	bigquery.NumericFieldType:    storagepb.TableFieldSchema_NUMERIC,
+	bigquery.BigNumericFieldType: storagepb.TableFieldSchema_BIGNUMERIC,
+	bigquery.GeographyFieldType:  storagepb.TableFieldSchema_GEOGRAPHY,
+}
+
+func bqFieldToProto(in *bigquery.FieldSchema) (*storagepb.TableFieldSchema, error) {
+	if in == nil {
+		return nil, nil
+	}
+	out := &storagepb.TableFieldSchema{
+		Name:        in.Name,
+		Description: in.Description,
+	}
+
+	// Type conversion.
+	typ, ok := fieldTypeMap[in.Type]
+	if !ok {
+		return nil, fmt.Errorf("could not convert field (%s) due to unknown type value: %s", in.Name, in.Type)
+	}
+	out.Type = typ
+
+	// Mode conversion.  Repeated trumps required.
+	out.Mode = storagepb.TableFieldSchema_NULLABLE
+	if in.Repeated {
+		out.Mode = storagepb.TableFieldSchema_REPEATED
+	}
+	if !in.Repeated && in.Required {
+		out.Mode = storagepb.TableFieldSchema_REQUIRED
+	}
+
+	for _, s := range in.Schema {
+		subField, err := bqFieldToProto(s)
+		if err != nil {
+			return nil, err
+		}
+		out.Fields = append(out.Fields, subField)
+	}
+	return out, nil
+}
+
+func protoToBQField(in *storagepb.TableFieldSchema) (*bigquery.FieldSchema, error) {
+	if in == nil {
+		return nil, nil
+	}
+	out := &bigquery.FieldSchema{
+		Name:        in.GetName(),
+		Description: in.GetDescription(),
+		Repeated:    in.GetMode() == storagepb.TableFieldSchema_REPEATED,
+		Required:    in.GetMode() == storagepb.TableFieldSchema_REQUIRED,
+	}
+
+	typeResolved := false
+	for k, v := range fieldTypeMap {
+		if v == in.GetType() {
+			out.Type = k
+			typeResolved = true
+			break
+		}
+	}
+	if !typeResolved {
+		return nil, fmt.Errorf("could not convert proto type to bigquery type: %v", in.GetType().String())
+	}
+
+	for _, s := range in.Fields {
+		subField, err := protoToBQField(s)
+		if err != nil {
+			return nil, err
+		}
+		out.Schema = append(out.Schema, subField)
+	}
+	return out, nil
+}
+
+// BQSchemaToStorageTableSchema converts a bigquery Schema into the protobuf-based TableSchema used
+// by the BigQuery Storage WriteClient.
+func BQSchemaToStorageTableSchema(in bigquery.Schema) (*storagepb.TableSchema, error) {
+	if in == nil {
+		return nil, nil
+	}
+	out := &storagepb.TableSchema{}
+	for _, s := range in {
+		converted, err := bqFieldToProto(s)
+		if err != nil {
+			return nil, err
+		}
+		out.Fields = append(out.Fields, converted)
+	}
+	return out, nil
+}
+
+// StorageTableSchemaToBQSchema converts a TableSchema from the BigQuery Storage WriteClient
+// into the equivalent BigQuery Schema.
+func StorageTableSchemaToBQSchema(in *storagepb.TableSchema) (bigquery.Schema, error) {
+	if in == nil {
+		return nil, nil
+	}
+	var out bigquery.Schema
+	for _, s := range in.Fields {
+		converted, err := protoToBQField(s)
+		if err != nil {
+			return nil, err
+		}
+		out = append(out, converted)
+	}
+	return out, nil
+}
diff --git a/bigquery/storage/managedwriter/adapt/schemaconversion_test.go b/bigquery/storage/managedwriter/adapt/schemaconversion_test.go
new file mode 100644
index 0000000..aab85fa
--- /dev/null
+++ b/bigquery/storage/managedwriter/adapt/schemaconversion_test.go
@@ -0,0 +1,203 @@
+// Copyright 2021 Google LLC
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package adapt
+
+import (
+	"testing"
+
+	"cloud.google.com/go/bigquery"
+	"cloud.google.com/go/internal/testutil"
+	"github.com/google/go-cmp/cmp"
+	storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1beta2"
+	"google.golang.org/protobuf/testing/protocmp"
+)
+
+func TestFieldConversions(t *testing.T) {
+	testCases := []struct {
+		desc  string
+		bq    *bigquery.FieldSchema
+		proto *storagepb.TableFieldSchema
+	}{
+		{
+			desc:  "nil",
+			bq:    nil,
+			proto: nil,
+		},
+		{
+			desc: "string field",
+			bq: &bigquery.FieldSchema{
+				Name:        "name",
+				Type:        bigquery.StringFieldType,
+				Description: "description",
+			},
+			proto: &storagepb.TableFieldSchema{
+				Name:        "name",
+				Type:        storagepb.TableFieldSchema_STRING,
+				Description: "description",
+				Mode:        storagepb.TableFieldSchema_NULLABLE,
+			},
+		},
+		{
+			desc: "required integer field",
+			bq: &bigquery.FieldSchema{
+				Name:        "name",
+				Type:        bigquery.IntegerFieldType,
+				Description: "description",
+				Required:    true,
+			},
+			proto: &storagepb.TableFieldSchema{
+				Name:        "name",
+				Type:        storagepb.TableFieldSchema_INT64,
+				Description: "description",
+				Mode:        storagepb.TableFieldSchema_REQUIRED,
+			},
+		},
+		{
+			desc: "struct with repeated bytes subfield",
+			bq: &bigquery.FieldSchema{
+				Name:        "name",
+				Type:        bigquery.RecordFieldType,
+				Description: "description",
+				Required:    true,
+				Schema: bigquery.Schema{
+					&bigquery.FieldSchema{
+						Name:        "inner1",
+						Repeated:    true,
+						Description: "repeat",
+						Type:        bigquery.BytesFieldType,
+					},
+				},
+			},
+			proto: &storagepb.TableFieldSchema{
+				Name:        "name",
+				Type:        storagepb.TableFieldSchema_STRUCT,
+				Description: "description",
+				Mode:        storagepb.TableFieldSchema_REQUIRED,
+				Fields: []*storagepb.TableFieldSchema{
+					{
+						Name:        "inner1",
+						Mode:        storagepb.TableFieldSchema_REPEATED,
+						Description: "repeat",
+						Type:        storagepb.TableFieldSchema_BYTES,
+					},
+				},
+			},
+		},
+	}
+
+	for _, tc := range testCases {
+		// first, bq to proto
+		converted, err := bqFieldToProto(tc.bq)
+		if err != nil {
+			t.Errorf("case (%s) failed conversion from bq: %v", tc.desc, err)
+		}
+		if diff := cmp.Diff(converted, tc.proto, protocmp.Transform()); diff != "" {
+			t.Errorf("conversion to proto diff (%s):\n%v", tc.desc, diff)
+		}
+		// reverse conversion, proto to bq
+		reverse, err := protoToBQField(tc.proto)
+		if err != nil {
+			t.Errorf("case (%s) failed conversion from proto: %v", tc.desc, err)
+		}
+		if diff := cmp.Diff(reverse, tc.bq); diff != "" {
+			t.Errorf("conversion to BQ diff (%s):\n%v", tc.desc, diff)
+		}
+	}
+}
+
+func TestSchemaConversion(t *testing.T) {
+
+	testCases := []struct {
+		description   string
+		bqSchema      bigquery.Schema
+		storageSchema *storagepb.TableSchema
+	}{
+		{
+			description:   "nil",
+			bqSchema:      nil,
+			storageSchema: nil,
+		},
+		{
+			description: "scalars",
+			bqSchema: bigquery.Schema{
+				{Name: "f1", Type: bigquery.StringFieldType},
+				{Name: "f2", Type: bigquery.IntegerFieldType},
+				{Name: "f3", Type: bigquery.BooleanFieldType},
+			},
+			storageSchema: &storagepb.TableSchema{
+				Fields: []*storagepb.TableFieldSchema{
+					{Name: "f1", Type: storagepb.TableFieldSchema_STRING, Mode: storagepb.TableFieldSchema_NULLABLE},
+					{Name: "f2", Type: storagepb.TableFieldSchema_INT64, Mode: storagepb.TableFieldSchema_NULLABLE},
+					{Name: "f3", Type: storagepb.TableFieldSchema_BOOL, Mode: storagepb.TableFieldSchema_NULLABLE},
+				},
+			},
+		},
+		{
+			description: "array",
+			bqSchema: bigquery.Schema{
+				{Name: "arr", Type: bigquery.NumericFieldType, Repeated: true},
+				{Name: "big", Type: bigquery.BigNumericFieldType, Required: true},
+			},
+			storageSchema: &storagepb.TableSchema{
+				Fields: []*storagepb.TableFieldSchema{
+					{Name: "arr", Type: storagepb.TableFieldSchema_NUMERIC, Mode: storagepb.TableFieldSchema_REPEATED},
+					{Name: "big", Type: storagepb.TableFieldSchema_BIGNUMERIC, Mode: storagepb.TableFieldSchema_REQUIRED},
+				},
+			},
+		},
+		{
+			description: "nested",
+			bqSchema: bigquery.Schema{
+				{Name: "struct1", Type: bigquery.RecordFieldType, Schema: []*bigquery.FieldSchema{
+					{Name: "leaf1", Type: bigquery.DateFieldType},
+					{Name: "leaf2", Type: bigquery.DateTimeFieldType},
+				}},
+				{Name: "field2", Type: bigquery.StringFieldType},
+			},
+			storageSchema: &storagepb.TableSchema{
+				Fields: []*storagepb.TableFieldSchema{
+					{Name: "struct1",
+						Type: storagepb.TableFieldSchema_STRUCT,
+						Mode: storagepb.TableFieldSchema_NULLABLE,
+						Fields: []*storagepb.TableFieldSchema{
+							{Name: "leaf1", Type: storagepb.TableFieldSchema_DATE, Mode: storagepb.TableFieldSchema_NULLABLE},
+							{Name: "leaf2", Type: storagepb.TableFieldSchema_DATETIME, Mode: storagepb.TableFieldSchema_NULLABLE},
+						}},
+					{Name: "field2", Type: storagepb.TableFieldSchema_STRING, Mode: storagepb.TableFieldSchema_NULLABLE},
+				},
+			},
+		},
+	}
+	for _, tc := range testCases {
+
+		// BQ -> Storage
+		storageS, err := BQSchemaToStorageTableSchema(tc.bqSchema)
+		if err != nil {
+			t.Errorf("BQSchemaToStorageTableSchema(%s): %v", tc.description, err)
+		}
+		if diff := testutil.Diff(storageS, tc.storageSchema); diff != "" {
+			t.Fatalf("BQSchemaToStorageTableSchema(%s): -got, +want:\n%s", tc.description, diff)
+		}
+
+		// Storage -> BQ
+		bqS, err := StorageTableSchemaToBQSchema(tc.storageSchema)
+		if err != nil {
+			t.Errorf("StorageTableSchemaToBQSchema(%s): %v", tc.description, err)
+		}
+		if diff := testutil.Diff(bqS, tc.bqSchema); diff != "" {
+			t.Fatalf("StorageTableSchemaToBQSchema(%s): -got, +want:\n%s", tc.description, diff)
+		}
+	}
+}
diff --git a/bigquery/storage/managedwriter/doc.go b/bigquery/storage/managedwriter/doc.go
new file mode 100644
index 0000000..a8e580b
--- /dev/null
+++ b/bigquery/storage/managedwriter/doc.go
@@ -0,0 +1,22 @@
+// Copyright 2021 Google LLC
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package managedwriter will be a thick client around the storage API's BigQueryWriteClient.
+//
+// It is EXPERIMENTAL and subject to change or removal without notice.
+//
+// Currently, the BigQueryWriteClient this library targets is exposed in the storage v1beta2 endpoint, and is
+// a successor to the streaming interface.  API method tabledata.insertAll is the primary backend method, and
+// the Inserter abstraction is the equivalent to this in the cloud.google.com/go/bigquery package.
+package managedwriter