blob: 0c4d7c023b00d2709193110f22e6336acd831169 [file] [log] [blame]
// Copyright 2016 Google LLC.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package bytestream provides a client for any service that exposes a ByteStream API.
//
// Note: This package is a work-in-progress. Backwards-incompatible changes should be expected.
package bytestream
// This file contains the client implementation of Bytestream declared at:
// https://github.com/googleapis/googleapis/blob/master/google/bytestream/bytestream.proto
import (
"context"
"fmt"
"math/rand"
"time"
"google.golang.org/grpc"
pb "google.golang.org/genproto/googleapis/bytestream"
)
const (
// MaxBufSize is the maximum buffer size (in bytes) received in a read chunk or sent in a write chunk.
MaxBufSize = 2 * 1024 * 1024
backoffBase = 10 * time.Millisecond
backoffMax = 1 * time.Second
maxTries = 5
)
// Client is the go wrapper around a ByteStreamClient and provides an interface to it.
type Client struct {
client pb.ByteStreamClient
options []grpc.CallOption
}
// NewClient creates a new bytestream.Client.
func NewClient(cc *grpc.ClientConn, options ...grpc.CallOption) *Client {
return &Client{
client: pb.NewByteStreamClient(cc),
options: options,
}
}
// Reader reads from a byte stream.
type Reader struct {
ctx context.Context
c *Client
readClient pb.ByteStream_ReadClient
resourceName string
err error
buf []byte
}
// ResourceName gets the resource name this Reader is reading.
func (r *Reader) ResourceName() string {
return r.resourceName
}
// Read implements io.Reader.
// Read buffers received bytes that do not fit in p.
func (r *Reader) Read(p []byte) (int, error) {
if r.err != nil {
return 0, r.err
}
var backoffDelay time.Duration
for tries := 0; len(r.buf) == 0 && tries < maxTries; tries++ {
// No data in buffer.
resp, err := r.readClient.Recv()
if err != nil {
r.err = err
return 0, err
}
r.buf = resp.Data
if len(r.buf) != 0 {
break
}
// back off
if backoffDelay < backoffBase {
backoffDelay = backoffBase
} else {
backoffDelay = time.Duration(float64(backoffDelay) * 1.3 * (1 - 0.4*rand.Float64()))
}
if backoffDelay > backoffMax {
backoffDelay = backoffMax
}
select {
case <-time.After(backoffDelay):
case <-r.ctx.Done():
if err := r.ctx.Err(); err != nil {
r.err = err
}
return 0, r.err
}
}
// Copy from buffer.
n := copy(p, r.buf)
r.buf = r.buf[n:]
return n, nil
}
// Close implements io.Closer.
func (r *Reader) Close() error {
if r.readClient == nil {
return nil
}
err := r.readClient.CloseSend()
r.readClient = nil
return err
}
// NewReader creates a new Reader to read a resource.
func (c *Client) NewReader(ctx context.Context, resourceName string) (*Reader, error) {
return c.NewReaderAt(ctx, resourceName, 0)
}
// NewReaderAt creates a new Reader to read a resource from the given offset.
func (c *Client) NewReaderAt(ctx context.Context, resourceName string, offset int64) (*Reader, error) {
// readClient is set up for Read(). ReadAt() will copy needed fields into its reentrantReader.
readClient, err := c.client.Read(ctx, &pb.ReadRequest{
ResourceName: resourceName,
ReadOffset: offset,
}, c.options...)
if err != nil {
return nil, err
}
return &Reader{
ctx: ctx,
c: c,
resourceName: resourceName,
readClient: readClient,
}, nil
}
// Writer writes to a byte stream.
type Writer struct {
ctx context.Context
writeClient pb.ByteStream_WriteClient
resourceName string
offset int64
err error
}
// ResourceName gets the resource name this Writer is writing.
func (w *Writer) ResourceName() string {
return w.resourceName
}
// Write implements io.Writer.
func (w *Writer) Write(p []byte) (int, error) {
if w.err != nil {
return 0, w.err
}
n := 0
for n < len(p) {
bufSize := len(p) - n
if bufSize > MaxBufSize {
bufSize = MaxBufSize
}
r := pb.WriteRequest{
WriteOffset: w.offset,
FinishWrite: false,
Data: p[n : n+bufSize],
}
// Bytestream only requires the resourceName to be sent in the first WriteRequest.
if w.offset == 0 {
r.ResourceName = w.resourceName
}
err := w.writeClient.Send(&r)
if err != nil {
w.err = err
return n, err
}
w.offset += int64(bufSize)
n += bufSize
}
return n, nil
}
// Close implements io.Closer. It is the caller's responsibility to call Close() when writing is done.
func (w *Writer) Close() error {
err := w.writeClient.Send(&pb.WriteRequest{
ResourceName: w.resourceName,
WriteOffset: w.offset,
FinishWrite: true,
Data: nil,
})
if err != nil {
w.err = err
return fmt.Errorf("Send(WriteRequest< FinishWrite >) failed: %v", err)
}
resp, err := w.writeClient.CloseAndRecv()
if err != nil {
w.err = err
return fmt.Errorf("CloseAndRecv: %v", err)
}
if resp == nil {
err = fmt.Errorf("expected a response on close, got %v", resp)
} else if resp.CommittedSize != w.offset {
err = fmt.Errorf("server only wrote %d bytes, want %d", resp.CommittedSize, w.offset)
}
w.err = err
return err
}
// NewWriter creates a new Writer to write a resource.
//
// resourceName specifies the name of the resource.
// The resource will be available after Close has been called.
//
// It is the caller's responsibility to call Close when writing is done.
//
// TODO: There is currently no way to resume a write. Maybe NewWriter should begin with a call to QueryWriteStatus.
func (c *Client) NewWriter(ctx context.Context, resourceName string) (*Writer, error) {
wc, err := c.client.Write(ctx, c.options...)
if err != nil {
return nil, err
}
return &Writer{
ctx: ctx,
writeClient: wc,
resourceName: resourceName,
}, nil
}