fix(pubsublite): make SubscriberClient.Receive identical to pubsub (#4281)
- Ensures that pscompat.SubscriberClient.Receive and pubsub.Subscription.Receive have identical interfaces.
- Adds examples for how to declare common interfaces for pubsublite and pubsub.
diff --git a/pubsublite/doc.go b/pubsublite/doc.go
index 07abbea..3ec736d 100644
--- a/pubsublite/doc.go
+++ b/pubsublite/doc.go
@@ -29,14 +29,30 @@
More information about Pub/Sub Lite is available at
https://cloud.google.com/pubsub/lite.
+See https://pkg.go.dev/cloud.google.com/go for authentication, timeouts,
+connection pooling and similar aspects of this package.
+
Note: This library is in BETA. Backwards-incompatible changes may be made before
stable v1.0.0 is released.
Introduction
-See https://pkg.go.dev/cloud.google.com/go for authentication, timeouts,
-connection pooling and similar aspects of this package.
+Examples can be found at
+https://pkg.go.dev/cloud.google.com/go/pubsublite#pkg-examples
+and
+https://pkg.go.dev/cloud.google.com/go/pubsublite/pscompat#pkg-examples.
+
+Complete sample programs can be found at
+https://github.com/GoogleCloudPlatform/golang-samples/tree/master/pubsublite.
+
+The cloud.google.com/go/pubsublite/pscompat subpackage contains clients for
+publishing and receiving messages, which have similar interfaces to their
+pubsub.Topic and pubsub.Subscription counterparts in cloud.google.com/go/pubsub.
+The following examples demonstrate how to declare common interfaces:
+https://pkg.go.dev/cloud.google.com/go/pubsublite/pscompat#example-NewPublisherClient-Interface
+and
+https://pkg.go.dev/cloud.google.com/go/pubsublite/pscompat#example-NewSubscriberClient-Interface.
The following imports are required for code snippets below:
@@ -46,11 +62,6 @@
"cloud.google.com/go/pubsublite/pscompat"
)
-More complete examples can be found at
-https://pkg.go.dev/cloud.google.com/go/pubsublite#pkg-examples
-and
-https://pkg.go.dev/cloud.google.com/go/pubsublite/pscompat#pkg-examples.
-
Creating Topics
@@ -83,11 +94,6 @@
Publishing
-The pubsublite/pscompat subpackage contains clients for publishing and receiving
-messages, which have similar interfaces to their pubsub.Topic and
-pubsub.Subscription counterparts in the Cloud Pub/Sub library:
-https://pkg.go.dev/cloud.google.com/go/pubsub.
-
Pub/Sub Lite uses gRPC streams extensively for high throughput. For more
differences, see https://pkg.go.dev/cloud.google.com/go/pubsublite/pscompat.
@@ -118,7 +124,7 @@
Once you've finishing publishing all messages, call Stop to flush all messages
to the service and close gRPC streams. The PublisherClient can no longer be used
-after it has been stopped or has terminated due to a permanent service error.
+after it has been stopped or has terminated due to a permanent error.
publisher.Stop()
@@ -167,8 +173,8 @@
// TODO: Handle error.
}
-Receive blocks until either the context is canceled or a fatal service error
-occurs. To terminate a call to Receive, cancel its context:
+Receive blocks until either the context is canceled or a permanent error occurs.
+To terminate a call to Receive, cancel its context:
cancel()
diff --git a/pubsublite/internal/wire/README.md b/pubsublite/internal/wire/README.md
index 133e1fe..bd17748 100644
--- a/pubsublite/internal/wire/README.md
+++ b/pubsublite/internal/wire/README.md
@@ -1,7 +1,6 @@
# Wire
-This directory contains internal implementation details for Cloud Pub/Sub Lite.
-Its exported interface can change at any time.
+This directory contains internal implementation details for Pub/Sub Lite.
## Conventions
diff --git a/pubsublite/pscompat/doc.go b/pubsublite/pscompat/doc.go
index 87b68b1..deb6f15 100644
--- a/pubsublite/pscompat/doc.go
+++ b/pubsublite/pscompat/doc.go
@@ -19,7 +19,10 @@
https://pkg.go.dev/cloud.google.com/go/pubsub. If interfaces are defined by the
client application, PublisherClient and SubscriberClient can be used as
substitutions for pubsub.Topic.Publish() and pubsub.Subscription.Receive(),
-respectively, from the pubsub package.
+respectively, from the pubsub package. See the following examples:
+https://pkg.go.dev/cloud.google.com/go/pubsublite/pscompat#example-NewPublisherClient-Interface
+and
+https://pkg.go.dev/cloud.google.com/go/pubsublite/pscompat#example-NewSubscriberClient-Interface.
The Cloud Pub/Sub and Pub/Sub Lite services have some differences:
- Pub/Sub Lite does not support NACK for messages. By default, this will
@@ -42,6 +45,9 @@
Information about choosing between Cloud Pub/Sub vs Pub/Sub Lite is available at
https://cloud.google.com/pubsub/docs/choosing-pubsub-or-lite.
+Complete sample programs can be found at
+https://github.com/GoogleCloudPlatform/golang-samples/tree/master/pubsublite.
+
See https://pkg.go.dev/cloud.google.com/go for authentication, timeouts,
connection pooling and similar aspects of this package.
*/
diff --git a/pubsublite/pscompat/example_test.go b/pubsublite/pscompat/example_test.go
index 7ca12e6..7dc32ca 100644
--- a/pubsublite/pscompat/example_test.go
+++ b/pubsublite/pscompat/example_test.go
@@ -292,6 +292,70 @@
cancel()
}
+// This example illustrates how to declare a common interface for publisher
+// clients from Cloud Pub/Sub (cloud.google.com/go/pubsub) and Pub/Sub Lite
+// (cloud.google.com/go/pubsublite/pscompat).
+func ExampleNewPublisherClient_interface() {
+ // publisherInterface is implemented by both pscompat.PublisherClient and
+ // pubsub.Topic.
+ type publisherInterface interface {
+ Publish(context.Context, *pubsub.Message) *pubsub.PublishResult
+ Stop()
+ }
+
+ publish := func(publisher publisherInterface) {
+ defer publisher.Stop()
+ // TODO: Publish messages.
+ }
+
+ // Create a Pub/Sub Lite publisher client.
+ ctx := context.Background()
+ publisher, err := pscompat.NewPublisherClient(ctx, "projects/my-project/locations/zone/topics/my-topic")
+ if err != nil {
+ // TODO: Handle error.
+ }
+ publish(publisher)
+
+ // Create a Cloud Pub/Sub topic to publish.
+ client, err := pubsub.NewClient(ctx, "my-project")
+ if err != nil {
+ // TODO: Handle error.
+ }
+ topic := client.Topic("my-topic")
+ publish(topic)
+}
+
+// This example illustrates how to declare a common interface for subscriber
+// clients from Cloud Pub/Sub (cloud.google.com/go/pubsub) and Pub/Sub Lite
+// (cloud.google.com/go/pubsublite/pscompat).
+func ExampleNewSubscriberClient_interface() {
+ // subscriberInterface is implemented by both pscompat.SubscriberClient and
+ // pubsub.Subscription.
+ type subscriberInterface interface {
+ Receive(context.Context, func(context.Context, *pubsub.Message)) error
+ }
+
+ receive := func(subscriber subscriberInterface) {
+ // TODO: Receive messages.
+ }
+
+ // Create a Pub/Sub Lite subscriber client.
+ ctx := context.Background()
+ subscriber, err := pscompat.NewSubscriberClient(ctx, "projects/my-project/locations/zone/subscriptions/my-subscription")
+ if err != nil {
+ // TODO: Handle error.
+ }
+ receive(subscriber)
+
+ // Create a Cloud Pub/Sub subscription to receive.
+ client, err := pubsub.NewClient(ctx, "my-project")
+ if err != nil {
+ // TODO: Handle error.
+ }
+ subscription := client.Subscription("my-subscription")
+ receive(subscription)
+}
+
func ExampleParseMessageMetadata_publisher() {
ctx := context.Background()
const topic = "projects/my-project/locations/zone/topics/my-topic"
diff --git a/pubsublite/pscompat/subscriber.go b/pubsublite/pscompat/subscriber.go
index acfb356..dafb25e 100644
--- a/pubsublite/pscompat/subscriber.go
+++ b/pubsublite/pscompat/subscriber.go
@@ -86,11 +86,13 @@
return wire.NewSubscriber(context.Background(), f.settings, receiver, f.region, f.subscription.String(), f.options...)
}
+type messageReceiverFunc = func(context.Context, *pubsub.Message)
+
// subscriberInstance wraps an instance of a wire.Subscriber. A new instance is
// created for each invocation of SubscriberClient.Receive().
type subscriberInstance struct {
settings ReceiveSettings
- receiver MessageReceiverFunc
+ receiver messageReceiverFunc
recvCtx context.Context // Context passed to the receiver
recvCancel context.CancelFunc // Corresponding cancel func for recvCtx
wireSub wire.Subscriber
@@ -101,7 +103,7 @@
err error
}
-func newSubscriberInstance(ctx context.Context, factory wireSubscriberFactory, settings ReceiveSettings, receiver MessageReceiverFunc) (*subscriberInstance, error) {
+func newSubscriberInstance(ctx context.Context, factory wireSubscriberFactory, settings ReceiveSettings, receiver messageReceiverFunc) (*subscriberInstance, error) {
recvCtx, recvCancel := context.WithCancel(ctx)
subInstance := &subscriberInstance{
settings: settings,
@@ -221,17 +223,6 @@
return err
}
-// MessageReceiverFunc handles messages sent by the Pub/Sub Lite service.
-//
-// The implementation must arrange for pubsub.Message.Ack() or
-// pubsub.Message.Nack() to be called after processing the message.
-//
-// The receiver func will be called from multiple goroutines if the subscriber
-// is connected to multiple partitions. Only one call from any connected
-// partition will be outstanding at a time, and blocking in this receiver
-// callback will block the delivery of subsequent messages for the partition.
-type MessageReceiverFunc func(context.Context, *pubsub.Message)
-
// SubscriberClient is a Pub/Sub Lite client to receive messages for a given
// subscription.
//
@@ -292,18 +283,20 @@
// If there is a fatal service error, Receive returns that error after all of
// the outstanding calls to f have returned. If ctx is done, Receive returns nil
// after all of the outstanding calls to f have returned and all messages have
-// been acknowledged.
+// been acknowledged. The context passed to f will be canceled when ctx is Done
+// or there is a fatal service error.
//
// Receive calls f concurrently from multiple goroutines if the SubscriberClient
-// is connected to multiple partitions. All messages received by f must be ACKed
-// or NACKed. Failure to do so can prevent Receive from returning.
+// is connected to multiple partitions. Only one call from any connected
+// partition will be outstanding at a time, and blocking in the receiver
+// callback f will block the delivery of subsequent messages for the partition.
//
-// The context passed to f will be canceled when ctx is Done or there is a fatal
-// service error.
+// All messages received by f must be ACKed or NACKed. Failure to do so can
+// prevent Receive from returning.
//
// Each SubscriberClient may have only one invocation of Receive active at a
// time.
-func (s *SubscriberClient) Receive(ctx context.Context, f MessageReceiverFunc) error {
+func (s *SubscriberClient) Receive(ctx context.Context, f func(context.Context, *pubsub.Message)) error {
if err := s.setReceiveActive(true); err != nil {
return err
}
diff --git a/pubsublite/pscompat/subscriber_test.go b/pubsublite/pscompat/subscriber_test.go
index a614a74..429c9f5 100644
--- a/pubsublite/pscompat/subscriber_test.go
+++ b/pubsublite/pscompat/subscriber_test.go
@@ -121,7 +121,7 @@
}, nil
}
-func newTestSubscriberInstance(ctx context.Context, settings ReceiveSettings, receiver MessageReceiverFunc) *subscriberInstance {
+func newTestSubscriberInstance(ctx context.Context, settings ReceiveSettings, receiver messageReceiverFunc) *subscriberInstance {
sub, _ := newSubscriberInstance(ctx, new(mockWireSubscriberFactory), settings, receiver)
return sub
}