blob: be36e7ff8e7c400c7388702f00aa680dcc744698 [file] [log] [blame]
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
package pscompat
import (
"context"
"sync"
"cloud.google.com/go/pubsub"
"cloud.google.com/go/pubsublite/internal/wire"
"golang.org/x/xerrors"
"google.golang.org/api/option"
"google.golang.org/api/support/bundler"
ipubsub "cloud.google.com/go/internal/pubsub"
pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
)
var (
// ErrOverflow is set for a PublishResult when publish buffers overflow.
ErrOverflow = bundler.ErrOverflow
// ErrOversizedMessage is set for a PublishResult when a published message
// exceeds MaxPublishRequestBytes.
ErrOversizedMessage = bundler.ErrOversizedItem
// ErrPublisherStopped is set for a PublishResult when a message cannot be
// published because the publisher client has stopped or is in the process of
// stopping. PublisherClient.Error() returns the error that caused the
// publisher client to terminate (if any).
ErrPublisherStopped = wire.ErrServiceStopped
)
// translateError transforms a subset of errors to what would be returned by the
// pubsub package.
func translateError(err error) error {
if xerrors.Is(err, wire.ErrOversizedMessage) {
return ErrOversizedMessage
}
if xerrors.Is(err, wire.ErrOverflow) {
return ErrOverflow
}
return err
}
// PublisherClient is a Pub/Sub Lite client to publish messages to a given
// topic. A PublisherClient is safe to use from multiple goroutines.
//
// See https://cloud.google.com/pubsub/lite/docs/publishing for more information
// about publishing.
type PublisherClient struct {
settings PublishSettings
wirePub wire.Publisher
// Fields below must be guarded with mutex.
mu sync.Mutex
err error
}
// NewPublisherClient creates a new Pub/Sub Lite client to publish messages to
// a given topic, using DefaultPublishSettings. A valid topic path has the
// format: "projects/PROJECT_ID/locations/ZONE/topics/TOPIC_ID".
func NewPublisherClient(ctx context.Context, topic string, opts ...option.ClientOption) (*PublisherClient, error) {
return NewPublisherClientWithSettings(ctx, topic, DefaultPublishSettings, opts...)
}
// NewPublisherClientWithSettings creates a new Pub/Sub Lite client to publish
// messages to a given topic, using the specified PublishSettings. A valid topic
// path has the format: "projects/PROJECT_ID/locations/ZONE/topics/TOPIC_ID".
func NewPublisherClientWithSettings(ctx context.Context, topic string, settings PublishSettings, opts ...option.ClientOption) (*PublisherClient, error) {
topicPath, err := wire.ParseTopicPath(topic)
if err != nil {
return nil, err
}
region, err := wire.ZoneToRegion(topicPath.Zone)
if err != nil {
return nil, err
}
// Note: ctx is not used to create the wire publisher, because if it is
// cancelled, the publisher will not be able to perform graceful shutdown
// (e.g. flush pending messages).
wirePub, err := wire.NewPublisher(context.Background(), settings.toWireSettings(), region, topic, opts...)
if err != nil {
return nil, err
}
wirePub.Start()
if err := wirePub.WaitStarted(); err != nil {
return nil, err
}
return &PublisherClient{settings: settings, wirePub: wirePub}, nil
}
// Publish publishes `msg` to the topic asynchronously. Messages are batched and
// sent according to the client's PublishSettings. Publish never blocks.
//
// Publish returns a non-nil PublishResult which will be ready when the
// message has been sent (or has failed to be sent) to the server. Once the
// first publish error occurs, the publisher will terminate and subsequent
// PublishResults will also have an error. A new publisher must be created to
// republish failed messages.
//
// Once Stop() has been called or the publisher has failed permanently due to an
// error, future calls to Publish will immediately return a PublishResult with
// error ErrPublisherStopped. Error() returns the error that caused the
// publisher to terminate.
func (p *PublisherClient) Publish(ctx context.Context, msg *pubsub.Message) *pubsub.PublishResult {
result := ipubsub.NewPublishResult()
msgpb := new(pb.PubSubMessage)
if err := p.transformMessage(msg, msgpb); err != nil {
ipubsub.SetPublishResult(result, "", err)
p.setError(err)
p.wirePub.Stop()
return result
}
p.wirePub.Publish(msgpb, func(metadata *wire.MessageMetadata, err error) {
err = translateError(err)
if metadata != nil {
ipubsub.SetPublishResult(result, metadata.String(), err)
} else {
ipubsub.SetPublishResult(result, "", err)
}
})
return result
}
// Stop sends all remaining published messages and closes publish streams.
// Returns once all outstanding messages have been sent or have failed to be
// sent. Stop should be called when the client is no longer required.
func (p *PublisherClient) Stop() {
p.wirePub.Stop()
p.wirePub.WaitStopped()
}
// Error returns the error that caused the publisher client to terminate. It
// may be nil if Stop() was called.
func (p *PublisherClient) Error() error {
p.mu.Lock()
defer p.mu.Unlock()
if p.err != nil {
return p.err
}
return p.wirePub.Error()
}
func (p *PublisherClient) setError(err error) {
p.mu.Lock()
defer p.mu.Unlock()
// Don't clobber original error.
if p.err == nil {
p.err = err
}
}
func (p *PublisherClient) transformMessage(from *pubsub.Message, to *pb.PubSubMessage) error {
if p.settings.MessageTransformer != nil {
return p.settings.MessageTransformer(from, to)
}
keyExtractor := p.settings.KeyExtractor
if keyExtractor == nil {
keyExtractor = extractOrderingKey
}
return transformPublishedMessage(from, to, keyExtractor)
}