add e2e test for subscribe spans
diff --git a/pubsub/trace_test.go b/pubsub/trace_test.go index c8e33f0..731857a 100644 --- a/pubsub/trace_test.go +++ b/pubsub/trace_test.go
@@ -16,7 +16,6 @@ import ( "context" - "fmt" "testing" "cloud.google.com/go/internal/testutil" @@ -70,11 +69,11 @@ attributes: []attribute.KeyValue{}, }, { - spanName: "publish batching", + spanName: "publish scheduler", attributes: []attribute.KeyValue{}, }, { - spanName: "publish RPC", + spanName: "publish", attributes: []attribute.KeyValue{ attribute.Int(numBatchedMessagesAttribute, 1), }, @@ -104,7 +103,6 @@ t.Fatalf("got mismatched attribute lengths for span(%d), got: %d, want: %d", i, gotLength, wantLength) } for j, kv := range span.Attributes() { - // got := kv.Value if diff := testutil.Diff(kv.Key, want[i].attributes[j].Key); diff != "" { t.Errorf("span(%d): +got,-want: %s", i, diff) } @@ -118,20 +116,112 @@ defer c.Close() defer srv.Close() - topic := c.Topic("t") - r := topic.Publish(ctx, &Message{ - Data: []byte("test"), + topic, err := c.CreateTopic(ctx, "t") + if err != nil { + t.Fatalf("c.CreateTopic() err: %v", err) + } + + enableExactlyOnce := false + sub, err := c.CreateSubscription(ctx, "s", SubscriptionConfig{ + Topic: topic, + EnableExactlyOnceDelivery: enableExactlyOnce, }) - r.Get(ctx) + if err != nil { + t.Fatalf("c.CreateSubscription() err: %v", err) + } + + m := &Message{ + Data: []byte("test"), + } + r := topic.Publish(ctx, m) + id, err := r.Get(ctx) + if err != nil { + t.Fatalf("r.Get() err: %v", err) + } defer topic.Stop() spanRecorder := tracetest.NewSpanRecorder() provider := trace.NewTracerProvider(trace.WithSpanProcessor(spanRecorder)) otel.SetTracerProvider(provider) + ctx, cancel := context.WithCancel(ctx) + sub.Receive(ctx, func(ctx context.Context, m *Message) { + m.Ack() + cancel() + }) + spans := spanRecorder.Ended() + + msgSize := proto.Size(&pb.PubsubMessage{ + Data: m.Data, + Attributes: m.Attributes, + OrderingKey: m.OrderingKey, + }) + + want := []struct { + spanName string + attributes []attribute.KeyValue + }{ + { + spanName: "subscriber flow control", + attributes: []attribute.KeyValue{}, + }, + { + spanName: "subscribe scheduler", + attributes: []attribute.KeyValue{}, + }, + { + spanName: "projects/P/subscriptions/s receive", + attributes: []attribute.KeyValue{ + semconv.MessagingSystemKey.String("pubsub"), + semconv.MessagingDestinationKindTopic, + semconv.MessagingMessageIDKey.String(id), + semconv.MessagingMessagePayloadSizeBytesKey.Int(msgSize), + attribute.String(subscriptionAttribute, sub.name), + attribute.String(orderingAttribute, m.OrderingKey), + semconv.MessagingOperationReceive, + attribute.Bool(eosAttribute, false), + attribute.String(ackIDAttribute, ""), + attribute.Int(numBatchedMessagesAttribute, 1), + attribute.Bool(ackAttribute, true), + }, + }, + { + spanName: "projects/P/subscriptions/s process", + attributes: []attribute.KeyValue{ + attribute.Bool(ackAttribute, true), + }, + }, + { + spanName: "modify ack deadline", + attributes: []attribute.KeyValue{ + attribute.Int(modackDeadlineSecondsAttribute, 10), + attribute.Bool(initialModackAttribute, true), + attribute.Int(numBatchedMessagesAttribute, 1), + }, + }, + { + spanName: "ack", + attributes: []attribute.KeyValue{}, + }, + } + for i, span := range spans { - // TODO(hongalex): test subscribe path spans - fmt.Printf("got span(%d): %+v\n", i, span) + if !span.SpanContext().IsValid() { + t.Fatalf("span(%d) is invalid: %v", i, span) + } + if span.Name() != want[i].spanName { + t.Errorf("span(%d) got name: %s, want: %s", i, span.Name(), want[i].spanName) + } + gotLength := len(span.Attributes()) + wantLength := len(want[i].attributes) + if gotLength != wantLength { + t.Fatalf("got mismatched attribute lengths for span(%d), got: %d, want: %d", i, gotLength, wantLength) + } + for j, kv := range span.Attributes() { + if diff := testutil.Diff(kv.Key, want[i].attributes[j].Key); diff != "" { + t.Errorf("span(%d): +got,-want: %s", i, diff) + } + } } }