feat(pubsub): add opencensus metrics for outstanding messages/bytes (#3690)

* feat(pubsub): add opencensus metrics for outstanding messages/bytes

* fix exported variable comment
diff --git a/pubsub/flow_controller.go b/pubsub/flow_controller.go
index 3f165a0..c3c0a9e 100644
--- a/pubsub/flow_controller.go
+++ b/pubsub/flow_controller.go
@@ -31,6 +31,8 @@
 	// small releases.
 	// Atomic.
 	countRemaining int64
+	// Number of outstanding bytes remaining. Atomic.
+	bytesRemaining int64
 }
 
 // newFlowController creates a new flowController that ensures no more than
@@ -72,7 +74,11 @@
 			return err
 		}
 	}
-	atomic.AddInt64(&f.countRemaining, 1)
+	outstandingMessages := atomic.AddInt64(&f.countRemaining, 1)
+	recordStat(ctx, OutstandingMessages, outstandingMessages)
+	outstandingBytes := atomic.AddInt64(&f.bytesRemaining, f.bound(size))
+	recordStat(ctx, OutstandingBytes, outstandingBytes)
+
 	return nil
 }
 
@@ -81,7 +87,7 @@
 //
 // tryAcquire allows large messages to proceed by treating a size greater than
 // maxSize as if it were equal to maxSize.
-func (f *flowController) tryAcquire(size int) bool {
+func (f *flowController) tryAcquire(ctx context.Context, size int) bool {
 	if f.semCount != nil {
 		if !f.semCount.TryAcquire(1) {
 			return false
@@ -95,13 +101,21 @@
 			return false
 		}
 	}
-	atomic.AddInt64(&f.countRemaining, 1)
+	outstandingMessages := atomic.AddInt64(&f.countRemaining, 1)
+	recordStat(ctx, OutstandingMessages, outstandingMessages)
+	outstandingBytes := atomic.AddInt64(&f.bytesRemaining, f.bound(size))
+	recordStat(ctx, OutstandingBytes, outstandingBytes)
+
 	return true
 }
 
 // release notes that one message of size bytes is no longer outstanding.
-func (f *flowController) release(size int) {
-	atomic.AddInt64(&f.countRemaining, -1)
+func (f *flowController) release(ctx context.Context, size int) {
+	outstandingMessages := atomic.AddInt64(&f.countRemaining, -1)
+	recordStat(ctx, OutstandingMessages, outstandingMessages)
+	outstandingBytes := atomic.AddInt64(&f.bytesRemaining, -1*f.bound(size))
+	recordStat(ctx, OutstandingBytes, outstandingBytes)
+
 	if f.semCount != nil {
 		f.semCount.Release(1)
 	}
diff --git a/pubsub/flow_controller_test.go b/pubsub/flow_controller_test.go
index 71c41cb..765577a 100644
--- a/pubsub/flow_controller_test.go
+++ b/pubsub/flow_controller_test.go
@@ -41,7 +41,7 @@
 	// Control: a context that is not done should always return nil.
 	go func() {
 		time.Sleep(5 * time.Millisecond)
-		fc.release(5)
+		fc.release(ctx, 5)
 	}()
 	if err := fc.acquire(context.Background(), 6); err != nil {
 		t.Errorf("got %v, expected nil", err)
@@ -79,7 +79,7 @@
 				case first <- 1:
 				default:
 				}
-				fc.release(1)
+				fc.release(ctx, 1)
 			}
 		}()
 	}
@@ -162,7 +162,7 @@
 					if atomic.AddInt64(&curSize, -int64(test.acquireSize)) < 0 {
 						return errors.New("negative size")
 					}
-					fc.release(test.acquireSize)
+					fc.release(ctx, test.acquireSize)
 				}
 				return success
 			})
@@ -177,19 +177,20 @@
 func TestFlowControllerTryAcquire(t *testing.T) {
 	t.Parallel()
 	fc := newFlowController(3, 10)
+	ctx := context.Background()
 
 	// Successfully tryAcquire 4 bytes.
-	if !fc.tryAcquire(4) {
+	if !fc.tryAcquire(ctx, 4) {
 		t.Error("got false, wanted true")
 	}
 
 	// Fail to tryAcquire 7 bytes.
-	if fc.tryAcquire(7) {
+	if fc.tryAcquire(ctx, 7) {
 		t.Error("got true, wanted false")
 	}
 
 	// Successfully tryAcquire 6 byte.
-	if !fc.tryAcquire(6) {
+	if !fc.tryAcquire(ctx, 6) {
 		t.Error("got false, wanted true")
 	}
 }
@@ -205,12 +206,12 @@
 	}
 
 	// Successfully tryAcquire 4 bytes.
-	if !fc.tryAcquire(4) {
+	if !fc.tryAcquire(ctx, 4) {
 		t.Error("got false, wanted true")
 	}
 
 	// Fail to tryAcquire 3 bytes.
-	if fc.tryAcquire(3) {
+	if fc.tryAcquire(ctx, 3) {
 		t.Error("got true, wanted false")
 	}
 }
@@ -223,9 +224,9 @@
 	if err := fc.acquire(ctx, 4); err != nil {
 		t.Errorf("got %v, wanted no error", err)
 	}
-	fc.release(1)
-	fc.release(1)
-	fc.release(1)
+	fc.release(ctx, 1)
+	fc.release(ctx, 1)
+	fc.release(ctx, 1)
 	wantCount := int64(-2)
 	c := int64(fc.count())
 	if c != wantCount {
@@ -244,12 +245,12 @@
 	}
 
 	// Successfully tryAcquire 4GB bytes.
-	if !fc.tryAcquire(4e9) {
+	if !fc.tryAcquire(ctx, 4e9) {
 		t.Error("got false, wanted true")
 	}
 
 	// Fail to tryAcquire a third message.
-	if fc.tryAcquire(3) {
+	if fc.tryAcquire(ctx, 3) {
 		t.Error("got true, wanted false")
 	}
 }
diff --git a/pubsub/subscription.go b/pubsub/subscription.go
index d1dee3c..0c79b31 100644
--- a/pubsub/subscription.go
+++ b/pubsub/subscription.go
@@ -920,7 +920,7 @@
 					old := ackh.doneFunc
 					msgLen := len(msg.Data)
 					ackh.doneFunc = func(ackID string, ack bool, receiveTime time.Time) {
-						defer fc.release(msgLen)
+						defer fc.release(ctx, msgLen)
 						old(ackID, ack, receiveTime)
 					}
 					wg.Add(1)
@@ -957,7 +957,7 @@
 }
 
 type pullOptions struct {
-	maxExtension       time.Duration // the maximum time to extend a message's ack deadline in tota
+	maxExtension       time.Duration // the maximum time to extend a message's ack deadline in total
 	maxExtensionPeriod time.Duration // the maximum time to extend a message's ack deadline per modack rpc
 	maxPrefetch        int32
 	// If true, use unary Pull instead of StreamingPull, and never pull more
diff --git a/pubsub/trace.go b/pubsub/trace.go
index cb33172..84cab3c 100644
--- a/pubsub/trace.go
+++ b/pubsub/trace.go
@@ -84,6 +84,14 @@
 	// StreamResponseCount is a measure of the number of responses received on a streaming-pull stream.
 	// It is EXPERIMENTAL and subject to change or removal without notice.
 	StreamResponseCount = stats.Int64(statsPrefix+"stream_response_count", "Number of gRPC StreamingPull response messages received", stats.UnitDimensionless)
+
+	// OutstandingMessages is a measure of the number of outstanding messages held by the client before they are processed.
+	// It is EXPERIMENTAL and subject to change or removal without notice.
+	OutstandingMessages = stats.Int64(statsPrefix+"outstanding_messages", "Number of outstanding Pub/Sub messages", stats.UnitDimensionless)
+
+	// OutstandingBytes is a measure of the number of bytes all outstanding messages held by the client take up.
+	// It is EXPERIMENTAL and subject to change or removal without notice.
+	OutstandingBytes = stats.Int64(statsPrefix+"outstanding_bytes", "Number of outstanding bytes", stats.UnitDimensionless)
 )
 
 var (
@@ -130,6 +138,14 @@
 	// StreamResponseCountView is a cumulative sum of StreamResponseCount.
 	// It is EXPERIMENTAL and subject to change or removal without notice.
 	StreamResponseCountView *view.View
+
+	// OutstandingMessagesView is the last value of OutstandingMessages
+	// It is EXPERIMENTAL and subject to change or removal without notice.
+	OutstandingMessagesView *view.View
+
+	// OutstandingBytesView is the last value of OutstandingBytes
+	// It is EXPERIMENTAL and subject to change or removal without notice.
+	OutstandingBytesView *view.View
 )
 
 func init() {
@@ -144,6 +160,8 @@
 	StreamRetryCountView = createCountView(StreamRetryCount, keySubscription)
 	StreamRequestCountView = createCountView(StreamRequestCount, keySubscription)
 	StreamResponseCountView = createCountView(StreamResponseCount, keySubscription)
+	OutstandingMessagesView = createLastValueView(OutstandingMessages, keySubscription)
+	OutstandingBytesView = createLastValueView(OutstandingBytes, keySubscription)
 
 	DefaultPublishViews = []*view.View{
 		PublishedMessagesView,
@@ -160,6 +178,8 @@
 		StreamRetryCountView,
 		StreamRequestCountView,
 		StreamResponseCountView,
+		OutstandingMessagesView,
+		OutstandingBytesView,
 	}
 }
 
@@ -190,6 +210,16 @@
 	}
 }
 
+func createLastValueView(m stats.Measure, keys ...tag.Key) *view.View {
+	return &view.View{
+		Name:        m.Name(),
+		Description: m.Description(),
+		TagKeys:     keys,
+		Measure:     m,
+		Aggregation: view.LastValue(),
+	}
+}
+
 var logOnce sync.Once
 
 // withSubscriptionKey returns a new context modified with the subscriptionKey tag map.