feat(pubsub): add opencensus metrics for outstanding messages/bytes (#3690)
* feat(pubsub): add opencensus metrics for outstanding messages/bytes
* fix exported variable comment
diff --git a/pubsub/flow_controller.go b/pubsub/flow_controller.go
index 3f165a0..c3c0a9e 100644
--- a/pubsub/flow_controller.go
+++ b/pubsub/flow_controller.go
@@ -31,6 +31,8 @@
// small releases.
// Atomic.
countRemaining int64
+ // Number of outstanding bytes remaining. Atomic.
+ bytesRemaining int64
}
// newFlowController creates a new flowController that ensures no more than
@@ -72,7 +74,11 @@
return err
}
}
- atomic.AddInt64(&f.countRemaining, 1)
+ outstandingMessages := atomic.AddInt64(&f.countRemaining, 1)
+ recordStat(ctx, OutstandingMessages, outstandingMessages)
+ outstandingBytes := atomic.AddInt64(&f.bytesRemaining, f.bound(size))
+ recordStat(ctx, OutstandingBytes, outstandingBytes)
+
return nil
}
@@ -81,7 +87,7 @@
//
// tryAcquire allows large messages to proceed by treating a size greater than
// maxSize as if it were equal to maxSize.
-func (f *flowController) tryAcquire(size int) bool {
+func (f *flowController) tryAcquire(ctx context.Context, size int) bool {
if f.semCount != nil {
if !f.semCount.TryAcquire(1) {
return false
@@ -95,13 +101,21 @@
return false
}
}
- atomic.AddInt64(&f.countRemaining, 1)
+ outstandingMessages := atomic.AddInt64(&f.countRemaining, 1)
+ recordStat(ctx, OutstandingMessages, outstandingMessages)
+ outstandingBytes := atomic.AddInt64(&f.bytesRemaining, f.bound(size))
+ recordStat(ctx, OutstandingBytes, outstandingBytes)
+
return true
}
// release notes that one message of size bytes is no longer outstanding.
-func (f *flowController) release(size int) {
- atomic.AddInt64(&f.countRemaining, -1)
+func (f *flowController) release(ctx context.Context, size int) {
+ outstandingMessages := atomic.AddInt64(&f.countRemaining, -1)
+ recordStat(ctx, OutstandingMessages, outstandingMessages)
+ outstandingBytes := atomic.AddInt64(&f.bytesRemaining, -1*f.bound(size))
+ recordStat(ctx, OutstandingBytes, outstandingBytes)
+
if f.semCount != nil {
f.semCount.Release(1)
}
diff --git a/pubsub/flow_controller_test.go b/pubsub/flow_controller_test.go
index 71c41cb..765577a 100644
--- a/pubsub/flow_controller_test.go
+++ b/pubsub/flow_controller_test.go
@@ -41,7 +41,7 @@
// Control: a context that is not done should always return nil.
go func() {
time.Sleep(5 * time.Millisecond)
- fc.release(5)
+ fc.release(ctx, 5)
}()
if err := fc.acquire(context.Background(), 6); err != nil {
t.Errorf("got %v, expected nil", err)
@@ -79,7 +79,7 @@
case first <- 1:
default:
}
- fc.release(1)
+ fc.release(ctx, 1)
}
}()
}
@@ -162,7 +162,7 @@
if atomic.AddInt64(&curSize, -int64(test.acquireSize)) < 0 {
return errors.New("negative size")
}
- fc.release(test.acquireSize)
+ fc.release(ctx, test.acquireSize)
}
return success
})
@@ -177,19 +177,20 @@
func TestFlowControllerTryAcquire(t *testing.T) {
t.Parallel()
fc := newFlowController(3, 10)
+ ctx := context.Background()
// Successfully tryAcquire 4 bytes.
- if !fc.tryAcquire(4) {
+ if !fc.tryAcquire(ctx, 4) {
t.Error("got false, wanted true")
}
// Fail to tryAcquire 7 bytes.
- if fc.tryAcquire(7) {
+ if fc.tryAcquire(ctx, 7) {
t.Error("got true, wanted false")
}
// Successfully tryAcquire 6 byte.
- if !fc.tryAcquire(6) {
+ if !fc.tryAcquire(ctx, 6) {
t.Error("got false, wanted true")
}
}
@@ -205,12 +206,12 @@
}
// Successfully tryAcquire 4 bytes.
- if !fc.tryAcquire(4) {
+ if !fc.tryAcquire(ctx, 4) {
t.Error("got false, wanted true")
}
// Fail to tryAcquire 3 bytes.
- if fc.tryAcquire(3) {
+ if fc.tryAcquire(ctx, 3) {
t.Error("got true, wanted false")
}
}
@@ -223,9 +224,9 @@
if err := fc.acquire(ctx, 4); err != nil {
t.Errorf("got %v, wanted no error", err)
}
- fc.release(1)
- fc.release(1)
- fc.release(1)
+ fc.release(ctx, 1)
+ fc.release(ctx, 1)
+ fc.release(ctx, 1)
wantCount := int64(-2)
c := int64(fc.count())
if c != wantCount {
@@ -244,12 +245,12 @@
}
// Successfully tryAcquire 4GB bytes.
- if !fc.tryAcquire(4e9) {
+ if !fc.tryAcquire(ctx, 4e9) {
t.Error("got false, wanted true")
}
// Fail to tryAcquire a third message.
- if fc.tryAcquire(3) {
+ if fc.tryAcquire(ctx, 3) {
t.Error("got true, wanted false")
}
}
diff --git a/pubsub/subscription.go b/pubsub/subscription.go
index d1dee3c..0c79b31 100644
--- a/pubsub/subscription.go
+++ b/pubsub/subscription.go
@@ -920,7 +920,7 @@
old := ackh.doneFunc
msgLen := len(msg.Data)
ackh.doneFunc = func(ackID string, ack bool, receiveTime time.Time) {
- defer fc.release(msgLen)
+ defer fc.release(ctx, msgLen)
old(ackID, ack, receiveTime)
}
wg.Add(1)
@@ -957,7 +957,7 @@
}
type pullOptions struct {
- maxExtension time.Duration // the maximum time to extend a message's ack deadline in tota
+ maxExtension time.Duration // the maximum time to extend a message's ack deadline in total
maxExtensionPeriod time.Duration // the maximum time to extend a message's ack deadline per modack rpc
maxPrefetch int32
// If true, use unary Pull instead of StreamingPull, and never pull more
diff --git a/pubsub/trace.go b/pubsub/trace.go
index cb33172..84cab3c 100644
--- a/pubsub/trace.go
+++ b/pubsub/trace.go
@@ -84,6 +84,14 @@
// StreamResponseCount is a measure of the number of responses received on a streaming-pull stream.
// It is EXPERIMENTAL and subject to change or removal without notice.
StreamResponseCount = stats.Int64(statsPrefix+"stream_response_count", "Number of gRPC StreamingPull response messages received", stats.UnitDimensionless)
+
+ // OutstandingMessages is a measure of the number of outstanding messages held by the client before they are processed.
+ // It is EXPERIMENTAL and subject to change or removal without notice.
+ OutstandingMessages = stats.Int64(statsPrefix+"outstanding_messages", "Number of outstanding Pub/Sub messages", stats.UnitDimensionless)
+
+ // OutstandingBytes is a measure of the number of bytes all outstanding messages held by the client take up.
+ // It is EXPERIMENTAL and subject to change or removal without notice.
+ OutstandingBytes = stats.Int64(statsPrefix+"outstanding_bytes", "Number of outstanding bytes", stats.UnitDimensionless)
)
var (
@@ -130,6 +138,14 @@
// StreamResponseCountView is a cumulative sum of StreamResponseCount.
// It is EXPERIMENTAL and subject to change or removal without notice.
StreamResponseCountView *view.View
+
+ // OutstandingMessagesView is the last value of OutstandingMessages
+ // It is EXPERIMENTAL and subject to change or removal without notice.
+ OutstandingMessagesView *view.View
+
+ // OutstandingBytesView is the last value of OutstandingBytes
+ // It is EXPERIMENTAL and subject to change or removal without notice.
+ OutstandingBytesView *view.View
)
func init() {
@@ -144,6 +160,8 @@
StreamRetryCountView = createCountView(StreamRetryCount, keySubscription)
StreamRequestCountView = createCountView(StreamRequestCount, keySubscription)
StreamResponseCountView = createCountView(StreamResponseCount, keySubscription)
+ OutstandingMessagesView = createLastValueView(OutstandingMessages, keySubscription)
+ OutstandingBytesView = createLastValueView(OutstandingBytes, keySubscription)
DefaultPublishViews = []*view.View{
PublishedMessagesView,
@@ -160,6 +178,8 @@
StreamRetryCountView,
StreamRequestCountView,
StreamResponseCountView,
+ OutstandingMessagesView,
+ OutstandingBytesView,
}
}
@@ -190,6 +210,16 @@
}
}
+func createLastValueView(m stats.Measure, keys ...tag.Key) *view.View {
+ return &view.View{
+ Name: m.Name(),
+ Description: m.Description(),
+ TagKeys: keys,
+ Measure: m,
+ Aggregation: view.LastValue(),
+ }
+}
+
var logOnce sync.Once
// withSubscriptionKey returns a new context modified with the subscriptionKey tag map.