bigquery: Table.Create takes TableMetadata, not options

BREAKING CHANGES:

- Table.Create takes TableMetadata
- TableMetadata.View renamed to ViewQuery
- TableMetadata.ID renamed to FullID

See https://code-review.googlesource.com/c/gocloud/+/17130
for the rationale of using a struct instead of options for
Dataset.Create.

This CL does the same for Table.Create.

- Remove CreateTableOptions.

- Table.Create takes a second argument, TableMetadata.

- TableMetadata reorganized into writable and read-only sections.

Also, the two renames mentioned above.

Change-Id: I6aa83850dc4449c2b88393d97442748021be6ddd
Reviewed-on: https://code-review.googlesource.com/17150
Reviewed-by: kokoro <noreply+kokoro@google.com>
Reviewed-by: Ross Light <light@google.com>
Reviewed-by: Michael Darakananda <pongad@google.com>
diff --git a/bigquery/create_table_test.go b/bigquery/create_table_test.go
deleted file mode 100644
index 8f586ab..0000000
--- a/bigquery/create_table_test.go
+++ /dev/null
@@ -1,122 +0,0 @@
-// Copyright 2015 Google Inc. All Rights Reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//      http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package bigquery
-
-import (
-	"testing"
-	"time"
-
-	"github.com/google/go-cmp/cmp"
-
-	"cloud.google.com/go/internal/testutil"
-
-	"golang.org/x/net/context"
-	bq "google.golang.org/api/bigquery/v2"
-)
-
-type createTableRecorder struct {
-	conf *createTableConf
-	service
-}
-
-func (rec *createTableRecorder) createTable(ctx context.Context, conf *createTableConf) error {
-	rec.conf = conf
-	return nil
-}
-
-func TestCreateTableOptions(t *testing.T) {
-	s := &createTableRecorder{}
-	c := &Client{
-		projectID: "p",
-		service:   s,
-	}
-	ds := c.Dataset("d")
-	table := ds.Table("t")
-	exp := time.Now()
-	q := "query"
-	if err := table.Create(context.Background(), TableExpiration(exp), ViewQuery(q), UseStandardSQL()); err != nil {
-		t.Fatalf("err calling Table.Create: %v", err)
-	}
-	want := createTableConf{
-		projectID:      "p",
-		datasetID:      "d",
-		tableID:        "t",
-		expiration:     exp,
-		viewQuery:      q,
-		useStandardSQL: true,
-	}
-	equal := func(x, y createTableConf) bool {
-		return testutil.Equal(x, y, cmp.AllowUnexported(createTableConf{}))
-	}
-
-	if !equal(*s.conf, want) {
-		t.Errorf("createTableConf: got:\n%v\nwant:\n%v", *s.conf, want)
-	}
-
-	sc := Schema{fieldSchema("desc", "name", "STRING", false, true)}
-	if err := table.Create(context.Background(), TableExpiration(exp), sc); err != nil {
-		t.Fatalf("err calling Table.Create: %v", err)
-	}
-	want = createTableConf{
-		projectID:  "p",
-		datasetID:  "d",
-		tableID:    "t",
-		expiration: exp,
-		// No need for an elaborate schema, that is tested in schema_test.go.
-		schema: &bq.TableSchema{
-			Fields: []*bq.TableFieldSchema{
-				bqTableFieldSchema("desc", "name", "STRING", "REQUIRED"),
-			},
-		},
-	}
-	if !equal(*s.conf, want) {
-		t.Errorf("createTableConf: got:\n%v\nwant:\n%v", *s.conf, want)
-	}
-
-	partitionCases := []struct {
-		timePartitioning   TimePartitioning
-		expectedExpiration time.Duration
-	}{
-		{TimePartitioning{}, time.Duration(0)},
-		{TimePartitioning{time.Second}, time.Second},
-	}
-
-	for _, c := range partitionCases {
-		if err := table.Create(context.Background(), c.timePartitioning); err != nil {
-			t.Fatalf("err calling Table.Create: %v", err)
-		}
-		want = createTableConf{
-			projectID:        "p",
-			datasetID:        "d",
-			tableID:          "t",
-			timePartitioning: &TimePartitioning{c.expectedExpiration},
-		}
-		if !equal(*s.conf, want) {
-			t.Errorf("createTableConf: got:\n%v\nwant:\n%v", *s.conf, want)
-		}
-	}
-}
-
-func TestCreateTableOptionsLegacySQL(t *testing.T) {
-	c := &Client{
-		projectID: "p",
-		service:   &bigqueryService{},
-	}
-	ds := c.Dataset("d")
-	table := ds.Table("t")
-	if err := table.Create(context.Background(), UseStandardSQL(), UseLegacySQL()); err == nil {
-		t.Fatal("no error using both standard and legacy SQL options")
-	}
-}
diff --git a/bigquery/examples_test.go b/bigquery/examples_test.go
index a781265..81cd76c 100644
--- a/bigquery/examples_test.go
+++ b/bigquery/examples_test.go
@@ -17,6 +17,7 @@
 import (
 	"fmt"
 	"os"
+	"time"
 
 	"cloud.google.com/go/bigquery"
 	"golang.org/x/net/context"
@@ -390,13 +391,13 @@
 		// TODO: Handle error.
 	}
 	t := client.Dataset("my_dataset").Table("new-table")
-	if err := t.Create(ctx); err != nil {
+	if err := t.Create(ctx, nil); err != nil {
 		// TODO: Handle error.
 	}
 }
 
-// If you know your table's schema initially, pass a Schema to Create.
-func ExampleTable_Create_schema() {
+// Initialize a new table by passing TableMetadata to Table.Create.
+func ExampleTable_Create_initialize() {
 	ctx := context.Background()
 	// Infer table schema from a Go type.
 	schema, err := bigquery.InferSchema(Item{})
@@ -408,7 +409,12 @@
 		// TODO: Handle error.
 	}
 	t := client.Dataset("my_dataset").Table("new-table")
-	if err := t.Create(ctx, schema); err != nil {
+	if err := t.Create(ctx,
+		&bigquery.TableMetadata{
+			Name:           "My New Table",
+			Schema:         schema,
+			ExpirationTime: time.Now().Add(24 * time.Hour),
+		}); err != nil {
 		// TODO: Handle error.
 	}
 }
diff --git a/bigquery/integration_test.go b/bigquery/integration_test.go
index d8583ea..33c5d42 100644
--- a/bigquery/integration_test.go
+++ b/bigquery/integration_test.go
@@ -98,7 +98,10 @@
 	schema := Schema{
 		{Name: "rec", Type: RecordFieldType, Schema: Schema{}},
 	}
-	err := table.Create(context.Background(), schema, TableExpiration(time.Now().Add(5*time.Minute)))
+	err := table.Create(context.Background(), &TableMetadata{
+		Schema:         schema,
+		ExpirationTime: time.Now().Add(5 * time.Minute),
+	})
 	if err == nil {
 		t.Fatal("want error, got nil")
 	}
@@ -117,8 +120,12 @@
 
 	// Test that standard SQL views work.
 	view := dataset.Table("t_view_standardsql")
-	query := ViewQuery(fmt.Sprintf("SELECT APPROX_COUNT_DISTINCT(name) FROM `%s.%s.%s`", dataset.ProjectID, dataset.DatasetID, table.TableID))
-	err := view.Create(context.Background(), UseStandardSQL(), query)
+	query := fmt.Sprintf("SELECT APPROX_COUNT_DISTINCT(name) FROM `%s.%s.%s`",
+		dataset.ProjectID, dataset.DatasetID, table.TableID)
+	err := view.Create(context.Background(), &TableMetadata{
+		ViewQuery:      query,
+		UseStandardSQL: true,
+	})
 	if err != nil {
 		t.Fatalf("table.create: Did not expect an error, got: %v", err)
 	}
@@ -138,8 +145,8 @@
 		t.Fatal(err)
 	}
 	// TODO(jba): check md more thorougly.
-	if got, want := md.ID, fmt.Sprintf("%s:%s.%s", dataset.ProjectID, dataset.DatasetID, table.TableID); got != want {
-		t.Errorf("metadata.ID: got %q, want %q", got, want)
+	if got, want := md.FullID, fmt.Sprintf("%s:%s.%s", dataset.ProjectID, dataset.DatasetID, table.TableID); got != want {
+		t.Errorf("metadata.FullID: got %q, want %q", got, want)
 	}
 	if got, want := md.Type, RegularTable; got != want {
 		t.Errorf("metadata.Type: got %v, want %v", got, want)
@@ -163,7 +170,11 @@
 	}
 	for i, c := range partitionCases {
 		table := dataset.Table(fmt.Sprintf("t_metadata_partition_%v", i))
-		err = table.Create(context.Background(), schema, c.timePartitioning, TableExpiration(time.Now().Add(5*time.Minute)))
+		err = table.Create(context.Background(), &TableMetadata{
+			Schema:           schema,
+			TimePartitioning: &c.timePartitioning,
+			ExpirationTime:   time.Now().Add(5 * time.Minute),
+		})
 		if err != nil {
 			t.Fatal(err)
 		}
@@ -1119,7 +1130,7 @@
 }
 
 func TestIntegration_TableUseLegacySQL(t *testing.T) {
-	// Test the UseLegacySQL and UseStandardSQL options for CreateTable.
+	// Test UseLegacySQL and UseStandardSQL for Table.Create.
 	if client == nil {
 		t.Skip("Integration tests skipped")
 	}
@@ -1128,15 +1139,12 @@
 	defer table.Delete(ctx)
 	for i, test := range useLegacySqlTests {
 		view := dataset.Table(fmt.Sprintf("t_view_%d", i))
-		vq := ViewQuery(fmt.Sprintf("SELECT word from %s", test.t))
-		opts := []CreateTableOption{vq}
-		if test.std {
-			opts = append(opts, UseStandardSQL())
+		tm := &TableMetadata{
+			ViewQuery:      fmt.Sprintf("SELECT word from %s", test.t),
+			UseStandardSQL: test.std,
+			UseLegacySQL:   test.legacy,
 		}
-		if test.legacy {
-			opts = append(opts, UseLegacySQL())
-		}
-		err := view.Create(ctx, opts...)
+		err := view.Create(ctx, tm)
 		gotErr := err != nil
 		if gotErr && !test.err {
 			t.Errorf("%+v:\nunexpected error: %v", test, err)
@@ -1183,7 +1191,10 @@
 func newTable(t *testing.T, s Schema) *Table {
 	name := fmt.Sprintf("t%d", time.Now().UnixNano())
 	table := dataset.Table(name)
-	err := table.Create(context.Background(), s, TableExpiration(testTableExpiration))
+	err := table.Create(context.Background(), &TableMetadata{
+		Schema:         s,
+		ExpirationTime: testTableExpiration,
+	})
 	if err != nil {
 		t.Fatal(err)
 	}
diff --git a/bigquery/schema.go b/bigquery/schema.go
index 0c8f464..b8674ea 100644
--- a/bigquery/schema.go
+++ b/bigquery/schema.go
@@ -77,11 +77,6 @@
 	return &bq.TableSchema{Fields: fields}
 }
 
-// customizeCreateTable allows a Schema to be used directly as an option to CreateTable.
-func (s Schema) customizeCreateTable(conf *createTableConf) {
-	conf.schema = s.asTableSchema()
-}
-
 func convertTableFieldSchema(tfs *bq.TableFieldSchema) *FieldSchema {
 	fs := &FieldSchema{
 		Description: tfs.Description,
diff --git a/bigquery/service.go b/bigquery/service.go
index 6579aa7..7c84a31 100644
--- a/bigquery/service.go
+++ b/bigquery/service.go
@@ -44,7 +44,7 @@
 	listJobs(ctx context.Context, projectId string, maxResults int, pageToken string, all bool, state string) ([]JobInfo, string, error)
 
 	// Tables
-	createTable(ctx context.Context, conf *createTableConf) error
+	createTable(ctx context.Context, projectID, datasetID, tableID string, tm *TableMetadata) error
 	getTableMetadata(ctx context.Context, projectID, datasetID, tableID string) (*TableMetadata, error)
 	deleteTable(ctx context.Context, projectID, datasetID, tableID string) error
 
@@ -502,63 +502,89 @@
 	return tables, res.NextPageToken, nil
 }
 
-type createTableConf struct {
-	projectID, datasetID, tableID string
-	expiration                    time.Time
-	viewQuery                     string
-	schema                        *bq.TableSchema
-	useStandardSQL                bool
-	useLegacySQL                  bool
-	timePartitioning              *TimePartitioning
-}
-
 // createTable creates a table in the BigQuery service.
-// expiration is an optional time after which the table will be deleted and its storage reclaimed.
-// If viewQuery is non-empty, the created table will be of type VIEW.
+// If tm.ViewQuery is non-empty, the created table will be of type VIEW.
 // Note: expiration can only be set during table creation.
 // Note: after table creation, a view can be modified only if its table was initially created with a view.
-func (s *bigqueryService) createTable(ctx context.Context, conf *createTableConf) error {
-	if conf.useStandardSQL && conf.useLegacySQL {
-		return errors.New("bigquery: cannot provide both UseStandardSQL and UseLegacySQL")
+func (s *bigqueryService) createTable(ctx context.Context, projectID, datasetID, tableID string, tm *TableMetadata) error {
+	table, err := bqTableFromMetadata(tm)
+	if err != nil {
+		return err
 	}
-	table := &bq.Table{
-		// TODO(jba): retry? Is this always idempotent?
-		TableReference: &bq.TableReference{
-			ProjectId: conf.projectID,
-			DatasetId: conf.datasetID,
-			TableId:   conf.tableID,
-		},
+	table.TableReference = &bq.TableReference{
+		ProjectId: projectID,
+		DatasetId: datasetID,
+		TableId:   tableID,
 	}
-	if !conf.expiration.IsZero() {
-		table.ExpirationTime = conf.expiration.UnixNano() / 1e6
+	req := s.s.Tables.Insert(projectID, datasetID, table).Context(ctx)
+	setClientHeader(req.Header())
+	_, err = req.Do()
+	return err
+}
+
+func bqTableFromMetadata(tm *TableMetadata) (*bq.Table, error) {
+	t := &bq.Table{}
+	if tm == nil {
+		return t, nil
 	}
-	// TODO(jba): make it impossible to provide both a view query and a schema.
-	if conf.viewQuery != "" {
-		table.View = &bq.ViewDefinition{
-			Query: conf.viewQuery,
+	if tm.Schema != nil && tm.ViewQuery != "" {
+		return nil, errors.New("bigquery: provide Schema or ViewQuery, not both")
+	}
+	t.FriendlyName = tm.Name
+	t.Description = tm.Description
+	if tm.Schema != nil {
+		t.Schema = tm.Schema.asTableSchema()
+	}
+	if tm.ViewQuery != "" {
+		if tm.UseStandardSQL && tm.UseLegacySQL {
+			return nil, errors.New("bigquery: cannot provide both UseStandardSQL and UseLegacySQL")
 		}
-		if conf.useStandardSQL {
-			table.View.UseLegacySql = false
-			table.View.ForceSendFields = append(table.View.ForceSendFields, "UseLegacySql")
+		t.View = &bq.ViewDefinition{Query: tm.ViewQuery}
+		if tm.UseLegacySQL {
+			t.View.UseLegacySql = true
 		}
-		if conf.useLegacySQL {
-			table.View.UseLegacySql = true
+		if tm.UseStandardSQL {
+			t.View.UseLegacySql = false
+			t.View.ForceSendFields = append(t.View.ForceSendFields, "UseLegacySql")
 		}
+	} else if tm.UseLegacySQL || tm.UseStandardSQL {
+		return nil, errors.New("bigquery: UseLegacy/StandardSQL requires ViewQuery")
 	}
-	if conf.schema != nil {
-		table.Schema = conf.schema
-	}
-	if conf.timePartitioning != nil {
-		table.TimePartitioning = &bq.TimePartitioning{
+	if tm.TimePartitioning != nil {
+		t.TimePartitioning = &bq.TimePartitioning{
 			Type:         "DAY",
-			ExpirationMs: int64(conf.timePartitioning.Expiration.Seconds() * 1000),
+			ExpirationMs: int64(tm.TimePartitioning.Expiration / time.Millisecond),
 		}
 	}
+	if !tm.ExpirationTime.IsZero() {
+		t.ExpirationTime = tm.ExpirationTime.UnixNano() / 1e6
+	}
 
-	req := s.s.Tables.Insert(conf.projectID, conf.datasetID, table).Context(ctx)
-	setClientHeader(req.Header())
-	_, err := req.Do()
-	return err
+	if tm.FullID != "" {
+		return nil, errors.New("cannot set FullID on create")
+	}
+	if tm.Type != "" {
+		return nil, errors.New("cannot set Type on create")
+	}
+	if !tm.CreationTime.IsZero() {
+		return nil, errors.New("cannot set CreationTime on create")
+	}
+	if !tm.LastModifiedTime.IsZero() {
+		return nil, errors.New("cannot set LastModifiedTime on create")
+	}
+	if tm.NumBytes != 0 {
+		return nil, errors.New("cannot set NumBytes on create")
+	}
+	if tm.NumRows != 0 {
+		return nil, errors.New("cannot set NumRows on create")
+	}
+	if tm.StreamingBuffer != nil {
+		return nil, errors.New("cannot set StreamingBuffer on create")
+	}
+	if tm.ETag != "" {
+		return nil, errors.New("cannot set ETag on create")
+	}
+	return t, nil
 }
 
 func (s *bigqueryService) getTableMetadata(ctx context.Context, projectID, datasetID, tableID string) (*TableMetadata, error) {
@@ -586,7 +612,7 @@
 		Description:      t.Description,
 		Name:             t.FriendlyName,
 		Type:             TableType(t.Type),
-		ID:               t.Id,
+		FullID:           t.Id,
 		NumBytes:         t.NumBytes,
 		NumRows:          t.NumRows,
 		ExpirationTime:   unixMillisToTime(t.ExpirationTime),
@@ -598,7 +624,8 @@
 		md.Schema = convertTableSchema(t.Schema)
 	}
 	if t.View != nil {
-		md.View = t.View.Query
+		md.ViewQuery = t.View.Query
+		md.UseLegacySQL = t.View.UseLegacySql
 	}
 	if t.TimePartitioning != nil {
 		md.TimePartitioning = &TimePartitioning{
@@ -729,21 +756,11 @@
 	if dm == nil {
 		return ds, nil
 	}
-	if dm.Name != "" {
-		ds.FriendlyName = dm.Name
-	}
-	if dm.Description != "" {
-		ds.Description = dm.Description
-	}
-	if dm.Location != "" {
-		ds.Location = dm.Location
-	}
-	if dm.DefaultTableExpiration != 0 {
-		ds.DefaultTableExpirationMs = int64(dm.DefaultTableExpiration / time.Millisecond)
-	}
-	if dm.Labels != nil {
-		ds.Labels = dm.Labels
-	}
+	ds.FriendlyName = dm.Name
+	ds.Description = dm.Description
+	ds.Location = dm.Location
+	ds.DefaultTableExpirationMs = int64(dm.DefaultTableExpiration / time.Millisecond)
+	ds.Labels = dm.Labels
 	if !dm.CreationTime.IsZero() {
 		return nil, errors.New("bigquery: Dataset.CreationTime is not writable")
 	}
diff --git a/bigquery/service_test.go b/bigquery/service_test.go
index 07feaae..bc214ba 100644
--- a/bigquery/service_test.go
+++ b/bigquery/service_test.go
@@ -59,15 +59,15 @@
 			&TableMetadata{
 				Description:      "desc",
 				Name:             "fname",
-				View:             "view-query",
-				ID:               "id",
+				ViewQuery:        "view-query",
+				FullID:           "id",
 				Type:             ExternalTable,
 				ExpirationTime:   aTime.Truncate(time.Millisecond),
 				CreationTime:     aTime.Truncate(time.Millisecond),
 				LastModifiedTime: aTime.Truncate(time.Millisecond),
 				NumBytes:         123,
 				NumRows:          7,
-				TimePartitioning: &TimePartitioning{Expiration: time.Duration(7890) * time.Millisecond},
+				TimePartitioning: &TimePartitioning{Expiration: 7890 * time.Millisecond},
 				StreamingBuffer: &StreamingBuffer{
 					EstimatedBytes:  11,
 					EstimatedRows:   3,
@@ -78,8 +78,104 @@
 		},
 	} {
 		got := bqTableToMetadata(test.in)
-		if !testutil.Equal(got, test.want) {
-			t.Errorf("%v:\ngot  %+v\nwant %+v", test.in, got, test.want)
+		if diff := testutil.Diff(got, test.want); diff != "" {
+			t.Errorf("%+v:\n, -got, +want:\n%s", test.in, diff)
+		}
+	}
+}
+
+func TestBQTableFromMetadata(t *testing.T) {
+	aTime := time.Date(2017, 1, 26, 0, 0, 0, 0, time.Local)
+	aTimeMillis := aTime.UnixNano() / 1e6
+	sc := Schema{fieldSchema("desc", "name", "STRING", false, true)}
+
+	for _, test := range []struct {
+		in   *TableMetadata
+		want *bq.Table
+	}{
+		{nil, &bq.Table{}},
+		{&TableMetadata{}, &bq.Table{}},
+		{
+			&TableMetadata{
+				Name:           "n",
+				Description:    "d",
+				Schema:         sc,
+				ExpirationTime: aTime,
+			},
+			&bq.Table{
+				FriendlyName: "n",
+				Description:  "d",
+				Schema: &bq.TableSchema{
+					Fields: []*bq.TableFieldSchema{
+						bqTableFieldSchema("desc", "name", "STRING", "REQUIRED"),
+					},
+				},
+				ExpirationTime: aTimeMillis,
+			},
+		},
+		{
+			&TableMetadata{
+				ViewQuery:        "q",
+				UseLegacySQL:     true,
+				TimePartitioning: &TimePartitioning{},
+			},
+			&bq.Table{
+				View: &bq.ViewDefinition{
+					Query:        "q",
+					UseLegacySql: true,
+				},
+				TimePartitioning: &bq.TimePartitioning{
+					Type:         "DAY",
+					ExpirationMs: 0,
+				},
+			},
+		},
+		{
+			&TableMetadata{
+				ViewQuery:        "q",
+				UseStandardSQL:   true,
+				TimePartitioning: &TimePartitioning{time.Second},
+			},
+			&bq.Table{
+				View: &bq.ViewDefinition{
+					Query:           "q",
+					UseLegacySql:    false,
+					ForceSendFields: []string{"UseLegacySql"},
+				},
+				TimePartitioning: &bq.TimePartitioning{
+					Type:         "DAY",
+					ExpirationMs: 1000,
+				},
+			},
+		},
+	} {
+		got, err := bqTableFromMetadata(test.in)
+		if err != nil {
+			t.Fatalf("%+v: %v", test.in, err)
+		}
+		if diff := testutil.Diff(got, test.want); diff != "" {
+			t.Errorf("%+v:\n-got, +want:\n%s", test.in, diff)
+		}
+	}
+
+	// Errors
+	for _, in := range []*TableMetadata{
+		{Schema: sc, ViewQuery: "q"}, // can't have both schema and query
+		{UseLegacySQL: true},         // UseLegacySQL without query
+		{UseStandardSQL: true},       // UseStandardSQL without query
+		// read-only fields
+		{FullID: "x"},
+		{Type: "x"},
+		{CreationTime: aTime},
+		{LastModifiedTime: aTime},
+		{NumBytes: 1},
+		{NumRows: 1},
+		{StreamingBuffer: &StreamingBuffer{}},
+		{ETag: "x"},
+	} {
+		_, err := bqTableFromMetadata(in)
+		if err == nil {
+			t.Errorf("%+v: got nil, want error", in)
 		}
 	}
 }
diff --git a/bigquery/table.go b/bigquery/table.go
index 41aa1fe..8c29e89 100644
--- a/bigquery/table.go
+++ b/bigquery/table.go
@@ -39,18 +39,38 @@
 
 // TableMetadata contains information about a BigQuery table.
 type TableMetadata struct {
-	Description string // The user-friendly description of this table.
-	Name        string // The user-friendly name for this table.
-	Schema      Schema
-	View        string
+	// The following fields can be set when creating a table.
 
-	ID   string // An opaque ID uniquely identifying the table.
-	Type TableType
+	// The user-friendly name for the table.
+	Name string
+
+	// The user-friendly description of the table.
+	Description string
+
+	// The table schema. If provided on create, ViewQuery must be empty.
+	Schema Schema
+
+	// The query to use for a view. If provided on create, Schema must be nil.
+	ViewQuery string
+
+	// Use Legacy SQL for the view query. The default.
+	// At most one of UseLegacySQL and UseStandardSQL can be true.
+	UseLegacySQL bool
+
+	// Use Legacy SQL for the view query.
+	UseStandardSQL bool
+
+	// If non-nil, the table is partitioned by time.
+	TimePartitioning *TimePartitioning
 
 	// The time when this table expires. If not set, the table will persist
 	// indefinitely. Expired tables will be deleted and their storage reclaimed.
 	ExpirationTime time.Time
 
+	// All the fields below are read-only.
+
+	FullID           string // An opaque ID uniquely identifying the table.
+	Type             TableType
 	CreationTime     time.Time
 	LastModifiedTime time.Time
 
@@ -62,9 +82,6 @@
 	// This does not include data that is being buffered during a streaming insert.
 	NumRows uint64
 
-	// The time-based partitioning settings for this table.
-	TimePartitioning *TimePartitioning
-
 	// Contains information regarding this table's streaming buffer, if one is
 	// present. This field will be nil if the table is not being streamed to or if
 	// there is no data in the streaming buffer.
@@ -115,6 +132,14 @@
 	ExternalTable TableType = "EXTERNAL"
 )
 
+// TimePartitioning describes the time-based date partitioning on a table.
+// For more information see: https://cloud.google.com/bigquery/docs/creating-partitioned-tables.
+type TimePartitioning struct {
+	// The amount of time to keep the storage for a partition.
+	// If the duration is empty (0), the data in the partitions do not expire.
+	Expiration time.Duration
+}
+
 // StreamingBuffer holds information about the streaming buffer.
 type StreamingBuffer struct {
 	// A lower-bound estimate of the number of bytes currently in the streaming
@@ -148,18 +173,9 @@
 }
 
 // Create creates a table in the BigQuery service.
-// To create a table with a schema, pass in a Schema to Create;
-// Schema is a valid CreateTableOption.
-func (t *Table) Create(ctx context.Context, options ...CreateTableOption) error {
-	conf := &createTableConf{
-		projectID: t.ProjectID,
-		datasetID: t.DatasetID,
-		tableID:   t.TableID,
-	}
-	for _, o := range options {
-		o.customizeCreateTable(conf)
-	}
-	return t.c.service.createTable(ctx, conf)
+// Pass in a TableMetadata value to configure the dataset.
+func (t *Table) Create(ctx context.Context, tm *TableMetadata) error {
+	return t.c.service.createTable(ctx, t.ProjectID, t.DatasetID, t.TableID, tm)
 }
 
 // Metadata fetches the metadata for the table.
@@ -172,63 +188,6 @@
 	return t.c.service.deleteTable(ctx, t.ProjectID, t.DatasetID, t.TableID)
 }
 
-// A CreateTableOption is an optional argument to CreateTable.
-type CreateTableOption interface {
-	customizeCreateTable(*createTableConf)
-}
-
-type tableExpiration time.Time
-
-// TableExpiration returns a CreateTableOption that will cause the created table to be deleted after the expiration time.
-func TableExpiration(exp time.Time) CreateTableOption { return tableExpiration(exp) }
-
-func (opt tableExpiration) customizeCreateTable(conf *createTableConf) {
-	conf.expiration = time.Time(opt)
-}
-
-type viewQuery string
-
-// ViewQuery returns a CreateTableOption that causes the created table to be a virtual table defined by the supplied query.
-// For more information see: https://cloud.google.com/bigquery/querying-data#views
-func ViewQuery(query string) CreateTableOption { return viewQuery(query) }
-
-func (opt viewQuery) customizeCreateTable(conf *createTableConf) {
-	conf.viewQuery = string(opt)
-}
-
-type useStandardSQL struct{}
-
-// UseStandardSQL returns a CreateTableOption to set the table to use standard SQL.
-// The default setting is false (using legacy SQL).
-func UseStandardSQL() CreateTableOption { return useStandardSQL{} }
-
-func (opt useStandardSQL) customizeCreateTable(conf *createTableConf) {
-	conf.useStandardSQL = true
-}
-
-type useLegacySQL struct{}
-
-// UseLegacySQL returns a CreateTableOption to set the table to use legacy SQL.
-// This is currently the default.
-func UseLegacySQL() CreateTableOption { return useLegacySQL{} }
-
-func (opt useLegacySQL) customizeCreateTable(conf *createTableConf) {
-	conf.useLegacySQL = true
-}
-
-// TimePartitioning is a CreateTableOption that can be used to set time-based
-// date partitioning on a table.
-// For more information see: https://cloud.google.com/bigquery/docs/creating-partitioned-tables
-type TimePartitioning struct {
-	// (Optional) The amount of time to keep the storage for a partition.
-	// If the duration is empty (0), the data in the partitions do not expire.
-	Expiration time.Duration
-}
-
-func (opt TimePartitioning) customizeCreateTable(conf *createTableConf) {
-	conf.timePartitioning = &opt
-}
-
 // Read fetches the contents of the table.
 func (t *Table) Read(ctx context.Context) *RowIterator {
 	return newRowIterator(ctx, t.c.service, &readTableConf{