blob: c10d1e495bca1430f4c8cc0547f67226829ace29 [file] [log] [blame]
// Copyright 2024 Google LLC
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
// continuousReads is a benchmark that continuously downloads the same set of
// objects from GCS until the timeout is reached.
// - Initialize structs to populate with the benchmark results.
// - Select a random API to use to upload and download the object, unless an
// API is set in the command-line,
// - Select a random size for the objects; this size will
// be between two values configured in the command-line.
// - Create a directory of objects of that size in GCS, uploading random
// contents directly from memory. The number of objects created is equal to
// the command-line configuration for the number of objects in a directory.
// - Grab a storage client from the pool.
// - Check if the timeout is exceeded; if so, compile p50, p90 and p99 and return.
// - Queue a goroutine that performs a download. This goroutine will run as
// soon as there is an available worker. The number of workers that run
// concurrently is set in the command-line.
// - This goroutine chooses the object to download by receiving an object name
// from the channel. At the end of the download, it will return that name to
// the channel. That way, no goroutines are reading from the same object at
// the same time.
// Note that since the same objects are downloaded continuously, the downloads
// will overwrite previous downloads of the same object.
// - The time taken is saved in a slice that is used to compile the percentiles
// once the timeout is exceeded.
type continuousReads struct {
opts *benchmarkOptions
bucketName string
directoryPath string
numWorkers int
objects chan string // list of objects to synchronize reads
results []time.Duration
resultMu sync.Mutex
api benchmarkAPI
objectSize int64 // every object must be of the same size
func (r *continuousReads) setup(ctx context.Context) error {
// Check for context cancellation
select {
case <-ctx.Done():
return nil // don't error out here since we expect to finish with a cancellation
// Select API first as it will be the same for all writes/reads
api := opts.api
if api == mixedAPIs {
switch rand.Intn(4) {
case 0:
api = xmlAPI
case 1:
api = jsonAPI
case 2:
api = grpcAPI
case 3:
api = directPath
r.api = api
// Select object size
objectSize := opts.objectSize
if objectSize == 0 {
objectSize = randomInt64(opts.minObjectSize, opts.maxObjectSize)
r.objectSize = objectSize
// Make a temp dir for this run
dir, err := os.MkdirTemp("", "benchmark-experiment-")
if err != nil {
return err
// Create contents
r.directoryPath = dir
objects, err := generateDirInGCS(ctx, path.Base(dir), objectSize)
if err != nil {
return err
r.objects = *objects
return nil
// cleanup deletes objects on disk and in GCS. It does not accept a context as
// it should run to completion to ensure full clean up of resources.
func (r *continuousReads) cleanup() error {
// Clean temp dir
if err := os.RemoveAll(r.directoryPath); err != nil {
return err
// Delete uploaded objects
return deleteDirectoryFromGCS(r.bucketName, path.Base(r.directoryPath))
func (r *continuousReads) run(ctx context.Context) error {
benchGroup, ctx := errgroup.WithContext(ctx)
for {
select {
case <-ctx.Done():
err := benchGroup.Wait()
if errorIsDeadLineExceeded(err) {
return nil
return err
benchGroup.Go(func() error {
client := getClient(ctx, r.api)
object := <-r.objects
var span trace.Span
ctx, span = otel.GetTracerProvider().Tracer(tracerName).Start(ctx, "continuous_reads")
span.SetAttributes(attribute.KeyValue{"workload", attribute.StringValue("9")},
attribute.KeyValue{"api", attribute.StringValue(string(r.api))},
attribute.KeyValue{"object_size", attribute.Int64Value(r.objectSize)})
defer span.End()
// Download full object (range is not supported)
rangeStart := int64(0)
rangeLength := int64(-1)
timeTaken, err := downloadBenchmark(ctx, downloadOpts{
client: client,
objectSize: r.objectSize,
bucket: r.bucketName,
object: object,
rangeStart: rangeStart,
rangeLength: rangeLength,
downloadToDirectory: r.directoryPath,
timeout: r.opts.timeoutPerOp,
if err != nil {
r.results = append(r.results, timeTaken)
r.objects <- object
return err
func (r *continuousReads) compileResults() {
// TO-DO: switch to slices.Sort(r.results) when Go<1.21 support is dropped
sort.Slice(r.results, func(i, j int) bool {
return r.results[i] < r.results[j]
l := len(r.results)
percentiles := map[string]time.Duration{
"p50": r.results[l/2],
"p90": r.results[l*9/10],
"p99": r.results[l*99/100],
for name, value := range percentiles {
result := benchmarkResult{
objectSize: r.objectSize,
isRead: true,
elapsedTime: value,
metricName: name,
result.selectReadParams(*r.opts, r.api)
result.params.rangeOffset = 0
results <- result