refactor(pubsublite): add context to error messages for leaf services (#3649)
To disambiguate error messages from leaf services when they are propagated to the top-level publisher and subscriber clients.
diff --git a/pubsublite/internal/test/util.go b/pubsublite/internal/test/util.go
index 979c8cc..6a35095 100644
--- a/pubsublite/internal/test/util.go
+++ b/pubsublite/internal/test/util.go
@@ -18,13 +18,14 @@
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
+ "golang.org/x/xerrors"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// ErrorEqual compares two errors for equivalence.
func ErrorEqual(got, want error) bool {
- if got == want {
+ if xerrors.Is(got, want) {
return true
}
return cmp.Equal(got, want, cmpopts.EquateErrors())
diff --git a/pubsublite/internal/wire/assigner.go b/pubsublite/internal/wire/assigner.go
index 14e71f6..e5dcc41 100644
--- a/pubsublite/internal/wire/assigner.go
+++ b/pubsublite/internal/wire/assigner.go
@@ -63,6 +63,7 @@
type assigner struct {
// Immutable after creation.
assignmentClient *vkit.PartitionAssignmentClient
+ subscription string
initialReq *pb.PartitionAssignmentRequest
receiveAssignment partitionAssignmentReceiver
metadata pubsubMetadata
@@ -81,6 +82,7 @@
a := &assigner{
assignmentClient: assignmentClient,
+ subscription: subscriptionPath,
initialReq: &pb.PartitionAssignmentRequest{
Request: &pb.PartitionAssignmentRequest_Initial{
Initial: &pb.InitialPartitionAssignmentRequest{
@@ -164,7 +166,7 @@
}
func (a *assigner) unsafeInitiateShutdown(targetStatus serviceStatus, err error) {
- if !a.unsafeUpdateStatus(targetStatus, err) {
+ if !a.unsafeUpdateStatus(targetStatus, wrapError("assigner", a.subscription, err)) {
return
}
// No data to send. Immediately terminate the stream.
diff --git a/pubsublite/internal/wire/committer.go b/pubsublite/internal/wire/committer.go
index c830f87..d9d24c0 100644
--- a/pubsublite/internal/wire/committer.go
+++ b/pubsublite/internal/wire/committer.go
@@ -42,6 +42,7 @@
type committer struct {
// Immutable after creation.
cursorClient *vkit.CursorClient
+ subscription subscriptionPartition
initialReq *pb.StreamingCommitCursorRequest
metadata pubsubMetadata
@@ -59,6 +60,7 @@
c := &committer{
cursorClient: cursor,
+ subscription: subscription,
initialReq: &pb.StreamingCommitCursorRequest{
Request: &pb.StreamingCommitCursorRequest_Initial{
Initial: &pb.InitialCommitCursorRequest{
@@ -201,7 +203,7 @@
}
func (c *committer) unsafeInitiateShutdown(targetStatus serviceStatus, err error) {
- if !c.unsafeUpdateStatus(targetStatus, err) {
+ if !c.unsafeUpdateStatus(targetStatus, wrapError("committer", c.subscription.String(), err)) {
return
}
diff --git a/pubsublite/internal/wire/errors.go b/pubsublite/internal/wire/errors.go
index 2953dca..55299e0 100644
--- a/pubsublite/internal/wire/errors.go
+++ b/pubsublite/internal/wire/errors.go
@@ -16,6 +16,8 @@
import (
"errors"
"fmt"
+
+ "golang.org/x/xerrors"
)
// Errors exported from this package.
@@ -41,3 +43,10 @@
// stopping.
ErrServiceStopped = errors.New("pubsublite: service has stopped or is stopping")
)
+
+func wrapError(context, resource string, err error) error {
+ if err != nil {
+ return xerrors.Errorf("%s(%s): %w", context, resource, err)
+ }
+ return err
+}
diff --git a/pubsublite/internal/wire/publisher.go b/pubsublite/internal/wire/publisher.go
index 19a5a02..f48e114 100644
--- a/pubsublite/internal/wire/publisher.go
+++ b/pubsublite/internal/wire/publisher.go
@@ -236,7 +236,7 @@
//
// Expected to be called with singlePartitionPublisher.mu held.
func (pp *singlePartitionPublisher) unsafeInitiateShutdown(targetStatus serviceStatus, err error) {
- if !pp.unsafeUpdateStatus(targetStatus, err) {
+ if !pp.unsafeUpdateStatus(targetStatus, wrapError("publisher", pp.topic.String(), err)) {
return
}
diff --git a/pubsublite/internal/wire/resources.go b/pubsublite/internal/wire/resources.go
index 5df9e81..5fa01a7 100644
--- a/pubsublite/internal/wire/resources.go
+++ b/pubsublite/internal/wire/resources.go
@@ -149,7 +149,15 @@
Partition int
}
+func (tp topicPartition) String() string {
+ return fmt.Sprintf("%s/partitions/%d", tp.Path, tp.Partition)
+}
+
type subscriptionPartition struct {
Path string
Partition int
}
+
+func (sp subscriptionPartition) String() string {
+ return fmt.Sprintf("%s/partitions/%d", sp.Path, sp.Partition)
+}
diff --git a/pubsublite/internal/wire/subscriber.go b/pubsublite/internal/wire/subscriber.go
index 054747a..18fc516 100644
--- a/pubsublite/internal/wire/subscriber.go
+++ b/pubsublite/internal/wire/subscriber.go
@@ -325,7 +325,7 @@
}
func (s *subscribeStream) unsafeInitiateShutdown(targetStatus serviceStatus, err error) {
- if !s.unsafeUpdateStatus(targetStatus, err) {
+ if !s.unsafeUpdateStatus(targetStatus, wrapError("subscriber", s.subscription.String(), err)) {
return
}