blob: 740be64911b828a28bb3722b7c9bedb3ae5d1317 [file] [log] [blame]
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
package wire
import (
"context"
"errors"
"fmt"
"math/rand"
"reflect"
"time"
"google.golang.org/api/option"
"google.golang.org/grpc"
vkit "cloud.google.com/go/pubsublite/apiv1"
pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
)
var (
errInvalidInitialPubResponse = errors.New("pubsublite: first response from server was not an initial response for publish")
errInvalidMsgPubResponse = errors.New("pubsublite: received invalid publish response from server")
)
// singlePartitionPublisher publishes messages to a single topic partition.
//
// Life of a successfully published message:
// - Publish() receives the message from the user.
// - It is added to `batcher.msgBundler`, which performs batching in accordance
// with user-configured PublishSettings.
// - onNewBatch() receives new message batches from the bundler. The batch is
// added to `batcher.publishQueue` (in-flight batches) and sent to the publish
// stream, if connected. If the stream is currently reconnecting, the entire
// queue is resent to the stream immediately after it has reconnected, in
// onStreamStatusChange().
// - onResponse() receives the first cursor offset for the first batch in
// `batcher.publishQueue`. It assigns the cursor offsets for each message and
// releases the publish results to the user.
//
// See comments for unsafeInitiateShutdown() for error scenarios.
type singlePartitionPublisher struct {
// Immutable after creation.
pubClient *vkit.PublisherClient
topic topicPartition
initialReq *pb.PublishRequest
metadata pubsubMetadata
// Fields below must be guarded with mu.
stream *retryableStream
batcher *publishMessageBatcher
enableSendToStream bool
abstractService
}
// singlePartitionPublisherFactory creates instances of singlePartitionPublisher
// for given partition numbers.
type singlePartitionPublisherFactory struct {
ctx context.Context
pubClient *vkit.PublisherClient
settings PublishSettings
topicPath string
}
func (f *singlePartitionPublisherFactory) New(partition int) *singlePartitionPublisher {
pp := &singlePartitionPublisher{
pubClient: f.pubClient,
topic: topicPartition{Path: f.topicPath, Partition: partition},
initialReq: &pb.PublishRequest{
RequestType: &pb.PublishRequest_InitialRequest{
InitialRequest: &pb.InitialPublishRequest{
Topic: f.topicPath,
Partition: int64(partition),
},
},
},
metadata: newPubsubMetadata(),
}
pp.batcher = newPublishMessageBatcher(&f.settings, partition, pp.onNewBatch)
pp.stream = newRetryableStream(f.ctx, pp, f.settings.Timeout, reflect.TypeOf(pb.PublishResponse{}))
pp.metadata.AddTopicRoutingMetadata(pp.topic)
pp.metadata.AddClientInfo(f.settings.Framework)
return pp
}
// Start attempts to establish a publish stream connection.
func (pp *singlePartitionPublisher) Start() {
pp.mu.Lock()
defer pp.mu.Unlock()
if pp.unsafeUpdateStatus(serviceStarting, nil) {
pp.stream.Start()
}
}
// Stop initiates shutdown of the publisher. All pending messages are flushed.
func (pp *singlePartitionPublisher) Stop() {
pp.mu.Lock()
defer pp.mu.Unlock()
pp.unsafeInitiateShutdown(serviceTerminating, nil)
}
// Publish a pub/sub message.
func (pp *singlePartitionPublisher) Publish(msg *pb.PubSubMessage, onResult PublishResultFunc) {
pp.mu.Lock()
defer pp.mu.Unlock()
processMessage := func() error {
// Messages are accepted while the service is starting up or active. During
// startup, messages are queued in the batcher and will be published once
// the stream connects. If startup fails, the error will be set for the
// queued messages.
switch {
case pp.status == serviceUninitialized:
return ErrServiceUninitialized
case pp.status >= serviceTerminating:
return ErrServiceStopped
}
if err := pp.batcher.AddMessage(msg, onResult); err != nil {
return err
}
return nil
}
// If the new message cannot be published, flush pending messages and then
// terminate the stream once results are received.
if err := processMessage(); err != nil {
pp.unsafeInitiateShutdown(serviceTerminating, err)
onResult(nil, err)
}
}
func (pp *singlePartitionPublisher) newStream(ctx context.Context) (grpc.ClientStream, error) {
return pp.pubClient.Publish(pp.metadata.AddToContext(ctx))
}
func (pp *singlePartitionPublisher) initialRequest() (interface{}, initialResponseRequired) {
return pp.initialReq, initialResponseRequired(true)
}
func (pp *singlePartitionPublisher) validateInitialResponse(response interface{}) error {
pubResponse, _ := response.(*pb.PublishResponse)
if pubResponse.GetInitialResponse() == nil {
return errInvalidInitialPubResponse
}
return nil
}
func (pp *singlePartitionPublisher) onStreamStatusChange(status streamStatus) {
pp.mu.Lock()
defer pp.mu.Unlock()
switch status {
case streamReconnecting:
// Prevent onNewBatch() from sending any new batches to the stream.
pp.enableSendToStream = false
case streamConnected:
pp.unsafeUpdateStatus(serviceActive, nil)
// To ensure messages are sent in order, we should resend in-flight batches
// to the stream immediately after reconnecting, before any new batches.
batches := pp.batcher.InFlightBatches()
for _, batch := range batches {
if !pp.stream.Send(batch.ToPublishRequest()) {
return
}
}
pp.enableSendToStream = true
case streamTerminated:
pp.unsafeInitiateShutdown(serviceTerminated, pp.stream.Error())
}
}
func (pp *singlePartitionPublisher) onNewBatch(batch *publishBatch) {
pp.mu.Lock()
defer pp.mu.Unlock()
pp.batcher.AddBatch(batch)
if pp.enableSendToStream {
// Note: if the underlying stream is reconnecting or Send() fails, all
// in-flight batches will be sent to the stream once the connection has been
// re-established. Thus the return value is ignored.
pp.stream.Send(batch.ToPublishRequest())
}
}
func (pp *singlePartitionPublisher) onResponse(response interface{}) {
pp.mu.Lock()
defer pp.mu.Unlock()
processResponse := func() error {
pubResponse, _ := response.(*pb.PublishResponse)
if pubResponse.GetMessageResponse() == nil {
return errInvalidMsgPubResponse
}
firstOffset := pubResponse.GetMessageResponse().GetStartCursor().GetOffset()
if err := pp.batcher.OnPublishResponse(firstOffset); err != nil {
return err
}
pp.unsafeCheckDone()
return nil
}
if err := processResponse(); err != nil {
pp.unsafeInitiateShutdown(serviceTerminated, err)
}
}
// unsafeInitiateShutdown must be provided a target serviceStatus, which must be
// one of:
// * serviceTerminating: attempts to successfully publish all pending messages
// before terminating the publisher. Occurs when:
// - The user calls Stop().
// - A new message fails preconditions. This should block the publish of
// subsequent messages to ensure ordering, but all pending messages should
// be flushed.
// * serviceTerminated: immediately terminates the publisher and errors all
// in-flight batches and pending messages in the bundler. Occurs when:
// - The publish stream terminates with a non-retryable error.
// - An inconsistency is detected in the server's publish responses. Assume
// there is a bug on the server and terminate the publisher, as correct
// processing of messages cannot be guaranteed.
//
// Expected to be called with singlePartitionPublisher.mu held.
func (pp *singlePartitionPublisher) unsafeInitiateShutdown(targetStatus serviceStatus, err error) {
if !pp.unsafeUpdateStatus(targetStatus, wrapError("publisher", pp.topic.String(), err)) {
return
}
// Close the stream if this is an immediate shutdown. Otherwise leave it open
// to send pending messages.
if targetStatus == serviceTerminated {
pp.enableSendToStream = false
pp.stream.Stop()
}
// Bundler.Flush() blocks and invokes onNewBatch(), which acquires the mutex,
// so it cannot be held here.
// Updating the publisher status above prevents any new messages from being
// added to the Bundler after flush.
pp.mu.Unlock()
pp.batcher.Flush()
pp.mu.Lock()
// If flushing pending messages, close the stream if there's nothing left to
// publish.
if targetStatus == serviceTerminating {
pp.unsafeCheckDone()
return
}
// For immediate shutdown set the error message for all pending messages.
pp.batcher.OnPermanentError(err)
}
// unsafeCheckDone closes the stream once all pending messages have been
// published during shutdown.
func (pp *singlePartitionPublisher) unsafeCheckDone() {
if pp.status == serviceTerminating && pp.batcher.InFlightBatchesEmpty() {
pp.stream.Stop()
}
}
// routingPublisher publishes messages to multiple topic partitions, each
// managed by a singlePartitionPublisher. It supports increasing topic partition
// count, but not decreasing.
type routingPublisher struct {
// Immutable after creation.
msgRouterFactory *messageRouterFactory
pubFactory *singlePartitionPublisherFactory
partitionWatcher *partitionCountWatcher
// Fields below must be guarded with mu.
msgRouter messageRouter
publishers []*singlePartitionPublisher
apiClientService
}
func newRoutingPublisher(allClients apiClients, adminClient *vkit.AdminClient, msgRouterFactory *messageRouterFactory, pubFactory *singlePartitionPublisherFactory) *routingPublisher {
pub := &routingPublisher{
apiClientService: apiClientService{clients: allClients},
msgRouterFactory: msgRouterFactory,
pubFactory: pubFactory,
}
pub.init()
pub.partitionWatcher = newPartitionCountWatcher(pubFactory.ctx, adminClient, pubFactory.settings, pubFactory.topicPath, pub.onPartitionCountChanged)
pub.unsafeAddServices(pub.partitionWatcher)
return pub
}
func (rp *routingPublisher) onPartitionCountChanged(partitionCount int) {
rp.mu.Lock()
defer rp.mu.Unlock()
if rp.status >= serviceTerminating {
return
}
if partitionCount == len(rp.publishers) {
return
}
if partitionCount < len(rp.publishers) {
// TODO: Log the decrease in partition count.
return
}
prevPartitionCount := len(rp.publishers)
for i := prevPartitionCount; i < partitionCount; i++ {
pub := rp.pubFactory.New(i)
rp.publishers = append(rp.publishers, pub)
rp.unsafeAddServices(pub)
}
rp.msgRouter = rp.msgRouterFactory.New(partitionCount)
}
func (rp *routingPublisher) Publish(msg *pb.PubSubMessage, onResult PublishResultFunc) {
pub, err := rp.routeToPublisher(msg)
if err != nil {
onResult(nil, err)
return
}
pub.Publish(msg, onResult)
}
func (rp *routingPublisher) routeToPublisher(msg *pb.PubSubMessage) (*singlePartitionPublisher, error) {
rp.mu.Lock()
defer rp.mu.Unlock()
if err := rp.unsafeCheckServiceStatus(); err != nil {
return nil, err
}
if rp.msgRouter == nil {
// Should not occur.
rp.unsafeInitiateShutdown(serviceTerminating, ErrServiceUninitialized)
return nil, ErrServiceUninitialized
}
partition := rp.msgRouter.Route(msg.GetKey())
if partition >= len(rp.publishers) {
// Should not occur.
err := fmt.Errorf("pubsublite: publisher not found for partition %d", partition)
rp.unsafeInitiateShutdown(serviceTerminating, err)
return nil, err
}
return rp.publishers[partition], nil
}
// Publisher is the client interface exported from this package for publishing
// messages.
type Publisher interface {
Publish(*pb.PubSubMessage, PublishResultFunc)
Start()
WaitStarted() error
Stop()
WaitStopped() error
Error() error
}
// NewPublisher creates a new client for publishing messages.
func NewPublisher(ctx context.Context, settings PublishSettings, region, topicPath string, opts ...option.ClientOption) (Publisher, error) {
if err := ValidateRegion(region); err != nil {
return nil, err
}
if err := validatePublishSettings(settings); err != nil {
return nil, err
}
var allClients apiClients
pubClient, err := newPublisherClient(ctx, region, opts...)
if err != nil {
return nil, err
}
allClients = append(allClients, pubClient)
adminClient, err := NewAdminClient(ctx, region, opts...)
if err != nil {
allClients.Close()
return nil, err
}
allClients = append(allClients, adminClient)
msgRouterFactory := newMessageRouterFactory(rand.New(rand.NewSource(time.Now().UnixNano())))
pubFactory := &singlePartitionPublisherFactory{
ctx: ctx,
pubClient: pubClient,
settings: settings,
topicPath: topicPath,
}
return newRoutingPublisher(allClients, adminClient, msgRouterFactory, pubFactory), nil
}