test(storage): add test for uploading with an emulator (#4641)

* Added check that the hit endpoint does not change

* Use subtests

* polish

* test verifies that server receives correct calls

* timeout overall in test

* timeout per subtest

* cleanup
diff --git a/storage/storage_test.go b/storage/storage_test.go
index b13a928..dbf23bc 100644
--- a/storage/storage_test.go
+++ b/storage/storage_test.go
@@ -1418,6 +1418,22 @@
 			WantReadHost:        "fake.gcs.com:8080",
 			WantScheme:          "http",
 		},
+		{
+			desc:                "Endpoint overrides emulator host when host is specified as scheme://host:port",
+			CustomEndpoint:      "http://localhost:8080/storage/v1",
+			StorageEmulatorHost: "https://localhost:9000",
+			WantRawBasePath:     "http://localhost:8080/storage/v1",
+			WantReadHost:        "localhost:8080",
+			WantScheme:          "http",
+		},
+		{
+			desc:                "Endpoint overrides emulator host when host is specified as host:port",
+			CustomEndpoint:      "http://localhost:8080/storage/v1",
+			StorageEmulatorHost: "localhost:9000",
+			WantRawBasePath:     "http://localhost:8080/storage/v1",
+			WantReadHost:        "localhost:8080",
+			WantScheme:          "http",
+		},
 	}
 	ctx := context.Background()
 	for _, tc := range testCases {
@@ -1439,3 +1455,193 @@
 	}
 	os.Setenv("STORAGE_EMULATOR_HOST", originalStorageEmulatorHost)
 }
+
+// Create a client using a combination of custom endpoint and STORAGE_EMULATOR_HOST
+// env variable and verify that the client hits the correct endpoint for several
+// different operations performe in sequence.
+// Verifies also that raw.BasePath, readHost and scheme are not changed
+// after running the operations.
+func TestOperationsWithEndpoint(t *testing.T) {
+	originalStorageEmulatorHost := os.Getenv("STORAGE_EMULATOR_HOST")
+	defer os.Setenv("STORAGE_EMULATOR_HOST", originalStorageEmulatorHost)
+
+	gotURL := make(chan string, 1)
+	gotHost := make(chan string, 1)
+	gotMethod := make(chan string, 1)
+
+	timedOut := make(chan bool, 1)
+
+	hClient, closeServer := newTestServer(func(w http.ResponseWriter, r *http.Request) {
+		done := make(chan bool, 1)
+		io.Copy(ioutil.Discard, r.Body)
+		fmt.Fprintf(w, "{}")
+		go func() {
+			gotHost <- r.Host
+			gotURL <- r.RequestURI
+			gotMethod <- r.Method
+			done <- true
+		}()
+
+		select {
+		case <-timedOut:
+		case <-done:
+		}
+
+	})
+	defer closeServer()
+
+	testCases := []struct {
+		desc                string
+		CustomEndpoint      string
+		StorageEmulatorHost string
+		wantScheme          string
+		wantHost            string
+	}{
+		{
+			desc:                "No endpoint or emulator host specified",
+			CustomEndpoint:      "",
+			StorageEmulatorHost: "",
+			wantScheme:          "https",
+			wantHost:            "storage.googleapis.com",
+		},
+		{
+			desc:                "emulator host specified",
+			CustomEndpoint:      "",
+			StorageEmulatorHost: "https://" + "addr",
+			wantScheme:          "https",
+			wantHost:            "addr",
+		},
+		{
+			desc:                "endpoint specified",
+			CustomEndpoint:      "https://" + "end" + "/storage/v1/",
+			StorageEmulatorHost: "",
+			wantScheme:          "https",
+			wantHost:            "end",
+		},
+		{
+			desc:                "both emulator and endpoint specified",
+			CustomEndpoint:      "https://" + "end" + "/storage/v1/",
+			StorageEmulatorHost: "http://host",
+			wantScheme:          "https",
+			wantHost:            "end",
+		},
+	}
+
+	for _, tc := range testCases {
+		ctx := context.Background()
+		t.Run(tc.desc, func(t *testing.T) {
+			timeout := time.After(time.Second)
+			done := make(chan bool, 1)
+			go func() {
+				os.Setenv("STORAGE_EMULATOR_HOST", tc.StorageEmulatorHost)
+
+				c, err := NewClient(ctx, option.WithHTTPClient(hClient), option.WithEndpoint(tc.CustomEndpoint))
+				if err != nil {
+					t.Errorf("error creating client: %v", err)
+					return
+				}
+				originalRawBasePath := c.raw.BasePath
+				originalReadHost := c.readHost
+				originalScheme := c.scheme
+
+				operations := []struct {
+					desc       string
+					runOp      func() error
+					wantURL    string
+					wantMethod string
+				}{
+					{
+						desc: "Create a bucket",
+						runOp: func() error {
+							return c.Bucket("test-bucket").Create(ctx, "pid", nil)
+						},
+						wantURL:    "/storage/v1/b?alt=json&prettyPrint=false&project=pid",
+						wantMethod: "POST",
+					},
+					{
+						desc: "Upload an object",
+						runOp: func() error {
+							w := c.Bucket("test-bucket").Object("file").NewWriter(ctx)
+							_, err = io.Copy(w, strings.NewReader("copyng into bucket"))
+							if err != nil {
+								return err
+							}
+							return w.Close()
+						},
+						wantURL:    "/upload/storage/v1/b/test-bucket/o?alt=json&name=file&prettyPrint=false&projection=full&uploadType=multipart",
+						wantMethod: "POST",
+					},
+					{
+						desc: "Download an object",
+						runOp: func() error {
+							rc, err := c.Bucket("test-bucket").Object("file").NewReader(ctx)
+							if err != nil {
+								return err
+							}
+
+							_, err = io.Copy(ioutil.Discard, rc)
+							if err != nil {
+								return err
+							}
+							return rc.Close()
+						},
+						wantURL:    "/test-bucket/file",
+						wantMethod: "GET",
+					},
+					{
+						desc: "Delete bucket",
+						runOp: func() error {
+							return c.Bucket("test-bucket").Delete(ctx)
+						},
+						wantURL:    "/storage/v1/b/test-bucket?alt=json&prettyPrint=false",
+						wantMethod: "DELETE",
+					},
+				}
+
+				// Check that the calls made to the server are as expected
+				// given the operations performed
+				for _, op := range operations {
+					if err := op.runOp(); err != nil {
+						t.Errorf("%s: %v", op.desc, err)
+					}
+					u, method := <-gotURL, <-gotMethod
+					if u != op.wantURL {
+						t.Errorf("%s: unexpected request URL\ngot  %q\nwant %q",
+							op.desc, u, op.wantURL)
+					}
+					if method != op.wantMethod {
+						t.Errorf("%s: unexpected request method\ngot  %q\nwant %q",
+							op.desc, method, op.wantMethod)
+					}
+
+					if got := <-gotHost; got != tc.wantHost {
+						t.Errorf("%s: unexpected request host\ngot  %q\nwant %q",
+							op.desc, got, tc.wantHost)
+					}
+				}
+
+				// Check that the client fields have not changed
+				if c.raw.BasePath != originalRawBasePath {
+					t.Errorf("raw.BasePath changed\n\tgot:\t\t%v\n\toriginal:\t%v",
+						c.raw.BasePath, originalRawBasePath)
+				}
+				if c.readHost != originalReadHost {
+					t.Errorf("readHost changed\n\tgot:\t\t%v\n\toriginal:\t%v",
+						c.readHost, originalReadHost)
+				}
+				if c.scheme != originalScheme {
+					t.Errorf("scheme changed\n\tgot:\t\t%v\n\toriginal:\t%v",
+						c.scheme, originalScheme)
+				}
+				done <- true
+			}()
+			select {
+			case <-timeout:
+				t.Errorf("test timeout")
+				timedOut <- true
+			case <-done:
+			}
+		})
+
+	}
+}