blob: e5dcc4151584d2ae00a74e7cd987d6cb3ed515c5 [file] [log] [blame]
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
package wire
import (
"context"
"errors"
"fmt"
"reflect"
"github.com/google/uuid"
"google.golang.org/grpc"
vkit "cloud.google.com/go/pubsublite/apiv1"
pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
)
// partitionSet is a set of partition numbers.
type partitionSet map[int]struct{}
func newPartitionSet(assignmentpb *pb.PartitionAssignment) partitionSet {
var void struct{}
partitions := make(map[int]struct{})
for _, p := range assignmentpb.GetPartitions() {
partitions[int(p)] = void
}
return partitionSet(partitions)
}
func (ps partitionSet) Ints() (partitions []int) {
for p := range ps {
partitions = append(partitions, p)
}
return
}
func (ps partitionSet) Contains(partition int) bool {
_, exists := ps[partition]
return exists
}
// A function that generates a 16-byte UUID.
type generateUUIDFunc func() (uuid.UUID, error)
// partitionAssignmentReceiver must enact the received partition assignment from
// the server, or otherwise return an error, which will break the stream. The
// receiver must not call the assigner, as this would result in a deadlock.
type partitionAssignmentReceiver func(partitionSet) error
// assigner wraps the partition assignment stream and notifies a receiver when
// the server sends a new set of partition assignments for a subscriber.
type assigner struct {
// Immutable after creation.
assignmentClient *vkit.PartitionAssignmentClient
subscription string
initialReq *pb.PartitionAssignmentRequest
receiveAssignment partitionAssignmentReceiver
metadata pubsubMetadata
// Fields below must be guarded with mu.
stream *retryableStream
abstractService
}
func newAssigner(ctx context.Context, assignmentClient *vkit.PartitionAssignmentClient, genUUID generateUUIDFunc, settings ReceiveSettings, subscriptionPath string, receiver partitionAssignmentReceiver) (*assigner, error) {
clientID, err := genUUID()
if err != nil {
return nil, fmt.Errorf("pubsublite: failed to generate client UUID: %v", err)
}
a := &assigner{
assignmentClient: assignmentClient,
subscription: subscriptionPath,
initialReq: &pb.PartitionAssignmentRequest{
Request: &pb.PartitionAssignmentRequest_Initial{
Initial: &pb.InitialPartitionAssignmentRequest{
Subscription: subscriptionPath,
ClientId: clientID[:],
},
},
},
receiveAssignment: receiver,
metadata: newPubsubMetadata(),
}
a.stream = newRetryableStream(ctx, a, settings.Timeout, reflect.TypeOf(pb.PartitionAssignment{}))
a.metadata.AddClientInfo(settings.Framework)
return a, nil
}
func (a *assigner) Start() {
a.mu.Lock()
defer a.mu.Unlock()
if a.unsafeUpdateStatus(serviceStarting, nil) {
a.stream.Start()
}
}
func (a *assigner) Stop() {
a.mu.Lock()
defer a.mu.Unlock()
a.unsafeInitiateShutdown(serviceTerminating, nil)
}
func (a *assigner) newStream(ctx context.Context) (grpc.ClientStream, error) {
return a.assignmentClient.AssignPartitions(a.metadata.AddToContext(ctx))
}
func (a *assigner) initialRequest() (interface{}, initialResponseRequired) {
return a.initialReq, initialResponseRequired(false)
}
func (a *assigner) validateInitialResponse(_ interface{}) error {
// Should not be called as initialResponseRequired=false above.
return errors.New("pubsublite: unexpected initial response")
}
func (a *assigner) onStreamStatusChange(status streamStatus) {
a.mu.Lock()
defer a.mu.Unlock()
switch status {
case streamConnected:
a.unsafeUpdateStatus(serviceActive, nil)
case streamTerminated:
a.unsafeInitiateShutdown(serviceTerminated, a.stream.Error())
}
}
func (a *assigner) onResponse(response interface{}) {
a.mu.Lock()
defer a.mu.Unlock()
if a.status >= serviceTerminating {
return
}
assignment, _ := response.(*pb.PartitionAssignment)
if err := a.handleAssignment(assignment); err != nil {
a.unsafeInitiateShutdown(serviceTerminated, err)
}
}
func (a *assigner) handleAssignment(assignment *pb.PartitionAssignment) error {
if err := a.receiveAssignment(newPartitionSet(assignment)); err != nil {
return err
}
a.stream.Send(&pb.PartitionAssignmentRequest{
Request: &pb.PartitionAssignmentRequest_Ack{
Ack: &pb.PartitionAssignmentAck{},
},
})
return nil
}
func (a *assigner) unsafeInitiateShutdown(targetStatus serviceStatus, err error) {
if !a.unsafeUpdateStatus(targetStatus, wrapError("assigner", a.subscription, err)) {
return
}
// No data to send. Immediately terminate the stream.
a.stream.Stop()
}