blob: 6f4e9415a822a29b954751485e00a840dbfe5ad2 [file] [log] [blame]
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
package pscompat
import (
"context"
"sync"
"cloud.google.com/go/pubsub"
"cloud.google.com/go/pubsublite/internal/wire"
"google.golang.org/api/option"
ipubsub "cloud.google.com/go/internal/pubsub"
pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
)
var (
// ErrOverflow is set for a PublishResult when publish buffers overflow.
// Use errors.Is for comparing errors.
ErrOverflow = wire.ErrOverflow
// ErrOversizedMessage is set for a PublishResult when a published message
// exceeds MaxPublishRequestBytes. Use errors.Is for comparing errors.
ErrOversizedMessage = wire.ErrOversizedMessage
// ErrPublisherStopped is set for a PublishResult when a message cannot be
// published because the publisher client has stopped or is in the process of
// stopping. PublisherClient.Error() returns the error that caused the
// publisher client to terminate (if any). Use errors.Is for comparing errors.
ErrPublisherStopped = wire.ErrServiceStopped
// ErrBackendUnavailable indicates that the backend service has been
// unavailable for a period of time. The timeout can be configured using
// PublishSettings.Timeout or ReceiveSettings.Timeout. Use errors.Is for
// comparing errors.
ErrBackendUnavailable = wire.ErrBackendUnavailable
)
// PublisherClient is a Pub/Sub Lite client to publish messages to a given
// topic. A PublisherClient is safe to use from multiple goroutines.
//
// See https://cloud.google.com/pubsub/lite/docs/publishing for more information
// about publishing.
type PublisherClient struct {
settings PublishSettings
wirePub wire.Publisher
// Fields below must be guarded with mutex.
mu sync.Mutex
err error
}
// NewPublisherClient creates a new Pub/Sub Lite publisher client to publish
// messages to a given topic, using DefaultPublishSettings. A valid topic path
// has the format: "projects/PROJECT_ID/locations/ZONE/topics/TOPIC_ID".
func NewPublisherClient(ctx context.Context, topic string, opts ...option.ClientOption) (*PublisherClient, error) {
return NewPublisherClientWithSettings(ctx, topic, DefaultPublishSettings, opts...)
}
// NewPublisherClientWithSettings creates a new Pub/Sub Lite publisher client to
// publish messages to a given topic, using the specified PublishSettings. A
// valid topic path has the format:
// "projects/PROJECT_ID/locations/ZONE/topics/TOPIC_ID".
func NewPublisherClientWithSettings(ctx context.Context, topic string, settings PublishSettings, opts ...option.ClientOption) (*PublisherClient, error) {
topicPath, err := wire.ParseTopicPath(topic)
if err != nil {
return nil, err
}
region, err := wire.ZoneToRegion(topicPath.Zone)
if err != nil {
return nil, err
}
// Note: ctx is not used to create the wire publisher, because if it is
// cancelled, the publisher will not be able to perform graceful shutdown
// (e.g. flush pending messages).
wirePub, err := wire.NewPublisher(context.Background(), settings.toWireSettings(), region, topic, opts...)
if err != nil {
return nil, err
}
wirePub.Start()
if err := wirePub.WaitStarted(); err != nil {
return nil, err
}
return &PublisherClient{settings: settings, wirePub: wirePub}, nil
}
// Publish publishes `msg` to the topic asynchronously. Messages are batched and
// sent according to the client's PublishSettings. Publish never blocks.
//
// Publish returns a non-nil PublishResult which will be ready when the
// message has been sent (or has failed to be sent) to the server. Retryable
// errors are automatically handled. If a PublishResult returns an error, this
// indicates that the publisher client encountered a fatal error and has
// permanently terminated. After the fatal error has been resolved, a new
// publisher client instance must be created to republish failed messages.
//
// Once Stop() has been called or the publisher client has failed permanently
// due to an error, future calls to Publish will immediately return a
// PublishResult with error ErrPublisherStopped.
//
// Error() returns the error that caused the publisher client to terminate and
// may contain more context than the error returned by PublishResult.
func (p *PublisherClient) Publish(ctx context.Context, msg *pubsub.Message) *pubsub.PublishResult {
result := ipubsub.NewPublishResult()
msgpb := new(pb.PubSubMessage)
if err := p.transformMessage(msg, msgpb); err != nil {
ipubsub.SetPublishResult(result, "", err)
p.setError(err)
p.wirePub.Stop()
return result
}
p.wirePub.Publish(msgpb, func(metadata *wire.MessageMetadata, err error) {
if metadata != nil {
ipubsub.SetPublishResult(result, metadata.String(), err)
} else {
ipubsub.SetPublishResult(result, "", err)
}
})
return result
}
// Stop sends all remaining published messages and closes publish streams.
// Returns once all outstanding messages have been sent or have failed to be
// sent. Stop should be called when the client is no longer required.
func (p *PublisherClient) Stop() {
p.wirePub.Stop()
p.wirePub.WaitStopped()
}
// Error returns the error that caused the publisher client to terminate. The
// error returned here may contain more context than PublishResult errors. The
// return value may be nil if Stop() was called.
func (p *PublisherClient) Error() error {
p.mu.Lock()
defer p.mu.Unlock()
if p.err != nil {
return p.err
}
return p.wirePub.Error()
}
func (p *PublisherClient) setError(err error) {
p.mu.Lock()
defer p.mu.Unlock()
// Don't clobber original error.
if p.err == nil {
p.err = err
}
}
func (p *PublisherClient) transformMessage(from *pubsub.Message, to *pb.PubSubMessage) error {
if p.settings.MessageTransformer != nil {
return p.settings.MessageTransformer(from, to)
}
keyExtractor := p.settings.KeyExtractor
if keyExtractor == nil {
keyExtractor = extractOrderingKey
}
return transformPublishedMessage(from, to, keyExtractor)
}