bigquery: add range partitioning for tables

This feature adds the ability to define tables partitioning based
on integer ranges.  Previously, only time-partitioning was enabled
for organization table storage.

Partioning can be defined by invoking table creation directly, or
as a byproduct of creating a new table from a load or query job.

Change-Id: I4659c9fc0d14a66e6a7819e7cd0d13e31abd2f94
Reviewed-on: https://code-review.googlesource.com/c/gocloud/+/45890
Reviewed-by: kokoro <noreply+kokoro@google.com>
Reviewed-by: Alex Hong <hongalex@google.com>
diff --git a/bigquery/integration_test.go b/bigquery/integration_test.go
index 8e72860..a86000c 100644
--- a/bigquery/integration_test.go
+++ b/bigquery/integration_test.go
@@ -396,6 +396,65 @@
 
 }
 
+func TestIntegration_RangePartitioning(t *testing.T) {
+	if client == nil {
+		t.Skip("Integration tests skipped")
+	}
+	ctx := context.Background()
+	table := dataset.Table(tableIDs.New())
+
+	schema := Schema{
+		{Name: "name", Type: StringFieldType},
+		{Name: "somevalue", Type: IntegerFieldType},
+	}
+
+	wantedRange := &RangePartitioningRange{
+		Start:    10,
+		End:      135,
+		Interval: 25,
+	}
+
+	wantedPartitioning := &RangePartitioning{
+		Field: "somevalue",
+		Range: wantedRange,
+	}
+
+	err := table.Create(context.Background(), &TableMetadata{
+		Schema:            schema,
+		RangePartitioning: wantedPartitioning,
+	})
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer table.Delete(ctx)
+	md, err := table.Metadata(ctx)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if md.RangePartitioning == nil {
+		t.Fatal("expected range partitioning, got nil")
+	}
+	got := md.RangePartitioning.Field
+	if wantedPartitioning.Field != got {
+		t.Errorf("RangePartitioning Field: got %v, want %v", got, wantedPartitioning.Field)
+	}
+	if md.RangePartitioning.Range == nil {
+		t.Fatal("expected a range definition, got nil")
+	}
+	gotInt64 := md.RangePartitioning.Range.Start
+	if gotInt64 != wantedRange.Start {
+		t.Errorf("Range.Start: got %v, wanted %v", gotInt64, wantedRange.Start)
+	}
+	gotInt64 = md.RangePartitioning.Range.End
+	if gotInt64 != wantedRange.End {
+		t.Errorf("Range.End: got %v, wanted %v", gotInt64, wantedRange.End)
+	}
+	gotInt64 = md.RangePartitioning.Range.Interval
+	if gotInt64 != wantedRange.Interval {
+		t.Errorf("Range.Interval: got %v, wanted %v", gotInt64, wantedRange.Interval)
+	}
+}
 func TestIntegration_RemoveTimePartitioning(t *testing.T) {
 	if client == nil {
 		t.Skip("Integration tests skipped")
diff --git a/bigquery/load.go b/bigquery/load.go
index b275707..26d4978 100644
--- a/bigquery/load.go
+++ b/bigquery/load.go
@@ -44,6 +44,9 @@
 	// If non-nil, the destination table is partitioned by time.
 	TimePartitioning *TimePartitioning
 
+	// If non-nil, the destination table is partitioned by integer range.
+	RangePartitioning *RangePartitioning
+
 	// Clustering specifies the data clustering configuration for the destination table.
 	Clustering *Clustering
 
@@ -68,6 +71,7 @@
 			WriteDisposition:                   string(l.WriteDisposition),
 			DestinationTable:                   l.Dst.toBQ(),
 			TimePartitioning:                   l.TimePartitioning.toBQ(),
+			RangePartitioning:                  l.RangePartitioning.toBQ(),
 			Clustering:                         l.Clustering.toBQ(),
 			DestinationEncryptionConfiguration: l.DestinationEncryptionConfig.toBQ(),
 			SchemaUpdateOptions:                l.SchemaUpdateOptions,
@@ -85,6 +89,7 @@
 		WriteDisposition:            TableWriteDisposition(q.Load.WriteDisposition),
 		Dst:                         bqToTable(q.Load.DestinationTable, c),
 		TimePartitioning:            bqToTimePartitioning(q.Load.TimePartitioning),
+		RangePartitioning:           bqToRangePartitioning(q.Load.RangePartitioning),
 		Clustering:                  bqToClustering(q.Load.Clustering),
 		DestinationEncryptionConfig: bqToEncryptionConfig(q.Load.DestinationEncryptionConfiguration),
 		SchemaUpdateOptions:         q.Load.SchemaUpdateOptions,
diff --git a/bigquery/load_test.go b/bigquery/load_test.go
index dcb73fa..3b6f59f 100644
--- a/bigquery/load_test.go
+++ b/bigquery/load_test.go
@@ -272,6 +272,57 @@
 				return j
 			}(),
 		},
+		{
+			dst: c.Dataset("dataset-id").Table("table-id"),
+			src: func() *ReaderSource {
+				r := NewReaderSource(strings.NewReader("foo"))
+				return r
+			}(),
+			config: LoadConfig{
+				TimePartitioning: &TimePartitioning{
+					Field: "somefield",
+				},
+			},
+			want: func() *bq.Job {
+				j := defaultLoadJob()
+				j.Configuration.Load.SourceUris = nil
+				j.Configuration.Load.TimePartitioning = &bq.TimePartitioning{
+					Field: "somefield",
+					Type:  "DAY",
+				}
+				return j
+			}(),
+		},
+		{
+			dst: c.Dataset("dataset-id").Table("table-id"),
+			src: func() *ReaderSource {
+				r := NewReaderSource(strings.NewReader("foo"))
+				return r
+			}(),
+			config: LoadConfig{
+				RangePartitioning: &RangePartitioning{
+					Field: "somefield",
+					Range: &RangePartitioningRange{
+						Start:    1,
+						End:      2,
+						Interval: 3,
+					},
+				},
+			},
+			want: func() *bq.Job {
+				j := defaultLoadJob()
+				j.Configuration.Load.SourceUris = nil
+				j.Configuration.Load.RangePartitioning = &bq.RangePartitioning{
+					Field: "somefield",
+					Range: &bq.RangePartitioningRange{
+						Start:    1,
+						End:      2,
+						Interval: 3,
+					},
+				}
+				return j
+			}(),
+		},
 	}
 
 	for i, tc := range testCases {
diff --git a/bigquery/query.go b/bigquery/query.go
index 735a54c..39c1df0 100644
--- a/bigquery/query.go
+++ b/bigquery/query.go
@@ -105,6 +105,10 @@
 	// for the destination table.
 	TimePartitioning *TimePartitioning
 
+	// RangePartitioning specifies integer range-based partitioning
+	// for the destination table.
+	RangePartitioning *RangePartitioning
+
 	// Clustering specifies the data clustering configuration for the destination table.
 	Clustering *Clustering
 
@@ -137,6 +141,7 @@
 		Priority:                           string(qc.Priority),
 		MaximumBytesBilled:                 qc.MaxBytesBilled,
 		TimePartitioning:                   qc.TimePartitioning.toBQ(),
+		RangePartitioning:                  qc.RangePartitioning.toBQ(),
 		Clustering:                         qc.Clustering.toBQ(),
 		DestinationEncryptionConfiguration: qc.DestinationEncryptionConfig.toBQ(),
 		SchemaUpdateOptions:                qc.SchemaUpdateOptions,
@@ -208,6 +213,7 @@
 		MaxBytesBilled:              qq.MaximumBytesBilled,
 		UseLegacySQL:                qq.UseLegacySql == nil || *qq.UseLegacySql,
 		TimePartitioning:            bqToTimePartitioning(qq.TimePartitioning),
+		RangePartitioning:           bqToRangePartitioning(qq.RangePartitioning),
 		Clustering:                  bqToClustering(qq.Clustering),
 		DestinationEncryptionConfig: bqToEncryptionConfig(qq.DestinationEncryptionConfiguration),
 		SchemaUpdateOptions:         qq.SchemaUpdateOptions,
diff --git a/bigquery/query_test.go b/bigquery/query_test.go
index f56eeef..f50678b 100644
--- a/bigquery/query_test.go
+++ b/bigquery/query_test.go
@@ -275,6 +275,52 @@
 				return j
 			}(),
 		},
+		{
+			dst: c.Dataset("dataset-id").Table("table-id"),
+			src: &QueryConfig{
+				Q:                "query string",
+				DefaultProjectID: "def-project-id",
+				DefaultDatasetID: "def-dataset-id",
+				TimePartitioning: &TimePartitioning{},
+			},
+			want: func() *bq.Job {
+				j := defaultQueryJob()
+				j.Configuration.Query.ForceSendFields = nil
+				j.Configuration.Query.TimePartitioning = &bq.TimePartitioning{
+					Type: "DAY",
+				}
+				return j
+			}(),
+		},
+		{
+			dst: c.Dataset("dataset-id").Table("table-id"),
+			src: &QueryConfig{
+				Q:                "query string",
+				DefaultProjectID: "def-project-id",
+				DefaultDatasetID: "def-dataset-id",
+				RangePartitioning: &RangePartitioning{
+					Field: "foo",
+					Range: &RangePartitioningRange{
+						Start:    1,
+						End:      2,
+						Interval: 3,
+					},
+				},
+			},
+			want: func() *bq.Job {
+				j := defaultQueryJob()
+				j.Configuration.Query.ForceSendFields = nil
+				j.Configuration.Query.RangePartitioning = &bq.RangePartitioning{
+					Field: "foo",
+					Range: &bq.RangePartitioningRange{
+						Start:    1,
+						End:      2,
+						Interval: 3,
+					},
+				}
+				return j
+			}(),
+		},
 	}
 	for i, tc := range testCases {
 		query := c.Query("")
diff --git a/bigquery/table.go b/bigquery/table.go
index 2eb5843..dcea1d3 100644
--- a/bigquery/table.go
+++ b/bigquery/table.go
@@ -63,9 +63,14 @@
 	// Deprecated: use UseLegacySQL.
 	UseStandardSQL bool
 
-	// If non-nil, the table is partitioned by time.
+	// If non-nil, the table is partitioned by time. Only one of
+	// time partitioning or range partitioning can be specified.
 	TimePartitioning *TimePartitioning
 
+	// It non-nil, the table is partitioned by integer range.  Only one of
+	// time partitioning or range partitioning can be specified.
+	RangePartitioning *RangePartitioning
+
 	// If set to true, queries that reference this table must specify a
 	// partition filter (e.g. a WHERE clause) that can be used to eliminate
 	// partitions. Used to prevent unintentional full data scans on large
@@ -208,6 +213,68 @@
 	}
 }
 
+// RangePartitioning indicates an integer-range based storage organization strategy.
+type RangePartitioning struct {
+	// The field by which the table is partitioned.
+	// This field must be a top-level field, and must be typed as an
+	// INTEGER/INT64.
+	Field string
+	// The details of how partitions are mapped onto the integer range.
+	Range *RangePartitioningRange
+}
+
+// RangePartitioningRange defines the boundaries and width of partitioned values.
+type RangePartitioningRange struct {
+	// The start value of defined range of values, inclusive of the specified value.
+	Start int64
+	// The end of the defined range of values, exclusive of the defined value.
+	End int64
+	// The width of each interval range.
+	Interval int64
+}
+
+func (rp *RangePartitioning) toBQ() *bq.RangePartitioning {
+	if rp == nil {
+		return nil
+	}
+	return &bq.RangePartitioning{
+		Field: rp.Field,
+		Range: rp.Range.toBQ(),
+	}
+}
+
+func bqToRangePartitioning(q *bq.RangePartitioning) *RangePartitioning {
+	if q == nil {
+		return nil
+	}
+	return &RangePartitioning{
+		Field: q.Field,
+		Range: bqToRangePartitioningRange(q.Range),
+	}
+}
+
+func bqToRangePartitioningRange(br *bq.RangePartitioningRange) *RangePartitioningRange {
+	if br == nil {
+		return nil
+	}
+	return &RangePartitioningRange{
+		Start:    br.Start,
+		End:      br.End,
+		Interval: br.Interval,
+	}
+}
+
+func (rpr *RangePartitioningRange) toBQ() *bq.RangePartitioningRange {
+	if rpr == nil {
+		return nil
+	}
+	return &bq.RangePartitioningRange{
+		Start:    rpr.Start,
+		End:      rpr.End,
+		Interval: rpr.Interval,
+	}
+}
+
 // Clustering governs the organization of data within a partitioned table.
 // For more information, see https://cloud.google.com/bigquery/docs/clustered-tables
 type Clustering struct {
@@ -344,6 +411,7 @@
 		return nil, errors.New("bigquery: UseLegacy/StandardSQL requires ViewQuery")
 	}
 	t.TimePartitioning = tm.TimePartitioning.toBQ()
+	t.RangePartitioning = tm.RangePartitioning.toBQ()
 	t.Clustering = tm.Clustering.toBQ()
 	t.RequirePartitionFilter = tm.RequirePartitionFilter
 
@@ -432,6 +500,7 @@
 		md.UseLegacySQL = t.View.UseLegacySql
 	}
 	md.TimePartitioning = bqToTimePartitioning(t.TimePartitioning)
+	md.RangePartitioning = bqToRangePartitioning(t.RangePartitioning)
 	md.Clustering = bqToClustering(t.Clustering)
 	if t.StreamingBuffer != nil {
 		md.StreamingBuffer = &StreamingBuffer{
diff --git a/bigquery/table_test.go b/bigquery/table_test.go
index 73e4bf9..dad4189 100644
--- a/bigquery/table_test.go
+++ b/bigquery/table_test.go
@@ -200,6 +200,35 @@
 			},
 		},
 		{
+			&TableMetadata{
+				RangePartitioning: &RangePartitioning{
+					Field: "ofNumbers",
+					Range: &RangePartitioningRange{
+						Start:    1,
+						End:      100,
+						Interval: 5,
+					},
+				},
+				Clustering: &Clustering{
+					Fields: []string{"cfield1"},
+				},
+			},
+			&bq.Table{
+
+				RangePartitioning: &bq.RangePartitioning{
+					Field: "ofNumbers",
+					Range: &bq.RangePartitioningRange{
+						Start:    1,
+						End:      100,
+						Interval: 5,
+					},
+				},
+				Clustering: &bq.Clustering{
+					Fields: []string{"cfield1"},
+				},
+			},
+		},
+		{
 			&TableMetadata{ExpirationTime: NeverExpire},
 			&bq.Table{ExpirationTime: 0},
 		},