blob: 2d2a3244658c4acc0208f729df20a6864289cbff [file] [log] [blame]
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
package wire
import (
"context"
"errors"
"fmt"
"reflect"
"sync"
"time"
"google.golang.org/grpc"
vkit "cloud.google.com/go/pubsublite/apiv1"
pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
)
var (
errInvalidInitialCommitResponse = errors.New("pubsublite: first response from server was not an initial response for streaming commit")
errInvalidCommitResponse = errors.New("pubsublite: received invalid commit response from server")
)
// The frequency of batched cursor commits.
const commitCursorPeriod = 50 * time.Millisecond
// committer wraps a commit cursor stream for a subscription and partition.
// A background task periodically effectively reads the latest desired cursor
// offset from the `ackTracker` and sends a commit request to the stream if the
// cursor needs to be updated. The `commitCursorTracker` is used to manage
// in-flight commit requests.
type committer struct {
// Immutable after creation.
cursorClient *vkit.CursorClient
subscription subscriptionPartition
initialReq *pb.StreamingCommitCursorRequest
metadata pubsubMetadata
// Fields below must be guarded with mu.
stream *retryableStream
acks *ackTracker
cursorTracker *commitCursorTracker
pollCommits *periodicTask
flushPending *sync.Cond
enableCommits bool
abstractService
}
func newCommitter(ctx context.Context, cursor *vkit.CursorClient, settings ReceiveSettings,
subscription subscriptionPartition, acks *ackTracker, disableTasks bool) *committer {
c := &committer{
cursorClient: cursor,
subscription: subscription,
initialReq: &pb.StreamingCommitCursorRequest{
Request: &pb.StreamingCommitCursorRequest_Initial{
Initial: &pb.InitialCommitCursorRequest{
Subscription: subscription.Path,
Partition: int64(subscription.Partition),
},
},
},
metadata: newPubsubMetadata(),
acks: acks,
cursorTracker: newCommitCursorTracker(acks),
}
c.stream = newRetryableStream(ctx, c, settings.Timeout, reflect.TypeOf(pb.StreamingCommitCursorResponse{}))
c.metadata.AddClientInfo(settings.Framework)
backgroundTask := c.commitOffsetToStream
if disableTasks {
backgroundTask = func() {}
}
c.pollCommits = newPeriodicTask(commitCursorPeriod, backgroundTask)
c.flushPending = sync.NewCond(&c.mu)
return c
}
// Start attempts to establish a streaming commit cursor connection.
func (c *committer) Start() {
c.mu.Lock()
defer c.mu.Unlock()
if c.unsafeUpdateStatus(serviceStarting, nil) {
c.stream.Start()
c.pollCommits.Start()
}
}
// Stop initiates shutdown of the committer. It will wait for outstanding acks
// and send the final commit offset to the server.
func (c *committer) Stop() {
c.mu.Lock()
defer c.mu.Unlock()
c.unsafeInitiateShutdown(serviceTerminating, nil)
}
// Terminate will discard outstanding acks and send the final commit offset to
// the server.
func (c *committer) Terminate() {
c.mu.Lock()
defer c.mu.Unlock()
c.acks.Release()
c.unsafeInitiateShutdown(serviceTerminating, nil)
}
// BlockingReset flushes any pending commits to the server, waits until the
// server has confirmed the commit, and resets the state of the committer.
func (c *committer) BlockingReset() error {
c.mu.Lock()
defer c.mu.Unlock()
c.acks.Release() // Discard outstanding acks
for !c.cursorTracker.UpToDate() && c.status < serviceTerminating {
c.unsafeCommitOffsetToStream()
c.flushPending.Wait()
}
if c.status >= serviceTerminating {
return ErrServiceStopped
}
c.cursorTracker.Reset()
return nil
}
func (c *committer) newStream(ctx context.Context) (grpc.ClientStream, error) {
return c.cursorClient.StreamingCommitCursor(c.metadata.AddToContext(ctx))
}
func (c *committer) initialRequest() (interface{}, initialResponseRequired) {
return c.initialReq, initialResponseRequired(true)
}
func (c *committer) validateInitialResponse(response interface{}) error {
commitResponse, _ := response.(*pb.StreamingCommitCursorResponse)
if commitResponse.GetInitial() == nil {
return errInvalidInitialCommitResponse
}
return nil
}
func (c *committer) onStreamStatusChange(status streamStatus) {
c.mu.Lock()
defer c.mu.Unlock()
switch status {
case streamConnected:
c.enableCommits = true
c.unsafeUpdateStatus(serviceActive, nil)
// Once the stream connects, clear unconfirmed commits and immediately send
// the latest desired commit offset.
c.cursorTracker.ClearPending()
c.unsafeCommitOffsetToStream()
c.pollCommits.Start()
case streamReconnecting:
// Ensure there are no commits until streamConnected has been handled above.
c.enableCommits = false
c.pollCommits.Stop()
case streamTerminated:
c.unsafeInitiateShutdown(serviceTerminated, c.stream.Error())
}
}
func (c *committer) onResponse(response interface{}) {
c.mu.Lock()
defer c.mu.Unlock()
// If an inconsistency is detected in the server's responses, immediately
// terminate the committer, as correct processing of commits cannot be
// guaranteed.
processResponse := func() error {
commitResponse, _ := response.(*pb.StreamingCommitCursorResponse)
if commitResponse.GetCommit() == nil {
return errInvalidCommitResponse
}
numAcked := commitResponse.GetCommit().GetAcknowledgedCommits()
if numAcked <= 0 {
return fmt.Errorf("pubsublite: server acknowledged an invalid commit count: %d", numAcked)
}
if err := c.cursorTracker.ConfirmOffsets(numAcked); err != nil {
return err
}
c.unsafeCheckDone()
return nil
}
if err := processResponse(); err != nil {
c.unsafeInitiateShutdown(serviceTerminated, err)
}
}
// commitOffsetToStream is called by the periodic background task.
func (c *committer) commitOffsetToStream() {
c.mu.Lock()
defer c.mu.Unlock()
c.unsafeCommitOffsetToStream()
}
func (c *committer) unsafeCommitOffsetToStream() {
if !c.enableCommits {
return
}
nextOffset := c.cursorTracker.NextOffset()
if nextOffset == nilCursorOffset {
return
}
req := &pb.StreamingCommitCursorRequest{
Request: &pb.StreamingCommitCursorRequest_Commit{
Commit: &pb.SequencedCommitCursorRequest{
Cursor: &pb.Cursor{Offset: nextOffset},
},
},
}
if c.stream.Send(req) {
c.cursorTracker.AddPending(nextOffset)
}
}
func (c *committer) unsafeInitiateShutdown(targetStatus serviceStatus, err error) {
if !c.unsafeUpdateStatus(targetStatus, wrapError("committer", c.subscription.String(), err)) {
c.unsafeCheckDone()
return
}
// Notify any waiting threads of the termination.
c.flushPending.Broadcast()
// If it's a graceful shutdown, expedite sending final commits to the stream.
if targetStatus == serviceTerminating {
c.unsafeCommitOffsetToStream()
c.unsafeCheckDone()
return
}
// Otherwise discard outstanding acks and immediately terminate the stream.
c.acks.Release()
c.unsafeOnTerminated()
}
// Performs actions when the cursor tracker is up to date.
func (c *committer) unsafeCheckDone() {
if !c.cursorTracker.UpToDate() {
return
}
// Notify any waiting threads that flushing pending commits is complete.
c.flushPending.Broadcast()
// The commit stream can be closed once the final commit offset has been
// confirmed and there are no outstanding acks.
if c.status == serviceTerminating {
c.unsafeOnTerminated()
}
}
func (c *committer) unsafeOnTerminated() {
c.pollCommits.Stop()
c.stream.Stop()
}