| // Copyright 2019 Google LLC | 
 | // | 
 | // Licensed under the Apache License, Version 2.0 (the "License"); | 
 | // you may not use this file except in compliance with the License. | 
 | // You may obtain a copy of the License at | 
 | // | 
 | //      http://www.apache.org/licenses/LICENSE-2.0 | 
 | // | 
 | // Unless required by applicable law or agreed to in writing, software | 
 | // distributed under the License is distributed on an "AS IS" BASIS, | 
 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
 | // See the License for the specific language governing permissions and | 
 | // limitations under the License. | 
 | package storage_test | 
 |  | 
 | import ( | 
 | 	"context" | 
 | 	"fmt" | 
 | 	"io/ioutil" | 
 | 	"net/http" | 
 | 	"net/http/httptest" | 
 | 	"strings" | 
 | 	"sync/atomic" | 
 | 	"testing" | 
 | 	"time" | 
 |  | 
 | 	"golang.org/x/oauth2" | 
 |  | 
 | 	"cloud.google.com/go/storage" | 
 | 	"google.golang.org/api/googleapi" | 
 | 	"google.golang.org/api/option" | 
 | ) | 
 |  | 
 | func TestIndefiniteRetries(t *testing.T) { | 
 | 	if testing.Short() { | 
 | 		t.Skip("A long running test for retries") | 
 | 	} | 
 |  | 
 | 	uploadRoute := "/upload" | 
 |  | 
 | 	var resumableUploadIDs atomic.Value | 
 | 	resumableUploadIDs.Store(make(map[string]time.Time)) | 
 |  | 
 | 	lookupUploadID := func(resumableUploadID string) bool { | 
 | 		_, ok := resumableUploadIDs.Load().(map[string]time.Time)[resumableUploadID] | 
 | 		return ok | 
 | 	} | 
 |  | 
 | 	memoizeUploadID := func(resumableUploadID string) { | 
 | 		resumableUploadIDs.Load().(map[string]time.Time)[resumableUploadID] = time.Now().UTC() | 
 | 	} | 
 |  | 
 | 	cst := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | 
 | 		resumableUploadID := r.URL.Query().Get("upload_id") | 
 | 		path := r.URL.Path | 
 |  | 
 | 		switch { | 
 | 		case path == "/b": // Bucket creation | 
 | 			w.Write([]byte(`{"kind":"storage#bucket","id":"bucket","name":"bucket"}`)) | 
 | 			return | 
 |  | 
 | 		case (strings.HasPrefix(path, "/b/") || strings.HasPrefix(path, "/upload/storage/v1/b/")) && strings.HasSuffix(path, "/o"): | 
 | 			if resumableUploadID == "" { | 
 | 				uploadID := time.Now().Format(time.RFC3339Nano) | 
 | 				w.Header().Set("X-GUploader-UploadID", uploadID) | 
 | 				// Now for the resumable upload URL. | 
 | 				w.Header().Set("Location", fmt.Sprintf("http://%s?upload_id=%s", r.Host+uploadRoute, uploadID)) | 
 | 			} else { | 
 | 				w.Write([]byte(`{"kind":"storage#object","bucket":"bucket","name":"bucket"}`)) | 
 | 			} | 
 | 			return | 
 |  | 
 | 		case path == uploadRoute: | 
 | 			start, completedUpload, spamThem := parseContentRange(r.Header) | 
 |  | 
 | 			if resumableUploadID != "" { | 
 | 				if !lookupUploadID(resumableUploadID) { | 
 | 					if start == "0" { | 
 | 						// First time that we are encountering this upload | 
 | 						// and it is at byte 0, so memoize the uploadID. | 
 | 						memoizeUploadID(resumableUploadID) | 
 | 					} else { | 
 | 						// If the start and end range are non-zero this is the exact | 
 | 						// error in https://github.com/googleapis/google-cloud-go/issues/1507 | 
 | 						// mismatched_content_start (Invalid request. According to the Content-Range header, | 
 | 						// the upload offset is 1082130432 byte(s), which exceeds already uploaded size of 0 byte(s).) | 
 | 						errStr := fmt.Sprintf("mismatched_content_start (Invalid request. According to the Content-Range header,"+ | 
 | 							"the upload offset is %s byte(s), which exceeds already uploaded size of 0 byte(s).)\n%s", start, r.Header["Content-Range"]) | 
 | 						http.Error(w, errStr, http.StatusServiceUnavailable) | 
 | 						return | 
 | 					} | 
 | 				} | 
 | 			} | 
 | 			if spamThem { | 
 | 				// Reproduce https://github.com/googleapis/google-cloud-go/issues/1507 | 
 | 				// by sending then a retryable error on the last byte. | 
 | 				w.WriteHeader(http.StatusTooManyRequests) | 
 | 				return | 
 | 			} | 
 | 			if completedUpload { | 
 | 				// Completed the upload. | 
 | 				return | 
 | 			} | 
 |  | 
 | 			// Consume the body since we can accept this body. | 
 | 			ioutil.ReadAll(r.Body) | 
 | 			w.Header().Set("X-Http-Status-Code-Override", "308") | 
 | 			return | 
 |  | 
 | 		default: | 
 | 			http.Error(w, "Unimplemented", http.StatusNotFound) | 
 | 			return | 
 | 		} | 
 | 	})) | 
 | 	defer cst.Close() | 
 |  | 
 | 	hc := &http.Client{ | 
 | 		Transport: &oauth2.Transport{ | 
 | 			Source: new(tokenSupplier), | 
 | 		}, | 
 | 	} | 
 |  | 
 | 	ctx, cancel := context.WithCancel(context.Background()) | 
 | 	defer cancel() | 
 |  | 
 | 	opts := []option.ClientOption{option.WithHTTPClient(hc), option.WithEndpoint(cst.URL)} | 
 |  | 
 | 	sc, err := storage.NewClient(ctx, opts...) | 
 | 	if err != nil { | 
 | 		t.Fatalf("Failed to create storage client: %v", err) | 
 | 	} | 
 | 	defer sc.Close() | 
 |  | 
 | 	obj := sc.Bucket("issue-1507").Object("object") | 
 | 	w := obj.NewWriter(ctx) | 
 |  | 
 | 	maxFileSize := 1 << 20 | 
 | 	w.ChunkSize = maxFileSize / 4 | 
 |  | 
 | 	for i := 0; i < maxFileSize; { | 
 | 		nowStr := time.Now().Format(time.RFC3339Nano) | 
 | 		n, err := fmt.Fprintf(w, "%s", nowStr) | 
 | 		if err != nil { | 
 | 			t.Fatalf("Failed to write to object: %v", err) | 
 | 		} | 
 | 		i += n | 
 | 	} | 
 |  | 
 | 	closeDone := make(chan error, 1) | 
 | 	go func() { | 
 | 		// Invoking w.Close() to ensure that this triggers completion of the upload. | 
 | 		closeDone <- w.Close() | 
 | 	}() | 
 |  | 
 | 	// Given that the ExponentialBackoff is 30 seconds from a start of 100ms, | 
 | 	// let's wait for a maximum of 5 minutes to account for (2**n) increments | 
 | 	// between [100ms, 30s]. | 
 | 	maxWait := 5 * time.Minute | 
 | 	select { | 
 | 	case <-time.After(maxWait): | 
 | 		t.Fatalf("Test took longer than %s to return", maxWait) | 
 | 	case err := <-closeDone: | 
 | 		ge, ok := err.(*googleapi.Error) | 
 | 		if !ok { | 
 | 			t.Fatalf("Got error (%v) of type %T, expected *googleapi.Error", err, err) | 
 | 		} | 
 | 		if ge.Code != http.StatusTooManyRequests { | 
 | 			t.Fatalf("Got unexpected error: %#v\nWant statusCode of %d", ge, http.StatusTooManyRequests) | 
 | 		} | 
 | 	} | 
 | } | 
 |  | 
 | type tokenSupplier int | 
 |  | 
 | func (ts *tokenSupplier) Token() (*oauth2.Token, error) { | 
 | 	return &oauth2.Token{ | 
 | 		AccessToken:  "access-token", | 
 | 		TokenType:    "Bearer", | 
 | 		RefreshToken: "refresh-token", | 
 | 		Expiry:       time.Now().Add(time.Hour), | 
 | 	}, nil | 
 | } | 
 |  | 
 | func parseContentRange(hdr http.Header) (start string, completed, spamThem bool) { | 
 | 	cRange := strings.TrimPrefix(hdr.Get("Content-Range"), "bytes ") | 
 | 	rangeSplits := strings.Split(cRange, "/") | 
 | 	prelude := rangeSplits[0] | 
 |  | 
 | 	if rangeSplits[1] != "*" { // They've uploaded the last byte. | 
 | 		// Reproduce https://github.com/googleapis/google-cloud-go/issues/1507 | 
 | 		// by sending then a retryable error on the last byte. | 
 | 		spamThem = true | 
 | 	} | 
 | 	if len(prelude) == 0 || prelude == "*" { | 
 | 		// Completed the upload. | 
 | 		completed = true | 
 | 		return | 
 | 	} | 
 | 	startEndSplit := strings.Split(prelude, "-") | 
 | 	start = startEndSplit[0] | 
 | 	return | 
 | } |