blob: c584d50977cb931c2f173934f8a446f48e327bcb [file] [log] [blame]
// Copyright 2016 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package gensupport
import (
"context"
"fmt"
"io"
"io/ioutil"
"net/http"
"reflect"
"strings"
"testing"
"time"
)
type unexpectedReader struct{}
func (unexpectedReader) Read([]byte) (int, error) {
return 0, fmt.Errorf("unexpected read in test")
}
// event is an expected request/response pair
type event struct {
// the byte range header that should be present in a request.
byteRange string
// the http status code to send in response.
responseStatus int
}
// interruptibleTransport is configured with a canned set of requests/responses.
// It records the incoming data, unless the corresponding event is configured to return
// http.StatusServiceUnavailable.
type interruptibleTransport struct {
events []event
buf []byte
bodies bodyTracker
}
// bodyTracker keeps track of response bodies that have not been closed.
type bodyTracker map[io.ReadCloser]struct{}
func (bt bodyTracker) Add(body io.ReadCloser) {
bt[body] = struct{}{}
}
func (bt bodyTracker) Close(body io.ReadCloser) {
delete(bt, body)
}
type trackingCloser struct {
io.Reader
tracker bodyTracker
}
func (tc *trackingCloser) Close() error {
tc.tracker.Close(tc)
return nil
}
func (tc *trackingCloser) Open() {
tc.tracker.Add(tc)
}
func (t *interruptibleTransport) RoundTrip(req *http.Request) (*http.Response, error) {
if len(t.events) == 0 {
panic("ran out of events, but got a request")
}
ev := t.events[0]
t.events = t.events[1:]
if got, want := req.Header.Get("Content-Range"), ev.byteRange; got != want {
return nil, fmt.Errorf("byte range: got %s; want %s", got, want)
}
if ev.responseStatus != http.StatusServiceUnavailable {
buf, err := ioutil.ReadAll(req.Body)
if err != nil {
return nil, fmt.Errorf("error reading from request data: %v", err)
}
t.buf = append(t.buf, buf...)
}
tc := &trackingCloser{unexpectedReader{}, t.bodies}
tc.Open()
h := http.Header{}
status := ev.responseStatus
// Support "X-GUploader-No-308" like Google:
if status == 308 && req.Header.Get("X-GUploader-No-308") == "yes" {
status = 200
h.Set("X-Http-Status-Code-Override", "308")
}
res := &http.Response{
StatusCode: status,
Header: h,
Body: tc,
}
return res, nil
}
// progressRecorder records updates, and calls f for every invocation of ProgressUpdate.
type progressRecorder struct {
updates []int64
f func()
}
func (pr *progressRecorder) ProgressUpdate(current int64) {
pr.updates = append(pr.updates, current)
if pr.f != nil {
pr.f()
}
}
func TestInterruptedTransferChunks(t *testing.T) {
type testCase struct {
name string
data string
chunkSize int
events []event
wantProgress []int64
}
for _, tc := range []testCase{
{
name: "large",
data: strings.Repeat("a", 300),
chunkSize: 90,
events: []event{
{"bytes 0-89/*", http.StatusServiceUnavailable},
{"bytes 0-89/*", 308},
{"bytes 90-179/*", 308},
{"bytes 180-269/*", http.StatusServiceUnavailable},
{"bytes 180-269/*", 308},
{"bytes 270-299/300", 200},
},
wantProgress: []int64{90, 180, 270, 300},
},
{
name: "small",
data: strings.Repeat("a", 20),
chunkSize: 10,
events: []event{
{"bytes 0-9/*", http.StatusServiceUnavailable},
{"bytes 0-9/*", 308},
{"bytes 10-19/*", http.StatusServiceUnavailable},
{"bytes 10-19/*", 308},
// 0 byte final request demands a byte range with leading asterix.
{"bytes */20", http.StatusServiceUnavailable},
{"bytes */20", 200},
},
wantProgress: []int64{10, 20},
},
} {
t.Run(tc.name, func(t *testing.T) {
media := strings.NewReader(tc.data)
tr := &interruptibleTransport{
buf: make([]byte, 0, len(tc.data)),
events: tc.events,
bodies: bodyTracker{},
}
pr := progressRecorder{}
rx := &ResumableUpload{
Client: &http.Client{Transport: tr},
Media: NewMediaBuffer(media, tc.chunkSize),
MediaType: "text/plain",
Callback: pr.ProgressUpdate,
}
oldBackoff := backoff
backoff = func() Backoff { return new(NoPauseBackoff) }
defer func() { backoff = oldBackoff }()
res, err := rx.Upload(context.Background())
if err == nil {
res.Body.Close()
}
if err != nil || res == nil || res.StatusCode != http.StatusOK {
if res == nil {
t.Fatalf("Upload not successful, res=nil: %v", err)
} else {
t.Fatalf("Upload not successful, statusCode=%v, err=%v", res.StatusCode, err)
}
}
if !reflect.DeepEqual(tr.buf, []byte(tc.data)) {
t.Fatalf("transferred contents:\ngot %s\nwant %s", tr.buf, tc.data)
}
if !reflect.DeepEqual(pr.updates, tc.wantProgress) {
t.Fatalf("progress updates: got %v, want %v", pr.updates, tc.wantProgress)
}
if len(tr.events) > 0 {
t.Fatalf("did not observe all expected events. leftover events: %v", tr.events)
}
if len(tr.bodies) > 0 {
t.Errorf("unclosed request bodies: %v", tr.bodies)
}
})
}
}
func TestCancelUploadFast(t *testing.T) {
const (
chunkSize = 90
mediaSize = 300
)
media := strings.NewReader(strings.Repeat("a", mediaSize))
tr := &interruptibleTransport{
buf: make([]byte, 0, mediaSize),
// Shouldn't really need an event, but sometimes the test loses the
// race. So, this is just a filler event.
events: []event{{"bytes 0-9/*", http.StatusServiceUnavailable}},
}
pr := progressRecorder{}
rx := &ResumableUpload{
Client: &http.Client{Transport: tr},
Media: NewMediaBuffer(media, chunkSize),
MediaType: "text/plain",
Callback: pr.ProgressUpdate,
}
oldBackoff := backoff
backoff = func() Backoff { return new(NoPauseBackoff) }
defer func() { backoff = oldBackoff }()
ctx, cancelFunc := context.WithCancel(context.Background())
cancelFunc() // stop the upload that hasn't started yet
res, err := rx.Upload(ctx)
if err != context.Canceled {
t.Fatalf("Upload err: got: %v; want: context cancelled", err)
}
if res != nil {
t.Fatalf("Upload result: got: %v; want: nil", res)
}
if pr.updates != nil {
t.Errorf("progress updates: got %v; want: nil", pr.updates)
}
}
func TestCancelUploadBasic(t *testing.T) {
const (
chunkSize = 90
mediaSize = 300
)
media := strings.NewReader(strings.Repeat("a", mediaSize))
tr := &interruptibleTransport{
buf: make([]byte, 0, mediaSize),
events: []event{
{"bytes 0-89/*", http.StatusServiceUnavailable},
{"bytes 0-89/*", 308},
{"bytes 90-179/*", 308},
{"bytes 180-269/*", 308}, // Upload should be cancelled before this event.
},
bodies: bodyTracker{},
}
ctx, cancelFunc := context.WithCancel(context.Background())
numUpdates := 0
pr := progressRecorder{f: func() {
numUpdates++
if numUpdates >= 2 {
cancelFunc()
}
}}
rx := &ResumableUpload{
Client: &http.Client{Transport: tr},
Media: NewMediaBuffer(media, chunkSize),
MediaType: "text/plain",
Callback: pr.ProgressUpdate,
}
oldBackoff := backoff
backoff = func() Backoff { return new(PauseOneSecond) }
defer func() { backoff = oldBackoff }()
res, err := rx.Upload(ctx)
if err != context.Canceled {
t.Fatalf("Upload err: got: %v; want: context cancelled", err)
}
if res != nil {
t.Fatalf("Upload result: got: %v; want: nil", res)
}
if got, want := tr.buf, []byte(strings.Repeat("a", chunkSize*2)); !reflect.DeepEqual(got, want) {
t.Fatalf("transferred contents:\ngot %s\nwant %s", got, want)
}
if got, want := pr.updates, []int64{chunkSize, chunkSize * 2}; !reflect.DeepEqual(got, want) {
t.Fatalf("progress updates: got %v; want: %v", got, want)
}
if len(tr.bodies) > 0 {
t.Errorf("unclosed request bodies: %v", tr.bodies)
}
}
func TestRetry_Bounded(t *testing.T) {
const (
chunkSize = 90
mediaSize = 300
)
media := strings.NewReader(strings.Repeat("a", mediaSize))
tr := &interruptibleTransport{
buf: make([]byte, 0, mediaSize),
events: []event{
{"bytes 0-89/*", http.StatusServiceUnavailable},
{"bytes 0-89/*", http.StatusServiceUnavailable},
},
bodies: bodyTracker{},
}
rx := &ResumableUpload{
Client: &http.Client{Transport: tr},
Media: NewMediaBuffer(media, chunkSize),
MediaType: "text/plain",
Callback: func(int64) {},
}
oldRetryDeadline := retryDeadline
retryDeadline = time.Second
defer func() { retryDeadline = oldRetryDeadline }()
oldBackoff := backoff
backoff = func() Backoff { return new(PauseForeverBackoff) }
defer func() { backoff = oldBackoff }()
resCode := make(chan int, 1)
go func() {
resp, err := rx.Upload(context.Background())
if err != nil {
t.Error(err)
return
}
resCode <- resp.StatusCode
}()
select {
case <-time.After(5 * time.Second):
t.Fatal("timed out waiting for Upload to complete")
case got := <-resCode:
if want, got := http.StatusServiceUnavailable, got; got != want {
t.Fatalf("want %d, got %d", want, got)
}
}
}
func TestRetry_EachChunkHasItsOwnRetryDeadline(t *testing.T) {
const (
chunkSize = 90
mediaSize = 300
)
media := strings.NewReader(strings.Repeat("a", mediaSize))
tr := &interruptibleTransport{
buf: make([]byte, 0, mediaSize),
events: []event{
{"bytes 0-89/*", http.StatusServiceUnavailable},
// cum: 1s sleep
{"bytes 0-89/*", http.StatusServiceUnavailable},
// cum: 2s sleep
{"bytes 0-89/*", http.StatusServiceUnavailable},
// cum: 3s sleep
{"bytes 0-89/*", http.StatusServiceUnavailable},
// cum: 4s sleep
{"bytes 0-89/*", 308},
// cum: 1s sleep <-- resets because it's a new chunk
{"bytes 90-179/*", 308},
// cum: 1s sleep <-- resets because it's a new chunk
{"bytes 180-269/*", 308},
// cum: 1s sleep <-- resets because it's a new chunk
{"bytes 270-299/300", 200},
},
bodies: bodyTracker{},
}
rx := &ResumableUpload{
Client: &http.Client{Transport: tr},
Media: NewMediaBuffer(media, chunkSize),
MediaType: "text/plain",
Callback: func(int64) {},
}
oldRetryDeadline := retryDeadline
retryDeadline = 5 * time.Second
defer func() { retryDeadline = oldRetryDeadline }()
oldBackoff := backoff
backoff = func() Backoff { return new(PauseOneSecond) }
defer func() { backoff = oldBackoff }()
resCode := make(chan int, 1)
go func() {
resp, err := rx.Upload(context.Background())
if err != nil {
t.Error(err)
return
}
resCode <- resp.StatusCode
}()
select {
case <-time.After(15 * time.Second):
t.Fatal("timed out waiting for Upload to complete")
case got := <-resCode:
if want := http.StatusOK; got != want {
t.Fatalf("want %d, got %d", want, got)
}
}
}