| // Copyright 2017 Google LLC |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| //go:build ignore |
| // +build ignore |
| |
| package main |
| |
| import ( |
| "bytes" |
| "context" |
| "encoding/json" |
| "flag" |
| "fmt" |
| "io/ioutil" |
| "log" |
| "os" |
| "strings" |
| "time" |
| |
| "cloud.google.com/go/bigquery" |
| "google.golang.org/api/iterator" |
| ) |
| |
| // profileTag is a simple annotation for benchmark runs. |
| type profileTag struct { |
| Key string `json:"key,omitempty" bigquery:"key"` |
| Value string `json:"value,omitempty" bigquery:"value"` |
| } |
| |
| type tags []*profileTag |
| |
| func (ts *tags) String() string { |
| var s strings.Builder |
| fp := len(*ts) |
| for i, t := range *ts { |
| s.WriteString(fmt.Sprintf("%s:%s", t.Key, t.Value)) |
| if i < fp-1 { |
| s.WriteString(",") |
| } |
| } |
| return s.String() |
| } |
| |
| func (ts *tags) Set(value string) error { |
| if value == "" { |
| return nil |
| } |
| parts := strings.SplitN(value, ":", 2) |
| if len(parts) == 2 { |
| // both a key and value |
| *ts = append(*ts, &profileTag{Key: parts[0], Value: parts[1]}) |
| } else { |
| *ts = append(*ts, &profileTag{Key: value}) |
| } |
| return nil |
| } |
| |
| // AsSlice is used to simplify schema inference. |
| func (ts *tags) AsSlice() []*profileTag { |
| var out []*profileTag |
| for _, v := range *ts { |
| out = append(out, v) |
| } |
| return out |
| } |
| |
| // profiledQuery provides metadata about query invocations and performance. |
| type profiledQuery struct { |
| // Used to describe a set of related queries. |
| GroupName string `json:"groupname" bigquery:"groupname"` |
| // User to describe a single query configuration. |
| Name string `json:"name" bigquery:"name"` |
| // Tags allow an arbitrary list of KV pairs for denoting specifics of a profile. |
| Tags []*profileTag `json:"tags" bigquery:"tags"` |
| // Persisted query configuration. |
| Query *bigquery.Query `json:"-" bigquery:"-"` |
| // Just the query string. |
| SQL string |
| // Timing details from multiple invocations. |
| Runs []*timingInfo `json:"runs" bigquery:"runs"` |
| // When this data was logged. |
| EventTime time.Time `json:"event_time" bigquery:"event_time"` |
| } |
| |
| // timingInfo provides measurements for a single invocation of a query. |
| type timingInfo struct { |
| // If the query failed in error, this retains a copy of the error string |
| ErrorString string `json:"errorstring,omitempty" bigquery:"errorstring"` |
| // Start time from the client perspective, e.q. calling Read() to insert and wait for an iterator |
| StartTime time.Time `json:"start_time,omitempty" bigquery:"start_time"` |
| // Measured when the Read() call returns. |
| QueryEndTime time.Time `json:"query_end_time,omitempty" bigquery:"query_end_time"` |
| // Measured when consumer receives the first row via the iterator. |
| FirstRowReturnedTime time.Time `json:"first_row_returned_time,omitempty" bigquery:"first_row_returned_time"` |
| // Measured when consumer receives iterator.Done |
| AllRowsReturnedTime time.Time `json:"all_rows_returned_time,omitempty" bigquery:"all_rows_returned_time"` |
| // Number of rows fetched through the iterator. |
| TotalRows int64 `json:"total_rows,omitempty" bigquery:"total_rows"` |
| } |
| |
| // Summary provides a human-readable string that summarizes the significant timing details. |
| func (t *timingInfo) Summary() string { |
| noVal := "NODATA" |
| var buf bytes.Buffer |
| fmt.Fprintf(&buf, "QUERYTIME ") |
| if !t.QueryEndTime.IsZero() { |
| fmt.Fprintf(&buf, "%v", t.QueryEndTime.Sub(t.StartTime)) |
| } else { |
| fmt.Fprintf(&buf, noVal) |
| } |
| |
| fmt.Fprintf(&buf, " FIRSTROW ") |
| if !t.FirstRowReturnedTime.IsZero() { |
| fmt.Fprintf(&buf, "%v (+%v)", t.FirstRowReturnedTime.Sub(t.StartTime), t.FirstRowReturnedTime.Sub(t.QueryEndTime)) |
| } else { |
| fmt.Fprintf(&buf, noVal) |
| } |
| |
| fmt.Fprintf(&buf, " ALLROWS ") |
| if !t.AllRowsReturnedTime.IsZero() { |
| fmt.Fprintf(&buf, "%v (+%v)", t.AllRowsReturnedTime.Sub(t.StartTime), t.AllRowsReturnedTime.Sub(t.FirstRowReturnedTime)) |
| } else { |
| fmt.Fprintf(&buf, noVal) |
| } |
| if t.TotalRows > 0 { |
| fmt.Fprintf(&buf, " ROWS %d", t.TotalRows) |
| } |
| if t.ErrorString != "" { |
| fmt.Fprintf(&buf, " ERRORED %s ", t.ErrorString) |
| } |
| return buf.String() |
| } |
| |
| // measureSelectQuery invokes a query given a config and returns timing information. |
| // |
| // This instrumentation is meant for the common query case. |
| func measureSelectQuery(ctx context.Context, q *bigquery.Query) *timingInfo { |
| timing := &timingInfo{ |
| StartTime: time.Now(), |
| } |
| it, err := q.Read(ctx) |
| timing.QueryEndTime = time.Now() |
| if err != nil { |
| timing.ErrorString = err.Error() |
| return timing |
| } |
| var row []bigquery.Value |
| var rowCount int64 |
| for { |
| err := it.Next(&row) |
| if rowCount == 0 { |
| timing.FirstRowReturnedTime = time.Now() |
| } |
| if err == iterator.Done { |
| timing.AllRowsReturnedTime = time.Now() |
| timing.TotalRows = rowCount |
| break |
| } |
| if err != nil { |
| timing.ErrorString = err.Error() |
| return timing |
| } |
| rowCount++ |
| } |
| return timing |
| } |
| |
| // runBenchmarks processes the input file and instruments the queries. |
| // It currently instruments queries serially to reduce variance due to concurrent execution on either the backend or in this client. |
| func runBenchmarks(ctx context.Context, client *bigquery.Client, filename string, tags *tags, reruns int) (profiles []*profiledQuery, err error) { |
| |
| queriesJSON, err := ioutil.ReadFile(filename) |
| if err != nil { |
| return nil, fmt.Errorf("failed to read queries files: %v", err) |
| } |
| |
| var benchmarkInput map[string]map[string]string |
| if err := json.Unmarshal(queriesJSON, &benchmarkInput); err != nil { |
| return nil, fmt.Errorf("failed to unmarshall queries data: %v", err) |
| } |
| |
| convertedTags := tags.AsSlice() |
| |
| for groupName, m := range benchmarkInput { |
| for id, sql := range m { |
| prof := &profiledQuery{ |
| GroupName: groupName, |
| Name: id, |
| SQL: sql, |
| Tags: convertedTags, |
| EventTime: time.Now(), |
| } |
| fmt.Printf("Measuring %s : %s", groupName, id) |
| query := client.Query(sql) |
| prof.Query = query |
| |
| for i := 0; i < reruns; i++ { |
| fmt.Printf(".") |
| prof.Runs = append(prof.Runs, measureSelectQuery(ctx, query)) |
| } |
| fmt.Println() |
| profiles = append(profiles, prof) |
| } |
| } |
| fmt.Println() |
| return profiles, nil |
| } |
| |
| // printResults prints information about collected query profiles. |
| func printResults(queries []*profiledQuery) { |
| for i, prof := range queries { |
| fmt.Printf("%d: (%s:%s)\n", i, prof.GroupName, prof.Name) |
| fmt.Printf("SQL: %s\n", prof.Query.Q) |
| fmt.Printf("MEASUREMENTS\n") |
| for j, timing := range prof.Runs { |
| fmt.Printf("\t\t(%d) %s\n", j, timing.Summary()) |
| } |
| fmt.Println() |
| } |
| } |
| |
| // prepareTable ensures a table exists, and optionally creates it if directed |
| func prepareTable(ctx context.Context, client *bigquery.Client, table string, create bool) (*bigquery.Table, error) { |
| // Ensure table exists before streaming results, and possibly create it if directed. |
| parts := strings.Split(table, ".") |
| if len(parts) != 3 { |
| return nil, fmt.Errorf("Expected table in p.d.t format, got: %s", table) |
| } |
| tRef := client.DatasetInProject(parts[0], parts[1]).Table(parts[2]) |
| // check with backend |
| _, err := tRef.Metadata(ctx) |
| if err != nil { |
| if create { |
| schema, err := bigquery.InferSchema(profiledQuery{}) |
| if err != nil { |
| return nil, fmt.Errorf("could not infer schema while creating table: %v", err) |
| } |
| createMeta := &bigquery.TableMetadata{ |
| Schema: schema.Relax(), |
| TimePartitioning: &bigquery.TimePartitioning{ |
| Type: bigquery.DayPartitioningType, |
| Field: "event_time", |
| }, |
| Clustering: &bigquery.Clustering{ |
| Fields: []string{"groupname", "name"}, |
| }, |
| } |
| if err2 := tRef.Create(ctx, createMeta); err2 != nil { |
| return nil, fmt.Errorf("could not create table: %v", err2) |
| } |
| return tRef, nil |
| } |
| return nil, fmt.Errorf("error while validating table existence: %v", err) |
| } |
| return tRef, nil |
| } |
| |
| // reportResults streams results into the designated table. |
| func reportResults(ctx context.Context, client *bigquery.Client, table *bigquery.Table, results []*profiledQuery) error { |
| inserter := table.Inserter() |
| |
| // Set a timeout on our context to bound retries |
| ctx, cancel := context.WithTimeout(ctx, 30*time.Second) |
| defer cancel() |
| if err := inserter.Put(ctx, results); err != nil { |
| return fmt.Errorf("reportResults: %v", err) |
| } |
| return nil |
| } |
| |
| func main() { |
| var reruns = flag.Int("reruns", 3, "number of reruns to issue for each query") |
| var queryfile = flag.String("queryfile", "benchmarked-queries.json", "path to file contain queries to be benchmarked.") |
| var projectID = flag.String("projectid", "", "project ID to use for running benchmarks. Uses GOOGLE_CLOUD_PROJECT env if not set.") |
| var reportTable = flag.String("table", "", "table to stream results into, specified in project.dataset.table format") |
| var createTable = flag.Bool("create_table", false, "create result table if it does not exist") |
| |
| var tags tags |
| flag.Var(&tags, "tag", "an optional key and value seperated by colon (:) character") |
| flag.Parse() |
| |
| // Validate flags. |
| if *reruns <= 0 { |
| log.Fatalf("--reruns should be a positive value") |
| } |
| projID := os.Getenv("GOOGLE_CLOUD_PROJECT") |
| if *projectID != "" { |
| projID = *projectID |
| } |
| if projID == "" { |
| log.Fatalf("must provide --projectid or set GOOGLE_CLOUD_PROJECT environment variable") |
| } |
| |
| // Setup context and client based on ADC. |
| ctx := context.Background() |
| client, err := bigquery.NewClient(ctx, projID) |
| if err != nil { |
| log.Fatalf("bigquery.NewClient: %v", err) |
| } |
| defer client.Close() |
| |
| // If we're going to stream results, let's make sure we can do that before running all the tests. |
| var table *bigquery.Table |
| if *reportTable != "" { |
| table, err = prepareTable(ctx, client, *reportTable, *createTable) |
| if err != nil { |
| log.Fatalf("prepareTable: %v", err) |
| } |
| } |
| start := time.Now() |
| profiles, err := runBenchmarks(ctx, client, *queryfile, &tags, *reruns) |
| if err != nil { |
| log.Fatalf("runBenchmarks: %v", err) |
| } |
| fmt.Printf("measurement time: %v\n\n", time.Now().Sub(start)) |
| if table != nil { |
| if err := reportResults(ctx, client, table, profiles); err != nil { |
| log.Fatalf("reportResults: %v", err) |
| } |
| } |
| printResults(profiles) |
| } |