instrument pubsub ack and mod-ack-deadline calls

Change-Id: I84c37f34b41099efad6407034f4882e175229731
diff --git a/pubsub/service.go b/pubsub/service.go
index 328fe48..0b5cccd 100644
--- a/pubsub/service.go
+++ b/pubsub/service.go
@@ -17,6 +17,7 @@
 import (
 	"fmt"
 	"io"
+	"log"
 	"math"
 	"sync"
 	"time"
@@ -24,6 +25,7 @@
 	"cloud.google.com/go/iam"
 	"cloud.google.com/go/internal/version"
 	vkit "cloud.google.com/go/pubsub/apiv1"
+	"github.com/DataDog/datadog-go/statsd"
 	"golang.org/x/net/context"
 	"google.golang.org/api/option"
 	pb "google.golang.org/genproto/googleapis/pubsub/v1"
@@ -31,6 +33,8 @@
 	"google.golang.org/grpc/codes"
 )
 
+const sampleRate = 1.0
+
 type nextStringFunc func() (string, error)
 
 // service provides an internal abstraction to isolate the generated
@@ -72,8 +76,9 @@
 }
 
 type apiService struct {
-	pubc *vkit.PublisherClient
-	subc *vkit.SubscriberClient
+	pubc  *vkit.PublisherClient
+	subc  *vkit.SubscriberClient
+	statc *statsd.Client
 }
 
 func newPubSubService(ctx context.Context, opts []option.ClientOption) (*apiService, error) {
@@ -88,7 +93,14 @@
 	}
 	pubc.SetGoogleClientInfo("gccl", version.Repo)
 	subc.SetGoogleClientInfo("gccl", version.Repo)
-	return &apiService{pubc: pubc, subc: subc}, nil
+	statc, err := statsd.NewBuffered("127.0.0.1:8125", 1000)
+	if err != nil {
+		_ = pubc.Close()
+		_ = subc.Close()
+		return nil, err
+	}
+	statc.Namespace = "gopubsub"
+	return &apiService{pubc: pubc, subc: subc, statc: statc}, nil
 }
 
 func (s *apiService) close() error {
@@ -205,11 +217,25 @@
 }
 
 func (s *apiService) modifyAckDeadline(ctx context.Context, subName string, deadline time.Duration, ackIDs []string) error {
-	return s.subc.ModifyAckDeadline(ctx, &pb.ModifyAckDeadlineRequest{
+	start := time.Now()
+	err := s.subc.ModifyAckDeadline(ctx, &pb.ModifyAckDeadlineRequest{
 		Subscription:       subName,
 		AckIds:             ackIDs,
 		AckDeadlineSeconds: trunc32(int64(deadline.Seconds())),
 	})
+	elapsed := time.Since(start)
+	s.count("rpc.mod_ack_deadline.count", 1)
+	if deadline == 0 {
+		s.count("rpc.mod_ack_deadline.zeroes", 1)
+	}
+	s.histogram("rpc.mod_ack_deadline.time", elapsed.Seconds())
+	if err != nil {
+		s.count("rpc.mod_ack_deadline.errs", 1)
+	} else {
+		s.count("rpc.mod_ack_deadline.ids", len(ackIDs))
+	}
+	return err
+
 }
 
 // maxPayload is the maximum number of bytes to devote to actual ids in
@@ -242,10 +268,32 @@
 }
 
 func (s *apiService) acknowledge(ctx context.Context, subName string, ackIDs []string) error {
-	return s.subc.Acknowledge(ctx, &pb.AcknowledgeRequest{
+	start := time.Now()
+	err := s.subc.Acknowledge(ctx, &pb.AcknowledgeRequest{
 		Subscription: subName,
 		AckIds:       ackIDs,
 	})
+	elapsed := time.Since(start)
+	s.count("rpc.ack.count", 1)
+	s.histogram("rpc.ack.time", elapsed.Seconds())
+	if err != nil {
+		s.count("rpc.ack.errs", 1)
+	} else {
+		s.count("rpc.ack.ids", len(ackIDs))
+	}
+	return err
+}
+
+func (s *apiService) count(name string, val int) {
+	if err := s.statc.Count(name, val, nil, sampleRate); err != nil {
+		log.Printf("statsd error: %v", err)
+	}
+}
+
+func (s *apiService) histogram(name string, val float64) {
+	if err := s.statc.Histogram(name, val, nil, sampleRate); err != nil {
+		log.Printf("statsd error: %v", err)
+	}
 }
 
 func (s *apiService) fetchMessages(ctx context.Context, subName string, maxMessages int32) ([]*Message, error) {