profiler: Add Cloud Profiler runtime agent for Go.

Cloud Profiler Go agent talks to Cloud Profiler server, and collects and uploads profiles following the server's instruction.

Expect the user to call StartProfiling() to start an endless goroutine
to handle the work. See package level documentation in profiler.go for
usage details.

Change-Id: Iabee5430c963df175ece1096fc481610e5441b8e
Reviewed-on: https://code-review.googlesource.com/13590
Reviewed-by: kokoro <noreply+kokoro@google.com>
Reviewed-by: Jonathan Amsterdam <jba@google.com>
diff --git a/profiler/mocks/mock_profiler_client.go b/profiler/mocks/mock_profiler_client.go
new file mode 100644
index 0000000..13e9e77
--- /dev/null
+++ b/profiler/mocks/mock_profiler_client.go
@@ -0,0 +1,78 @@
+// Copyright 2017 Google Inc. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Automatically generated by MockGen. DO NOT EDIT!
+// Source: google.golang.org/genproto/googleapis/devtools/cloudprofiler/v2 (interfaces: ProfilerServiceClient)
+
+package mocks
+
+import (
+	gomock "github.com/golang/mock/gomock"
+	context "golang.org/x/net/context"
+	v2 "google.golang.org/genproto/googleapis/devtools/cloudprofiler/v2"
+	grpc "google.golang.org/grpc"
+)
+
+// Mock of ProfilerServiceClient interface
+type MockProfilerServiceClient struct {
+	ctrl     *gomock.Controller
+	recorder *_MockProfilerServiceClientRecorder
+}
+
+// Recorder for MockProfilerServiceClient (not exported)
+type _MockProfilerServiceClientRecorder struct {
+	mock *MockProfilerServiceClient
+}
+
+func NewMockProfilerServiceClient(ctrl *gomock.Controller) *MockProfilerServiceClient {
+	mock := &MockProfilerServiceClient{ctrl: ctrl}
+	mock.recorder = &_MockProfilerServiceClientRecorder{mock}
+	return mock
+}
+
+func (_m *MockProfilerServiceClient) EXPECT() *_MockProfilerServiceClientRecorder {
+	return _m.recorder
+}
+
+func (_m *MockProfilerServiceClient) CreateProfile(_param0 context.Context, _param1 *v2.CreateProfileRequest, _param2 ...grpc.CallOption) (*v2.Profile, error) {
+	_s := []interface{}{_param0, _param1}
+	for _, _x := range _param2 {
+		_s = append(_s, _x)
+	}
+	ret := _m.ctrl.Call(_m, "CreateProfile", _s...)
+	ret0, _ := ret[0].(*v2.Profile)
+	ret1, _ := ret[1].(error)
+	return ret0, ret1
+}
+
+func (_mr *_MockProfilerServiceClientRecorder) CreateProfile(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call {
+	_s := append([]interface{}{arg0, arg1}, arg2...)
+	return _mr.mock.ctrl.RecordCall(_mr.mock, "CreateProfile", _s...)
+}
+
+func (_m *MockProfilerServiceClient) UpdateProfile(_param0 context.Context, _param1 *v2.UpdateProfileRequest, _param2 ...grpc.CallOption) (*v2.Profile, error) {
+	_s := []interface{}{_param0, _param1}
+	for _, _x := range _param2 {
+		_s = append(_s, _x)
+	}
+	ret := _m.ctrl.Call(_m, "UpdateProfile", _s...)
+	ret0, _ := ret[0].(*v2.Profile)
+	ret1, _ := ret[1].(error)
+	return ret0, ret1
+}
+
+func (_mr *_MockProfilerServiceClientRecorder) UpdateProfile(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call {
+	_s := append([]interface{}{arg0, arg1}, arg2...)
+	return _mr.mock.ctrl.RecordCall(_mr.mock, "UpdateProfile", _s...)
+}
diff --git a/profiler/profiler.go b/profiler/profiler.go
new file mode 100644
index 0000000..16306dc
--- /dev/null
+++ b/profiler/profiler.go
@@ -0,0 +1,394 @@
+// Copyright 2017 Google Inc. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package profiler is a client for the Google Cloud Profiler service.
+//
+// This package is still experimental and subject to change.
+//
+// Calling Start will start a goroutine to collect profiles and
+// upload to Cloud Profiler server, at the rhythm specified by
+// the server.
+//
+// The caller should provide the target string in the config so Cloud
+// Profiler knows how to group the profile data. Otherwise the target
+// string is set to "unknown".
+//
+// Optionally DebugLogging can be set in the config to enable detailed
+// logging from profiler.
+//
+// Start should only be called once. The first call will start
+// the profiling goroutine. Any additional calls will be ignored.
+package profiler
+
+import (
+	"bytes"
+	"errors"
+	"fmt"
+	"log"
+	"runtime/pprof"
+	"sort"
+	"strings"
+	"sync"
+	"time"
+
+	gcemd "cloud.google.com/go/compute/metadata"
+	"cloud.google.com/go/internal/version"
+	"github.com/golang/protobuf/proto"
+	"github.com/golang/protobuf/ptypes"
+	gax "github.com/googleapis/gax-go"
+	"golang.org/x/net/context"
+	"golang.org/x/oauth2"
+	"golang.org/x/oauth2/google"
+	"google.golang.org/api/option"
+	"google.golang.org/api/transport"
+	pb "google.golang.org/genproto/googleapis/devtools/cloudprofiler/v2"
+	edpb "google.golang.org/genproto/googleapis/rpc/errdetails"
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/codes"
+	grpcmd "google.golang.org/grpc/metadata"
+	"google.golang.org/grpc/status"
+)
+
+var (
+	config    = &Config{}
+	startOnce sync.Once
+	// getProjectID, getInstanceName, getZone, startCPUProfile, stopCPUProfile,
+	// writeHeapProfile and sleep are overrideable for testing.
+	getProjectID     = gcemd.ProjectID
+	getInstanceName  = gcemd.InstanceName
+	getZone          = gcemd.Zone
+	startCPUProfile  = pprof.StartCPUProfile
+	stopCPUProfile   = pprof.StopCPUProfile
+	writeHeapProfile = pprof.WriteHeapProfile
+	sleep            = gax.Sleep
+)
+
+const (
+	apiAddress            = "cloudprofiler.googleapis.com:443"
+	xGoogAPIMetadata      = "x-goog-api-client"
+	deploymentKeyMetadata = "x-profiler-deployment-key-bin"
+	zoneNameLabel         = "zone"
+	instanceLabel         = "instance"
+	scope                 = "https://www.googleapis.com/auth/monitoring.write"
+
+	initialBackoff = time.Second
+	// Ensure the agent will recover within 1 hour.
+	maxBackoff        = time.Hour
+	backoffMultiplier = 1.3 // Backoff envelope increases by this factor on each retry.
+	retryInfoMetadata = "google.rpc.retryinfo-bin"
+)
+
+// Config is the profiler configuration.
+type Config struct {
+	// Target groups related deployments together, defaults to "unknown".
+	Target string
+	// DebugLogging enables detailed debug logging from profiler.
+	DebugLogging bool
+	// ProjectID is the ID of the cloud project to use instead of
+	// the one read from the VM metadata server. Typically for testing.
+	ProjectID string
+	// InstanceName is the name of the VM instance to use instead of
+	// the one read from the VM metadata server. Typically for testing.
+	InstanceName string
+	// ZoneName is the name of the zone to use instead of
+	// the one read from the VM metadata server. Typically for testing.
+	ZoneName string
+	// APIAddr is the HTTP endpoint to use to connect to the profiler
+	// agent API. Defaults to the production environment, overridable
+	// for testing.
+	APIAddr string
+}
+
+// Start starts a goroutine to collect and upload profiles.
+// See package level documentation for details.
+func Start(cfg *Config) error {
+	var err error
+	startOnce.Do(func() {
+		initializeConfig(cfg)
+
+		ctx := context.Background()
+
+		var ts oauth2.TokenSource
+		ts, err = google.DefaultTokenSource(ctx, scope)
+		if err != nil {
+			debugLog("failed to get application default credentials: %v", err)
+			return
+		}
+
+		opts := []option.ClientOption{
+			option.WithEndpoint(config.APIAddr),
+			option.WithTokenSource(ts),
+			option.WithScopes(scope),
+		}
+
+		var conn *grpc.ClientConn
+		conn, err = transport.DialGRPC(ctx, opts...)
+		if err != nil {
+			debugLog("failed to dial GRPC: %v", err)
+			return
+		}
+
+		var d *pb.Deployment
+		d, err = initializeDeployment()
+		if err != nil {
+			debugLog("failed to initialize deployment: %v", err)
+			return
+		}
+
+		a, ctx := initializeResources(ctx, conn, d)
+		go pollProfilerService(ctx, a)
+	})
+
+	return err
+}
+
+func debugLog(format string, e ...interface{}) {
+	if config.DebugLogging {
+		log.Printf(format, e...)
+	}
+}
+
+// agent polls Cloud Profiler server for instructions on behalf of
+// a task, and collects and uploads profiles as requested.
+type agent struct {
+	client             *client
+	deployment         *pb.Deployment
+	creationErrorCount int64
+}
+
+// abortedBackoffDuration retrieves the retry duration from gRPC trailing
+// metadata, which is set by Cloud Profiler server.
+func abortedBackoffDuration(md grpcmd.MD) (time.Duration, error) {
+	elem := md[retryInfoMetadata]
+	if len(elem) <= 0 {
+		return 0, errors.New("no retry info")
+	}
+
+	var retryInfo edpb.RetryInfo
+	if err := proto.Unmarshal([]byte(elem[0]), &retryInfo); err != nil {
+		return 0, err
+	} else if time, err := ptypes.Duration(retryInfo.RetryDelay); err != nil {
+		return 0, err
+	} else {
+		if time < 0 {
+			return 0, errors.New("negative retry duration")
+		}
+		return time, nil
+	}
+}
+
+type retryer struct {
+	backoff gax.Backoff
+	md      grpcmd.MD
+}
+
+func (r *retryer) Retry(err error) (time.Duration, bool) {
+	st, _ := status.FromError(err)
+	if st != nil && st.Code() == codes.Aborted {
+		dur, err := abortedBackoffDuration(r.md)
+		if err == nil {
+			return dur, true
+		}
+		debugLog("failed to get backoff duration: %v", err)
+	}
+	return r.backoff.Pause(), true
+}
+
+// createProfile talks to Cloud Profiler server to create profile. In
+// case of error, the goroutine will sleep and retry. Sleep duration may
+// be specified by the server. Otherwise it will be an exponentially
+// increasing value, bounded by maxBackoff.
+func (a *agent) createProfile(ctx context.Context) *pb.Profile {
+	req := pb.CreateProfileRequest{
+		Deployment:  a.deployment,
+		ProfileType: []pb.ProfileType{pb.ProfileType_CPU, pb.ProfileType_HEAP},
+	}
+
+	var p *pb.Profile
+	md := grpcmd.New(map[string]string{})
+
+	gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
+		var err error
+		p, err = a.client.client.CreateProfile(ctx, &req, grpc.Trailer(&md))
+		return err
+	}, gax.WithRetry(func() gax.Retryer {
+		return &retryer{
+			backoff: gax.Backoff{
+				Initial:    initialBackoff,
+				Max:        maxBackoff,
+				Multiplier: backoffMultiplier,
+			},
+			md: md,
+		}
+	}))
+
+	return p
+}
+
+func (a *agent) profileAndUpload(ctx context.Context, p *pb.Profile) {
+	var prof bytes.Buffer
+	pt := p.GetProfileType()
+
+	switch pt {
+	case pb.ProfileType_CPU:
+		duration, err := ptypes.Duration(p.Duration)
+		if err != nil {
+			debugLog("failed to get profile duration: %v", err)
+			return
+		}
+		if err := startCPUProfile(&prof); err != nil {
+			debugLog("failed to start CPU profile: %v", err)
+			return
+		}
+		sleep(ctx, duration)
+		stopCPUProfile()
+	case pb.ProfileType_HEAP:
+		if err := writeHeapProfile(&prof); err != nil {
+			debugLog("failed to write heap profile: %v", err)
+			return
+		}
+	default:
+		debugLog("unexpected profile type: %v", pt)
+		return
+	}
+
+	p.ProfileBytes = prof.Bytes()
+	p.Labels = a.deployment.Labels
+	req := pb.UpdateProfileRequest{Profile: p}
+
+	// Upload profile, discard profile in case of error.
+	_, err := a.client.client.UpdateProfile(ctx, &req)
+	if err != nil {
+		debugLog("failed to upload profile: %v", err)
+	}
+}
+
+// client is a client for interacting with Cloud Profiler API.
+type client struct {
+	// gRPC API client.
+	client pb.ProfilerServiceClient
+
+	// Metadata for google API to be sent with each request.
+	xGoogHeader []string
+
+	// Metadata for Cloud Profiler API to be sent with each request.
+	profilerHeader []string
+}
+
+// setProfilerHeader sets the unique key string for a deployment target in
+// the `x-profiler-deployment-key-bin` header passed on each request.
+// Intended for use by Cloud Profiler agents.
+func (c *client) setProfilerHeader(d *pb.Deployment) {
+	labels := make([]string, 0, len(d.Labels))
+	for k, v := range d.Labels {
+		labels = append(labels, fmt.Sprintf("%s|%s", k, v))
+	}
+	sort.Strings(labels)
+	key := d.ProjectId + "##" + d.Target + "##" + strings.Join(labels, "#")
+	c.profilerHeader = []string{key}
+}
+
+// setXGoogHeader sets the name and version of the application in
+// the `x-goog-api-client` header passed on each request. Intended for
+// use by Google-written clients.
+func (c *client) setXGoogHeader(keyval ...string) {
+	kv := append([]string{"gl-go", version.Go(), "gccl", version.Repo}, keyval...)
+	kv = append(kv, "gax", gax.Version, "grpc", grpc.Version)
+	c.xGoogHeader = []string{gax.XGoogHeader(kv...)}
+}
+
+func (c *client) insertMetadata(ctx context.Context) context.Context {
+	md, _ := grpcmd.FromOutgoingContext(ctx)
+	md = md.Copy()
+	md[xGoogAPIMetadata] = c.xGoogHeader
+	md[deploymentKeyMetadata] = c.profilerHeader
+	return grpcmd.NewOutgoingContext(ctx, md)
+}
+
+func initializeDeployment() (*pb.Deployment, error) {
+	var projectID, instance, zone string
+	var err error
+
+	if config.ProjectID != "" {
+		projectID = config.ProjectID
+	} else {
+		projectID, err = getProjectID()
+		if err != nil {
+			return nil, err
+		}
+	}
+
+	if config.InstanceName != "" {
+		instance = config.InstanceName
+	} else {
+		instance, err = getInstanceName()
+		if err != nil {
+			return nil, err
+		}
+	}
+
+	if config.ZoneName != "" {
+		zone = config.ZoneName
+	} else {
+		zone, err = getZone()
+		if err != nil {
+			return nil, err
+		}
+	}
+
+	labels := make(map[string]string)
+	labels[zoneNameLabel] = zone
+	labels[instanceLabel] = instance
+
+	return &pb.Deployment{
+		ProjectId: projectID,
+		Target:    config.Target,
+		Labels:    labels,
+	}, nil
+}
+
+func initializeResources(ctx context.Context, conn *grpc.ClientConn, d *pb.Deployment) (*agent, context.Context) {
+	c := &client{
+		client: pb.NewProfilerServiceClient(conn),
+	}
+	c.setXGoogHeader()
+	c.setProfilerHeader(d)
+
+	ctx = c.insertMetadata(ctx)
+	return &agent{
+		client:     c,
+		deployment: d,
+	}, ctx
+}
+
+func initializeConfig(cfg *Config) {
+	*config = *cfg
+
+	if config.Target == "" {
+		config.Target = "unknown"
+	}
+	if config.APIAddr == "" {
+		config.APIAddr = apiAddress
+	}
+}
+
+// pollProfilerService starts an endless loop to poll Cloud Profiler
+// server for instructions, and collects and uploads profiles as
+// requested.
+func pollProfilerService(ctx context.Context, a *agent) {
+	for {
+		p := a.createProfile(ctx)
+		a.profileAndUpload(ctx, p)
+	}
+}
diff --git a/profiler/profiler_example_test.go b/profiler/profiler_example_test.go
new file mode 100644
index 0000000..f683fb1
--- /dev/null
+++ b/profiler/profiler_example_test.go
@@ -0,0 +1,32 @@
+// Copyright 2017 Google Inc. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package profiler_test
+
+import (
+	"cloud.google.com/go/profiler"
+)
+
+func ExampleStart() {
+	// The caller should provide the target string in the config so Cloud
+	// Profiler knows how to group the profile data. Otherwise the target
+	// string is set to "unknown".
+	//
+	// Optionally DebugLogging can be set in the config to enable detailed
+	// logging from profiler.
+	err := profiler.Start(&profiler.Config{Target: "my-target"})
+	if err != nil {
+		//TODO: Handle error.
+	}
+}
diff --git a/profiler/profiler_test.go b/profiler/profiler_test.go
new file mode 100644
index 0000000..6e76f74
--- /dev/null
+++ b/profiler/profiler_test.go
@@ -0,0 +1,361 @@
+// Copyright 2017 Google Inc. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package profiler
+
+import (
+	"errors"
+	"io"
+	"reflect"
+	"strings"
+	"testing"
+	"time"
+
+	"cloud.google.com/go/profiler/mocks"
+	"github.com/golang/mock/gomock"
+	"github.com/golang/protobuf/proto"
+	"github.com/golang/protobuf/ptypes"
+	gax "github.com/googleapis/gax-go"
+	"golang.org/x/net/context"
+	pb "google.golang.org/genproto/googleapis/devtools/cloudprofiler/v2"
+	edpb "google.golang.org/genproto/googleapis/rpc/errdetails"
+	"google.golang.org/grpc/codes"
+	grpcmd "google.golang.org/grpc/metadata"
+	"google.golang.org/grpc/status"
+)
+
+const (
+	testProjectID    = "test-project-ID"
+	testInstanceName = "test-instance-name"
+	testZoneName     = "test-zone-name"
+	testTarget       = "test-target"
+)
+
+func createTestDeployment() *pb.Deployment {
+	labels := make(map[string]string)
+	labels[zoneNameLabel] = testZoneName
+	labels[instanceLabel] = testInstanceName
+	return &pb.Deployment{
+		ProjectId: testProjectID,
+		Target:    testTarget,
+		Labels:    labels,
+	}
+}
+
+func createTestAgent(psc pb.ProfilerServiceClient) *agent {
+	c := &client{client: psc}
+	a := &agent{
+		client:     c,
+		deployment: createTestDeployment(),
+	}
+	return a
+}
+
+func createTrailers(dur time.Duration) map[string]string {
+	b, _ := proto.Marshal(&edpb.RetryInfo{
+		RetryDelay: ptypes.DurationProto(dur),
+	})
+	return map[string]string{
+		retryInfoMetadata: string(b),
+	}
+}
+
+func TestCreateProfile(t *testing.T) {
+	ctx := context.Background()
+	ctrl := gomock.NewController(t)
+	defer ctrl.Finish()
+	mpc := mocks.NewMockProfilerServiceClient(ctrl)
+	a := createTestAgent(mpc)
+	p := &pb.Profile{Name: "test_profile"}
+	wantRequest := pb.CreateProfileRequest{
+		Deployment:  a.deployment,
+		ProfileType: []pb.ProfileType{pb.ProfileType_CPU, pb.ProfileType_HEAP},
+	}
+
+	mpc.EXPECT().CreateProfile(ctx, gomock.Eq(&wantRequest), gomock.Any()).Times(1).Return(p, nil)
+
+	gotP := a.createProfile(ctx)
+
+	if !reflect.DeepEqual(gotP, p) {
+		t.Errorf("CreateProfile() got wrong profile, got %v, want %v", gotP, p)
+	}
+}
+
+func TestProfileAndUpload(t *testing.T) {
+	ctx := context.Background()
+	ctrl := gomock.NewController(t)
+	defer ctrl.Finish()
+
+	errFunc := func(io.Writer) error { return errors.New("") }
+	testDuration := time.Second * 5
+	tests := []struct {
+		profileType          pb.ProfileType
+		duration             *time.Duration
+		startCPUProfileFunc  func(io.Writer) error
+		writeHeapProfileFunc func(io.Writer) error
+		wantBytes            []byte
+	}{
+		{
+			profileType: pb.ProfileType_CPU,
+			duration:    &testDuration,
+			startCPUProfileFunc: func(w io.Writer) error {
+				w.Write([]byte{1})
+				return nil
+			},
+			writeHeapProfileFunc: errFunc,
+			wantBytes:            []byte{1},
+		},
+		{
+			profileType:          pb.ProfileType_CPU,
+			startCPUProfileFunc:  errFunc,
+			writeHeapProfileFunc: errFunc,
+		},
+		{
+			profileType: pb.ProfileType_CPU,
+			duration:    &testDuration,
+			startCPUProfileFunc: func(w io.Writer) error {
+				w.Write([]byte{2})
+				return nil
+			},
+			writeHeapProfileFunc: func(w io.Writer) error {
+				w.Write([]byte{3})
+				return nil
+			},
+			wantBytes: []byte{2},
+		},
+		{
+			profileType:         pb.ProfileType_HEAP,
+			startCPUProfileFunc: errFunc,
+			writeHeapProfileFunc: func(w io.Writer) error {
+				w.Write([]byte{4})
+				return nil
+			},
+			wantBytes: []byte{4},
+		},
+		{
+			profileType:          pb.ProfileType_HEAP,
+			startCPUProfileFunc:  errFunc,
+			writeHeapProfileFunc: errFunc,
+		},
+		{
+			profileType: pb.ProfileType_HEAP,
+			startCPUProfileFunc: func(w io.Writer) error {
+				w.Write([]byte{5})
+				return nil
+			},
+			writeHeapProfileFunc: func(w io.Writer) error {
+				w.Write([]byte{6})
+				return nil
+			},
+			wantBytes: []byte{6},
+		},
+		{
+			profileType: pb.ProfileType_PROFILE_TYPE_UNSPECIFIED,
+			startCPUProfileFunc: func(w io.Writer) error {
+				w.Write([]byte{7})
+				return nil
+			},
+			writeHeapProfileFunc: func(w io.Writer) error {
+				w.Write([]byte{8})
+				return nil
+			},
+		},
+	}
+
+	for _, tt := range tests {
+		mpc := mocks.NewMockProfilerServiceClient(ctrl)
+		a := createTestAgent(mpc)
+		startCPUProfile = tt.startCPUProfileFunc
+		stopCPUProfile = func() {}
+		writeHeapProfile = tt.writeHeapProfileFunc
+		var gotSleep *time.Duration
+		sleep = func(ctx context.Context, d time.Duration) error {
+			gotSleep = &d
+			return nil
+		}
+		p := &pb.Profile{ProfileType: tt.profileType}
+		if tt.duration != nil {
+			p.Duration = ptypes.DurationProto(*tt.duration)
+		}
+
+		if tt.wantBytes != nil {
+			wantProfile := &pb.Profile{
+				ProfileType: p.ProfileType,
+				Duration:    p.Duration,
+			}
+			wantProfile.Labels = a.deployment.Labels
+			wantProfile.ProfileBytes = tt.wantBytes
+			wantRequest := pb.UpdateProfileRequest{
+				Profile: wantProfile,
+			}
+			mpc.EXPECT().UpdateProfile(ctx, gomock.Eq(&wantRequest)).Times(1)
+		} else {
+			mpc.EXPECT().UpdateProfile(gomock.Any(), gomock.Any()).MaxTimes(0)
+		}
+
+		a.profileAndUpload(ctx, p)
+
+		if tt.duration == nil {
+			if gotSleep != nil {
+				t.Errorf("profileAndUpload(%v) slept for: %v, want no sleep", p, gotSleep)
+			}
+		} else {
+			if gotSleep == nil {
+				t.Errorf("profileAndUpload(%v) didn't sleep, want sleep for: %v", p, tt.duration)
+			} else if *gotSleep != *tt.duration {
+				t.Errorf("profileAndUpload(%v) slept for wrong duration, got: %v, want: %v", p, gotSleep, tt.duration)
+			}
+		}
+	}
+}
+
+func TestRetry(t *testing.T) {
+	normalDuration := time.Second * 3
+	negativeDuration := time.Second * -3
+
+	tests := []struct {
+		trailers  map[string]string
+		wantPause *time.Duration
+	}{
+		{
+			createTrailers(normalDuration),
+			&normalDuration,
+		},
+		{
+			createTrailers(negativeDuration),
+			nil,
+		},
+		{
+			map[string]string{retryInfoMetadata: "wrong format"},
+			nil,
+		},
+		{
+			map[string]string{},
+			nil,
+		},
+	}
+
+	for _, tt := range tests {
+		md := grpcmd.New(tt.trailers)
+		r := &retryer{
+			backoff: gax.Backoff{
+				Initial:    initialBackoff,
+				Max:        maxBackoff,
+				Multiplier: backoffMultiplier,
+			},
+			md: md,
+		}
+
+		pause, shouldRetry := r.Retry(status.Error(codes.Aborted, ""))
+
+		if !shouldRetry {
+			t.Error("retryer.Retry() returned shouldRetry false, want true")
+		}
+
+		if tt.wantPause != nil {
+			if pause != *tt.wantPause {
+				t.Errorf("retryer.Retry() returned wrong pause, got: %v, want: %v", pause, tt.wantPause)
+			}
+		} else {
+			if pause > initialBackoff {
+				t.Errorf("retryer.Retry() returned wrong pause, got: %v, want: < %v", pause, initialBackoff)
+			}
+		}
+	}
+
+	md := grpcmd.New(map[string]string{})
+
+	r := &retryer{
+		backoff: gax.Backoff{
+			Initial:    initialBackoff,
+			Max:        maxBackoff,
+			Multiplier: backoffMultiplier,
+		},
+		md: md,
+	}
+	for i := 0; i < 100; i++ {
+		pause, shouldRetry := r.Retry(errors.New(""))
+		if !shouldRetry {
+			t.Errorf("retryer.Retry() called %v times, returned shouldRetry false, want true", i)
+		}
+		if pause > maxBackoff {
+			t.Errorf("retryer.Retry() called %v times, returned wrong pause, got: %v, want: < %v", i, pause, maxBackoff)
+		}
+	}
+}
+
+func TestInitializeResources(t *testing.T) {
+	d := createTestDeployment()
+	ctx := context.Background()
+
+	a, ctx := initializeResources(ctx, nil, d)
+
+	if xg := a.client.xGoogHeader; len(xg) == 0 {
+		t.Errorf("initializeResources() sets empty xGoogHeader")
+	} else {
+		if !strings.Contains(xg[0], "gl-go/") {
+			t.Errorf("initializeResources() sets wrong xGoogHeader, got: %v, want gl-go key", xg[0])
+		}
+		if !strings.Contains(xg[0], "gccl/") {
+			t.Errorf("initializeResources() sets wrong xGoogHeader, got: %v, want gccl key", xg[0])
+		}
+		if !strings.Contains(xg[0], "gax/") {
+			t.Errorf("initializeResources() sets wrong xGoogHeader, got: %v, want gax key", xg[0])
+		}
+		if !strings.Contains(xg[0], "grpc/") {
+			t.Errorf("initializeResources() sets wrong xGoogHeader, got: %v, want grpc key", xg[0])
+		}
+	}
+
+	wantPH := "test-project-ID##test-target##instance|test-instance-name#zone|test-zone-name"
+	if ph := a.client.profilerHeader; len(ph) == 0 {
+		t.Errorf("initializeResources() sets empty profilerHeader")
+	} else if ph[0] != wantPH {
+		t.Errorf("initializeResources() sets wrong profilerHeader, got: %v, want: %v", ph[0], wantPH)
+	}
+
+	md, _ := grpcmd.FromOutgoingContext(ctx)
+
+	if !reflect.DeepEqual(md[xGoogAPIMetadata], a.client.xGoogHeader) {
+		t.Errorf("md[%v] = %v, want equal xGoogHeader = %v", xGoogAPIMetadata, md[xGoogAPIMetadata], a.client.xGoogHeader)
+	}
+	if !reflect.DeepEqual(md[deploymentKeyMetadata], a.client.profilerHeader) {
+		t.Errorf("md[%v] = %v, want equal profilerHeader = %v", deploymentKeyMetadata, md[deploymentKeyMetadata], a.client.profilerHeader)
+	}
+}
+
+func TestInitializeDeployment(t *testing.T) {
+	getProjectID = func() (string, error) {
+		return testProjectID, nil
+	}
+	getInstanceName = func() (string, error) {
+		return testInstanceName, nil
+	}
+	getZone = func() (string, error) {
+		return testZoneName, nil
+	}
+
+	config = &Config{Target: testTarget}
+	d, err := initializeDeployment()
+
+	if err != nil {
+		t.Errorf("initializeDeployment() got error: %v, want no error", err)
+	}
+
+	want := createTestDeployment()
+
+	if !reflect.DeepEqual(d, want) {
+		t.Errorf("initializeDeployment() got wrong deployment, got: %v, want %v", d, want)
+	}
+}