fix(pubsub): fix iterator distribution bound calculations (#6125)
* fix iterator distribution bounds
* add comments to test
* run gofmt
diff --git a/pubsub/iterator.go b/pubsub/iterator.go
index 012c9f4..c8c2f80 100644
--- a/pubsub/iterator.go
+++ b/pubsub/iterator.go
@@ -161,8 +161,8 @@
// min/maxDurationPerLeaseExtension.
func (it *messageIterator) addToDistribution(receiveTime time.Time) {
d := time.Since(receiveTime)
- d = minDuration(d, minDurationPerLeaseExtension)
- d = maxDuration(d, maxDurationPerLeaseExtension)
+ d = maxDuration(d, minDurationPerLeaseExtension)
+ d = minDuration(d, maxDurationPerLeaseExtension)
it.ackTimeDist.Record(int(d / time.Second))
}
diff --git a/pubsub/iterator_test.go b/pubsub/iterator_test.go
index 09cd185..9f2aec1 100644
--- a/pubsub/iterator_test.go
+++ b/pubsub/iterator_test.go
@@ -496,3 +496,44 @@
})
}
}
+
+func TestAddToDistribution(t *testing.T) {
+ srv := pstest.NewServer()
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ srv.Publish(fullyQualifiedTopicName, []byte("creating a topic"), nil)
+
+ _, client, err := initConn(ctx, srv.Addr)
+ if err != nil {
+ t.Fatal(err)
+ }
+ iter := newMessageIterator(client.subc, fullyQualifiedTopicName, &pullOptions{})
+
+ // Start with a datapoint that's too small that should be bounded to 10s.
+ receiveTime := time.Now().Add(time.Duration(-1) * time.Second)
+ iter.addToDistribution(receiveTime)
+ deadline := iter.ackTimeDist.Percentile(.99)
+ want := 10
+ if deadline != want {
+ t.Errorf("99th percentile ack distribution got: %v, want %v", deadline, want)
+ }
+
+ // The next datapoint should not be bounded.
+ receiveTime = time.Now().Add(time.Duration(-300) * time.Second)
+ iter.addToDistribution(receiveTime)
+ deadline = iter.ackTimeDist.Percentile(.99)
+ want = 300
+ if deadline != want {
+ t.Errorf("99th percentile ack distribution got: %v, want %v", deadline, want)
+ }
+
+ // Lastly, add a datapoint that should be bounded to 600s
+ receiveTime = time.Now().Add(time.Duration(-1000) * time.Second)
+ iter.addToDistribution(receiveTime)
+ deadline = iter.ackTimeDist.Percentile(.99)
+ want = 600
+ if deadline != want {
+ t.Errorf("99th percentile ack distribution got: %v, want %v", deadline, want)
+ }
+}