instrument pubsub ack and mod-ack-deadline calls
Change-Id: I84c37f34b41099efad6407034f4882e175229731
diff --git a/pubsub/service.go b/pubsub/service.go
index 328fe48..0b5cccd 100644
--- a/pubsub/service.go
+++ b/pubsub/service.go
@@ -17,6 +17,7 @@
import (
"fmt"
"io"
+ "log"
"math"
"sync"
"time"
@@ -24,6 +25,7 @@
"cloud.google.com/go/iam"
"cloud.google.com/go/internal/version"
vkit "cloud.google.com/go/pubsub/apiv1"
+ "github.com/DataDog/datadog-go/statsd"
"golang.org/x/net/context"
"google.golang.org/api/option"
pb "google.golang.org/genproto/googleapis/pubsub/v1"
@@ -31,6 +33,8 @@
"google.golang.org/grpc/codes"
)
+const sampleRate = 1.0
+
type nextStringFunc func() (string, error)
// service provides an internal abstraction to isolate the generated
@@ -72,8 +76,9 @@
}
type apiService struct {
- pubc *vkit.PublisherClient
- subc *vkit.SubscriberClient
+ pubc *vkit.PublisherClient
+ subc *vkit.SubscriberClient
+ statc *statsd.Client
}
func newPubSubService(ctx context.Context, opts []option.ClientOption) (*apiService, error) {
@@ -88,7 +93,14 @@
}
pubc.SetGoogleClientInfo("gccl", version.Repo)
subc.SetGoogleClientInfo("gccl", version.Repo)
- return &apiService{pubc: pubc, subc: subc}, nil
+ statc, err := statsd.NewBuffered("127.0.0.1:8125", 1000)
+ if err != nil {
+ _ = pubc.Close()
+ _ = subc.Close()
+ return nil, err
+ }
+ statc.Namespace = "gopubsub"
+ return &apiService{pubc: pubc, subc: subc, statc: statc}, nil
}
func (s *apiService) close() error {
@@ -205,11 +217,25 @@
}
func (s *apiService) modifyAckDeadline(ctx context.Context, subName string, deadline time.Duration, ackIDs []string) error {
- return s.subc.ModifyAckDeadline(ctx, &pb.ModifyAckDeadlineRequest{
+ start := time.Now()
+ err := s.subc.ModifyAckDeadline(ctx, &pb.ModifyAckDeadlineRequest{
Subscription: subName,
AckIds: ackIDs,
AckDeadlineSeconds: trunc32(int64(deadline.Seconds())),
})
+ elapsed := time.Since(start)
+ s.count("rpc.mod_ack_deadline.count", 1)
+ if deadline == 0 {
+ s.count("rpc.mod_ack_deadline.zeroes", 1)
+ }
+ s.histogram("rpc.mod_ack_deadline.time", elapsed.Seconds())
+ if err != nil {
+ s.count("rpc.mod_ack_deadline.errs", 1)
+ } else {
+ s.count("rpc.mod_ack_deadline.ids", len(ackIDs))
+ }
+ return err
+
}
// maxPayload is the maximum number of bytes to devote to actual ids in
@@ -242,10 +268,32 @@
}
func (s *apiService) acknowledge(ctx context.Context, subName string, ackIDs []string) error {
- return s.subc.Acknowledge(ctx, &pb.AcknowledgeRequest{
+ start := time.Now()
+ err := s.subc.Acknowledge(ctx, &pb.AcknowledgeRequest{
Subscription: subName,
AckIds: ackIDs,
})
+ elapsed := time.Since(start)
+ s.count("rpc.ack.count", 1)
+ s.histogram("rpc.ack.time", elapsed.Seconds())
+ if err != nil {
+ s.count("rpc.ack.errs", 1)
+ } else {
+ s.count("rpc.ack.ids", len(ackIDs))
+ }
+ return err
+}
+
+func (s *apiService) count(name string, val int) {
+ if err := s.statc.Count(name, val, nil, sampleRate); err != nil {
+ log.Printf("statsd error: %v", err)
+ }
+}
+
+func (s *apiService) histogram(name string, val float64) {
+ if err := s.statc.Histogram(name, val, nil, sampleRate); err != nil {
+ log.Printf("statsd error: %v", err)
+ }
}
func (s *apiService) fetchMessages(ctx context.Context, subName string, maxMessages int32) ([]*Message, error) {