blob: 16306dce4795e472227daee3b24fd065401e09c1 [file] [log] [blame]
// Copyright 2017 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package profiler is a client for the Google Cloud Profiler service.
//
// This package is still experimental and subject to change.
//
// Calling Start will start a goroutine to collect profiles and
// upload to Cloud Profiler server, at the rhythm specified by
// the server.
//
// The caller should provide the target string in the config so Cloud
// Profiler knows how to group the profile data. Otherwise the target
// string is set to "unknown".
//
// Optionally DebugLogging can be set in the config to enable detailed
// logging from profiler.
//
// Start should only be called once. The first call will start
// the profiling goroutine. Any additional calls will be ignored.
package profiler
import (
"bytes"
"errors"
"fmt"
"log"
"runtime/pprof"
"sort"
"strings"
"sync"
"time"
gcemd "cloud.google.com/go/compute/metadata"
"cloud.google.com/go/internal/version"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
gax "github.com/googleapis/gax-go"
"golang.org/x/net/context"
"golang.org/x/oauth2"
"golang.org/x/oauth2/google"
"google.golang.org/api/option"
"google.golang.org/api/transport"
pb "google.golang.org/genproto/googleapis/devtools/cloudprofiler/v2"
edpb "google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
grpcmd "google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)
var (
config = &Config{}
startOnce sync.Once
// getProjectID, getInstanceName, getZone, startCPUProfile, stopCPUProfile,
// writeHeapProfile and sleep are overrideable for testing.
getProjectID = gcemd.ProjectID
getInstanceName = gcemd.InstanceName
getZone = gcemd.Zone
startCPUProfile = pprof.StartCPUProfile
stopCPUProfile = pprof.StopCPUProfile
writeHeapProfile = pprof.WriteHeapProfile
sleep = gax.Sleep
)
const (
apiAddress = "cloudprofiler.googleapis.com:443"
xGoogAPIMetadata = "x-goog-api-client"
deploymentKeyMetadata = "x-profiler-deployment-key-bin"
zoneNameLabel = "zone"
instanceLabel = "instance"
scope = "https://www.googleapis.com/auth/monitoring.write"
initialBackoff = time.Second
// Ensure the agent will recover within 1 hour.
maxBackoff = time.Hour
backoffMultiplier = 1.3 // Backoff envelope increases by this factor on each retry.
retryInfoMetadata = "google.rpc.retryinfo-bin"
)
// Config is the profiler configuration.
type Config struct {
// Target groups related deployments together, defaults to "unknown".
Target string
// DebugLogging enables detailed debug logging from profiler.
DebugLogging bool
// ProjectID is the ID of the cloud project to use instead of
// the one read from the VM metadata server. Typically for testing.
ProjectID string
// InstanceName is the name of the VM instance to use instead of
// the one read from the VM metadata server. Typically for testing.
InstanceName string
// ZoneName is the name of the zone to use instead of
// the one read from the VM metadata server. Typically for testing.
ZoneName string
// APIAddr is the HTTP endpoint to use to connect to the profiler
// agent API. Defaults to the production environment, overridable
// for testing.
APIAddr string
}
// Start starts a goroutine to collect and upload profiles.
// See package level documentation for details.
func Start(cfg *Config) error {
var err error
startOnce.Do(func() {
initializeConfig(cfg)
ctx := context.Background()
var ts oauth2.TokenSource
ts, err = google.DefaultTokenSource(ctx, scope)
if err != nil {
debugLog("failed to get application default credentials: %v", err)
return
}
opts := []option.ClientOption{
option.WithEndpoint(config.APIAddr),
option.WithTokenSource(ts),
option.WithScopes(scope),
}
var conn *grpc.ClientConn
conn, err = transport.DialGRPC(ctx, opts...)
if err != nil {
debugLog("failed to dial GRPC: %v", err)
return
}
var d *pb.Deployment
d, err = initializeDeployment()
if err != nil {
debugLog("failed to initialize deployment: %v", err)
return
}
a, ctx := initializeResources(ctx, conn, d)
go pollProfilerService(ctx, a)
})
return err
}
func debugLog(format string, e ...interface{}) {
if config.DebugLogging {
log.Printf(format, e...)
}
}
// agent polls Cloud Profiler server for instructions on behalf of
// a task, and collects and uploads profiles as requested.
type agent struct {
client *client
deployment *pb.Deployment
creationErrorCount int64
}
// abortedBackoffDuration retrieves the retry duration from gRPC trailing
// metadata, which is set by Cloud Profiler server.
func abortedBackoffDuration(md grpcmd.MD) (time.Duration, error) {
elem := md[retryInfoMetadata]
if len(elem) <= 0 {
return 0, errors.New("no retry info")
}
var retryInfo edpb.RetryInfo
if err := proto.Unmarshal([]byte(elem[0]), &retryInfo); err != nil {
return 0, err
} else if time, err := ptypes.Duration(retryInfo.RetryDelay); err != nil {
return 0, err
} else {
if time < 0 {
return 0, errors.New("negative retry duration")
}
return time, nil
}
}
type retryer struct {
backoff gax.Backoff
md grpcmd.MD
}
func (r *retryer) Retry(err error) (time.Duration, bool) {
st, _ := status.FromError(err)
if st != nil && st.Code() == codes.Aborted {
dur, err := abortedBackoffDuration(r.md)
if err == nil {
return dur, true
}
debugLog("failed to get backoff duration: %v", err)
}
return r.backoff.Pause(), true
}
// createProfile talks to Cloud Profiler server to create profile. In
// case of error, the goroutine will sleep and retry. Sleep duration may
// be specified by the server. Otherwise it will be an exponentially
// increasing value, bounded by maxBackoff.
func (a *agent) createProfile(ctx context.Context) *pb.Profile {
req := pb.CreateProfileRequest{
Deployment: a.deployment,
ProfileType: []pb.ProfileType{pb.ProfileType_CPU, pb.ProfileType_HEAP},
}
var p *pb.Profile
md := grpcmd.New(map[string]string{})
gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
var err error
p, err = a.client.client.CreateProfile(ctx, &req, grpc.Trailer(&md))
return err
}, gax.WithRetry(func() gax.Retryer {
return &retryer{
backoff: gax.Backoff{
Initial: initialBackoff,
Max: maxBackoff,
Multiplier: backoffMultiplier,
},
md: md,
}
}))
return p
}
func (a *agent) profileAndUpload(ctx context.Context, p *pb.Profile) {
var prof bytes.Buffer
pt := p.GetProfileType()
switch pt {
case pb.ProfileType_CPU:
duration, err := ptypes.Duration(p.Duration)
if err != nil {
debugLog("failed to get profile duration: %v", err)
return
}
if err := startCPUProfile(&prof); err != nil {
debugLog("failed to start CPU profile: %v", err)
return
}
sleep(ctx, duration)
stopCPUProfile()
case pb.ProfileType_HEAP:
if err := writeHeapProfile(&prof); err != nil {
debugLog("failed to write heap profile: %v", err)
return
}
default:
debugLog("unexpected profile type: %v", pt)
return
}
p.ProfileBytes = prof.Bytes()
p.Labels = a.deployment.Labels
req := pb.UpdateProfileRequest{Profile: p}
// Upload profile, discard profile in case of error.
_, err := a.client.client.UpdateProfile(ctx, &req)
if err != nil {
debugLog("failed to upload profile: %v", err)
}
}
// client is a client for interacting with Cloud Profiler API.
type client struct {
// gRPC API client.
client pb.ProfilerServiceClient
// Metadata for google API to be sent with each request.
xGoogHeader []string
// Metadata for Cloud Profiler API to be sent with each request.
profilerHeader []string
}
// setProfilerHeader sets the unique key string for a deployment target in
// the `x-profiler-deployment-key-bin` header passed on each request.
// Intended for use by Cloud Profiler agents.
func (c *client) setProfilerHeader(d *pb.Deployment) {
labels := make([]string, 0, len(d.Labels))
for k, v := range d.Labels {
labels = append(labels, fmt.Sprintf("%s|%s", k, v))
}
sort.Strings(labels)
key := d.ProjectId + "##" + d.Target + "##" + strings.Join(labels, "#")
c.profilerHeader = []string{key}
}
// setXGoogHeader sets the name and version of the application in
// the `x-goog-api-client` header passed on each request. Intended for
// use by Google-written clients.
func (c *client) setXGoogHeader(keyval ...string) {
kv := append([]string{"gl-go", version.Go(), "gccl", version.Repo}, keyval...)
kv = append(kv, "gax", gax.Version, "grpc", grpc.Version)
c.xGoogHeader = []string{gax.XGoogHeader(kv...)}
}
func (c *client) insertMetadata(ctx context.Context) context.Context {
md, _ := grpcmd.FromOutgoingContext(ctx)
md = md.Copy()
md[xGoogAPIMetadata] = c.xGoogHeader
md[deploymentKeyMetadata] = c.profilerHeader
return grpcmd.NewOutgoingContext(ctx, md)
}
func initializeDeployment() (*pb.Deployment, error) {
var projectID, instance, zone string
var err error
if config.ProjectID != "" {
projectID = config.ProjectID
} else {
projectID, err = getProjectID()
if err != nil {
return nil, err
}
}
if config.InstanceName != "" {
instance = config.InstanceName
} else {
instance, err = getInstanceName()
if err != nil {
return nil, err
}
}
if config.ZoneName != "" {
zone = config.ZoneName
} else {
zone, err = getZone()
if err != nil {
return nil, err
}
}
labels := make(map[string]string)
labels[zoneNameLabel] = zone
labels[instanceLabel] = instance
return &pb.Deployment{
ProjectId: projectID,
Target: config.Target,
Labels: labels,
}, nil
}
func initializeResources(ctx context.Context, conn *grpc.ClientConn, d *pb.Deployment) (*agent, context.Context) {
c := &client{
client: pb.NewProfilerServiceClient(conn),
}
c.setXGoogHeader()
c.setProfilerHeader(d)
ctx = c.insertMetadata(ctx)
return &agent{
client: c,
deployment: d,
}, ctx
}
func initializeConfig(cfg *Config) {
*config = *cfg
if config.Target == "" {
config.Target = "unknown"
}
if config.APIAddr == "" {
config.APIAddr = apiAddress
}
}
// pollProfilerService starts an endless loop to poll Cloud Profiler
// server for instructions, and collects and uploads profiles as
// requested.
func pollProfilerService(ctx context.Context, a *agent) {
for {
p := a.createProfile(ctx)
a.profileAndUpload(ctx, p)
}
}