blob: a707b5b9632dc1fa75dd463a5faaea870b106379 [file] [log] [blame]
// Copyright 2013 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package main
import (
"container/list"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"math"
"math/rand"
"net/http"
"os"
"strconv"
"strings"
"time"
bigquery "google.golang.org/api/bigquery/v2"
storage "google.golang.org/api/storage/v1"
)
const (
GB = 1 << 30
MaxBackoff = 30000
BaseBackoff = 250
BackoffGrowthFactor = 1.8
BackoffGrowthDamper = 0.25
JobStatusDone = "DONE"
DatasetAlreadyExists = "Already Exists: Dataset"
TableWriteEmptyDisposition = "WRITE_EMPTY"
)
func init() {
scope := fmt.Sprintf("%s %s %s", bigquery.BigqueryScope,
storage.DevstorageReadOnlyScope,
"https://www.googleapis.com/auth/userinfo.profile")
registerDemo("bigquery", scope, bqMain)
}
// This example demonstrates loading objects from Google Cloud Storage into
// BigQuery. Objects are specified by their bucket and a name prefix. Each
// object will be loaded into a new table identified by the object name minus
// any file extension. All tables are added to the specified dataset (one will
// be created if necessary). Currently, tables will not be overwritten and an
// attempt to load an object into a dataset that already contains its table
// will emit an error message indicating the table already exists.
// A schema file must be provided and it will be applied to every object/table.
// Example usage:
// go-api-demo -clientid="my-clientid" -secret="my-secret" bq myProject
// myDataBucket datafile2013070 DataFiles2013
// ./datafile_schema.json 100
//
// This will load all objects (e.g. all data files from July 2013) from
// gs://myDataBucket into a (possibly new) BigQuery dataset named DataFiles2013
// using the schema file provided and allowing up to 100 bad records. Assuming
// each object is named like datafileYYYYMMDD.csv.gz and all of July's files are
// stored in the bucket, 9 tables will be created named like datafile201307DD
// where DD ranges from 01 to 09, inclusive.
// When the program completes, it will emit a results line similar to:
//
// 9 files loaded in 3m58s (18m2.708s). Size: 7.18GB Rows: 7130725
//
// The total elapsed time from the start of first job to the end of the last job
// (effectively wall clock time) is shown. In parenthesis is the aggregate time
// taken to load all tables.
func bqMain(client *http.Client, argv []string) {
if len(argv) != 6 {
fmt.Fprintln(os.Stderr,
"Usage: bq project_id bucket prefix dataset schema max_bad_records")
return
}
var (
project = argv[0]
bucket = argv[1]
objPrefix = argv[2]
datasetId = argv[3]
schemaFile = argv[4]
)
badRecords, err := strconv.ParseInt(argv[5], 10, 64)
if err != nil {
fmt.Fprintln(os.Stderr, err)
return
}
rand.Seed(time.Now().UnixNano())
service, err := storage.New(client)
if err != nil {
log.Fatalf("Unable to create Storage service: %v", err)
}
// Get the list of objects in the bucket matching the specified prefix.
list := service.Objects.List(bucket)
list.Prefix(objPrefix)
objects, err := list.Do()
if err != nil {
fmt.Fprintln(os.Stderr, err)
return
}
// Create the wrapper and insert the (new) dataset.
dataset, err := newBQDataset(client, project, datasetId)
if err != nil {
fmt.Fprintln(os.Stderr, err)
return
}
if err = dataset.insert(true); err != nil {
fmt.Fprintln(os.Stderr, err)
return
}
objectSource := &tableSource{
maxBadRecords: badRecords,
disposition: TableWriteEmptyDisposition,
}
// Load the schema from disk.
f, err := ioutil.ReadFile(schemaFile)
if err != nil {
fmt.Fprintln(os.Stderr, err)
return
}
if err = json.Unmarshal(f, &objectSource.schema); err != nil {
fmt.Fprintln(os.Stderr, err)
return
}
// Assumes all objects have .csv, .csv.gz (or no) extension.
tableIdFromObject := func(name string) string {
return strings.TrimSuffix(strings.TrimSuffix(name, ".gz"), ".csv")
}
// A jobset is way to group a collection of jobs together for monitoring.
// For this example, we just use the name of the bucket and object prefix.
jobset := fmt.Sprintf("%s:%s", bucket, objPrefix)
fmt.Fprintf(os.Stderr, "\nLoading %d objects.\n", len(objects.Items))
// Load each object into a dataset of the same name (minus any extension).
// A successful insert call will inject the job into our queue for monitoring.
for _, o := range objects.Items {
objectSource.id = tableIdFromObject(o.Name)
objectSource.uri = fmt.Sprintf("gs://%s/%s", o.Bucket, o.Name)
if err = dataset.load(jobset, objectSource); err != nil {
fmt.Fprintln(os.Stderr, err)
}
}
dataset.monitor(jobset)
}
// Wraps the BigQuery service and dataset and provides some helper functions.
type bqDataset struct {
project string
id string
bq *bigquery.Service
dataset *bigquery.Dataset
jobsets map[string]*list.List
}
func newBQDataset(client *http.Client, dsProj string, dsId string) (*bqDataset,
error) {
service, err := bigquery.New(client)
if err != nil {
log.Fatalf("Unable to create BigQuery service: %v", err)
}
return &bqDataset{
project: dsProj,
id: dsId,
bq: service,
dataset: &bigquery.Dataset{
DatasetReference: &bigquery.DatasetReference{
DatasetId: dsId,
ProjectId: dsProj,
},
},
jobsets: make(map[string]*list.List),
}, nil
}
func (ds *bqDataset) insert(existsOK bool) error {
call := ds.bq.Datasets.Insert(ds.project, ds.dataset)
_, err := call.Do()
if err != nil && (!existsOK || !strings.Contains(err.Error(),
DatasetAlreadyExists)) {
return err
}
return nil
}
type tableSource struct {
id string
uri string
schema bigquery.TableSchema
maxBadRecords int64
disposition string
}
func (ds *bqDataset) load(jobset string, source *tableSource) error {
job := &bigquery.Job{
Configuration: &bigquery.JobConfiguration{
Load: &bigquery.JobConfigurationLoad{
DestinationTable: &bigquery.TableReference{
DatasetId: ds.dataset.DatasetReference.DatasetId,
ProjectId: ds.project,
TableId: source.id,
},
MaxBadRecords: source.maxBadRecords,
Schema: &source.schema,
SourceUris: []string{source.uri},
WriteDisposition: source.disposition,
},
},
}
call := ds.bq.Jobs.Insert(ds.project, job)
job, err := call.Do()
if err != nil {
return err
}
_, ok := ds.jobsets[jobset]
if !ok {
ds.jobsets[jobset] = list.New()
}
ds.jobsets[jobset].PushBack(job)
return nil
}
func (ds *bqDataset) getJob(id string) (*bigquery.Job, error) {
return ds.bq.Jobs.Get(ds.project, id).Do()
}
func (ds *bqDataset) monitor(jobset string) {
jobq, ok := ds.jobsets[jobset]
if !ok {
return
}
var backoff float64 = BaseBackoff
pause := func(grow bool) {
if grow {
backoff *= BackoffGrowthFactor
backoff -= (backoff * rand.Float64() * BackoffGrowthDamper)
backoff = math.Min(backoff, MaxBackoff)
fmt.Fprintf(os.Stderr, "[%s] Checking remaining %d jobs...\n", jobset,
1+jobq.Len())
}
time.Sleep(time.Duration(backoff) * time.Millisecond)
}
var stats jobStats
// Track a 'head' pending job in queue for detecting cycling.
head := ""
// Loop until all jobs are done - with either success or error.
for jobq.Len() > 0 {
jel := jobq.Front()
job := jel.Value.(*bigquery.Job)
jobq.Remove(jel)
jid := job.JobReference.JobId
loop := false
// Check and possibly pick a new head job id.
if len(head) == 0 {
head = jid
} else {
if jid == head {
loop = true
}
}
// Retrieve the job's current status.
pause(loop)
j, err := ds.getJob(jid)
if err != nil {
fmt.Fprintln(os.Stderr, err)
// In this case of a transient API error, we want keep the job.
if j == nil {
jobq.PushBack(job)
} else {
// Must reset head tracker if job is discarded.
if loop {
head = ""
backoff = BaseBackoff
}
}
continue
}
// Reassign with the updated job data (from Get).
// We don't use j here as Get might return nil for this value.
job = j
if job.Status.State != JobStatusDone {
jobq.PushBack(job)
continue
}
if res := job.Status.ErrorResult; res != nil {
fmt.Fprintln(os.Stderr, res.Message)
} else {
stat := job.Statistics
lstat := stat.Load
stats.files += 1
stats.bytesIn += lstat.InputFileBytes
stats.bytesOut += lstat.OutputBytes
stats.rows += lstat.OutputRows
stats.elapsed +=
time.Duration(stat.EndTime-stat.StartTime) * time.Millisecond
if stats.start.IsZero() {
stats.start = time.Unix(stat.StartTime/1000, 0)
} else {
t := time.Unix(stat.StartTime/1000, 0)
if stats.start.Sub(t) > 0 {
stats.start = t
}
}
if stats.finish.IsZero() {
stats.finish = time.Unix(stat.EndTime/1000, 0)
} else {
t := time.Unix(stat.EndTime/1000, 0)
if t.Sub(stats.finish) > 0 {
stats.finish = t
}
}
}
// When the head job is processed reset the backoff since the loads
// run in BQ in parallel.
if loop {
head = ""
backoff = BaseBackoff
}
}
fmt.Fprintf(os.Stderr, "%#v\n", stats)
}
type jobStats struct {
// Number of files (sources) loaded.
files int64
// Bytes read from source (possibly compressed).
bytesIn int64
// Bytes loaded into BigQuery (uncompressed).
bytesOut int64
// Rows loaded into BigQuery.
rows int64
// Time taken to load source into table.
elapsed time.Duration
// Start time of the job.
start time.Time
// End time of the job.
finish time.Time
}
func (s jobStats) GoString() string {
return fmt.Sprintf("\n%d files loaded in %v (%v). Size: %.2fGB Rows: %d\n",
s.files, s.finish.Sub(s.start), s.elapsed, float64(s.bytesOut)/GB,
s.rows)
}