blob: e79102164099ef47755c44a3cd992ff6d6d93a86 [file] [log] [blame]
// Copyright 2017 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package profiler
import (
"bytes"
"compress/gzip"
"context"
"errors"
"fmt"
"io"
"log"
"math/rand"
"os"
"runtime"
"strings"
"sync"
"testing"
"time"
gcemd "cloud.google.com/go/compute/metadata"
"cloud.google.com/go/internal/testutil"
"cloud.google.com/go/profiler/mocks"
"cloud.google.com/go/profiler/testdata"
"github.com/golang/mock/gomock"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"github.com/google/pprof/profile"
gax "github.com/googleapis/gax-go/v2"
gtransport "google.golang.org/api/transport/grpc"
pb "google.golang.org/genproto/googleapis/devtools/cloudprofiler/v2"
edpb "google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc/codes"
grpcmd "google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)
const (
testProjectID = "test-project-ID"
testInstance = "test-instance"
testZone = "test-zone"
testService = "test-service"
testSvcVersion = "test-service-version"
testProfileDuration = time.Second * 10
testServerTimeout = time.Second * 15
)
func createTestDeployment() *pb.Deployment {
labels := map[string]string{
zoneNameLabel: testZone,
versionLabel: testSvcVersion,
}
return &pb.Deployment{
ProjectId: testProjectID,
Target: testService,
Labels: labels,
}
}
func createTestAgent(psc pb.ProfilerServiceClient) *agent {
return &agent{
client: psc,
deployment: createTestDeployment(),
profileLabels: map[string]string{instanceLabel: testInstance},
profileTypes: []pb.ProfileType{pb.ProfileType_CPU, pb.ProfileType_HEAP, pb.ProfileType_THREADS},
}
}
func createTrailers(dur time.Duration) map[string]string {
b, _ := proto.Marshal(&edpb.RetryInfo{
RetryDelay: ptypes.DurationProto(dur),
})
return map[string]string{
retryInfoMetadata: string(b),
}
}
func TestCreateProfile(t *testing.T) {
ctx := context.Background()
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mpc := mocks.NewMockProfilerServiceClient(ctrl)
a := createTestAgent(mpc)
p := &pb.Profile{Name: "test_profile"}
wantRequest := pb.CreateProfileRequest{
Parent: "projects/" + a.deployment.ProjectId,
Deployment: a.deployment,
ProfileType: a.profileTypes,
}
mpc.EXPECT().CreateProfile(ctx, gomock.Eq(&wantRequest), gomock.Any()).Times(1).Return(p, nil)
gotP := a.createProfile(ctx)
if !testutil.Equal(gotP, p) {
t.Errorf("CreateProfile() got wrong profile, got %v, want %v", gotP, p)
}
}
func TestProfileAndUpload(t *testing.T) {
oldStartCPUProfile, oldStopCPUProfile, oldWriteHeapProfile, oldSleep := startCPUProfile, stopCPUProfile, writeHeapProfile, sleep
defer func() {
startCPUProfile, stopCPUProfile, writeHeapProfile, sleep = oldStartCPUProfile, oldStopCPUProfile, oldWriteHeapProfile, oldSleep
}()
ctx := context.Background()
ctrl := gomock.NewController(t)
defer ctrl.Finish()
var heapCollected1, heapCollected2, heapUploaded, allocUploaded bytes.Buffer
testdata.HeapProfileCollected1.Write(&heapCollected1)
testdata.HeapProfileCollected2.Write(&heapCollected2)
testdata.HeapProfileUploaded.Write(&heapUploaded)
testdata.AllocProfileUploaded.Write(&allocUploaded)
callCount := 0
writeTwoHeapFunc := func(w io.Writer) error {
callCount++
if callCount%2 == 1 {
w.Write(heapCollected1.Bytes())
return nil
}
w.Write(heapCollected2.Bytes())
return nil
}
errFunc := func(io.Writer) error { return errors.New("") }
testDuration := time.Second * 5
tests := []struct {
profileType pb.ProfileType
duration *time.Duration
startCPUProfileFunc func(io.Writer) error
writeHeapProfileFunc func(io.Writer) error
wantBytes []byte
}{
{
profileType: pb.ProfileType_CPU,
duration: &testDuration,
startCPUProfileFunc: func(w io.Writer) error {
w.Write([]byte{1})
return nil
},
writeHeapProfileFunc: errFunc,
wantBytes: []byte{1},
},
{
profileType: pb.ProfileType_CPU,
startCPUProfileFunc: errFunc,
writeHeapProfileFunc: errFunc,
},
{
profileType: pb.ProfileType_CPU,
duration: &testDuration,
startCPUProfileFunc: func(w io.Writer) error {
w.Write([]byte{2})
return nil
},
writeHeapProfileFunc: func(w io.Writer) error {
w.Write([]byte{3})
return nil
},
wantBytes: []byte{2},
},
{
profileType: pb.ProfileType_HEAP,
startCPUProfileFunc: errFunc,
writeHeapProfileFunc: func(w io.Writer) error {
w.Write(heapCollected1.Bytes())
return nil
},
wantBytes: heapUploaded.Bytes(),
},
{
profileType: pb.ProfileType_HEAP_ALLOC,
startCPUProfileFunc: errFunc,
writeHeapProfileFunc: writeTwoHeapFunc,
duration: &testDuration,
wantBytes: allocUploaded.Bytes(),
},
{
profileType: pb.ProfileType_HEAP,
startCPUProfileFunc: errFunc,
writeHeapProfileFunc: errFunc,
},
{
profileType: pb.ProfileType_HEAP,
startCPUProfileFunc: func(w io.Writer) error {
w.Write([]byte{5})
return nil
},
writeHeapProfileFunc: func(w io.Writer) error {
w.Write(heapCollected1.Bytes())
return nil
},
wantBytes: heapUploaded.Bytes(),
},
{
profileType: pb.ProfileType_PROFILE_TYPE_UNSPECIFIED,
startCPUProfileFunc: func(w io.Writer) error {
w.Write([]byte{7})
return nil
},
writeHeapProfileFunc: func(w io.Writer) error {
w.Write(heapCollected1.Bytes())
return nil
},
},
}
for _, tt := range tests {
mpc := mocks.NewMockProfilerServiceClient(ctrl)
a := createTestAgent(mpc)
startCPUProfile = tt.startCPUProfileFunc
stopCPUProfile = func() {}
writeHeapProfile = tt.writeHeapProfileFunc
var gotSleep *time.Duration
sleep = func(ctx context.Context, d time.Duration) error {
gotSleep = &d
return nil
}
p := &pb.Profile{ProfileType: tt.profileType}
if tt.duration != nil {
p.Duration = ptypes.DurationProto(*tt.duration)
}
if tt.wantBytes != nil {
wantProfile := &pb.Profile{
ProfileType: p.ProfileType,
Duration: p.Duration,
ProfileBytes: tt.wantBytes,
Labels: a.profileLabels,
}
wantRequest := pb.UpdateProfileRequest{
Profile: wantProfile,
}
mpc.EXPECT().UpdateProfile(ctx, gomock.Eq(&wantRequest)).Times(1)
} else {
mpc.EXPECT().UpdateProfile(gomock.Any(), gomock.Any()).MaxTimes(0)
}
a.profileAndUpload(ctx, p)
if tt.duration == nil {
if gotSleep != nil {
t.Errorf("profileAndUpload(%v) slept for: %v, want no sleep", p, gotSleep)
}
} else {
if gotSleep == nil {
t.Errorf("profileAndUpload(%v) didn't sleep, want sleep for: %v", p, tt.duration)
} else if *gotSleep != *tt.duration {
t.Errorf("profileAndUpload(%v) slept for wrong duration, got: %v, want: %v", p, gotSleep, tt.duration)
}
}
}
}
func TestRetry(t *testing.T) {
normalDuration := time.Second * 3
negativeDuration := time.Second * -3
tests := []struct {
trailers map[string]string
wantPause *time.Duration
}{
{
createTrailers(normalDuration),
&normalDuration,
},
{
createTrailers(negativeDuration),
nil,
},
{
map[string]string{retryInfoMetadata: "wrong format"},
nil,
},
{
map[string]string{},
nil,
},
}
for _, tt := range tests {
md := grpcmd.New(tt.trailers)
r := &retryer{
backoff: gax.Backoff{
Initial: initialBackoff,
Max: maxBackoff,
Multiplier: backoffMultiplier,
},
md: md,
}
pause, shouldRetry := r.Retry(status.Error(codes.Aborted, ""))
if !shouldRetry {
t.Error("retryer.Retry() returned shouldRetry false, want true")
}
if tt.wantPause != nil {
if pause != *tt.wantPause {
t.Errorf("retryer.Retry() returned wrong pause, got: %v, want: %v", pause, tt.wantPause)
}
} else {
if pause > initialBackoff {
t.Errorf("retryer.Retry() returned wrong pause, got: %v, want: < %v", pause, initialBackoff)
}
}
}
md := grpcmd.New(map[string]string{})
r := &retryer{
backoff: gax.Backoff{
Initial: initialBackoff,
Max: maxBackoff,
Multiplier: backoffMultiplier,
},
md: md,
}
for i := 0; i < 100; i++ {
pause, shouldRetry := r.Retry(errors.New(""))
if !shouldRetry {
t.Errorf("retryer.Retry() called %v times, returned shouldRetry false, want true", i)
}
if pause > maxBackoff {
t.Errorf("retryer.Retry() called %v times, returned wrong pause, got: %v, want: < %v", i, pause, maxBackoff)
}
}
}
func TestWithXGoogHeader(t *testing.T) {
ctx := withXGoogHeader(context.Background())
md, _ := grpcmd.FromOutgoingContext(ctx)
if xg := md[xGoogAPIMetadata]; len(xg) == 0 {
t.Errorf("withXGoogHeader() sets empty xGoogHeader")
} else {
if !strings.Contains(xg[0], "gl-go/") {
t.Errorf("withXGoogHeader() got: %v, want gl-go key", xg[0])
}
if !strings.Contains(xg[0], "gccl/") {
t.Errorf("withXGoogHeader() got: %v, want gccl key", xg[0])
}
if !strings.Contains(xg[0], "gax/") {
t.Errorf("withXGoogHeader() got: %v, want gax key", xg[0])
}
if !strings.Contains(xg[0], "grpc/") {
t.Errorf("withXGoogHeader() got: %v, want grpc key", xg[0])
}
}
}
func TestInitializeAgent(t *testing.T) {
oldConfig, oldMutexEnabled := config, mutexEnabled
defer func() {
config, mutexEnabled = oldConfig, oldMutexEnabled
}()
for _, tt := range []struct {
config Config
enableMutex bool
wantProfileTypes []pb.ProfileType
wantDeploymentLabels map[string]string
wantProfileLabels map[string]string
}{
{
config: Config{ServiceVersion: testSvcVersion, Zone: testZone},
wantProfileTypes: []pb.ProfileType{pb.ProfileType_CPU, pb.ProfileType_HEAP, pb.ProfileType_THREADS, pb.ProfileType_HEAP_ALLOC},
wantDeploymentLabels: map[string]string{zoneNameLabel: testZone, versionLabel: testSvcVersion, languageLabel: "go"},
wantProfileLabels: map[string]string{},
},
{
config: Config{Zone: testZone},
wantProfileTypes: []pb.ProfileType{pb.ProfileType_CPU, pb.ProfileType_HEAP, pb.ProfileType_THREADS, pb.ProfileType_HEAP_ALLOC},
wantDeploymentLabels: map[string]string{zoneNameLabel: testZone, languageLabel: "go"},
wantProfileLabels: map[string]string{},
},
{
config: Config{ServiceVersion: testSvcVersion},
wantProfileTypes: []pb.ProfileType{pb.ProfileType_CPU, pb.ProfileType_HEAP, pb.ProfileType_THREADS, pb.ProfileType_HEAP_ALLOC},
wantDeploymentLabels: map[string]string{versionLabel: testSvcVersion, languageLabel: "go"},
wantProfileLabels: map[string]string{},
},
{
config: Config{Instance: testInstance},
wantProfileTypes: []pb.ProfileType{pb.ProfileType_CPU, pb.ProfileType_HEAP, pb.ProfileType_THREADS, pb.ProfileType_HEAP_ALLOC},
wantDeploymentLabels: map[string]string{languageLabel: "go"},
wantProfileLabels: map[string]string{instanceLabel: testInstance},
},
{
config: Config{Instance: testInstance},
enableMutex: true,
wantProfileTypes: []pb.ProfileType{pb.ProfileType_CPU, pb.ProfileType_HEAP, pb.ProfileType_THREADS, pb.ProfileType_HEAP_ALLOC, pb.ProfileType_CONTENTION},
wantDeploymentLabels: map[string]string{languageLabel: "go"},
wantProfileLabels: map[string]string{instanceLabel: testInstance},
},
{
config: Config{NoHeapProfiling: true},
wantProfileTypes: []pb.ProfileType{pb.ProfileType_CPU, pb.ProfileType_THREADS, pb.ProfileType_HEAP_ALLOC},
wantDeploymentLabels: map[string]string{languageLabel: "go"},
wantProfileLabels: map[string]string{},
},
{
config: Config{NoHeapProfiling: true, NoGoroutineProfiling: true, NoAllocProfiling: true},
wantProfileTypes: []pb.ProfileType{pb.ProfileType_CPU},
wantDeploymentLabels: map[string]string{languageLabel: "go"},
wantProfileLabels: map[string]string{},
},
} {
config = tt.config
config.ProjectID = testProjectID
config.Service = testService
mutexEnabled = tt.enableMutex
a := initializeAgent(nil)
wantDeployment := &pb.Deployment{
ProjectId: testProjectID,
Target: testService,
Labels: tt.wantDeploymentLabels,
}
if !testutil.Equal(a.deployment, wantDeployment) {
t.Errorf("initializeAgent() got deployment: %v, want %v", a.deployment, wantDeployment)
}
if !testutil.Equal(a.profileLabels, tt.wantProfileLabels) {
t.Errorf("initializeAgent() got profile labels: %v, want %v", a.profileLabels, tt.wantProfileLabels)
}
if !testutil.Equal(a.profileTypes, tt.wantProfileTypes) {
t.Errorf("initializeAgent() got profile types: %v, want %v", a.profileTypes, tt.wantProfileTypes)
}
}
}
func TestInitializeConfig(t *testing.T) {
oldConfig, oldGAEService, oldGAEVersion, oldKnativeService, oldKnativeVersion, oldEnvProjectID, oldGetProjectID, oldGetInstanceName, oldGetZone, oldOnGCE := config, os.Getenv("GAE_SERVICE"), os.Getenv("GAE_VERSION"), os.Getenv("K_SERVICE"), os.Getenv("K_REVISION"), os.Getenv("GOOGLE_CLOUD_PROJECT"), getProjectID, getInstanceName, getZone, onGCE
defer func() {
config, getProjectID, getInstanceName, getZone, onGCE = oldConfig, oldGetProjectID, oldGetInstanceName, oldGetZone, oldOnGCE
if err := os.Setenv("GAE_SERVICE", oldGAEService); err != nil {
t.Fatal(err)
}
if err := os.Setenv("GAE_VERSION", oldGAEVersion); err != nil {
t.Fatal(err)
}
if err := os.Setenv("K_SERVICE", oldKnativeService); err != nil {
t.Fatal(err)
}
if err := os.Setenv("K_REVISION", oldKnativeVersion); err != nil {
t.Fatal(err)
}
if err := os.Setenv("GOOGLE_CLOUD_PROJECT", oldEnvProjectID); err != nil {
t.Fatal(err)
}
}()
const (
testGAEService = "test-gae-service"
testGAEVersion = "test-gae-version"
testKnativeService = "test-knative-service"
testKnativeVersion = "test-knative-version"
testGCEProjectID = "test-gce-project-id"
testEnvProjectID = "test-env-project-id"
)
for _, tt := range []struct {
desc string
config Config
wantConfig Config
wantErrorString string
onGAE bool
onKnative bool
onGCE bool
envProjectID bool
}{
{
"accepts service name",
Config{Service: testService},
Config{Service: testService, ProjectID: testGCEProjectID, Zone: testZone, Instance: testInstance},
"",
false,
false,
true,
false,
},
{
"env project overrides GCE project",
Config{Service: testService},
Config{Service: testService, ProjectID: testEnvProjectID, Zone: testZone, Instance: testInstance},
"",
false,
false,
true,
true,
},
{
"requires service name",
Config{},
Config{},
"service name must be configured",
false,
false,
true,
false,
},
{
"requires valid service name",
Config{Service: "Service"},
Config{Service: "Service"},
"service name \"Service\" does not match regular expression ^[a-z]([-a-z0-9_.]{0,253}[a-z0-9])?$",
false,
false,
true,
false,
},
{
"accepts service name from config and service version from GAE",
Config{Service: testService},
Config{Service: testService, ServiceVersion: testGAEVersion, ProjectID: testGCEProjectID, Zone: testZone, Instance: testInstance},
"",
true,
false,
true,
false,
},
{
"reads both service name and version from GAE env vars",
Config{},
Config{Service: testGAEService, ServiceVersion: testGAEVersion, ProjectID: testGCEProjectID, Zone: testZone, Instance: testInstance},
"",
true,
false,
true,
false,
},
{
"reads both service name and version from Knative env vars",
Config{},
Config{Service: testKnativeService, ServiceVersion: testKnativeVersion, ProjectID: testGCEProjectID, Zone: testZone, Instance: testInstance},
"",
false,
true,
true,
false,
},
{
"accepts service version from config",
Config{Service: testService, ServiceVersion: testSvcVersion},
Config{Service: testService, ServiceVersion: testSvcVersion, ProjectID: testGCEProjectID, Zone: testZone, Instance: testInstance},
"",
false,
false,
true,
false,
},
{
"configured version has priority over GAE-provided version",
Config{Service: testService, ServiceVersion: testSvcVersion},
Config{Service: testService, ServiceVersion: testSvcVersion, ProjectID: testGCEProjectID, Zone: testZone, Instance: testInstance},
"",
true,
false,
true,
false,
},
{
"configured version has priority over Knative-provided version",
Config{Service: testService, ServiceVersion: testSvcVersion},
Config{Service: testService, ServiceVersion: testSvcVersion, ProjectID: testGCEProjectID, Zone: testZone, Instance: testInstance},
"",
false,
true,
true,
false,
},
{
"GAE version has priority over Knative-provided version",
Config{},
Config{Service: testGAEService, ServiceVersion: testGAEVersion, ProjectID: testGCEProjectID, Zone: testZone, Instance: testInstance},
"",
true,
true,
true,
false,
},
{
"configured project ID has priority over metadata-provided project ID",
Config{Service: testService, ProjectID: testProjectID},
Config{Service: testService, ProjectID: testProjectID, Zone: testZone, Instance: testInstance},
"",
false,
false,
true,
false,
},
{
"configured project ID has priority over environment project ID",
Config{Service: testService, ProjectID: testProjectID},
Config{Service: testService, ProjectID: testProjectID},
"",
false,
false,
false,
true,
},
{
"requires project ID if not on GCE",
Config{Service: testService},
Config{Service: testService},
"project ID must be specified in the configuration if running outside of GCP",
false,
false,
false,
false,
},
{
"configured zone has priority over metadata-provided zone",
Config{Service: testService, ProjectID: testProjectID, Zone: testZone + "-override"},
Config{Service: testService, ProjectID: testProjectID, Zone: testZone + "-override", Instance: testInstance},
"",
false,
false,
true,
false,
},
{
"configured instance has priority over metadata-provided instance",
Config{Service: testService, ProjectID: testProjectID, Instance: testInstance + "-override"},
Config{Service: testService, ProjectID: testProjectID, Zone: testZone, Instance: testInstance + "-override"},
"",
false,
false,
true,
false,
},
} {
t.Logf("Running test: %s", tt.desc)
gaeEnvService, gaeEnvVersion := "", ""
if tt.onGAE {
gaeEnvService, gaeEnvVersion = testGAEService, testGAEVersion
}
if err := os.Setenv("GAE_SERVICE", gaeEnvService); err != nil {
t.Fatal(err)
}
if err := os.Setenv("GAE_VERSION", gaeEnvVersion); err != nil {
t.Fatal(err)
}
knEnvService, knEnvVersion := "", ""
if tt.onKnative {
knEnvService, knEnvVersion = testKnativeService, testKnativeVersion
}
if err := os.Setenv("K_SERVICE", knEnvService); err != nil {
t.Fatal(err)
}
if err := os.Setenv("K_REVISION", knEnvVersion); err != nil {
t.Fatal(err)
}
if tt.onGCE {
onGCE = func() bool { return true }
getProjectID = func() (string, error) { return testGCEProjectID, nil }
getZone = func() (string, error) { return testZone, nil }
getInstanceName = func() (string, error) { return testInstance, nil }
} else {
onGCE = func() bool { return false }
getProjectID = func() (string, error) { return "", fmt.Errorf("test get project id error") }
getZone = func() (string, error) { return "", fmt.Errorf("test get zone error") }
getInstanceName = func() (string, error) { return "", fmt.Errorf("test get instance error") }
}
envProjectID := ""
if tt.envProjectID {
envProjectID = testEnvProjectID
}
if err := os.Setenv("GOOGLE_CLOUD_PROJECT", envProjectID); err != nil {
t.Fatal(err)
}
errorString := ""
if err := initializeConfig(tt.config); err != nil {
errorString = err.Error()
}
if !strings.Contains(errorString, tt.wantErrorString) {
t.Errorf("initializeConfig(%v) got error: %v, want contain %v", tt.config, errorString, tt.wantErrorString)
}
if tt.wantErrorString == "" {
tt.wantConfig.APIAddr = apiAddress
}
if config != tt.wantConfig {
t.Errorf("initializeConfig(%v) got: %v, want %v", tt.config, config, tt.wantConfig)
}
}
for _, tt := range []struct {
desc string
wantErr bool
getProjectIDError error
getZoneError error
getInstanceError error
}{
{
desc: "metadata returns error for project ID",
wantErr: true,
getProjectIDError: errors.New("fake get project ID error"),
},
{
desc: "metadata returns error for zone",
wantErr: true,
getZoneError: errors.New("fake get zone error"),
},
{
desc: "metadata returns error for instance",
wantErr: true,
getInstanceError: errors.New("fake get instance error"),
},
{
desc: "metadata returns NotDefinedError for instance",
getInstanceError: gcemd.NotDefinedError("fake GCE metadata NotDefinedError error"),
},
} {
onGCE = func() bool { return true }
getProjectID = func() (string, error) { return testGCEProjectID, tt.getProjectIDError }
getZone = func() (string, error) { return testZone, tt.getZoneError }
getInstanceName = func() (string, error) { return testInstance, tt.getInstanceError }
if err := initializeConfig(Config{Service: testService}); (err != nil) != tt.wantErr {
t.Errorf("%s: initializeConfig() got error: %v, want error %t", tt.desc, err, tt.wantErr)
}
}
}
type fakeProfilerServer struct {
count int
gotProfiles map[string][]byte
done chan bool
}
func (fs *fakeProfilerServer) CreateProfile(ctx context.Context, in *pb.CreateProfileRequest) (*pb.Profile, error) {
fs.count++
switch fs.count {
case 1:
return &pb.Profile{Name: "testCPU", ProfileType: pb.ProfileType_CPU, Duration: ptypes.DurationProto(testProfileDuration)}, nil
case 2:
return &pb.Profile{Name: "testHeap", ProfileType: pb.ProfileType_HEAP}, nil
default:
select {}
}
}
func (fs *fakeProfilerServer) UpdateProfile(ctx context.Context, in *pb.UpdateProfileRequest) (*pb.Profile, error) {
switch in.Profile.ProfileType {
case pb.ProfileType_CPU:
fs.gotProfiles["CPU"] = in.Profile.ProfileBytes
case pb.ProfileType_HEAP:
fs.gotProfiles["HEAP"] = in.Profile.ProfileBytes
fs.done <- true
}
return in.Profile, nil
}
func (fs *fakeProfilerServer) CreateOfflineProfile(_ context.Context, _ *pb.CreateOfflineProfileRequest) (*pb.Profile, error) {
return nil, status.Error(codes.Unimplemented, "")
}
func profileeLoop(quit chan bool) {
for {
select {
case <-quit:
return
default:
profileeWork()
}
}
}
func profileeWork() {
data := make([]byte, 10*1024*1024)
rand.Read(data)
var b bytes.Buffer
gz := gzip.NewWriter(&b)
if _, err := gz.Write(data); err != nil {
log.Println("failed to write to gzip stream", err)
return
}
if err := gz.Flush(); err != nil {
log.Println("failed to flush to gzip stream", err)
return
}
if err := gz.Close(); err != nil {
log.Println("failed to close gzip stream", err)
}
}
func validateProfile(rawData []byte, wantFunctionName string) error {
p, err := profile.ParseData(rawData)
if err != nil {
return fmt.Errorf("ParseData failed: %v", err)
}
if len(p.Sample) == 0 {
return fmt.Errorf("profile contains zero samples: %v", p)
}
if len(p.Location) == 0 {
return fmt.Errorf("profile contains zero locations: %v", p)
}
if len(p.Function) == 0 {
return fmt.Errorf("profile contains zero functions: %v", p)
}
for _, l := range p.Location {
if len(l.Line) > 0 && l.Line[0].Function != nil && strings.Contains(l.Line[0].Function.Name, wantFunctionName) {
return nil
}
}
return fmt.Errorf("wanted function name %s not found in the profile", wantFunctionName)
}
func TestDeltaMutexProfile(t *testing.T) {
oldMutexEnabled, oldMaxProcs := mutexEnabled, runtime.GOMAXPROCS(10)
defer func() {
mutexEnabled = oldMutexEnabled
runtime.GOMAXPROCS(oldMaxProcs)
}()
if mutexEnabled = enableMutexProfiling(); !mutexEnabled {
t.Skip("Go too old - mutex profiling not supported.")
}
hog(time.Second, mutexHog)
go func() {
hog(2*time.Second, backgroundHog)
}()
var prof bytes.Buffer
if err := deltaMutexProfile(context.Background(), time.Second, &prof); err != nil {
t.Fatalf("deltaMutexProfile() got error: %v", err)
}
p, err := profile.Parse(&prof)
if err != nil {
t.Fatalf("profile.Parse() got error: %v", err)
}
if s := sum(p, "mutexHog"); s != 0 {
t.Errorf("mutexHog found in the delta mutex profile (sum=%d):\n%s", s, p)
}
if s := sum(p, "backgroundHog"); s <= 0 {
t.Errorf("backgroundHog not in the delta mutex profile (sum=%d):\n%s", s, p)
}
}
// sum returns the sum of all mutex counts from the samples whose
// stacks include the specified function name.
func sum(p *profile.Profile, fname string) int64 {
locIDs := map[*profile.Location]bool{}
for _, loc := range p.Location {
for _, l := range loc.Line {
if strings.Contains(l.Function.Name, fname) {
locIDs[loc] = true
break
}
}
}
var s int64
for _, sample := range p.Sample {
for _, loc := range sample.Location {
if locIDs[loc] {
s += sample.Value[0]
break
}
}
}
return s
}
func mutexHog(mu1, mu2 *sync.Mutex, start time.Time, dt time.Duration) {
for time.Since(start) < dt {
mu1.Lock()
runtime.Gosched()
mu2.Lock()
mu1.Unlock()
mu2.Unlock()
}
}
// backgroundHog is identical to mutexHog. We keep them separate
// in order to distinguish them with function names in the stack trace.
func backgroundHog(mu1, mu2 *sync.Mutex, start time.Time, dt time.Duration) {
for time.Since(start) < dt {
mu1.Lock()
runtime.Gosched()
mu2.Lock()
mu1.Unlock()
mu2.Unlock()
}
}
func hog(dt time.Duration, hogger func(mu1, mu2 *sync.Mutex, start time.Time, dt time.Duration)) {
start := time.Now()
mu1 := new(sync.Mutex)
mu2 := new(sync.Mutex)
var wg sync.WaitGroup
wg.Add(10)
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
hogger(mu1, mu2, start, dt)
}()
}
wg.Wait()
}
func TestAgentWithServer(t *testing.T) {
oldDialGRPC, oldConfig := dialGRPC, config
defer func() {
dialGRPC, config = oldDialGRPC, oldConfig
}()
srv, err := testutil.NewServer()
if err != nil {
t.Fatalf("testutil.NewServer(): %v", err)
}
fakeServer := &fakeProfilerServer{gotProfiles: map[string][]byte{}, done: make(chan bool)}
pb.RegisterProfilerServiceServer(srv.Gsrv, fakeServer)
srv.Start()
dialGRPC = gtransport.DialInsecure
if err := Start(Config{
Service: testService,
ProjectID: testProjectID,
APIAddr: srv.Addr,
Instance: testInstance,
Zone: testZone,
}); err != nil {
t.Fatalf("Start(): %v", err)
}
quitProfilee := make(chan bool)
go profileeLoop(quitProfilee)
select {
case <-fakeServer.done:
case <-time.After(testServerTimeout):
t.Errorf("got timeout after %v, want fake server done", testServerTimeout)
}
quitProfilee <- true
for _, pType := range []string{"CPU", "HEAP"} {
if profile, ok := fakeServer.gotProfiles[pType]; !ok {
t.Errorf("fakeServer.gotProfiles[%s] got no profile, want profile", pType)
} else if err := validateProfile(profile, "profilee"); err != nil {
t.Errorf("validateProfile(%s) got error: %v", pType, err)
}
}
}