refactor(pubsublite): remove async ack processing (#4189)
Process acks synchronously.
diff --git a/pubsublite/internal/wire/subscriber.go b/pubsublite/internal/wire/subscriber.go
index cad68b4..7dcb4f6 100644
--- a/pubsublite/internal/wire/subscriber.go
+++ b/pubsublite/internal/wire/subscriber.go
@@ -307,16 +307,11 @@
}
func (s *subscribeStream) onAck(ac *ackConsumer) {
- // Don't block the user's goroutine with potentially expensive ack processing.
- go s.onAckAsync(ac.MsgBytes)
-}
-
-func (s *subscribeStream) onAckAsync(msgBytes int64) {
s.mu.Lock()
defer s.mu.Unlock()
if s.status == serviceActive {
- s.unsafeAllowFlow(flowControlTokens{Bytes: msgBytes, Messages: 1})
+ s.unsafeAllowFlow(flowControlTokens{Bytes: ac.MsgBytes, Messages: 1})
}
}
diff --git a/pubsublite/internal/wire/subscriber_test.go b/pubsublite/internal/wire/subscriber_test.go
index a5b417c..495e7ed 100644
--- a/pubsublite/internal/wire/subscriber_test.go
+++ b/pubsublite/internal/wire/subscriber_test.go
@@ -339,8 +339,8 @@
}
sub.Receiver.ValidateMsg(msg1)
sub.Receiver.ValidateMsg(msg2)
- sub.sub.onAckAsync(msg1.SizeBytes)
- sub.sub.onAckAsync(msg2.SizeBytes)
+ sub.sub.onAck(&ackConsumer{MsgBytes: msg1.SizeBytes})
+ sub.sub.onAck(&ackConsumer{MsgBytes: msg2.SizeBytes})
sub.sub.sendBatchFlowControl()
if gotErr := sub.FinalError(); !test.ErrorEqual(gotErr, serverErr) {
t.Errorf("Final err: (%v), want: (%v)", gotErr, serverErr)
@@ -373,8 +373,8 @@
}
sub.Receiver.ValidateMsg(msg1)
sub.Receiver.ValidateMsg(msg2)
- sub.sub.onAckAsync(msg1.SizeBytes)
- sub.sub.onAckAsync(msg2.SizeBytes)
+ sub.sub.onAck(&ackConsumer{MsgBytes: msg1.SizeBytes})
+ sub.sub.onAck(&ackConsumer{MsgBytes: msg2.SizeBytes})
// Note: the ack for msg2 automatically triggers sending the flow control.
if gotErr := sub.FinalError(); !test.ErrorEqual(gotErr, serverErr) {
t.Errorf("Final err: (%v), want: (%v)", gotErr, serverErr)
@@ -419,7 +419,7 @@
barrier.ReleaseAfter(func() {
// While the stream is not connected, the pending flow control request
// should not be released and sent to the stream.
- sub.sub.onAckAsync(msg.SizeBytes)
+ sub.sub.onAck(&ackConsumer{MsgBytes: msg.SizeBytes})
if sub.PendingFlowControlRequest() == nil {
t.Errorf("Pending flow control request should not be cleared")
}