feat(pubsublite): support out of band seeks (#4208)

- Supports admin/out of band seeks pushed from the server.
- Sets the new InitialSubscribeRequest.initial_location field for subscribe streams.
diff --git a/pubsublite/internal/wire/committer.go b/pubsublite/internal/wire/committer.go
index 5344e45..2d2a324 100644
--- a/pubsublite/internal/wire/committer.go
+++ b/pubsublite/internal/wire/committer.go
@@ -123,6 +123,7 @@
 	c.mu.Lock()
 	defer c.mu.Unlock()
 
+	c.acks.Release() // Discard outstanding acks
 	for !c.cursorTracker.UpToDate() && c.status < serviceTerminating {
 		c.unsafeCommitOffsetToStream()
 		c.flushPending.Wait()
diff --git a/pubsublite/internal/wire/committer_test.go b/pubsublite/internal/wire/committer_test.go
index 304b676..018886f 100644
--- a/pubsublite/internal/wire/committer_test.go
+++ b/pubsublite/internal/wire/committer_test.go
@@ -328,8 +328,7 @@
 	verifiers := test.NewVerifiers(t)
 	stream := test.NewRPCVerifier(t)
 	stream.Push(initCommitReq(subscription), initCommitResp(), nil)
-	stream.Push(commitReq(34), commitResp(1), nil)
-	barrier := stream.PushWithBarrier(commitReq(56), commitResp(1), nil)
+	barrier := stream.PushWithBarrier(commitReq(34), commitResp(1), nil)
 	verifiers.AddCommitStream(subscription.Path, subscription.Partition, stream)
 
 	mockServer.OnTestStart(verifiers)
@@ -340,6 +339,8 @@
 		t.Errorf("Start() got err: (%v)", gotErr)
 	}
 
+	ack1.Ack()
+
 	complete := test.NewCondition("blocking reset complete")
 	go func() {
 		if err := cmt.BlockingReset(); err != nil {
@@ -350,14 +351,8 @@
 	}()
 	complete.VerifyNotDone(t)
 
-	ack1.Ack()
-	cmt.SendBatchCommit()
-	complete.VerifyNotDone(t)
-	ack2.Ack()
-	cmt.SendBatchCommit()
-
-	// Until the final commit response is received, committer.BlockingReset should
-	// not return.
+	// Until the commit response is received, committer.BlockingReset should not
+	// return.
 	barrier.ReleaseAfter(func() {
 		complete.VerifyNotDone(t)
 	})
@@ -371,6 +366,9 @@
 		t.Errorf("ackTracker.Empty() got %v, want %v", got, want)
 	}
 
+	// This ack should have been discarded.
+	ack2.Ack()
+
 	// Calling committer.BlockingReset again should immediately return.
 	if err := cmt.BlockingReset(); err != nil {
 		t.Errorf("BlockingReset() got err: (%v), want: <nil>", err)
@@ -390,7 +388,7 @@
 	verifiers := test.NewVerifiers(t)
 	stream := test.NewRPCVerifier(t)
 	stream.Push(initCommitReq(subscription), initCommitResp(), nil)
-	stream.Push(commitReq(34), commitResp(1), nil)
+	barrier := stream.PushWithBarrier(commitReq(34), commitResp(1), nil)
 	verifiers.AddCommitStream(subscription.Path, subscription.Partition, stream)
 
 	mockServer.OnTestStart(verifiers)
@@ -401,6 +399,8 @@
 		t.Errorf("Start() got err: (%v)", gotErr)
 	}
 
+	ack1.Ack()
+
 	complete := test.NewCondition("blocking reset complete")
 	go func() {
 		if got, want := cmt.BlockingReset(), ErrServiceStopped; !test.ErrorEqual(got, want) {
@@ -410,18 +410,11 @@
 	}()
 	complete.VerifyNotDone(t)
 
-	ack1.Ack()
-	cmt.SendBatchCommit()
-	complete.VerifyNotDone(t)
-
 	// committer.BlockingReset should return when the committer is stopped.
-	cmt.Stop()
-	complete.WaitUntilDone(t, serviceTestWaitTimeout)
-
-	// Ack tracker should not be reset.
-	if got, want := acks.Empty(), false; got != want {
-		t.Errorf("ackTracker.Empty() got %v, want %v", got, want)
-	}
+	barrier.ReleaseAfter(func() {
+		cmt.Stop()
+		complete.WaitUntilDone(t, serviceTestWaitTimeout)
+	})
 
 	cmt.Terminate()
 	if gotErr := cmt.FinalError(); gotErr != nil {
@@ -452,6 +445,8 @@
 		t.Errorf("Start() got err: (%v)", gotErr)
 	}
 
+	ack1.Ack()
+
 	complete := test.NewCondition("blocking reset complete")
 	go func() {
 		if got, want := cmt.BlockingReset(), ErrServiceStopped; !test.ErrorEqual(got, want) {
@@ -459,10 +454,6 @@
 		}
 		complete.SetDone()
 	}()
-	complete.VerifyNotDone(t)
-
-	ack1.Ack()
-	cmt.SendBatchCommit()
 
 	// committer.BlockingReset should return when the committer terminates due to
 	// fatal server error.
diff --git a/pubsublite/internal/wire/flow_control.go b/pubsublite/internal/wire/flow_control.go
index 7a04213..b7cb005 100644
--- a/pubsublite/internal/wire/flow_control.go
+++ b/pubsublite/internal/wire/flow_control.go
@@ -160,11 +160,14 @@
 }
 
 // RequestForRestart returns the seek request to send when a new subscribe
-// stream reconnects. Returns nil if the subscriber has just started, in which
-// case the server returns the offset of the last committed cursor.
+// stream reconnects.
 func (ot *subscriberOffsetTracker) RequestForRestart() *pb.SeekRequest {
 	if ot.minNextOffset <= 0 {
-		return nil
+		return &pb.SeekRequest{
+			Target: &pb.SeekRequest_NamedTarget_{
+				NamedTarget: pb.SeekRequest_COMMITTED_CURSOR,
+			},
+		}
 	}
 	return &pb.SeekRequest{
 		Target: &pb.SeekRequest_Cursor{
diff --git a/pubsublite/internal/wire/flow_control_test.go b/pubsublite/internal/wire/flow_control_test.go
index d070f1f..24f878c 100644
--- a/pubsublite/internal/wire/flow_control_test.go
+++ b/pubsublite/internal/wire/flow_control_test.go
@@ -253,7 +253,11 @@
 		{
 			desc:    "Uninitialized tracker",
 			tracker: subscriberOffsetTracker{},
-			want:    nil,
+			want: &pb.SeekRequest{
+				Target: &pb.SeekRequest_NamedTarget_{
+					NamedTarget: pb.SeekRequest_COMMITTED_CURSOR,
+				},
+			},
 		},
 		{
 			desc:    "Next offset positive",
diff --git a/pubsublite/internal/wire/requests_test.go b/pubsublite/internal/wire/requests_test.go
index 94944a9..d265f7b 100644
--- a/pubsublite/internal/wire/requests_test.go
+++ b/pubsublite/internal/wire/requests_test.go
@@ -134,12 +134,33 @@
 
 // SubscriberService
 
-func initSubReq(subscription subscriptionPartition) *pb.SubscribeRequest {
+func initSubReqCommit(subscription subscriptionPartition) *pb.SubscribeRequest {
 	return &pb.SubscribeRequest{
 		Request: &pb.SubscribeRequest_Initial{
 			Initial: &pb.InitialSubscribeRequest{
 				Subscription: subscription.Path,
 				Partition:    int64(subscription.Partition),
+				InitialLocation: &pb.SeekRequest{
+					Target: &pb.SeekRequest_NamedTarget_{
+						NamedTarget: pb.SeekRequest_COMMITTED_CURSOR,
+					},
+				},
+			},
+		},
+	}
+}
+
+func initSubReqCursor(subscription subscriptionPartition, offset int64) *pb.SubscribeRequest {
+	return &pb.SubscribeRequest{
+		Request: &pb.SubscribeRequest_Initial{
+			Initial: &pb.InitialSubscribeRequest{
+				Subscription: subscription.Path,
+				Partition:    int64(subscription.Partition),
+				InitialLocation: &pb.SeekRequest{
+					Target: &pb.SeekRequest_Cursor{
+						Cursor: &pb.Cursor{Offset: offset},
+					},
+				},
 			},
 		},
 	}
@@ -153,18 +174,6 @@
 	}
 }
 
-func seekReq(offset int64) *pb.SubscribeRequest {
-	return &pb.SubscribeRequest{
-		Request: &pb.SubscribeRequest_Seek{
-			Seek: &pb.SeekRequest{
-				Target: &pb.SeekRequest_Cursor{
-					Cursor: &pb.Cursor{Offset: offset},
-				},
-			},
-		},
-	}
-}
-
 func seekResp(offset int64) *pb.SubscribeResponse {
 	return &pb.SubscribeResponse{
 		Response: &pb.SubscribeResponse_Seek{
diff --git a/pubsublite/internal/wire/subscriber.go b/pubsublite/internal/wire/subscriber.go
index 125eeec..b6860a6 100644
--- a/pubsublite/internal/wire/subscriber.go
+++ b/pubsublite/internal/wire/subscriber.go
@@ -31,8 +31,7 @@
 var (
 	errServerNoMessages                = errors.New("pubsublite: server delivered no messages")
 	errInvalidInitialSubscribeResponse = errors.New("pubsublite: first response from server was not an initial response for subscribe")
-	errInvalidSubscribeResponse        = errors.New("pubsublite: received invalid subscribe response from server")
-	errNoInFlightSeek                  = errors.New("pubsublite: received seek response for no in-flight seek")
+	errInvalidSubscribeResponse        = errors.New("pubsublite: received unexpected subscribe response from server")
 )
 
 // ReceivedMessage stores a received Pub/Sub message and AckConsumer for
@@ -126,6 +125,10 @@
 // The frequency of sending batch flow control requests.
 const batchFlowControlPeriod = 100 * time.Millisecond
 
+// Handles subscriber reset actions that are external to the subscribeStream
+// (e.g. wait for the committer to flush commits).
+type subscriberResetHandler func() error
+
 // subscribeStream directly wraps the subscribe client stream. It passes
 // messages to the message receiver and manages flow control. Flow control
 // tokens are batched and sent to the stream via a periodic background task,
@@ -137,7 +140,7 @@
 	subClient    *vkit.SubscriberClient
 	settings     ReceiveSettings
 	subscription subscriptionPartition
-	initialReq   *pb.SubscribeRequest
+	handleReset  subscriberResetHandler
 	metadata     pubsubMetadata
 
 	// Fields below must be guarded with mu.
@@ -146,27 +149,20 @@
 	offsetTracker          subscriberOffsetTracker
 	flowControl            flowControlBatcher
 	pollFlowControl        *periodicTask
-	seekInFlight           bool
 	enableBatchFlowControl bool
 
 	abstractService
 }
 
 func newSubscribeStream(ctx context.Context, subClient *vkit.SubscriberClient, settings ReceiveSettings,
-	receiver MessageReceiverFunc, subscription subscriptionPartition, acks *ackTracker, disableTasks bool) *subscribeStream {
+	receiver MessageReceiverFunc, subscription subscriptionPartition, acks *ackTracker,
+	handleReset subscriberResetHandler, disableTasks bool) *subscribeStream {
 
 	s := &subscribeStream{
 		subClient:    subClient,
 		settings:     settings,
 		subscription: subscription,
-		initialReq: &pb.SubscribeRequest{
-			Request: &pb.SubscribeRequest_Initial{
-				Initial: &pb.InitialSubscribeRequest{
-					Subscription: subscription.Path,
-					Partition:    int64(subscription.Partition),
-				},
-			},
-		},
+		handleReset:  handleReset,
 		messageQueue: newMessageDeliveryQueue(acks, receiver, settings.MaxOutstandingMessages),
 		metadata:     newPubsubMetadata(),
 	}
@@ -212,7 +208,18 @@
 }
 
 func (s *subscribeStream) initialRequest() (interface{}, initialResponseRequired) {
-	return s.initialReq, initialResponseRequired(true)
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	initReq := &pb.SubscribeRequest{
+		Request: &pb.SubscribeRequest_Initial{
+			Initial: &pb.InitialSubscribeRequest{
+				Subscription:    s.subscription.Path,
+				Partition:       int64(s.subscription.Partition),
+				InitialLocation: s.offsetTracker.RequestForRestart(),
+			},
+		},
+	}
+	return initReq, initialResponseRequired(true)
 }
 
 func (s *subscribeStream) validateInitialResponse(response interface{}) error {
@@ -231,27 +238,43 @@
 	case streamConnected:
 		s.unsafeUpdateStatus(serviceActive, nil)
 
-		// Reinitialize the offset and flow control tokens when a new subscribe
-		// stream instance is connected.
-		if seekReq := s.offsetTracker.RequestForRestart(); seekReq != nil {
-			if !s.stream.Send(&pb.SubscribeRequest{
-				Request: &pb.SubscribeRequest_Seek{Seek: seekReq},
-			}) {
-				return
-			}
-			s.seekInFlight = true
-		}
+		// Reinitialize the flow control tokens when a new subscribe stream instance
+		// is connected.
 		s.unsafeSendFlowControl(s.flowControl.RequestForRestart())
 		s.enableBatchFlowControl = true
 		s.pollFlowControl.Start()
 
 	case streamReconnecting:
-		s.seekInFlight = false
 		// Ensure no batch flow control tokens are sent until the RequestForRestart
 		// is sent above when a new subscribe stream is initialized.
 		s.enableBatchFlowControl = false
 		s.pollFlowControl.Stop()
 
+	case streamResetState:
+		// Handle out-of-band seek notifications from the server. Committer and
+		// subscriber state are reset.
+
+		s.messageQueue.Stop()
+
+		// Wait for all message receiver callbacks to finish and the committer to
+		// flush pending commits and reset its state. Release the mutex while
+		// waiting.
+		s.mu.Unlock()
+		s.messageQueue.Wait()
+		err := s.handleReset()
+		s.mu.Lock()
+
+		if err != nil {
+			s.unsafeInitiateShutdown(serviceTerminating, nil)
+			return
+		}
+		s.messageQueue.Start()
+		s.offsetTracker.Reset()
+		s.flowControl.Reset(flowControlTokens{
+			Bytes:    int64(s.settings.MaxOutstandingBytes),
+			Messages: int64(s.settings.MaxOutstandingMessages),
+		})
+
 	case streamTerminated:
 		s.unsafeInitiateShutdown(serviceTerminated, s.stream.Error())
 	}
@@ -270,8 +293,6 @@
 	switch {
 	case subscribeResponse.GetMessages() != nil:
 		err = s.unsafeOnMessageResponse(subscribeResponse.GetMessages())
-	case subscribeResponse.GetSeek() != nil:
-		err = s.unsafeOnSeekResponse(subscribeResponse.GetSeek())
 	default:
 		err = errInvalidSubscribeResponse
 	}
@@ -280,14 +301,6 @@
 	}
 }
 
-func (s *subscribeStream) unsafeOnSeekResponse(response *pb.SeekResponse) error {
-	if !s.seekInFlight {
-		return errNoInFlightSeek
-	}
-	s.seekInFlight = false
-	return nil
-}
-
 func (s *subscribeStream) unsafeOnMessageResponse(response *pb.MessageResponse) error {
 	if len(response.Messages) == 0 {
 		return errServerNoMessages
@@ -388,7 +401,7 @@
 	subscription := subscriptionPartition{Path: f.subscriptionPath, Partition: partition}
 	acks := newAckTracker()
 	commit := newCommitter(f.ctx, f.cursorClient, f.settings, subscription, acks, f.disableTasks)
-	sub := newSubscribeStream(f.ctx, f.subClient, f.settings, f.receiver, subscription, acks, f.disableTasks)
+	sub := newSubscribeStream(f.ctx, f.subClient, f.settings, f.receiver, subscription, acks, commit.BlockingReset, f.disableTasks)
 	ps := &singlePartitionSubscriber{
 		subscriber: sub,
 		committer:  commit,
diff --git a/pubsublite/internal/wire/subscriber_test.go b/pubsublite/internal/wire/subscriber_test.go
index a45bb28..0c30fdb 100644
--- a/pubsublite/internal/wire/subscriber_test.go
+++ b/pubsublite/internal/wire/subscriber_test.go
@@ -16,6 +16,7 @@
 import (
 	"context"
 	"sort"
+	"sync"
 	"testing"
 	"time"
 
@@ -244,11 +245,14 @@
 type testSubscribeStream struct {
 	Receiver *testMessageReceiver
 	t        *testing.T
+	acks     *ackTracker
 	sub      *subscribeStream
+	mu       sync.Mutex
+	resetErr error
 	serviceTestProxy
 }
 
-func newTestSubscribeStream(t *testing.T, subscription subscriptionPartition, settings ReceiveSettings, acks *ackTracker) *testSubscribeStream {
+func newTestSubscribeStream(t *testing.T, subscription subscriptionPartition, settings ReceiveSettings) *testSubscribeStream {
 	ctx := context.Background()
 	subClient, err := newSubscriberClient(ctx, "ignored", testServer.ClientConn())
 	if err != nil {
@@ -258,8 +262,9 @@
 	ts := &testSubscribeStream{
 		Receiver: newTestMessageReceiver(t),
 		t:        t,
+		acks:     newAckTracker(),
 	}
-	ts.sub = newSubscribeStream(ctx, subClient, settings, ts.Receiver.onMessage, subscription, acks, true)
+	ts.sub = newSubscribeStream(ctx, subClient, settings, ts.Receiver.onMessage, subscription, ts.acks, ts.handleReset, true)
 	ts.initAndStart(t, ts.sub, "Subscriber", subClient)
 	return ts
 }
@@ -276,9 +281,20 @@
 	return ts.sub.flowControl.pendingTokens.ToFlowControlRequest()
 }
 
+func (ts *testSubscribeStream) SetResetErr(err error) {
+	ts.mu.Lock()
+	defer ts.mu.Unlock()
+	ts.resetErr = err
+}
+
+func (ts *testSubscribeStream) handleReset() error {
+	ts.mu.Lock()
+	defer ts.mu.Unlock()
+	return ts.resetErr
+}
+
 func TestSubscribeStreamReconnect(t *testing.T) {
 	subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0}
-	acks := newAckTracker()
 	msg1 := seqMsgWithOffsetAndSize(67, 200)
 	msg2 := seqMsgWithOffsetAndSize(68, 100)
 	permanentErr := status.Error(codes.FailedPrecondition, "permanent failure")
@@ -286,16 +302,15 @@
 	verifiers := test.NewVerifiers(t)
 
 	stream1 := test.NewRPCVerifier(t)
-	stream1.Push(initSubReq(subscription), initSubResp(), nil)
+	stream1.Push(initSubReqCommit(subscription), initSubResp(), nil)
 	stream1.Push(initFlowControlReq(), msgSubResp(msg1), nil)
 	stream1.Push(nil, nil, status.Error(codes.Unavailable, "server unavailable"))
 	verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, stream1)
 
-	// When reconnected, the subscribeStream should seek to msg2 and have
-	// subtracted flow control tokens.
+	// When reconnected, the subscribeStream should set initial cursor to msg2 and
+	// have subtracted flow control tokens.
 	stream2 := test.NewRPCVerifier(t)
-	stream2.Push(initSubReq(subscription), initSubResp(), nil)
-	stream2.Push(seekReq(68), seekResp(68), nil)
+	stream2.Push(initSubReqCursor(subscription, 68), initSubResp(), nil)
 	stream2.Push(flowControlSubReq(flowControlTokens{Bytes: 800, Messages: 9}), msgSubResp(msg2), nil)
 	// Subscriber should terminate on permanent error.
 	stream2.Push(nil, nil, permanentErr)
@@ -304,7 +319,7 @@
 	mockServer.OnTestStart(verifiers)
 	defer mockServer.OnTestEnd()
 
-	sub := newTestSubscribeStream(t, subscription, testSubscriberSettings(), acks)
+	sub := newTestSubscribeStream(t, subscription, testSubscriberSettings())
 	if gotErr := sub.StartError(); gotErr != nil {
 		t.Errorf("Start() got err: (%v)", gotErr)
 	}
@@ -317,14 +332,13 @@
 
 func TestSubscribeStreamFlowControlBatching(t *testing.T) {
 	subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0}
-	acks := newAckTracker()
 	msg1 := seqMsgWithOffsetAndSize(67, 200)
 	msg2 := seqMsgWithOffsetAndSize(68, 100)
 	serverErr := status.Error(codes.InvalidArgument, "verifies flow control received")
 
 	verifiers := test.NewVerifiers(t)
 	stream := test.NewRPCVerifier(t)
-	stream.Push(initSubReq(subscription), initSubResp(), nil)
+	stream.Push(initSubReqCommit(subscription), initSubResp(), nil)
 	stream.Push(initFlowControlReq(), msgSubResp(msg1, msg2), nil)
 	// Batch flow control request expected.
 	stream.Push(flowControlSubReq(flowControlTokens{Bytes: 300, Messages: 2}), nil, serverErr)
@@ -333,7 +347,7 @@
 	mockServer.OnTestStart(verifiers)
 	defer mockServer.OnTestEnd()
 
-	sub := newTestSubscribeStream(t, subscription, testSubscriberSettings(), acks)
+	sub := newTestSubscribeStream(t, subscription, testSubscriberSettings())
 	if gotErr := sub.StartError(); gotErr != nil {
 		t.Errorf("Start() got err: (%v)", gotErr)
 	}
@@ -349,7 +363,6 @@
 
 func TestSubscribeStreamExpediteFlowControl(t *testing.T) {
 	subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0}
-	acks := newAckTracker()
 	msg1 := seqMsgWithOffsetAndSize(67, 250)
 	// MaxOutstandingBytes = 1000, so msg2 pushes the pending flow control bytes
 	// over the expediteBatchRequestRatio=50% threshold in flowControlBatcher.
@@ -358,7 +371,7 @@
 
 	verifiers := test.NewVerifiers(t)
 	stream := test.NewRPCVerifier(t)
-	stream.Push(initSubReq(subscription), initSubResp(), nil)
+	stream.Push(initSubReqCommit(subscription), initSubResp(), nil)
 	stream.Push(initFlowControlReq(), msgSubResp(msg1, msg2), nil)
 	// Batch flow control request expected.
 	stream.Push(flowControlSubReq(flowControlTokens{Bytes: 501, Messages: 2}), nil, serverErr)
@@ -367,7 +380,7 @@
 	mockServer.OnTestStart(verifiers)
 	defer mockServer.OnTestEnd()
 
-	sub := newTestSubscribeStream(t, subscription, testSubscriberSettings(), acks)
+	sub := newTestSubscribeStream(t, subscription, testSubscriberSettings())
 	if gotErr := sub.StartError(); gotErr != nil {
 		t.Errorf("Start() got err: (%v)", gotErr)
 	}
@@ -383,7 +396,6 @@
 
 func TestSubscribeStreamDisableBatchFlowControl(t *testing.T) {
 	subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0}
-	acks := newAckTracker()
 	// MaxOutstandingBytes = 1000, so this pushes the pending flow control bytes
 	// over the expediteBatchRequestRatio=50% threshold in flowControlBatcher.
 	msg := seqMsgWithOffsetAndSize(67, 800)
@@ -393,7 +405,7 @@
 	verifiers := test.NewVerifiers(t)
 
 	stream1 := test.NewRPCVerifier(t)
-	stream1.Push(initSubReq(subscription), initSubResp(), nil)
+	stream1.Push(initSubReqCommit(subscription), initSubResp(), nil)
 	stream1.Push(initFlowControlReq(), msgSubResp(msg), nil)
 	// Break the stream immediately after sending the message.
 	stream1.Push(nil, nil, retryableErr)
@@ -401,8 +413,7 @@
 
 	stream2 := test.NewRPCVerifier(t)
 	// The barrier is used to pause in the middle of stream reconnection.
-	barrier := stream2.PushWithBarrier(initSubReq(subscription), initSubResp(), nil)
-	stream2.Push(seekReq(68), seekResp(68), nil)
+	barrier := stream2.PushWithBarrier(initSubReqCursor(subscription, 68), initSubResp(), nil)
 	// Full flow control tokens should be sent after stream has connected.
 	stream2.Push(initFlowControlReq(), nil, serverErr)
 	verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, stream2)
@@ -410,7 +421,7 @@
 	mockServer.OnTestStart(verifiers)
 	defer mockServer.OnTestEnd()
 
-	sub := newTestSubscribeStream(t, subscription, testSubscriberSettings(), acks)
+	sub := newTestSubscribeStream(t, subscription, testSubscriberSettings())
 	if gotErr := sub.StartError(); gotErr != nil {
 		t.Errorf("Start() got err: (%v)", gotErr)
 	}
@@ -432,17 +443,16 @@
 
 func TestSubscribeStreamInvalidInitialResponse(t *testing.T) {
 	subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0}
-	acks := newAckTracker()
 
 	verifiers := test.NewVerifiers(t)
 	stream := test.NewRPCVerifier(t)
-	stream.Push(initSubReq(subscription), seekResp(0), nil) // Seek instead of init response
+	stream.Push(initSubReqCommit(subscription), seekResp(0), nil) // Seek instead of init response
 	verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, stream)
 
 	mockServer.OnTestStart(verifiers)
 	defer mockServer.OnTestEnd()
 
-	sub := newTestSubscribeStream(t, subscription, testSubscriberSettings(), acks)
+	sub := newTestSubscribeStream(t, subscription, testSubscriberSettings())
 	if gotErr, wantErr := sub.StartError(), errInvalidInitialSubscribeResponse; !test.ErrorEqual(gotErr, wantErr) {
 		t.Errorf("Start got err: (%v), want: (%v)", gotErr, wantErr)
 	}
@@ -450,18 +460,17 @@
 
 func TestSubscribeStreamDuplicateInitialResponse(t *testing.T) {
 	subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0}
-	acks := newAckTracker()
 
 	verifiers := test.NewVerifiers(t)
 	stream := test.NewRPCVerifier(t)
-	stream.Push(initSubReq(subscription), initSubResp(), nil)
+	stream.Push(initSubReqCommit(subscription), initSubResp(), nil)
 	stream.Push(initFlowControlReq(), initSubResp(), nil) // Second initial response
 	verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, stream)
 
 	mockServer.OnTestStart(verifiers)
 	defer mockServer.OnTestEnd()
 
-	sub := newTestSubscribeStream(t, subscription, testSubscriberSettings(), acks)
+	sub := newTestSubscribeStream(t, subscription, testSubscriberSettings())
 	if gotErr, wantErr := sub.FinalError(), errInvalidSubscribeResponse; !test.ErrorEqual(gotErr, wantErr) {
 		t.Errorf("Final err: (%v), want: (%v)", gotErr, wantErr)
 	}
@@ -469,37 +478,35 @@
 
 func TestSubscribeStreamSpuriousSeekResponse(t *testing.T) {
 	subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0}
-	acks := newAckTracker()
 
 	verifiers := test.NewVerifiers(t)
 	stream := test.NewRPCVerifier(t)
-	stream.Push(initSubReq(subscription), initSubResp(), nil)
+	stream.Push(initSubReqCommit(subscription), initSubResp(), nil)
 	stream.Push(initFlowControlReq(), seekResp(1), nil) // Seek response with no seek request
 	verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, stream)
 
 	mockServer.OnTestStart(verifiers)
 	defer mockServer.OnTestEnd()
 
-	sub := newTestSubscribeStream(t, subscription, testSubscriberSettings(), acks)
-	if gotErr, wantErr := sub.FinalError(), errNoInFlightSeek; !test.ErrorEqual(gotErr, wantErr) {
+	sub := newTestSubscribeStream(t, subscription, testSubscriberSettings())
+	if gotErr, wantErr := sub.FinalError(), errInvalidSubscribeResponse; !test.ErrorEqual(gotErr, wantErr) {
 		t.Errorf("Final err: (%v), want: (%v)", gotErr, wantErr)
 	}
 }
 
 func TestSubscribeStreamNoMessages(t *testing.T) {
 	subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0}
-	acks := newAckTracker()
 
 	verifiers := test.NewVerifiers(t)
 	stream := test.NewRPCVerifier(t)
-	stream.Push(initSubReq(subscription), initSubResp(), nil)
+	stream.Push(initSubReqCommit(subscription), initSubResp(), nil)
 	stream.Push(initFlowControlReq(), msgSubResp(), nil) // No messages in response
 	verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, stream)
 
 	mockServer.OnTestStart(verifiers)
 	defer mockServer.OnTestEnd()
 
-	sub := newTestSubscribeStream(t, subscription, testSubscriberSettings(), acks)
+	sub := newTestSubscribeStream(t, subscription, testSubscriberSettings())
 	if gotErr, wantErr := sub.FinalError(), errServerNoMessages; !test.ErrorEqual(gotErr, wantErr) {
 		t.Errorf("Final err: (%v), want: (%v)", gotErr, wantErr)
 	}
@@ -507,13 +514,12 @@
 
 func TestSubscribeStreamMessagesOutOfOrder(t *testing.T) {
 	subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0}
-	acks := newAckTracker()
 	msg1 := seqMsgWithOffsetAndSize(56, 100)
 	msg2 := seqMsgWithOffsetAndSize(55, 100) // Offset before msg1
 
 	verifiers := test.NewVerifiers(t)
 	stream := test.NewRPCVerifier(t)
-	stream.Push(initSubReq(subscription), initSubResp(), nil)
+	stream.Push(initSubReqCommit(subscription), initSubResp(), nil)
 	stream.Push(initFlowControlReq(), msgSubResp(msg1), nil)
 	stream.Push(nil, msgSubResp(msg2), nil)
 	verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, stream)
@@ -521,7 +527,7 @@
 	mockServer.OnTestStart(verifiers)
 	defer mockServer.OnTestEnd()
 
-	sub := newTestSubscribeStream(t, subscription, testSubscriberSettings(), acks)
+	sub := newTestSubscribeStream(t, subscription, testSubscriberSettings())
 	sub.Receiver.ValidateMsg(msg1)
 	if gotErr, msg := sub.FinalError(), "start offset = 55, expected >= 57"; !test.ErrorHasMsg(gotErr, msg) {
 		t.Errorf("Final err: (%v), want msg: %q", gotErr, msg)
@@ -530,13 +536,12 @@
 
 func TestSubscribeStreamFlowControlOverflow(t *testing.T) {
 	subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0}
-	acks := newAckTracker()
 	msg1 := seqMsgWithOffsetAndSize(56, 900)
 	msg2 := seqMsgWithOffsetAndSize(57, 101) // Overflows ReceiveSettings.MaxOutstandingBytes = 1000
 
 	verifiers := test.NewVerifiers(t)
 	stream := test.NewRPCVerifier(t)
-	stream.Push(initSubReq(subscription), initSubResp(), nil)
+	stream.Push(initSubReqCommit(subscription), initSubResp(), nil)
 	stream.Push(initFlowControlReq(), msgSubResp(msg1), nil)
 	stream.Push(nil, msgSubResp(msg2), nil)
 	verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, stream)
@@ -544,13 +549,40 @@
 	mockServer.OnTestStart(verifiers)
 	defer mockServer.OnTestEnd()
 
-	sub := newTestSubscribeStream(t, subscription, testSubscriberSettings(), acks)
+	sub := newTestSubscribeStream(t, subscription, testSubscriberSettings())
 	sub.Receiver.ValidateMsg(msg1)
 	if gotErr, wantErr := sub.FinalError(), errTokenCounterBytesNegative; !test.ErrorEqual(gotErr, wantErr) {
 		t.Errorf("Final err: (%v), want: (%v)", gotErr, wantErr)
 	}
 }
 
+func TestSubscribeStreamHandleResetError(t *testing.T) {
+	subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0}
+	msg := seqMsgWithOffsetAndSize(67, 100)
+
+	verifiers := test.NewVerifiers(t)
+	stream := test.NewRPCVerifier(t)
+	stream.Push(initSubReqCommit(subscription), initSubResp(), nil)
+	stream.Push(initFlowControlReq(), msgSubResp(msg), nil)
+	barrier := stream.PushWithBarrier(nil, nil, makeStreamResetSignal())
+	verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, stream)
+	// No reconnect expected because the reset handler func will fail.
+
+	mockServer.OnTestStart(verifiers)
+	defer mockServer.OnTestEnd()
+
+	sub := newTestSubscribeStream(t, subscription, testSubscriberSettings())
+	sub.SetResetErr(status.Error(codes.FailedPrecondition, "reset handler failed"))
+	if gotErr := sub.StartError(); gotErr != nil {
+		t.Errorf("Start() got err: (%v)", gotErr)
+	}
+	sub.Receiver.ValidateMsg(msg)
+	barrier.Release()
+	if gotErr := sub.FinalError(); gotErr != nil {
+		t.Errorf("Final err: (%v), want: <nil>", gotErr)
+	}
+}
+
 type testSinglePartitionSubscriber singlePartitionSubscriber
 
 func (t *testSinglePartitionSubscriber) WaitStopped() error {
@@ -595,7 +627,7 @@
 	// Verifies the behavior of the subscribeStream and committer when they are
 	// stopped before any messages are received.
 	subStream := test.NewRPCVerifier(t)
-	subStream.Push(initSubReq(subscription), initSubResp(), nil)
+	subStream.Push(initSubReqCommit(subscription), initSubResp(), nil)
 	barrier := subStream.PushWithBarrier(initFlowControlReq(), nil, nil)
 	verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, subStream)
 
@@ -626,7 +658,7 @@
 	verifiers := test.NewVerifiers(t)
 
 	subStream := test.NewRPCVerifier(t)
-	subStream.Push(initSubReq(subscription), initSubResp(), nil)
+	subStream.Push(initSubReqCommit(subscription), initSubResp(), nil)
 	subStream.Push(initFlowControlReq(), msgSubResp(msg1, msg2), nil)
 	verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, subStream)
 
@@ -661,17 +693,16 @@
 	verifiers := test.NewVerifiers(t)
 
 	subStream1 := test.NewRPCVerifier(t)
-	subStream1.Push(initSubReq(subscription), initSubResp(), nil)
+	subStream1.Push(initSubReqCommit(subscription), initSubResp(), nil)
 	subStream1.Push(initFlowControlReq(), msgSubResp(msg1), nil)
 	subStream1.Push(nil, msgSubResp(msg2), nil)
 	subStream1.Push(nil, nil, retryableErr)
 	verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, subStream1)
 
-	// When reconnected, the subscribeStream should seek to msg3 and have
-	// subtracted flow control tokens for msg1 and msg2.
+	// When reconnected, the subscribeStream should set initial cursor to msg3 and
+	// have subtracted flow control tokens for msg1 and msg2.
 	subStream2 := test.NewRPCVerifier(t)
-	subStream2.Push(initSubReq(subscription), initSubResp(), nil)
-	subStream2.Push(seekReq(3), nil, nil)
+	subStream2.Push(initSubReqCursor(subscription, 3), initSubResp(), nil)
 	subStream2.Push(flowControlSubReq(flowControlTokens{Bytes: 800, Messages: 8}), msgSubResp(msg3), nil)
 	verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, subStream2)
 
@@ -719,7 +750,7 @@
 	verifiers := test.NewVerifiers(t)
 
 	subStream := test.NewRPCVerifier(t)
-	subStream.Push(initSubReq(subscription), initSubResp(), nil)
+	subStream.Push(initSubReqCommit(subscription), initSubResp(), nil)
 	subStream.Push(initFlowControlReq(), msgSubResp(msg1, msg2), nil)
 	verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, subStream)
 
@@ -748,6 +779,156 @@
 	receiver.VerifyNoMsgs() // msg2 should not be received
 }
 
+func TestSinglePartitionSubscriberAdminSeekWhileConnected(t *testing.T) {
+	subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0}
+	receiver := newTestMessageReceiver(t)
+	msg1 := seqMsgWithOffsetAndSize(1, 100)
+	msg2 := seqMsgWithOffsetAndSize(2, 100)
+	msg3 := seqMsgWithOffsetAndSize(3, 100)
+
+	verifiers := test.NewVerifiers(t)
+
+	subStream1 := test.NewRPCVerifier(t)
+	subStream1.Push(initSubReqCommit(subscription), initSubResp(), nil)
+	subStream1.Push(initFlowControlReq(), msgSubResp(msg1, msg2, msg3), nil)
+	// Server disconnects the stream with the RESET signal.
+	barrier := subStream1.PushWithBarrier(nil, nil, makeStreamResetSignal())
+	verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, subStream1)
+
+	subStream2 := test.NewRPCVerifier(t)
+	// Reconnected stream reads from commit cursor.
+	subStream2.Push(initSubReqCommit(subscription), initSubResp(), nil)
+	// Ensure that the subscriber resets state and can handle seeking back to
+	// msg1.
+	subStream2.Push(initFlowControlReq(), msgSubResp(msg1), nil)
+	verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, subStream2)
+
+	cmtStream := test.NewRPCVerifier(t)
+	cmtStream.Push(initCommitReq(subscription), initCommitResp(), nil)
+	cmtStream.Push(commitReq(4), commitResp(1), nil)
+	cmtStream.Push(commitReq(2), commitResp(1), nil)
+	verifiers.AddCommitStream(subscription.Path, subscription.Partition, cmtStream)
+
+	mockServer.OnTestStart(verifiers)
+	defer mockServer.OnTestEnd()
+
+	sub := newTestSinglePartitionSubscriber(t, receiver.onMessage, subscription)
+	if gotErr := sub.WaitStarted(); gotErr != nil {
+		t.Errorf("Start() got err: (%v)", gotErr)
+	}
+
+	receiver.ValidateMsg(msg1).Ack()
+	receiver.ValidateMsg(msg2).Ack()
+	receiver.ValidateMsg(msg3).Ack()
+	barrier.Release()
+	receiver.ValidateMsg(msg1).Ack()
+
+	sub.Stop()
+	if gotErr := sub.WaitStopped(); gotErr != nil {
+		t.Errorf("Stop() got err: (%v)", gotErr)
+	}
+}
+
+func TestSinglePartitionSubscriberAdminSeekWhileReconnecting(t *testing.T) {
+	subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0}
+	receiver := newTestMessageReceiver(t)
+	msg1 := seqMsgWithOffsetAndSize(1, 100)
+	msg2 := seqMsgWithOffsetAndSize(2, 100)
+	msg3 := seqMsgWithOffsetAndSize(3, 100)
+
+	verifiers := test.NewVerifiers(t)
+
+	subStream1 := test.NewRPCVerifier(t)
+	subStream1.Push(initSubReqCommit(subscription), initSubResp(), nil)
+	subStream1.Push(initFlowControlReq(), msgSubResp(msg1, msg2, msg3), nil)
+	// Normal stream breakage.
+	barrier := subStream1.PushWithBarrier(nil, nil, status.Error(codes.DeadlineExceeded, ""))
+	verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, subStream1)
+
+	subStream2 := test.NewRPCVerifier(t)
+	// The server sends the RESET signal during stream initialization.
+	subStream2.Push(initSubReqCursor(subscription, 4), nil, makeStreamResetSignal())
+	verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, subStream2)
+
+	subStream3 := test.NewRPCVerifier(t)
+	// Reconnected stream reads from commit cursor.
+	subStream3.Push(initSubReqCommit(subscription), initSubResp(), nil)
+	// Ensure that the subscriber resets state and can handle seeking back to
+	// msg1.
+	subStream3.Push(initFlowControlReq(), msgSubResp(msg1), nil)
+	verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, subStream3)
+
+	cmtStream := test.NewRPCVerifier(t)
+	cmtStream.Push(initCommitReq(subscription), initCommitResp(), nil)
+	cmtStream.Push(commitReq(3), commitResp(1), nil)
+	cmtStream.Push(commitReq(2), commitResp(1), nil)
+	verifiers.AddCommitStream(subscription.Path, subscription.Partition, cmtStream)
+
+	mockServer.OnTestStart(verifiers)
+	defer mockServer.OnTestEnd()
+
+	sub := newTestSinglePartitionSubscriber(t, receiver.onMessage, subscription)
+	if gotErr := sub.WaitStarted(); gotErr != nil {
+		t.Errorf("Start() got err: (%v)", gotErr)
+	}
+
+	receiver.ValidateMsg(msg1).Ack()
+	receiver.ValidateMsg(msg2).Ack()
+	ack := receiver.ValidateMsg(msg3) // Unacked message discarded
+	barrier.Release()
+	receiver.ValidateMsg(msg1).Ack()
+	ack.Ack() // Should be ignored
+
+	sub.Stop()
+	if gotErr := sub.WaitStopped(); gotErr != nil {
+		t.Errorf("Stop() got err: (%v)", gotErr)
+	}
+}
+
+func TestSinglePartitionSubscriberStopDuringAdminSeek(t *testing.T) {
+	subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0}
+	receiver := newTestMessageReceiver(t)
+	msg1 := seqMsgWithOffsetAndSize(1, 100)
+	msg2 := seqMsgWithOffsetAndSize(2, 100)
+
+	verifiers := test.NewVerifiers(t)
+
+	subStream := test.NewRPCVerifier(t)
+	subStream.Push(initSubReqCommit(subscription), initSubResp(), nil)
+	subStream.Push(initFlowControlReq(), msgSubResp(msg1, msg2), nil)
+	// Server disconnects the stream with the RESET signal.
+	subBarrier := subStream.PushWithBarrier(nil, nil, makeStreamResetSignal())
+	verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, subStream)
+
+	cmtStream := test.NewRPCVerifier(t)
+	cmtStream.Push(initCommitReq(subscription), initCommitResp(), nil)
+	cmtBarrier := cmtStream.PushWithBarrier(commitReq(3), commitResp(1), nil)
+	verifiers.AddCommitStream(subscription.Path, subscription.Partition, cmtStream)
+
+	mockServer.OnTestStart(verifiers)
+	defer mockServer.OnTestEnd()
+
+	sub := newTestSinglePartitionSubscriber(t, receiver.onMessage, subscription)
+	if gotErr := sub.WaitStarted(); gotErr != nil {
+		t.Errorf("Start() got err: (%v)", gotErr)
+	}
+
+	receiver.ValidateMsg(msg1).Ack()
+	receiver.ValidateMsg(msg2).Ack()
+	subBarrier.Release()
+
+	// Ensure that the user is able to call Stop while a reset is in progress.
+	// Verifies that the subscribeStream is not holding mutexes while waiting and
+	// that the subscribe stream is not reconnected.
+	cmtBarrier.ReleaseAfter(func() {
+		sub.Stop()
+	})
+
+	if gotErr := sub.WaitStopped(); gotErr != nil {
+		t.Errorf("Stop() got err: (%v)", gotErr)
+	}
+}
+
 func newTestMultiPartitionSubscriber(t *testing.T, receiverFunc MessageReceiverFunc, subscriptionPath string, partitions []int) *multiPartitionSubscriber {
 	ctx := context.Background()
 	subClient, err := newSubscriberClient(ctx, "ignored", testServer.ClientConn())
@@ -787,7 +968,7 @@
 
 	// Partition 1
 	subStream1 := test.NewRPCVerifier(t)
-	subStream1.Push(initSubReq(subscriptionPartition{Path: subscription, Partition: 1}), initSubResp(), nil)
+	subStream1.Push(initSubReqCommit(subscriptionPartition{Path: subscription, Partition: 1}), initSubResp(), nil)
 	subStream1.Push(initFlowControlReq(), msgSubResp(msg1), nil)
 	subStream1.Push(nil, msgSubResp(msg2), nil)
 	verifiers.AddSubscribeStream(subscription, 1, subStream1)
@@ -799,7 +980,7 @@
 
 	// Partition 2
 	subStream2 := test.NewRPCVerifier(t)
-	subStream2.Push(initSubReq(subscriptionPartition{Path: subscription, Partition: 2}), initSubResp(), nil)
+	subStream2.Push(initSubReqCommit(subscriptionPartition{Path: subscription, Partition: 2}), initSubResp(), nil)
 	subStream2.Push(initFlowControlReq(), msgSubResp(msg3), nil)
 	subStream2.Push(nil, msgSubResp(msg4), nil)
 	verifiers.AddSubscribeStream(subscription, 2, subStream2)
@@ -835,7 +1016,7 @@
 
 	// Partition 1
 	subStream1 := test.NewRPCVerifier(t)
-	subStream1.Push(initSubReq(subscriptionPartition{Path: subscription, Partition: 1}), initSubResp(), nil)
+	subStream1.Push(initSubReqCommit(subscriptionPartition{Path: subscription, Partition: 1}), initSubResp(), nil)
 	subStream1.Push(initFlowControlReq(), msgSubResp(msg1), nil)
 	msg2Barrier := subStream1.PushWithBarrier(nil, msgSubResp(msg2), nil)
 	verifiers.AddSubscribeStream(subscription, 1, subStream1)
@@ -847,7 +1028,7 @@
 
 	// Partition 2
 	subStream2 := test.NewRPCVerifier(t)
-	subStream2.Push(initSubReq(subscriptionPartition{Path: subscription, Partition: 2}), initSubResp(), nil)
+	subStream2.Push(initSubReqCommit(subscriptionPartition{Path: subscription, Partition: 2}), initSubResp(), nil)
 	subStream2.Push(initFlowControlReq(), msgSubResp(msg3), nil)
 	errorBarrier := subStream2.PushWithBarrier(nil, nil, serverErr)
 	verifiers.AddSubscribeStream(subscription, 2, subStream2)
@@ -960,7 +1141,7 @@
 
 	// Partition 3
 	subStream3 := test.NewRPCVerifier(t)
-	subStream3.Push(initSubReq(subscriptionPartition{Path: subscription, Partition: 3}), initSubResp(), nil)
+	subStream3.Push(initSubReqCommit(subscriptionPartition{Path: subscription, Partition: 3}), initSubResp(), nil)
 	subStream3.Push(initFlowControlReq(), msgSubResp(msg1), nil)
 	msg2Barrier := subStream3.PushWithBarrier(nil, msgSubResp(msg2), nil)
 	verifiers.AddSubscribeStream(subscription, 3, subStream3)
@@ -973,7 +1154,7 @@
 
 	// Partition 6
 	subStream6 := test.NewRPCVerifier(t)
-	subStream6.Push(initSubReq(subscriptionPartition{Path: subscription, Partition: 6}), initSubResp(), nil)
+	subStream6.Push(initSubReqCommit(subscriptionPartition{Path: subscription, Partition: 6}), initSubResp(), nil)
 	subStream6.Push(initFlowControlReq(), msgSubResp(msg3), nil)
 	// msg4 should not be received.
 	msg4Barrier := subStream6.PushWithBarrier(nil, msgSubResp(msg4), nil)
@@ -986,7 +1167,7 @@
 
 	// Partition 8
 	subStream8 := test.NewRPCVerifier(t)
-	subStream8.Push(initSubReq(subscriptionPartition{Path: subscription, Partition: 8}), initSubResp(), nil)
+	subStream8.Push(initSubReqCommit(subscriptionPartition{Path: subscription, Partition: 8}), initSubResp(), nil)
 	subStream8.Push(initFlowControlReq(), msgSubResp(msg5), nil)
 	verifiers.AddSubscribeStream(subscription, 8, subStream8)
 
@@ -1047,7 +1228,7 @@
 
 	// Partition 1
 	subStream1 := test.NewRPCVerifier(t)
-	subStream1.Push(initSubReq(subscriptionPartition{Path: subscription, Partition: 1}), initSubResp(), nil)
+	subStream1.Push(initSubReqCommit(subscriptionPartition{Path: subscription, Partition: 1}), initSubResp(), nil)
 	subStream1.Push(initFlowControlReq(), msgSubResp(msg1), nil)
 	verifiers.AddSubscribeStream(subscription, 1, subStream1)
 
@@ -1058,7 +1239,7 @@
 
 	// Partition 2
 	subStream2 := test.NewRPCVerifier(t)
-	subStream2.Push(initSubReq(subscriptionPartition{Path: subscription, Partition: 2}), initSubResp(), nil)
+	subStream2.Push(initSubReqCommit(subscriptionPartition{Path: subscription, Partition: 2}), initSubResp(), nil)
 	subStream2.Push(initFlowControlReq(), msgSubResp(msg2), nil)
 	verifiers.AddSubscribeStream(subscription, 2, subStream2)
 
@@ -1101,7 +1282,7 @@
 
 	// Partition 1
 	subStream := test.NewRPCVerifier(t)
-	subStream.Push(initSubReq(subscriptionPartition{Path: subscription, Partition: 1}), initSubResp(), nil)
+	subStream.Push(initSubReqCommit(subscriptionPartition{Path: subscription, Partition: 1}), initSubResp(), nil)
 	subStream.Push(initFlowControlReq(), msgSubResp(msg1, msg2), nil)
 	verifiers.AddSubscribeStream(subscription, 1, subStream)