feat(pubsublite): support out of band seeks (#4208)
- Supports admin/out of band seeks pushed from the server.
- Sets the new InitialSubscribeRequest.initial_location field for subscribe streams.
diff --git a/pubsublite/internal/wire/committer.go b/pubsublite/internal/wire/committer.go
index 5344e45..2d2a324 100644
--- a/pubsublite/internal/wire/committer.go
+++ b/pubsublite/internal/wire/committer.go
@@ -123,6 +123,7 @@
c.mu.Lock()
defer c.mu.Unlock()
+ c.acks.Release() // Discard outstanding acks
for !c.cursorTracker.UpToDate() && c.status < serviceTerminating {
c.unsafeCommitOffsetToStream()
c.flushPending.Wait()
diff --git a/pubsublite/internal/wire/committer_test.go b/pubsublite/internal/wire/committer_test.go
index 304b676..018886f 100644
--- a/pubsublite/internal/wire/committer_test.go
+++ b/pubsublite/internal/wire/committer_test.go
@@ -328,8 +328,7 @@
verifiers := test.NewVerifiers(t)
stream := test.NewRPCVerifier(t)
stream.Push(initCommitReq(subscription), initCommitResp(), nil)
- stream.Push(commitReq(34), commitResp(1), nil)
- barrier := stream.PushWithBarrier(commitReq(56), commitResp(1), nil)
+ barrier := stream.PushWithBarrier(commitReq(34), commitResp(1), nil)
verifiers.AddCommitStream(subscription.Path, subscription.Partition, stream)
mockServer.OnTestStart(verifiers)
@@ -340,6 +339,8 @@
t.Errorf("Start() got err: (%v)", gotErr)
}
+ ack1.Ack()
+
complete := test.NewCondition("blocking reset complete")
go func() {
if err := cmt.BlockingReset(); err != nil {
@@ -350,14 +351,8 @@
}()
complete.VerifyNotDone(t)
- ack1.Ack()
- cmt.SendBatchCommit()
- complete.VerifyNotDone(t)
- ack2.Ack()
- cmt.SendBatchCommit()
-
- // Until the final commit response is received, committer.BlockingReset should
- // not return.
+ // Until the commit response is received, committer.BlockingReset should not
+ // return.
barrier.ReleaseAfter(func() {
complete.VerifyNotDone(t)
})
@@ -371,6 +366,9 @@
t.Errorf("ackTracker.Empty() got %v, want %v", got, want)
}
+ // This ack should have been discarded.
+ ack2.Ack()
+
// Calling committer.BlockingReset again should immediately return.
if err := cmt.BlockingReset(); err != nil {
t.Errorf("BlockingReset() got err: (%v), want: <nil>", err)
@@ -390,7 +388,7 @@
verifiers := test.NewVerifiers(t)
stream := test.NewRPCVerifier(t)
stream.Push(initCommitReq(subscription), initCommitResp(), nil)
- stream.Push(commitReq(34), commitResp(1), nil)
+ barrier := stream.PushWithBarrier(commitReq(34), commitResp(1), nil)
verifiers.AddCommitStream(subscription.Path, subscription.Partition, stream)
mockServer.OnTestStart(verifiers)
@@ -401,6 +399,8 @@
t.Errorf("Start() got err: (%v)", gotErr)
}
+ ack1.Ack()
+
complete := test.NewCondition("blocking reset complete")
go func() {
if got, want := cmt.BlockingReset(), ErrServiceStopped; !test.ErrorEqual(got, want) {
@@ -410,18 +410,11 @@
}()
complete.VerifyNotDone(t)
- ack1.Ack()
- cmt.SendBatchCommit()
- complete.VerifyNotDone(t)
-
// committer.BlockingReset should return when the committer is stopped.
- cmt.Stop()
- complete.WaitUntilDone(t, serviceTestWaitTimeout)
-
- // Ack tracker should not be reset.
- if got, want := acks.Empty(), false; got != want {
- t.Errorf("ackTracker.Empty() got %v, want %v", got, want)
- }
+ barrier.ReleaseAfter(func() {
+ cmt.Stop()
+ complete.WaitUntilDone(t, serviceTestWaitTimeout)
+ })
cmt.Terminate()
if gotErr := cmt.FinalError(); gotErr != nil {
@@ -452,6 +445,8 @@
t.Errorf("Start() got err: (%v)", gotErr)
}
+ ack1.Ack()
+
complete := test.NewCondition("blocking reset complete")
go func() {
if got, want := cmt.BlockingReset(), ErrServiceStopped; !test.ErrorEqual(got, want) {
@@ -459,10 +454,6 @@
}
complete.SetDone()
}()
- complete.VerifyNotDone(t)
-
- ack1.Ack()
- cmt.SendBatchCommit()
// committer.BlockingReset should return when the committer terminates due to
// fatal server error.
diff --git a/pubsublite/internal/wire/flow_control.go b/pubsublite/internal/wire/flow_control.go
index 7a04213..b7cb005 100644
--- a/pubsublite/internal/wire/flow_control.go
+++ b/pubsublite/internal/wire/flow_control.go
@@ -160,11 +160,14 @@
}
// RequestForRestart returns the seek request to send when a new subscribe
-// stream reconnects. Returns nil if the subscriber has just started, in which
-// case the server returns the offset of the last committed cursor.
+// stream reconnects.
func (ot *subscriberOffsetTracker) RequestForRestart() *pb.SeekRequest {
if ot.minNextOffset <= 0 {
- return nil
+ return &pb.SeekRequest{
+ Target: &pb.SeekRequest_NamedTarget_{
+ NamedTarget: pb.SeekRequest_COMMITTED_CURSOR,
+ },
+ }
}
return &pb.SeekRequest{
Target: &pb.SeekRequest_Cursor{
diff --git a/pubsublite/internal/wire/flow_control_test.go b/pubsublite/internal/wire/flow_control_test.go
index d070f1f..24f878c 100644
--- a/pubsublite/internal/wire/flow_control_test.go
+++ b/pubsublite/internal/wire/flow_control_test.go
@@ -253,7 +253,11 @@
{
desc: "Uninitialized tracker",
tracker: subscriberOffsetTracker{},
- want: nil,
+ want: &pb.SeekRequest{
+ Target: &pb.SeekRequest_NamedTarget_{
+ NamedTarget: pb.SeekRequest_COMMITTED_CURSOR,
+ },
+ },
},
{
desc: "Next offset positive",
diff --git a/pubsublite/internal/wire/requests_test.go b/pubsublite/internal/wire/requests_test.go
index 94944a9..d265f7b 100644
--- a/pubsublite/internal/wire/requests_test.go
+++ b/pubsublite/internal/wire/requests_test.go
@@ -134,12 +134,33 @@
// SubscriberService
-func initSubReq(subscription subscriptionPartition) *pb.SubscribeRequest {
+func initSubReqCommit(subscription subscriptionPartition) *pb.SubscribeRequest {
return &pb.SubscribeRequest{
Request: &pb.SubscribeRequest_Initial{
Initial: &pb.InitialSubscribeRequest{
Subscription: subscription.Path,
Partition: int64(subscription.Partition),
+ InitialLocation: &pb.SeekRequest{
+ Target: &pb.SeekRequest_NamedTarget_{
+ NamedTarget: pb.SeekRequest_COMMITTED_CURSOR,
+ },
+ },
+ },
+ },
+ }
+}
+
+func initSubReqCursor(subscription subscriptionPartition, offset int64) *pb.SubscribeRequest {
+ return &pb.SubscribeRequest{
+ Request: &pb.SubscribeRequest_Initial{
+ Initial: &pb.InitialSubscribeRequest{
+ Subscription: subscription.Path,
+ Partition: int64(subscription.Partition),
+ InitialLocation: &pb.SeekRequest{
+ Target: &pb.SeekRequest_Cursor{
+ Cursor: &pb.Cursor{Offset: offset},
+ },
+ },
},
},
}
@@ -153,18 +174,6 @@
}
}
-func seekReq(offset int64) *pb.SubscribeRequest {
- return &pb.SubscribeRequest{
- Request: &pb.SubscribeRequest_Seek{
- Seek: &pb.SeekRequest{
- Target: &pb.SeekRequest_Cursor{
- Cursor: &pb.Cursor{Offset: offset},
- },
- },
- },
- }
-}
-
func seekResp(offset int64) *pb.SubscribeResponse {
return &pb.SubscribeResponse{
Response: &pb.SubscribeResponse_Seek{
diff --git a/pubsublite/internal/wire/subscriber.go b/pubsublite/internal/wire/subscriber.go
index 125eeec..b6860a6 100644
--- a/pubsublite/internal/wire/subscriber.go
+++ b/pubsublite/internal/wire/subscriber.go
@@ -31,8 +31,7 @@
var (
errServerNoMessages = errors.New("pubsublite: server delivered no messages")
errInvalidInitialSubscribeResponse = errors.New("pubsublite: first response from server was not an initial response for subscribe")
- errInvalidSubscribeResponse = errors.New("pubsublite: received invalid subscribe response from server")
- errNoInFlightSeek = errors.New("pubsublite: received seek response for no in-flight seek")
+ errInvalidSubscribeResponse = errors.New("pubsublite: received unexpected subscribe response from server")
)
// ReceivedMessage stores a received Pub/Sub message and AckConsumer for
@@ -126,6 +125,10 @@
// The frequency of sending batch flow control requests.
const batchFlowControlPeriod = 100 * time.Millisecond
+// Handles subscriber reset actions that are external to the subscribeStream
+// (e.g. wait for the committer to flush commits).
+type subscriberResetHandler func() error
+
// subscribeStream directly wraps the subscribe client stream. It passes
// messages to the message receiver and manages flow control. Flow control
// tokens are batched and sent to the stream via a periodic background task,
@@ -137,7 +140,7 @@
subClient *vkit.SubscriberClient
settings ReceiveSettings
subscription subscriptionPartition
- initialReq *pb.SubscribeRequest
+ handleReset subscriberResetHandler
metadata pubsubMetadata
// Fields below must be guarded with mu.
@@ -146,27 +149,20 @@
offsetTracker subscriberOffsetTracker
flowControl flowControlBatcher
pollFlowControl *periodicTask
- seekInFlight bool
enableBatchFlowControl bool
abstractService
}
func newSubscribeStream(ctx context.Context, subClient *vkit.SubscriberClient, settings ReceiveSettings,
- receiver MessageReceiverFunc, subscription subscriptionPartition, acks *ackTracker, disableTasks bool) *subscribeStream {
+ receiver MessageReceiverFunc, subscription subscriptionPartition, acks *ackTracker,
+ handleReset subscriberResetHandler, disableTasks bool) *subscribeStream {
s := &subscribeStream{
subClient: subClient,
settings: settings,
subscription: subscription,
- initialReq: &pb.SubscribeRequest{
- Request: &pb.SubscribeRequest_Initial{
- Initial: &pb.InitialSubscribeRequest{
- Subscription: subscription.Path,
- Partition: int64(subscription.Partition),
- },
- },
- },
+ handleReset: handleReset,
messageQueue: newMessageDeliveryQueue(acks, receiver, settings.MaxOutstandingMessages),
metadata: newPubsubMetadata(),
}
@@ -212,7 +208,18 @@
}
func (s *subscribeStream) initialRequest() (interface{}, initialResponseRequired) {
- return s.initialReq, initialResponseRequired(true)
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ initReq := &pb.SubscribeRequest{
+ Request: &pb.SubscribeRequest_Initial{
+ Initial: &pb.InitialSubscribeRequest{
+ Subscription: s.subscription.Path,
+ Partition: int64(s.subscription.Partition),
+ InitialLocation: s.offsetTracker.RequestForRestart(),
+ },
+ },
+ }
+ return initReq, initialResponseRequired(true)
}
func (s *subscribeStream) validateInitialResponse(response interface{}) error {
@@ -231,27 +238,43 @@
case streamConnected:
s.unsafeUpdateStatus(serviceActive, nil)
- // Reinitialize the offset and flow control tokens when a new subscribe
- // stream instance is connected.
- if seekReq := s.offsetTracker.RequestForRestart(); seekReq != nil {
- if !s.stream.Send(&pb.SubscribeRequest{
- Request: &pb.SubscribeRequest_Seek{Seek: seekReq},
- }) {
- return
- }
- s.seekInFlight = true
- }
+ // Reinitialize the flow control tokens when a new subscribe stream instance
+ // is connected.
s.unsafeSendFlowControl(s.flowControl.RequestForRestart())
s.enableBatchFlowControl = true
s.pollFlowControl.Start()
case streamReconnecting:
- s.seekInFlight = false
// Ensure no batch flow control tokens are sent until the RequestForRestart
// is sent above when a new subscribe stream is initialized.
s.enableBatchFlowControl = false
s.pollFlowControl.Stop()
+ case streamResetState:
+ // Handle out-of-band seek notifications from the server. Committer and
+ // subscriber state are reset.
+
+ s.messageQueue.Stop()
+
+ // Wait for all message receiver callbacks to finish and the committer to
+ // flush pending commits and reset its state. Release the mutex while
+ // waiting.
+ s.mu.Unlock()
+ s.messageQueue.Wait()
+ err := s.handleReset()
+ s.mu.Lock()
+
+ if err != nil {
+ s.unsafeInitiateShutdown(serviceTerminating, nil)
+ return
+ }
+ s.messageQueue.Start()
+ s.offsetTracker.Reset()
+ s.flowControl.Reset(flowControlTokens{
+ Bytes: int64(s.settings.MaxOutstandingBytes),
+ Messages: int64(s.settings.MaxOutstandingMessages),
+ })
+
case streamTerminated:
s.unsafeInitiateShutdown(serviceTerminated, s.stream.Error())
}
@@ -270,8 +293,6 @@
switch {
case subscribeResponse.GetMessages() != nil:
err = s.unsafeOnMessageResponse(subscribeResponse.GetMessages())
- case subscribeResponse.GetSeek() != nil:
- err = s.unsafeOnSeekResponse(subscribeResponse.GetSeek())
default:
err = errInvalidSubscribeResponse
}
@@ -280,14 +301,6 @@
}
}
-func (s *subscribeStream) unsafeOnSeekResponse(response *pb.SeekResponse) error {
- if !s.seekInFlight {
- return errNoInFlightSeek
- }
- s.seekInFlight = false
- return nil
-}
-
func (s *subscribeStream) unsafeOnMessageResponse(response *pb.MessageResponse) error {
if len(response.Messages) == 0 {
return errServerNoMessages
@@ -388,7 +401,7 @@
subscription := subscriptionPartition{Path: f.subscriptionPath, Partition: partition}
acks := newAckTracker()
commit := newCommitter(f.ctx, f.cursorClient, f.settings, subscription, acks, f.disableTasks)
- sub := newSubscribeStream(f.ctx, f.subClient, f.settings, f.receiver, subscription, acks, f.disableTasks)
+ sub := newSubscribeStream(f.ctx, f.subClient, f.settings, f.receiver, subscription, acks, commit.BlockingReset, f.disableTasks)
ps := &singlePartitionSubscriber{
subscriber: sub,
committer: commit,
diff --git a/pubsublite/internal/wire/subscriber_test.go b/pubsublite/internal/wire/subscriber_test.go
index a45bb28..0c30fdb 100644
--- a/pubsublite/internal/wire/subscriber_test.go
+++ b/pubsublite/internal/wire/subscriber_test.go
@@ -16,6 +16,7 @@
import (
"context"
"sort"
+ "sync"
"testing"
"time"
@@ -244,11 +245,14 @@
type testSubscribeStream struct {
Receiver *testMessageReceiver
t *testing.T
+ acks *ackTracker
sub *subscribeStream
+ mu sync.Mutex
+ resetErr error
serviceTestProxy
}
-func newTestSubscribeStream(t *testing.T, subscription subscriptionPartition, settings ReceiveSettings, acks *ackTracker) *testSubscribeStream {
+func newTestSubscribeStream(t *testing.T, subscription subscriptionPartition, settings ReceiveSettings) *testSubscribeStream {
ctx := context.Background()
subClient, err := newSubscriberClient(ctx, "ignored", testServer.ClientConn())
if err != nil {
@@ -258,8 +262,9 @@
ts := &testSubscribeStream{
Receiver: newTestMessageReceiver(t),
t: t,
+ acks: newAckTracker(),
}
- ts.sub = newSubscribeStream(ctx, subClient, settings, ts.Receiver.onMessage, subscription, acks, true)
+ ts.sub = newSubscribeStream(ctx, subClient, settings, ts.Receiver.onMessage, subscription, ts.acks, ts.handleReset, true)
ts.initAndStart(t, ts.sub, "Subscriber", subClient)
return ts
}
@@ -276,9 +281,20 @@
return ts.sub.flowControl.pendingTokens.ToFlowControlRequest()
}
+func (ts *testSubscribeStream) SetResetErr(err error) {
+ ts.mu.Lock()
+ defer ts.mu.Unlock()
+ ts.resetErr = err
+}
+
+func (ts *testSubscribeStream) handleReset() error {
+ ts.mu.Lock()
+ defer ts.mu.Unlock()
+ return ts.resetErr
+}
+
func TestSubscribeStreamReconnect(t *testing.T) {
subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0}
- acks := newAckTracker()
msg1 := seqMsgWithOffsetAndSize(67, 200)
msg2 := seqMsgWithOffsetAndSize(68, 100)
permanentErr := status.Error(codes.FailedPrecondition, "permanent failure")
@@ -286,16 +302,15 @@
verifiers := test.NewVerifiers(t)
stream1 := test.NewRPCVerifier(t)
- stream1.Push(initSubReq(subscription), initSubResp(), nil)
+ stream1.Push(initSubReqCommit(subscription), initSubResp(), nil)
stream1.Push(initFlowControlReq(), msgSubResp(msg1), nil)
stream1.Push(nil, nil, status.Error(codes.Unavailable, "server unavailable"))
verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, stream1)
- // When reconnected, the subscribeStream should seek to msg2 and have
- // subtracted flow control tokens.
+ // When reconnected, the subscribeStream should set initial cursor to msg2 and
+ // have subtracted flow control tokens.
stream2 := test.NewRPCVerifier(t)
- stream2.Push(initSubReq(subscription), initSubResp(), nil)
- stream2.Push(seekReq(68), seekResp(68), nil)
+ stream2.Push(initSubReqCursor(subscription, 68), initSubResp(), nil)
stream2.Push(flowControlSubReq(flowControlTokens{Bytes: 800, Messages: 9}), msgSubResp(msg2), nil)
// Subscriber should terminate on permanent error.
stream2.Push(nil, nil, permanentErr)
@@ -304,7 +319,7 @@
mockServer.OnTestStart(verifiers)
defer mockServer.OnTestEnd()
- sub := newTestSubscribeStream(t, subscription, testSubscriberSettings(), acks)
+ sub := newTestSubscribeStream(t, subscription, testSubscriberSettings())
if gotErr := sub.StartError(); gotErr != nil {
t.Errorf("Start() got err: (%v)", gotErr)
}
@@ -317,14 +332,13 @@
func TestSubscribeStreamFlowControlBatching(t *testing.T) {
subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0}
- acks := newAckTracker()
msg1 := seqMsgWithOffsetAndSize(67, 200)
msg2 := seqMsgWithOffsetAndSize(68, 100)
serverErr := status.Error(codes.InvalidArgument, "verifies flow control received")
verifiers := test.NewVerifiers(t)
stream := test.NewRPCVerifier(t)
- stream.Push(initSubReq(subscription), initSubResp(), nil)
+ stream.Push(initSubReqCommit(subscription), initSubResp(), nil)
stream.Push(initFlowControlReq(), msgSubResp(msg1, msg2), nil)
// Batch flow control request expected.
stream.Push(flowControlSubReq(flowControlTokens{Bytes: 300, Messages: 2}), nil, serverErr)
@@ -333,7 +347,7 @@
mockServer.OnTestStart(verifiers)
defer mockServer.OnTestEnd()
- sub := newTestSubscribeStream(t, subscription, testSubscriberSettings(), acks)
+ sub := newTestSubscribeStream(t, subscription, testSubscriberSettings())
if gotErr := sub.StartError(); gotErr != nil {
t.Errorf("Start() got err: (%v)", gotErr)
}
@@ -349,7 +363,6 @@
func TestSubscribeStreamExpediteFlowControl(t *testing.T) {
subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0}
- acks := newAckTracker()
msg1 := seqMsgWithOffsetAndSize(67, 250)
// MaxOutstandingBytes = 1000, so msg2 pushes the pending flow control bytes
// over the expediteBatchRequestRatio=50% threshold in flowControlBatcher.
@@ -358,7 +371,7 @@
verifiers := test.NewVerifiers(t)
stream := test.NewRPCVerifier(t)
- stream.Push(initSubReq(subscription), initSubResp(), nil)
+ stream.Push(initSubReqCommit(subscription), initSubResp(), nil)
stream.Push(initFlowControlReq(), msgSubResp(msg1, msg2), nil)
// Batch flow control request expected.
stream.Push(flowControlSubReq(flowControlTokens{Bytes: 501, Messages: 2}), nil, serverErr)
@@ -367,7 +380,7 @@
mockServer.OnTestStart(verifiers)
defer mockServer.OnTestEnd()
- sub := newTestSubscribeStream(t, subscription, testSubscriberSettings(), acks)
+ sub := newTestSubscribeStream(t, subscription, testSubscriberSettings())
if gotErr := sub.StartError(); gotErr != nil {
t.Errorf("Start() got err: (%v)", gotErr)
}
@@ -383,7 +396,6 @@
func TestSubscribeStreamDisableBatchFlowControl(t *testing.T) {
subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0}
- acks := newAckTracker()
// MaxOutstandingBytes = 1000, so this pushes the pending flow control bytes
// over the expediteBatchRequestRatio=50% threshold in flowControlBatcher.
msg := seqMsgWithOffsetAndSize(67, 800)
@@ -393,7 +405,7 @@
verifiers := test.NewVerifiers(t)
stream1 := test.NewRPCVerifier(t)
- stream1.Push(initSubReq(subscription), initSubResp(), nil)
+ stream1.Push(initSubReqCommit(subscription), initSubResp(), nil)
stream1.Push(initFlowControlReq(), msgSubResp(msg), nil)
// Break the stream immediately after sending the message.
stream1.Push(nil, nil, retryableErr)
@@ -401,8 +413,7 @@
stream2 := test.NewRPCVerifier(t)
// The barrier is used to pause in the middle of stream reconnection.
- barrier := stream2.PushWithBarrier(initSubReq(subscription), initSubResp(), nil)
- stream2.Push(seekReq(68), seekResp(68), nil)
+ barrier := stream2.PushWithBarrier(initSubReqCursor(subscription, 68), initSubResp(), nil)
// Full flow control tokens should be sent after stream has connected.
stream2.Push(initFlowControlReq(), nil, serverErr)
verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, stream2)
@@ -410,7 +421,7 @@
mockServer.OnTestStart(verifiers)
defer mockServer.OnTestEnd()
- sub := newTestSubscribeStream(t, subscription, testSubscriberSettings(), acks)
+ sub := newTestSubscribeStream(t, subscription, testSubscriberSettings())
if gotErr := sub.StartError(); gotErr != nil {
t.Errorf("Start() got err: (%v)", gotErr)
}
@@ -432,17 +443,16 @@
func TestSubscribeStreamInvalidInitialResponse(t *testing.T) {
subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0}
- acks := newAckTracker()
verifiers := test.NewVerifiers(t)
stream := test.NewRPCVerifier(t)
- stream.Push(initSubReq(subscription), seekResp(0), nil) // Seek instead of init response
+ stream.Push(initSubReqCommit(subscription), seekResp(0), nil) // Seek instead of init response
verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, stream)
mockServer.OnTestStart(verifiers)
defer mockServer.OnTestEnd()
- sub := newTestSubscribeStream(t, subscription, testSubscriberSettings(), acks)
+ sub := newTestSubscribeStream(t, subscription, testSubscriberSettings())
if gotErr, wantErr := sub.StartError(), errInvalidInitialSubscribeResponse; !test.ErrorEqual(gotErr, wantErr) {
t.Errorf("Start got err: (%v), want: (%v)", gotErr, wantErr)
}
@@ -450,18 +460,17 @@
func TestSubscribeStreamDuplicateInitialResponse(t *testing.T) {
subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0}
- acks := newAckTracker()
verifiers := test.NewVerifiers(t)
stream := test.NewRPCVerifier(t)
- stream.Push(initSubReq(subscription), initSubResp(), nil)
+ stream.Push(initSubReqCommit(subscription), initSubResp(), nil)
stream.Push(initFlowControlReq(), initSubResp(), nil) // Second initial response
verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, stream)
mockServer.OnTestStart(verifiers)
defer mockServer.OnTestEnd()
- sub := newTestSubscribeStream(t, subscription, testSubscriberSettings(), acks)
+ sub := newTestSubscribeStream(t, subscription, testSubscriberSettings())
if gotErr, wantErr := sub.FinalError(), errInvalidSubscribeResponse; !test.ErrorEqual(gotErr, wantErr) {
t.Errorf("Final err: (%v), want: (%v)", gotErr, wantErr)
}
@@ -469,37 +478,35 @@
func TestSubscribeStreamSpuriousSeekResponse(t *testing.T) {
subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0}
- acks := newAckTracker()
verifiers := test.NewVerifiers(t)
stream := test.NewRPCVerifier(t)
- stream.Push(initSubReq(subscription), initSubResp(), nil)
+ stream.Push(initSubReqCommit(subscription), initSubResp(), nil)
stream.Push(initFlowControlReq(), seekResp(1), nil) // Seek response with no seek request
verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, stream)
mockServer.OnTestStart(verifiers)
defer mockServer.OnTestEnd()
- sub := newTestSubscribeStream(t, subscription, testSubscriberSettings(), acks)
- if gotErr, wantErr := sub.FinalError(), errNoInFlightSeek; !test.ErrorEqual(gotErr, wantErr) {
+ sub := newTestSubscribeStream(t, subscription, testSubscriberSettings())
+ if gotErr, wantErr := sub.FinalError(), errInvalidSubscribeResponse; !test.ErrorEqual(gotErr, wantErr) {
t.Errorf("Final err: (%v), want: (%v)", gotErr, wantErr)
}
}
func TestSubscribeStreamNoMessages(t *testing.T) {
subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0}
- acks := newAckTracker()
verifiers := test.NewVerifiers(t)
stream := test.NewRPCVerifier(t)
- stream.Push(initSubReq(subscription), initSubResp(), nil)
+ stream.Push(initSubReqCommit(subscription), initSubResp(), nil)
stream.Push(initFlowControlReq(), msgSubResp(), nil) // No messages in response
verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, stream)
mockServer.OnTestStart(verifiers)
defer mockServer.OnTestEnd()
- sub := newTestSubscribeStream(t, subscription, testSubscriberSettings(), acks)
+ sub := newTestSubscribeStream(t, subscription, testSubscriberSettings())
if gotErr, wantErr := sub.FinalError(), errServerNoMessages; !test.ErrorEqual(gotErr, wantErr) {
t.Errorf("Final err: (%v), want: (%v)", gotErr, wantErr)
}
@@ -507,13 +514,12 @@
func TestSubscribeStreamMessagesOutOfOrder(t *testing.T) {
subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0}
- acks := newAckTracker()
msg1 := seqMsgWithOffsetAndSize(56, 100)
msg2 := seqMsgWithOffsetAndSize(55, 100) // Offset before msg1
verifiers := test.NewVerifiers(t)
stream := test.NewRPCVerifier(t)
- stream.Push(initSubReq(subscription), initSubResp(), nil)
+ stream.Push(initSubReqCommit(subscription), initSubResp(), nil)
stream.Push(initFlowControlReq(), msgSubResp(msg1), nil)
stream.Push(nil, msgSubResp(msg2), nil)
verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, stream)
@@ -521,7 +527,7 @@
mockServer.OnTestStart(verifiers)
defer mockServer.OnTestEnd()
- sub := newTestSubscribeStream(t, subscription, testSubscriberSettings(), acks)
+ sub := newTestSubscribeStream(t, subscription, testSubscriberSettings())
sub.Receiver.ValidateMsg(msg1)
if gotErr, msg := sub.FinalError(), "start offset = 55, expected >= 57"; !test.ErrorHasMsg(gotErr, msg) {
t.Errorf("Final err: (%v), want msg: %q", gotErr, msg)
@@ -530,13 +536,12 @@
func TestSubscribeStreamFlowControlOverflow(t *testing.T) {
subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0}
- acks := newAckTracker()
msg1 := seqMsgWithOffsetAndSize(56, 900)
msg2 := seqMsgWithOffsetAndSize(57, 101) // Overflows ReceiveSettings.MaxOutstandingBytes = 1000
verifiers := test.NewVerifiers(t)
stream := test.NewRPCVerifier(t)
- stream.Push(initSubReq(subscription), initSubResp(), nil)
+ stream.Push(initSubReqCommit(subscription), initSubResp(), nil)
stream.Push(initFlowControlReq(), msgSubResp(msg1), nil)
stream.Push(nil, msgSubResp(msg2), nil)
verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, stream)
@@ -544,13 +549,40 @@
mockServer.OnTestStart(verifiers)
defer mockServer.OnTestEnd()
- sub := newTestSubscribeStream(t, subscription, testSubscriberSettings(), acks)
+ sub := newTestSubscribeStream(t, subscription, testSubscriberSettings())
sub.Receiver.ValidateMsg(msg1)
if gotErr, wantErr := sub.FinalError(), errTokenCounterBytesNegative; !test.ErrorEqual(gotErr, wantErr) {
t.Errorf("Final err: (%v), want: (%v)", gotErr, wantErr)
}
}
+func TestSubscribeStreamHandleResetError(t *testing.T) {
+ subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0}
+ msg := seqMsgWithOffsetAndSize(67, 100)
+
+ verifiers := test.NewVerifiers(t)
+ stream := test.NewRPCVerifier(t)
+ stream.Push(initSubReqCommit(subscription), initSubResp(), nil)
+ stream.Push(initFlowControlReq(), msgSubResp(msg), nil)
+ barrier := stream.PushWithBarrier(nil, nil, makeStreamResetSignal())
+ verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, stream)
+ // No reconnect expected because the reset handler func will fail.
+
+ mockServer.OnTestStart(verifiers)
+ defer mockServer.OnTestEnd()
+
+ sub := newTestSubscribeStream(t, subscription, testSubscriberSettings())
+ sub.SetResetErr(status.Error(codes.FailedPrecondition, "reset handler failed"))
+ if gotErr := sub.StartError(); gotErr != nil {
+ t.Errorf("Start() got err: (%v)", gotErr)
+ }
+ sub.Receiver.ValidateMsg(msg)
+ barrier.Release()
+ if gotErr := sub.FinalError(); gotErr != nil {
+ t.Errorf("Final err: (%v), want: <nil>", gotErr)
+ }
+}
+
type testSinglePartitionSubscriber singlePartitionSubscriber
func (t *testSinglePartitionSubscriber) WaitStopped() error {
@@ -595,7 +627,7 @@
// Verifies the behavior of the subscribeStream and committer when they are
// stopped before any messages are received.
subStream := test.NewRPCVerifier(t)
- subStream.Push(initSubReq(subscription), initSubResp(), nil)
+ subStream.Push(initSubReqCommit(subscription), initSubResp(), nil)
barrier := subStream.PushWithBarrier(initFlowControlReq(), nil, nil)
verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, subStream)
@@ -626,7 +658,7 @@
verifiers := test.NewVerifiers(t)
subStream := test.NewRPCVerifier(t)
- subStream.Push(initSubReq(subscription), initSubResp(), nil)
+ subStream.Push(initSubReqCommit(subscription), initSubResp(), nil)
subStream.Push(initFlowControlReq(), msgSubResp(msg1, msg2), nil)
verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, subStream)
@@ -661,17 +693,16 @@
verifiers := test.NewVerifiers(t)
subStream1 := test.NewRPCVerifier(t)
- subStream1.Push(initSubReq(subscription), initSubResp(), nil)
+ subStream1.Push(initSubReqCommit(subscription), initSubResp(), nil)
subStream1.Push(initFlowControlReq(), msgSubResp(msg1), nil)
subStream1.Push(nil, msgSubResp(msg2), nil)
subStream1.Push(nil, nil, retryableErr)
verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, subStream1)
- // When reconnected, the subscribeStream should seek to msg3 and have
- // subtracted flow control tokens for msg1 and msg2.
+ // When reconnected, the subscribeStream should set initial cursor to msg3 and
+ // have subtracted flow control tokens for msg1 and msg2.
subStream2 := test.NewRPCVerifier(t)
- subStream2.Push(initSubReq(subscription), initSubResp(), nil)
- subStream2.Push(seekReq(3), nil, nil)
+ subStream2.Push(initSubReqCursor(subscription, 3), initSubResp(), nil)
subStream2.Push(flowControlSubReq(flowControlTokens{Bytes: 800, Messages: 8}), msgSubResp(msg3), nil)
verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, subStream2)
@@ -719,7 +750,7 @@
verifiers := test.NewVerifiers(t)
subStream := test.NewRPCVerifier(t)
- subStream.Push(initSubReq(subscription), initSubResp(), nil)
+ subStream.Push(initSubReqCommit(subscription), initSubResp(), nil)
subStream.Push(initFlowControlReq(), msgSubResp(msg1, msg2), nil)
verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, subStream)
@@ -748,6 +779,156 @@
receiver.VerifyNoMsgs() // msg2 should not be received
}
+func TestSinglePartitionSubscriberAdminSeekWhileConnected(t *testing.T) {
+ subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0}
+ receiver := newTestMessageReceiver(t)
+ msg1 := seqMsgWithOffsetAndSize(1, 100)
+ msg2 := seqMsgWithOffsetAndSize(2, 100)
+ msg3 := seqMsgWithOffsetAndSize(3, 100)
+
+ verifiers := test.NewVerifiers(t)
+
+ subStream1 := test.NewRPCVerifier(t)
+ subStream1.Push(initSubReqCommit(subscription), initSubResp(), nil)
+ subStream1.Push(initFlowControlReq(), msgSubResp(msg1, msg2, msg3), nil)
+ // Server disconnects the stream with the RESET signal.
+ barrier := subStream1.PushWithBarrier(nil, nil, makeStreamResetSignal())
+ verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, subStream1)
+
+ subStream2 := test.NewRPCVerifier(t)
+ // Reconnected stream reads from commit cursor.
+ subStream2.Push(initSubReqCommit(subscription), initSubResp(), nil)
+ // Ensure that the subscriber resets state and can handle seeking back to
+ // msg1.
+ subStream2.Push(initFlowControlReq(), msgSubResp(msg1), nil)
+ verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, subStream2)
+
+ cmtStream := test.NewRPCVerifier(t)
+ cmtStream.Push(initCommitReq(subscription), initCommitResp(), nil)
+ cmtStream.Push(commitReq(4), commitResp(1), nil)
+ cmtStream.Push(commitReq(2), commitResp(1), nil)
+ verifiers.AddCommitStream(subscription.Path, subscription.Partition, cmtStream)
+
+ mockServer.OnTestStart(verifiers)
+ defer mockServer.OnTestEnd()
+
+ sub := newTestSinglePartitionSubscriber(t, receiver.onMessage, subscription)
+ if gotErr := sub.WaitStarted(); gotErr != nil {
+ t.Errorf("Start() got err: (%v)", gotErr)
+ }
+
+ receiver.ValidateMsg(msg1).Ack()
+ receiver.ValidateMsg(msg2).Ack()
+ receiver.ValidateMsg(msg3).Ack()
+ barrier.Release()
+ receiver.ValidateMsg(msg1).Ack()
+
+ sub.Stop()
+ if gotErr := sub.WaitStopped(); gotErr != nil {
+ t.Errorf("Stop() got err: (%v)", gotErr)
+ }
+}
+
+func TestSinglePartitionSubscriberAdminSeekWhileReconnecting(t *testing.T) {
+ subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0}
+ receiver := newTestMessageReceiver(t)
+ msg1 := seqMsgWithOffsetAndSize(1, 100)
+ msg2 := seqMsgWithOffsetAndSize(2, 100)
+ msg3 := seqMsgWithOffsetAndSize(3, 100)
+
+ verifiers := test.NewVerifiers(t)
+
+ subStream1 := test.NewRPCVerifier(t)
+ subStream1.Push(initSubReqCommit(subscription), initSubResp(), nil)
+ subStream1.Push(initFlowControlReq(), msgSubResp(msg1, msg2, msg3), nil)
+ // Normal stream breakage.
+ barrier := subStream1.PushWithBarrier(nil, nil, status.Error(codes.DeadlineExceeded, ""))
+ verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, subStream1)
+
+ subStream2 := test.NewRPCVerifier(t)
+ // The server sends the RESET signal during stream initialization.
+ subStream2.Push(initSubReqCursor(subscription, 4), nil, makeStreamResetSignal())
+ verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, subStream2)
+
+ subStream3 := test.NewRPCVerifier(t)
+ // Reconnected stream reads from commit cursor.
+ subStream3.Push(initSubReqCommit(subscription), initSubResp(), nil)
+ // Ensure that the subscriber resets state and can handle seeking back to
+ // msg1.
+ subStream3.Push(initFlowControlReq(), msgSubResp(msg1), nil)
+ verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, subStream3)
+
+ cmtStream := test.NewRPCVerifier(t)
+ cmtStream.Push(initCommitReq(subscription), initCommitResp(), nil)
+ cmtStream.Push(commitReq(3), commitResp(1), nil)
+ cmtStream.Push(commitReq(2), commitResp(1), nil)
+ verifiers.AddCommitStream(subscription.Path, subscription.Partition, cmtStream)
+
+ mockServer.OnTestStart(verifiers)
+ defer mockServer.OnTestEnd()
+
+ sub := newTestSinglePartitionSubscriber(t, receiver.onMessage, subscription)
+ if gotErr := sub.WaitStarted(); gotErr != nil {
+ t.Errorf("Start() got err: (%v)", gotErr)
+ }
+
+ receiver.ValidateMsg(msg1).Ack()
+ receiver.ValidateMsg(msg2).Ack()
+ ack := receiver.ValidateMsg(msg3) // Unacked message discarded
+ barrier.Release()
+ receiver.ValidateMsg(msg1).Ack()
+ ack.Ack() // Should be ignored
+
+ sub.Stop()
+ if gotErr := sub.WaitStopped(); gotErr != nil {
+ t.Errorf("Stop() got err: (%v)", gotErr)
+ }
+}
+
+func TestSinglePartitionSubscriberStopDuringAdminSeek(t *testing.T) {
+ subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0}
+ receiver := newTestMessageReceiver(t)
+ msg1 := seqMsgWithOffsetAndSize(1, 100)
+ msg2 := seqMsgWithOffsetAndSize(2, 100)
+
+ verifiers := test.NewVerifiers(t)
+
+ subStream := test.NewRPCVerifier(t)
+ subStream.Push(initSubReqCommit(subscription), initSubResp(), nil)
+ subStream.Push(initFlowControlReq(), msgSubResp(msg1, msg2), nil)
+ // Server disconnects the stream with the RESET signal.
+ subBarrier := subStream.PushWithBarrier(nil, nil, makeStreamResetSignal())
+ verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, subStream)
+
+ cmtStream := test.NewRPCVerifier(t)
+ cmtStream.Push(initCommitReq(subscription), initCommitResp(), nil)
+ cmtBarrier := cmtStream.PushWithBarrier(commitReq(3), commitResp(1), nil)
+ verifiers.AddCommitStream(subscription.Path, subscription.Partition, cmtStream)
+
+ mockServer.OnTestStart(verifiers)
+ defer mockServer.OnTestEnd()
+
+ sub := newTestSinglePartitionSubscriber(t, receiver.onMessage, subscription)
+ if gotErr := sub.WaitStarted(); gotErr != nil {
+ t.Errorf("Start() got err: (%v)", gotErr)
+ }
+
+ receiver.ValidateMsg(msg1).Ack()
+ receiver.ValidateMsg(msg2).Ack()
+ subBarrier.Release()
+
+ // Ensure that the user is able to call Stop while a reset is in progress.
+ // Verifies that the subscribeStream is not holding mutexes while waiting and
+ // that the subscribe stream is not reconnected.
+ cmtBarrier.ReleaseAfter(func() {
+ sub.Stop()
+ })
+
+ if gotErr := sub.WaitStopped(); gotErr != nil {
+ t.Errorf("Stop() got err: (%v)", gotErr)
+ }
+}
+
func newTestMultiPartitionSubscriber(t *testing.T, receiverFunc MessageReceiverFunc, subscriptionPath string, partitions []int) *multiPartitionSubscriber {
ctx := context.Background()
subClient, err := newSubscriberClient(ctx, "ignored", testServer.ClientConn())
@@ -787,7 +968,7 @@
// Partition 1
subStream1 := test.NewRPCVerifier(t)
- subStream1.Push(initSubReq(subscriptionPartition{Path: subscription, Partition: 1}), initSubResp(), nil)
+ subStream1.Push(initSubReqCommit(subscriptionPartition{Path: subscription, Partition: 1}), initSubResp(), nil)
subStream1.Push(initFlowControlReq(), msgSubResp(msg1), nil)
subStream1.Push(nil, msgSubResp(msg2), nil)
verifiers.AddSubscribeStream(subscription, 1, subStream1)
@@ -799,7 +980,7 @@
// Partition 2
subStream2 := test.NewRPCVerifier(t)
- subStream2.Push(initSubReq(subscriptionPartition{Path: subscription, Partition: 2}), initSubResp(), nil)
+ subStream2.Push(initSubReqCommit(subscriptionPartition{Path: subscription, Partition: 2}), initSubResp(), nil)
subStream2.Push(initFlowControlReq(), msgSubResp(msg3), nil)
subStream2.Push(nil, msgSubResp(msg4), nil)
verifiers.AddSubscribeStream(subscription, 2, subStream2)
@@ -835,7 +1016,7 @@
// Partition 1
subStream1 := test.NewRPCVerifier(t)
- subStream1.Push(initSubReq(subscriptionPartition{Path: subscription, Partition: 1}), initSubResp(), nil)
+ subStream1.Push(initSubReqCommit(subscriptionPartition{Path: subscription, Partition: 1}), initSubResp(), nil)
subStream1.Push(initFlowControlReq(), msgSubResp(msg1), nil)
msg2Barrier := subStream1.PushWithBarrier(nil, msgSubResp(msg2), nil)
verifiers.AddSubscribeStream(subscription, 1, subStream1)
@@ -847,7 +1028,7 @@
// Partition 2
subStream2 := test.NewRPCVerifier(t)
- subStream2.Push(initSubReq(subscriptionPartition{Path: subscription, Partition: 2}), initSubResp(), nil)
+ subStream2.Push(initSubReqCommit(subscriptionPartition{Path: subscription, Partition: 2}), initSubResp(), nil)
subStream2.Push(initFlowControlReq(), msgSubResp(msg3), nil)
errorBarrier := subStream2.PushWithBarrier(nil, nil, serverErr)
verifiers.AddSubscribeStream(subscription, 2, subStream2)
@@ -960,7 +1141,7 @@
// Partition 3
subStream3 := test.NewRPCVerifier(t)
- subStream3.Push(initSubReq(subscriptionPartition{Path: subscription, Partition: 3}), initSubResp(), nil)
+ subStream3.Push(initSubReqCommit(subscriptionPartition{Path: subscription, Partition: 3}), initSubResp(), nil)
subStream3.Push(initFlowControlReq(), msgSubResp(msg1), nil)
msg2Barrier := subStream3.PushWithBarrier(nil, msgSubResp(msg2), nil)
verifiers.AddSubscribeStream(subscription, 3, subStream3)
@@ -973,7 +1154,7 @@
// Partition 6
subStream6 := test.NewRPCVerifier(t)
- subStream6.Push(initSubReq(subscriptionPartition{Path: subscription, Partition: 6}), initSubResp(), nil)
+ subStream6.Push(initSubReqCommit(subscriptionPartition{Path: subscription, Partition: 6}), initSubResp(), nil)
subStream6.Push(initFlowControlReq(), msgSubResp(msg3), nil)
// msg4 should not be received.
msg4Barrier := subStream6.PushWithBarrier(nil, msgSubResp(msg4), nil)
@@ -986,7 +1167,7 @@
// Partition 8
subStream8 := test.NewRPCVerifier(t)
- subStream8.Push(initSubReq(subscriptionPartition{Path: subscription, Partition: 8}), initSubResp(), nil)
+ subStream8.Push(initSubReqCommit(subscriptionPartition{Path: subscription, Partition: 8}), initSubResp(), nil)
subStream8.Push(initFlowControlReq(), msgSubResp(msg5), nil)
verifiers.AddSubscribeStream(subscription, 8, subStream8)
@@ -1047,7 +1228,7 @@
// Partition 1
subStream1 := test.NewRPCVerifier(t)
- subStream1.Push(initSubReq(subscriptionPartition{Path: subscription, Partition: 1}), initSubResp(), nil)
+ subStream1.Push(initSubReqCommit(subscriptionPartition{Path: subscription, Partition: 1}), initSubResp(), nil)
subStream1.Push(initFlowControlReq(), msgSubResp(msg1), nil)
verifiers.AddSubscribeStream(subscription, 1, subStream1)
@@ -1058,7 +1239,7 @@
// Partition 2
subStream2 := test.NewRPCVerifier(t)
- subStream2.Push(initSubReq(subscriptionPartition{Path: subscription, Partition: 2}), initSubResp(), nil)
+ subStream2.Push(initSubReqCommit(subscriptionPartition{Path: subscription, Partition: 2}), initSubResp(), nil)
subStream2.Push(initFlowControlReq(), msgSubResp(msg2), nil)
verifiers.AddSubscribeStream(subscription, 2, subStream2)
@@ -1101,7 +1282,7 @@
// Partition 1
subStream := test.NewRPCVerifier(t)
- subStream.Push(initSubReq(subscriptionPartition{Path: subscription, Partition: 1}), initSubResp(), nil)
+ subStream.Push(initSubReqCommit(subscriptionPartition{Path: subscription, Partition: 1}), initSubResp(), nil)
subStream.Push(initFlowControlReq(), msgSubResp(msg1, msg2), nil)
verifiers.AddSubscribeStream(subscription, 1, subStream)