bigquery: add range partitioning for tables
This feature adds the ability to define tables partitioning based
on integer ranges. Previously, only time-partitioning was enabled
for organization table storage.
Partioning can be defined by invoking table creation directly, or
as a byproduct of creating a new table from a load or query job.
Change-Id: I4659c9fc0d14a66e6a7819e7cd0d13e31abd2f94
Reviewed-on: https://code-review.googlesource.com/c/gocloud/+/45890
Reviewed-by: kokoro <noreply+kokoro@google.com>
Reviewed-by: Alex Hong <hongalex@google.com>
diff --git a/bigquery/integration_test.go b/bigquery/integration_test.go
index 8e72860..a86000c 100644
--- a/bigquery/integration_test.go
+++ b/bigquery/integration_test.go
@@ -396,6 +396,65 @@
}
+func TestIntegration_RangePartitioning(t *testing.T) {
+ if client == nil {
+ t.Skip("Integration tests skipped")
+ }
+ ctx := context.Background()
+ table := dataset.Table(tableIDs.New())
+
+ schema := Schema{
+ {Name: "name", Type: StringFieldType},
+ {Name: "somevalue", Type: IntegerFieldType},
+ }
+
+ wantedRange := &RangePartitioningRange{
+ Start: 10,
+ End: 135,
+ Interval: 25,
+ }
+
+ wantedPartitioning := &RangePartitioning{
+ Field: "somevalue",
+ Range: wantedRange,
+ }
+
+ err := table.Create(context.Background(), &TableMetadata{
+ Schema: schema,
+ RangePartitioning: wantedPartitioning,
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer table.Delete(ctx)
+ md, err := table.Metadata(ctx)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ if md.RangePartitioning == nil {
+ t.Fatal("expected range partitioning, got nil")
+ }
+ got := md.RangePartitioning.Field
+ if wantedPartitioning.Field != got {
+ t.Errorf("RangePartitioning Field: got %v, want %v", got, wantedPartitioning.Field)
+ }
+ if md.RangePartitioning.Range == nil {
+ t.Fatal("expected a range definition, got nil")
+ }
+ gotInt64 := md.RangePartitioning.Range.Start
+ if gotInt64 != wantedRange.Start {
+ t.Errorf("Range.Start: got %v, wanted %v", gotInt64, wantedRange.Start)
+ }
+ gotInt64 = md.RangePartitioning.Range.End
+ if gotInt64 != wantedRange.End {
+ t.Errorf("Range.End: got %v, wanted %v", gotInt64, wantedRange.End)
+ }
+ gotInt64 = md.RangePartitioning.Range.Interval
+ if gotInt64 != wantedRange.Interval {
+ t.Errorf("Range.Interval: got %v, wanted %v", gotInt64, wantedRange.Interval)
+ }
+}
func TestIntegration_RemoveTimePartitioning(t *testing.T) {
if client == nil {
t.Skip("Integration tests skipped")
diff --git a/bigquery/load.go b/bigquery/load.go
index b275707..26d4978 100644
--- a/bigquery/load.go
+++ b/bigquery/load.go
@@ -44,6 +44,9 @@
// If non-nil, the destination table is partitioned by time.
TimePartitioning *TimePartitioning
+ // If non-nil, the destination table is partitioned by integer range.
+ RangePartitioning *RangePartitioning
+
// Clustering specifies the data clustering configuration for the destination table.
Clustering *Clustering
@@ -68,6 +71,7 @@
WriteDisposition: string(l.WriteDisposition),
DestinationTable: l.Dst.toBQ(),
TimePartitioning: l.TimePartitioning.toBQ(),
+ RangePartitioning: l.RangePartitioning.toBQ(),
Clustering: l.Clustering.toBQ(),
DestinationEncryptionConfiguration: l.DestinationEncryptionConfig.toBQ(),
SchemaUpdateOptions: l.SchemaUpdateOptions,
@@ -85,6 +89,7 @@
WriteDisposition: TableWriteDisposition(q.Load.WriteDisposition),
Dst: bqToTable(q.Load.DestinationTable, c),
TimePartitioning: bqToTimePartitioning(q.Load.TimePartitioning),
+ RangePartitioning: bqToRangePartitioning(q.Load.RangePartitioning),
Clustering: bqToClustering(q.Load.Clustering),
DestinationEncryptionConfig: bqToEncryptionConfig(q.Load.DestinationEncryptionConfiguration),
SchemaUpdateOptions: q.Load.SchemaUpdateOptions,
diff --git a/bigquery/load_test.go b/bigquery/load_test.go
index dcb73fa..3b6f59f 100644
--- a/bigquery/load_test.go
+++ b/bigquery/load_test.go
@@ -272,6 +272,57 @@
return j
}(),
},
+ {
+ dst: c.Dataset("dataset-id").Table("table-id"),
+ src: func() *ReaderSource {
+ r := NewReaderSource(strings.NewReader("foo"))
+ return r
+ }(),
+ config: LoadConfig{
+ TimePartitioning: &TimePartitioning{
+ Field: "somefield",
+ },
+ },
+ want: func() *bq.Job {
+ j := defaultLoadJob()
+ j.Configuration.Load.SourceUris = nil
+ j.Configuration.Load.TimePartitioning = &bq.TimePartitioning{
+ Field: "somefield",
+ Type: "DAY",
+ }
+ return j
+ }(),
+ },
+ {
+ dst: c.Dataset("dataset-id").Table("table-id"),
+ src: func() *ReaderSource {
+ r := NewReaderSource(strings.NewReader("foo"))
+ return r
+ }(),
+ config: LoadConfig{
+ RangePartitioning: &RangePartitioning{
+ Field: "somefield",
+ Range: &RangePartitioningRange{
+ Start: 1,
+ End: 2,
+ Interval: 3,
+ },
+ },
+ },
+ want: func() *bq.Job {
+ j := defaultLoadJob()
+ j.Configuration.Load.SourceUris = nil
+ j.Configuration.Load.RangePartitioning = &bq.RangePartitioning{
+ Field: "somefield",
+ Range: &bq.RangePartitioningRange{
+ Start: 1,
+ End: 2,
+ Interval: 3,
+ },
+ }
+ return j
+ }(),
+ },
}
for i, tc := range testCases {
diff --git a/bigquery/query.go b/bigquery/query.go
index 735a54c..39c1df0 100644
--- a/bigquery/query.go
+++ b/bigquery/query.go
@@ -105,6 +105,10 @@
// for the destination table.
TimePartitioning *TimePartitioning
+ // RangePartitioning specifies integer range-based partitioning
+ // for the destination table.
+ RangePartitioning *RangePartitioning
+
// Clustering specifies the data clustering configuration for the destination table.
Clustering *Clustering
@@ -137,6 +141,7 @@
Priority: string(qc.Priority),
MaximumBytesBilled: qc.MaxBytesBilled,
TimePartitioning: qc.TimePartitioning.toBQ(),
+ RangePartitioning: qc.RangePartitioning.toBQ(),
Clustering: qc.Clustering.toBQ(),
DestinationEncryptionConfiguration: qc.DestinationEncryptionConfig.toBQ(),
SchemaUpdateOptions: qc.SchemaUpdateOptions,
@@ -208,6 +213,7 @@
MaxBytesBilled: qq.MaximumBytesBilled,
UseLegacySQL: qq.UseLegacySql == nil || *qq.UseLegacySql,
TimePartitioning: bqToTimePartitioning(qq.TimePartitioning),
+ RangePartitioning: bqToRangePartitioning(qq.RangePartitioning),
Clustering: bqToClustering(qq.Clustering),
DestinationEncryptionConfig: bqToEncryptionConfig(qq.DestinationEncryptionConfiguration),
SchemaUpdateOptions: qq.SchemaUpdateOptions,
diff --git a/bigquery/query_test.go b/bigquery/query_test.go
index f56eeef..f50678b 100644
--- a/bigquery/query_test.go
+++ b/bigquery/query_test.go
@@ -275,6 +275,52 @@
return j
}(),
},
+ {
+ dst: c.Dataset("dataset-id").Table("table-id"),
+ src: &QueryConfig{
+ Q: "query string",
+ DefaultProjectID: "def-project-id",
+ DefaultDatasetID: "def-dataset-id",
+ TimePartitioning: &TimePartitioning{},
+ },
+ want: func() *bq.Job {
+ j := defaultQueryJob()
+ j.Configuration.Query.ForceSendFields = nil
+ j.Configuration.Query.TimePartitioning = &bq.TimePartitioning{
+ Type: "DAY",
+ }
+ return j
+ }(),
+ },
+ {
+ dst: c.Dataset("dataset-id").Table("table-id"),
+ src: &QueryConfig{
+ Q: "query string",
+ DefaultProjectID: "def-project-id",
+ DefaultDatasetID: "def-dataset-id",
+ RangePartitioning: &RangePartitioning{
+ Field: "foo",
+ Range: &RangePartitioningRange{
+ Start: 1,
+ End: 2,
+ Interval: 3,
+ },
+ },
+ },
+ want: func() *bq.Job {
+ j := defaultQueryJob()
+ j.Configuration.Query.ForceSendFields = nil
+ j.Configuration.Query.RangePartitioning = &bq.RangePartitioning{
+ Field: "foo",
+ Range: &bq.RangePartitioningRange{
+ Start: 1,
+ End: 2,
+ Interval: 3,
+ },
+ }
+ return j
+ }(),
+ },
}
for i, tc := range testCases {
query := c.Query("")
diff --git a/bigquery/table.go b/bigquery/table.go
index 2eb5843..dcea1d3 100644
--- a/bigquery/table.go
+++ b/bigquery/table.go
@@ -63,9 +63,14 @@
// Deprecated: use UseLegacySQL.
UseStandardSQL bool
- // If non-nil, the table is partitioned by time.
+ // If non-nil, the table is partitioned by time. Only one of
+ // time partitioning or range partitioning can be specified.
TimePartitioning *TimePartitioning
+ // It non-nil, the table is partitioned by integer range. Only one of
+ // time partitioning or range partitioning can be specified.
+ RangePartitioning *RangePartitioning
+
// If set to true, queries that reference this table must specify a
// partition filter (e.g. a WHERE clause) that can be used to eliminate
// partitions. Used to prevent unintentional full data scans on large
@@ -208,6 +213,68 @@
}
}
+// RangePartitioning indicates an integer-range based storage organization strategy.
+type RangePartitioning struct {
+ // The field by which the table is partitioned.
+ // This field must be a top-level field, and must be typed as an
+ // INTEGER/INT64.
+ Field string
+ // The details of how partitions are mapped onto the integer range.
+ Range *RangePartitioningRange
+}
+
+// RangePartitioningRange defines the boundaries and width of partitioned values.
+type RangePartitioningRange struct {
+ // The start value of defined range of values, inclusive of the specified value.
+ Start int64
+ // The end of the defined range of values, exclusive of the defined value.
+ End int64
+ // The width of each interval range.
+ Interval int64
+}
+
+func (rp *RangePartitioning) toBQ() *bq.RangePartitioning {
+ if rp == nil {
+ return nil
+ }
+ return &bq.RangePartitioning{
+ Field: rp.Field,
+ Range: rp.Range.toBQ(),
+ }
+}
+
+func bqToRangePartitioning(q *bq.RangePartitioning) *RangePartitioning {
+ if q == nil {
+ return nil
+ }
+ return &RangePartitioning{
+ Field: q.Field,
+ Range: bqToRangePartitioningRange(q.Range),
+ }
+}
+
+func bqToRangePartitioningRange(br *bq.RangePartitioningRange) *RangePartitioningRange {
+ if br == nil {
+ return nil
+ }
+ return &RangePartitioningRange{
+ Start: br.Start,
+ End: br.End,
+ Interval: br.Interval,
+ }
+}
+
+func (rpr *RangePartitioningRange) toBQ() *bq.RangePartitioningRange {
+ if rpr == nil {
+ return nil
+ }
+ return &bq.RangePartitioningRange{
+ Start: rpr.Start,
+ End: rpr.End,
+ Interval: rpr.Interval,
+ }
+}
+
// Clustering governs the organization of data within a partitioned table.
// For more information, see https://cloud.google.com/bigquery/docs/clustered-tables
type Clustering struct {
@@ -344,6 +411,7 @@
return nil, errors.New("bigquery: UseLegacy/StandardSQL requires ViewQuery")
}
t.TimePartitioning = tm.TimePartitioning.toBQ()
+ t.RangePartitioning = tm.RangePartitioning.toBQ()
t.Clustering = tm.Clustering.toBQ()
t.RequirePartitionFilter = tm.RequirePartitionFilter
@@ -432,6 +500,7 @@
md.UseLegacySQL = t.View.UseLegacySql
}
md.TimePartitioning = bqToTimePartitioning(t.TimePartitioning)
+ md.RangePartitioning = bqToRangePartitioning(t.RangePartitioning)
md.Clustering = bqToClustering(t.Clustering)
if t.StreamingBuffer != nil {
md.StreamingBuffer = &StreamingBuffer{
diff --git a/bigquery/table_test.go b/bigquery/table_test.go
index 73e4bf9..dad4189 100644
--- a/bigquery/table_test.go
+++ b/bigquery/table_test.go
@@ -200,6 +200,35 @@
},
},
{
+ &TableMetadata{
+ RangePartitioning: &RangePartitioning{
+ Field: "ofNumbers",
+ Range: &RangePartitioningRange{
+ Start: 1,
+ End: 100,
+ Interval: 5,
+ },
+ },
+ Clustering: &Clustering{
+ Fields: []string{"cfield1"},
+ },
+ },
+ &bq.Table{
+
+ RangePartitioning: &bq.RangePartitioning{
+ Field: "ofNumbers",
+ Range: &bq.RangePartitioningRange{
+ Start: 1,
+ End: 100,
+ Interval: 5,
+ },
+ },
+ Clustering: &bq.Clustering{
+ Fields: []string{"cfield1"},
+ },
+ },
+ },
+ {
&TableMetadata{ExpirationTime: NeverExpire},
&bq.Table{ExpirationTime: 0},
},