blob: 75cb78d08090673b534334b5ef4526ffc7431579 [file] [log] [blame] [edit]
// Copyright 2018 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pubsub
import (
"context"
"log"
"sync"
"cloud.google.com/go/pubsub/internal"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
otelcodes "go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/propagation"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"go.opentelemetry.io/otel/trace"
pb "google.golang.org/genproto/googleapis/pubsub/v1"
"google.golang.org/protobuf/proto"
)
// The following keys are used to tag requests with a specific topic/subscription ID.
var (
keyTopic = tag.MustNewKey("topic")
keySubscription = tag.MustNewKey("subscription")
)
// In the following, errors are used if status is not "OK".
var (
keyStatus = tag.MustNewKey("status")
keyError = tag.MustNewKey("error")
)
const statsPrefix = "cloud.google.com/go/pubsub/"
// The following are measures recorded in publish/subscribe flows.
var (
// PublishedMessages is a measure of the number of messages published, which may include errors.
// It is EXPERIMENTAL and subject to change or removal without notice.
PublishedMessages = stats.Int64(statsPrefix+"published_messages", "Number of PubSub message published", stats.UnitDimensionless)
// PublishLatency is a measure of the number of milliseconds it took to publish a bundle,
// which may consist of one or more messages.
// It is EXPERIMENTAL and subject to change or removal without notice.
PublishLatency = stats.Float64(statsPrefix+"publish_roundtrip_latency", "The latency in milliseconds per publish batch", stats.UnitMilliseconds)
// PullCount is a measure of the number of messages pulled.
// It is EXPERIMENTAL and subject to change or removal without notice.
PullCount = stats.Int64(statsPrefix+"pull_count", "Number of PubSub messages pulled", stats.UnitDimensionless)
// AckCount is a measure of the number of messages acked.
// It is EXPERIMENTAL and subject to change or removal without notice.
AckCount = stats.Int64(statsPrefix+"ack_count", "Number of PubSub messages acked", stats.UnitDimensionless)
// NackCount is a measure of the number of messages nacked.
// It is EXPERIMENTAL and subject to change or removal without notice.
NackCount = stats.Int64(statsPrefix+"nack_count", "Number of PubSub messages nacked", stats.UnitDimensionless)
// ModAckCount is a measure of the number of messages whose ack-deadline was modified.
// It is EXPERIMENTAL and subject to change or removal without notice.
ModAckCount = stats.Int64(statsPrefix+"mod_ack_count", "Number of ack-deadlines modified", stats.UnitDimensionless)
// ModAckTimeoutCount is a measure of the number ModifyAckDeadline RPCs that timed out.
// It is EXPERIMENTAL and subject to change or removal without notice.
ModAckTimeoutCount = stats.Int64(statsPrefix+"mod_ack_timeout_count", "Number of ModifyAckDeadline RPCs that timed out", stats.UnitDimensionless)
// StreamOpenCount is a measure of the number of times a streaming-pull stream was opened.
// It is EXPERIMENTAL and subject to change or removal without notice.
StreamOpenCount = stats.Int64(statsPrefix+"stream_open_count", "Number of calls opening a new streaming pull", stats.UnitDimensionless)
// StreamRetryCount is a measure of the number of times a streaming-pull operation was retried.
// It is EXPERIMENTAL and subject to change or removal without notice.
StreamRetryCount = stats.Int64(statsPrefix+"stream_retry_count", "Number of retries of a stream send or receive", stats.UnitDimensionless)
// StreamRequestCount is a measure of the number of requests sent on a streaming-pull stream.
// It is EXPERIMENTAL and subject to change or removal without notice.
StreamRequestCount = stats.Int64(statsPrefix+"stream_request_count", "Number gRPC StreamingPull request messages sent", stats.UnitDimensionless)
// StreamResponseCount is a measure of the number of responses received on a streaming-pull stream.
// It is EXPERIMENTAL and subject to change or removal without notice.
StreamResponseCount = stats.Int64(statsPrefix+"stream_response_count", "Number of gRPC StreamingPull response messages received", stats.UnitDimensionless)
// OutstandingMessages is a measure of the number of outstanding messages held by the client before they are processed.
// It is EXPERIMENTAL and subject to change or removal without notice.
OutstandingMessages = stats.Int64(statsPrefix+"outstanding_messages", "Number of outstanding Pub/Sub messages", stats.UnitDimensionless)
// OutstandingBytes is a measure of the number of bytes all outstanding messages held by the client take up.
// It is EXPERIMENTAL and subject to change or removal without notice.
OutstandingBytes = stats.Int64(statsPrefix+"outstanding_bytes", "Number of outstanding bytes", stats.UnitDimensionless)
// PublisherOutstandingMessages is a measure of the number of published outstanding messages held by the client before they are processed.
// It is EXPERIMENTAL and subject to change or removal without notice.
PublisherOutstandingMessages = stats.Int64(statsPrefix+"publisher_outstanding_messages", "Number of outstanding publish messages", stats.UnitDimensionless)
// PublisherOutstandingBytes is a measure of the number of bytes all outstanding publish messages held by the client take up.
// It is EXPERIMENTAL and subject to change or removal without notice.
PublisherOutstandingBytes = stats.Int64(statsPrefix+"publisher_outstanding_bytes", "Number of outstanding publish bytes", stats.UnitDimensionless)
)
var (
// PublishedMessagesView is a cumulative sum of PublishedMessages.
// It is EXPERIMENTAL and subject to change or removal without notice.
PublishedMessagesView *view.View
// PublishLatencyView is a distribution of PublishLatency.
// It is EXPERIMENTAL and subject to change or removal without notice.
PublishLatencyView *view.View
// PullCountView is a cumulative sum of PullCount.
// It is EXPERIMENTAL and subject to change or removal without notice.
PullCountView *view.View
// AckCountView is a cumulative sum of AckCount.
// It is EXPERIMENTAL and subject to change or removal without notice.
AckCountView *view.View
// NackCountView is a cumulative sum of NackCount.
// It is EXPERIMENTAL and subject to change or removal without notice.
NackCountView *view.View
// ModAckCountView is a cumulative sum of ModAckCount.
// It is EXPERIMENTAL and subject to change or removal without notice.
ModAckCountView *view.View
// ModAckTimeoutCountView is a cumulative sum of ModAckTimeoutCount.
// It is EXPERIMENTAL and subject to change or removal without notice.
ModAckTimeoutCountView *view.View
// StreamOpenCountView is a cumulative sum of StreamOpenCount.
// It is EXPERIMENTAL and subject to change or removal without notice.
StreamOpenCountView *view.View
// StreamRetryCountView is a cumulative sum of StreamRetryCount.
// It is EXPERIMENTAL and subject to change or removal without notice.
StreamRetryCountView *view.View
// StreamRequestCountView is a cumulative sum of StreamRequestCount.
// It is EXPERIMENTAL and subject to change or removal without notice.
StreamRequestCountView *view.View
// StreamResponseCountView is a cumulative sum of StreamResponseCount.
// It is EXPERIMENTAL and subject to change or removal without notice.
StreamResponseCountView *view.View
// OutstandingMessagesView is the last value of OutstandingMessages
// It is EXPERIMENTAL and subject to change or removal without notice.
OutstandingMessagesView *view.View
// OutstandingBytesView is the last value of OutstandingBytes
// It is EXPERIMENTAL and subject to change or removal without notice.
OutstandingBytesView *view.View
// PublisherOutstandingMessagesView is the last value of OutstandingMessages
// It is EXPERIMENTAL and subject to change or removal without notice.
PublisherOutstandingMessagesView *view.View
// PublisherOutstandingBytesView is the last value of OutstandingBytes
// It is EXPERIMENTAL and subject to change or removal without notice.
PublisherOutstandingBytesView *view.View
)
func init() {
PublishedMessagesView = createCountView(stats.Measure(PublishedMessages), keyTopic, keyStatus, keyError)
PublishLatencyView = createDistView(PublishLatency, keyTopic, keyStatus, keyError)
PublisherOutstandingMessagesView = createLastValueView(PublisherOutstandingMessages, keyTopic)
PublisherOutstandingBytesView = createLastValueView(PublisherOutstandingBytes, keyTopic)
PullCountView = createCountView(PullCount, keySubscription)
AckCountView = createCountView(AckCount, keySubscription)
NackCountView = createCountView(NackCount, keySubscription)
ModAckCountView = createCountView(ModAckCount, keySubscription)
ModAckTimeoutCountView = createCountView(ModAckTimeoutCount, keySubscription)
StreamOpenCountView = createCountView(StreamOpenCount, keySubscription)
StreamRetryCountView = createCountView(StreamRetryCount, keySubscription)
StreamRequestCountView = createCountView(StreamRequestCount, keySubscription)
StreamResponseCountView = createCountView(StreamResponseCount, keySubscription)
OutstandingMessagesView = createLastValueView(OutstandingMessages, keySubscription)
OutstandingBytesView = createLastValueView(OutstandingBytes, keySubscription)
DefaultPublishViews = []*view.View{
PublishedMessagesView,
PublishLatencyView,
PublisherOutstandingMessagesView,
PublisherOutstandingBytesView,
}
DefaultSubscribeViews = []*view.View{
PullCountView,
AckCountView,
NackCountView,
ModAckCountView,
ModAckTimeoutCountView,
StreamOpenCountView,
StreamRetryCountView,
StreamRequestCountView,
StreamResponseCountView,
OutstandingMessagesView,
OutstandingBytesView,
}
}
// These arrays hold the default OpenCensus views that keep track of publish/subscribe operations.
// It is EXPERIMENTAL and subject to change or removal without notice.
var (
DefaultPublishViews []*view.View
DefaultSubscribeViews []*view.View
)
func createCountView(m stats.Measure, keys ...tag.Key) *view.View {
return &view.View{
Name: m.Name(),
Description: m.Description(),
TagKeys: keys,
Measure: m,
Aggregation: view.Sum(),
}
}
func createDistView(m stats.Measure, keys ...tag.Key) *view.View {
return &view.View{
Name: m.Name(),
Description: m.Description(),
TagKeys: keys,
Measure: m,
Aggregation: view.Distribution(0, 25, 50, 75, 100, 200, 400, 600, 800, 1000, 2000, 4000, 6000),
}
}
func createLastValueView(m stats.Measure, keys ...tag.Key) *view.View {
return &view.View{
Name: m.Name(),
Description: m.Description(),
TagKeys: keys,
Measure: m,
Aggregation: view.LastValue(),
}
}
var logOnce sync.Once
// withSubscriptionKey returns a new context modified with the subscriptionKey tag map.
func withSubscriptionKey(ctx context.Context, subName string) context.Context {
ctx, err := tag.New(ctx, tag.Upsert(keySubscription, subName))
if err != nil {
logOnce.Do(func() {
log.Printf("pubsub: error creating tag map for 'subscribe' key: %v", err)
})
}
return ctx
}
func recordStat(ctx context.Context, m *stats.Int64Measure, n int64) {
stats.Record(ctx, m.M(n))
}
const defaultTracerName = "cloud.google.com/go/pubsub"
func tracer() trace.Tracer {
return otel.Tracer(defaultTracerName, trace.WithInstrumentationVersion(internal.Version))
}
var _ propagation.TextMapCarrier = (*PubsubMessageCarrier)(nil)
// PubsubMessageCarrier injects and extracts traces from a pubsub.Message.
type PubsubMessageCarrier struct {
msg *Message
}
// NewPubsubMessageCarrier creates a new PubsubMessageCarrier.
func NewPubsubMessageCarrier(msg *Message) PubsubMessageCarrier {
return PubsubMessageCarrier{msg: msg}
}
// Get retrieves a single value for a given key.
func (c PubsubMessageCarrier) Get(key string) string {
return c.msg.Attributes["googclient_"+key]
}
// Set sets an attribute.
func (c PubsubMessageCarrier) Set(key, val string) {
c.msg.Attributes["googclient_"+key] = val
}
// Keys returns a slice of all keys in the carrier.
func (c PubsubMessageCarrier) Keys() []string {
i := 0
out := make([]string, len(c.msg.Attributes))
for k := range c.msg.Attributes {
out[i] = k
i++
}
return out
}
const (
// span names
publisherSpanName = "send"
publishFlowControlSpanName = "publisher flow control"
publishSchedulerSpanName = "publish scheduler"
publishRPCSpanName = "publish"
subscriberSpanName = "receive"
subscriberFlowControlSpanName = "subscriber flow control"
processSpanName = "process"
subscribeSchedulerSpanName = "subscribe scheduler"
modAckSpanName = "modify ack deadline"
ackSpanName = "ack"
nackSpanName = "nack"
// custom pubsub specific attributes
numBatchedMessagesAttribute = "messaging.pubsub.num_messages_in_batch"
subscriptionAttribute = "messaging.pubsub.subscription"
orderingAttribute = "messaging.pubsub.ordering_key"
deliveryAttemptAttribute = "messaging.pubsub.delivery_attempt"
eosAttribute = "messaging.pubsub.exactly_once_delivery"
ackIDAttribute = "messaging.pubsub.ack_id"
ackAttribute = "messaging.pubsub.is_acked"
modackDeadlineSecondsAttribute = "messaging.pubsub.modack_deadline_seconds"
initialModackAttribute = "messaging.pubsub.is_initial_modack"
)
func getPublishSpanAttributes(topic string, msg *Message, opts ...attribute.KeyValue) []trace.SpanStartOption {
// TODO(hongalex): benchmark this to make sure no significant performance degradation
// when calculating proto.Size in receive paths.
// TODO(hongalex): find way to incorporate pubsub client library version in attribute.
msgSize := proto.Size(&pb.PubsubMessage{
Data: msg.Data,
Attributes: msg.Attributes,
OrderingKey: msg.OrderingKey,
})
ss := []trace.SpanStartOption{
trace.WithAttributes(
semconv.MessagingSystemKey.String("pubsub"),
semconv.MessagingDestinationKey.String(topic),
semconv.MessagingDestinationKindTopic,
semconv.MessagingMessageIDKey.String(msg.ID),
semconv.MessagingMessagePayloadSizeBytesKey.Int(msgSize),
attribute.String(orderingAttribute, msg.OrderingKey),
),
trace.WithAttributes(opts...),
trace.WithSpanKind(trace.SpanKindProducer),
}
return ss
}
func getSubSpanAttributes(sub string, msg *Message, opts ...attribute.KeyValue) []trace.SpanStartOption {
msgSize := proto.Size(&pb.PubsubMessage{
Data: msg.Data,
Attributes: msg.Attributes,
OrderingKey: msg.OrderingKey,
})
ss := []trace.SpanStartOption{
trace.WithAttributes(
semconv.MessagingSystemKey.String("pubsub"),
semconv.MessagingDestinationKindTopic,
semconv.MessagingMessageIDKey.String(msg.ID),
semconv.MessagingMessagePayloadSizeBytesKey.Int(msgSize),
attribute.String(subscriptionAttribute, sub),
attribute.String(orderingAttribute, msg.OrderingKey),
),
trace.WithAttributes(opts...),
trace.WithSpanKind(trace.SpanKindConsumer),
}
if msg.DeliveryAttempt != nil {
ss = append(ss, trace.WithAttributes(attribute.Int(deliveryAttemptAttribute, *msg.DeliveryAttempt)))
}
return ss
}
func spanRecordError(span trace.Span, err error) {
span.RecordError(err)
span.SetStatus(otelcodes.Error, err.Error())
span.End()
}