refactor(pubsublite): add context to error messages for leaf services (#3649)

To disambiguate error messages from leaf services when they are propagated to the top-level publisher and subscriber clients.
diff --git a/pubsublite/internal/test/util.go b/pubsublite/internal/test/util.go
index 979c8cc..6a35095 100644
--- a/pubsublite/internal/test/util.go
+++ b/pubsublite/internal/test/util.go
@@ -18,13 +18,14 @@
 
 	"github.com/google/go-cmp/cmp"
 	"github.com/google/go-cmp/cmp/cmpopts"
+	"golang.org/x/xerrors"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
 )
 
 // ErrorEqual compares two errors for equivalence.
 func ErrorEqual(got, want error) bool {
-	if got == want {
+	if xerrors.Is(got, want) {
 		return true
 	}
 	return cmp.Equal(got, want, cmpopts.EquateErrors())
diff --git a/pubsublite/internal/wire/assigner.go b/pubsublite/internal/wire/assigner.go
index 14e71f6..e5dcc41 100644
--- a/pubsublite/internal/wire/assigner.go
+++ b/pubsublite/internal/wire/assigner.go
@@ -63,6 +63,7 @@
 type assigner struct {
 	// Immutable after creation.
 	assignmentClient  *vkit.PartitionAssignmentClient
+	subscription      string
 	initialReq        *pb.PartitionAssignmentRequest
 	receiveAssignment partitionAssignmentReceiver
 	metadata          pubsubMetadata
@@ -81,6 +82,7 @@
 
 	a := &assigner{
 		assignmentClient: assignmentClient,
+		subscription:     subscriptionPath,
 		initialReq: &pb.PartitionAssignmentRequest{
 			Request: &pb.PartitionAssignmentRequest_Initial{
 				Initial: &pb.InitialPartitionAssignmentRequest{
@@ -164,7 +166,7 @@
 }
 
 func (a *assigner) unsafeInitiateShutdown(targetStatus serviceStatus, err error) {
-	if !a.unsafeUpdateStatus(targetStatus, err) {
+	if !a.unsafeUpdateStatus(targetStatus, wrapError("assigner", a.subscription, err)) {
 		return
 	}
 	// No data to send. Immediately terminate the stream.
diff --git a/pubsublite/internal/wire/committer.go b/pubsublite/internal/wire/committer.go
index c830f87..d9d24c0 100644
--- a/pubsublite/internal/wire/committer.go
+++ b/pubsublite/internal/wire/committer.go
@@ -42,6 +42,7 @@
 type committer struct {
 	// Immutable after creation.
 	cursorClient *vkit.CursorClient
+	subscription subscriptionPartition
 	initialReq   *pb.StreamingCommitCursorRequest
 	metadata     pubsubMetadata
 
@@ -59,6 +60,7 @@
 
 	c := &committer{
 		cursorClient: cursor,
+		subscription: subscription,
 		initialReq: &pb.StreamingCommitCursorRequest{
 			Request: &pb.StreamingCommitCursorRequest_Initial{
 				Initial: &pb.InitialCommitCursorRequest{
@@ -201,7 +203,7 @@
 }
 
 func (c *committer) unsafeInitiateShutdown(targetStatus serviceStatus, err error) {
-	if !c.unsafeUpdateStatus(targetStatus, err) {
+	if !c.unsafeUpdateStatus(targetStatus, wrapError("committer", c.subscription.String(), err)) {
 		return
 	}
 
diff --git a/pubsublite/internal/wire/errors.go b/pubsublite/internal/wire/errors.go
index 2953dca..55299e0 100644
--- a/pubsublite/internal/wire/errors.go
+++ b/pubsublite/internal/wire/errors.go
@@ -16,6 +16,8 @@
 import (
 	"errors"
 	"fmt"
+
+	"golang.org/x/xerrors"
 )
 
 // Errors exported from this package.
@@ -41,3 +43,10 @@
 	// stopping.
 	ErrServiceStopped = errors.New("pubsublite: service has stopped or is stopping")
 )
+
+func wrapError(context, resource string, err error) error {
+	if err != nil {
+		return xerrors.Errorf("%s(%s): %w", context, resource, err)
+	}
+	return err
+}
diff --git a/pubsublite/internal/wire/publisher.go b/pubsublite/internal/wire/publisher.go
index 19a5a02..f48e114 100644
--- a/pubsublite/internal/wire/publisher.go
+++ b/pubsublite/internal/wire/publisher.go
@@ -236,7 +236,7 @@
 //
 // Expected to be called with singlePartitionPublisher.mu held.
 func (pp *singlePartitionPublisher) unsafeInitiateShutdown(targetStatus serviceStatus, err error) {
-	if !pp.unsafeUpdateStatus(targetStatus, err) {
+	if !pp.unsafeUpdateStatus(targetStatus, wrapError("publisher", pp.topic.String(), err)) {
 		return
 	}
 
diff --git a/pubsublite/internal/wire/resources.go b/pubsublite/internal/wire/resources.go
index 5df9e81..5fa01a7 100644
--- a/pubsublite/internal/wire/resources.go
+++ b/pubsublite/internal/wire/resources.go
@@ -149,7 +149,15 @@
 	Partition int
 }
 
+func (tp topicPartition) String() string {
+	return fmt.Sprintf("%s/partitions/%d", tp.Path, tp.Partition)
+}
+
 type subscriptionPartition struct {
 	Path      string
 	Partition int
 }
+
+func (sp subscriptionPartition) String() string {
+	return fmt.Sprintf("%s/partitions/%d", sp.Path, sp.Partition)
+}
diff --git a/pubsublite/internal/wire/subscriber.go b/pubsublite/internal/wire/subscriber.go
index 054747a..18fc516 100644
--- a/pubsublite/internal/wire/subscriber.go
+++ b/pubsublite/internal/wire/subscriber.go
@@ -325,7 +325,7 @@
 }
 
 func (s *subscribeStream) unsafeInitiateShutdown(targetStatus serviceStatus, err error) {
-	if !s.unsafeUpdateStatus(targetStatus, err) {
+	if !s.unsafeUpdateStatus(targetStatus, wrapError("subscriber", s.subscription.String(), err)) {
 		return
 	}