refactor(pubsublite): remove async ack processing (#4189)

Process acks synchronously.
diff --git a/pubsublite/internal/wire/subscriber.go b/pubsublite/internal/wire/subscriber.go
index cad68b4..7dcb4f6 100644
--- a/pubsublite/internal/wire/subscriber.go
+++ b/pubsublite/internal/wire/subscriber.go
@@ -307,16 +307,11 @@
 }
 
 func (s *subscribeStream) onAck(ac *ackConsumer) {
-	// Don't block the user's goroutine with potentially expensive ack processing.
-	go s.onAckAsync(ac.MsgBytes)
-}
-
-func (s *subscribeStream) onAckAsync(msgBytes int64) {
 	s.mu.Lock()
 	defer s.mu.Unlock()
 
 	if s.status == serviceActive {
-		s.unsafeAllowFlow(flowControlTokens{Bytes: msgBytes, Messages: 1})
+		s.unsafeAllowFlow(flowControlTokens{Bytes: ac.MsgBytes, Messages: 1})
 	}
 }
 
diff --git a/pubsublite/internal/wire/subscriber_test.go b/pubsublite/internal/wire/subscriber_test.go
index a5b417c..495e7ed 100644
--- a/pubsublite/internal/wire/subscriber_test.go
+++ b/pubsublite/internal/wire/subscriber_test.go
@@ -339,8 +339,8 @@
 	}
 	sub.Receiver.ValidateMsg(msg1)
 	sub.Receiver.ValidateMsg(msg2)
-	sub.sub.onAckAsync(msg1.SizeBytes)
-	sub.sub.onAckAsync(msg2.SizeBytes)
+	sub.sub.onAck(&ackConsumer{MsgBytes: msg1.SizeBytes})
+	sub.sub.onAck(&ackConsumer{MsgBytes: msg2.SizeBytes})
 	sub.sub.sendBatchFlowControl()
 	if gotErr := sub.FinalError(); !test.ErrorEqual(gotErr, serverErr) {
 		t.Errorf("Final err: (%v), want: (%v)", gotErr, serverErr)
@@ -373,8 +373,8 @@
 	}
 	sub.Receiver.ValidateMsg(msg1)
 	sub.Receiver.ValidateMsg(msg2)
-	sub.sub.onAckAsync(msg1.SizeBytes)
-	sub.sub.onAckAsync(msg2.SizeBytes)
+	sub.sub.onAck(&ackConsumer{MsgBytes: msg1.SizeBytes})
+	sub.sub.onAck(&ackConsumer{MsgBytes: msg2.SizeBytes})
 	// Note: the ack for msg2 automatically triggers sending the flow control.
 	if gotErr := sub.FinalError(); !test.ErrorEqual(gotErr, serverErr) {
 		t.Errorf("Final err: (%v), want: (%v)", gotErr, serverErr)
@@ -419,7 +419,7 @@
 	barrier.ReleaseAfter(func() {
 		// While the stream is not connected, the pending flow control request
 		// should not be released and sent to the stream.
-		sub.sub.onAckAsync(msg.SizeBytes)
+		sub.sub.onAck(&ackConsumer{MsgBytes: msg.SizeBytes})
 		if sub.PendingFlowControlRequest() == nil {
 			t.Errorf("Pending flow control request should not be cleared")
 		}