| // Copyright 2016 Google LLC |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package pubsub |
| |
| import ( |
| "context" |
| "errors" |
| "io" |
| "log" |
| "strings" |
| "sync" |
| "time" |
| |
| ipubsub "cloud.google.com/go/internal/pubsub" |
| vkit "cloud.google.com/go/pubsub/apiv1" |
| pb "cloud.google.com/go/pubsub/apiv1/pubsubpb" |
| "cloud.google.com/go/pubsub/internal/distribution" |
| gax "github.com/googleapis/gax-go/v2" |
| "github.com/googleapis/gax-go/v2/apierror" |
| "google.golang.org/grpc" |
| "google.golang.org/grpc/codes" |
| "google.golang.org/grpc/status" |
| "google.golang.org/protobuf/encoding/protowire" |
| ) |
| |
| // Between message receipt and ack (that is, the time spent processing a message) we want to extend the message |
| // deadline by way of modack. However, we don't want to extend the deadline right as soon as the deadline expires; |
| // instead, we'd want to extend the deadline a little bit of time ahead. gracePeriod is that amount of time ahead |
| // of the actual deadline. |
| const gracePeriod = 5 * time.Second |
| |
| // ackIDBatchSize is the maximum number of ACK IDs to send in a single Ack/Modack RPC. |
| // The backend imposes a maximum request size limit of 524288 bytes (512 KiB) per |
| // acknowledge / modifyAckDeadline request. ACK IDs have a maximum size of 164 |
| // bytes, thus we cannot send more than 524288/176 ~= 2979 ACK IDs in an Ack/ModAc |
| |
| // Accounting for some overhead, we should thus only send a maximum of 2500 ACK |
| // IDs at a time. |
| // This is a var such that it can be modified for tests. |
| const ackIDBatchSize int = 2500 |
| |
| // These are vars so tests can change them. |
| var ( |
| maxDurationPerLeaseExtension = 10 * time.Minute |
| minDurationPerLeaseExtension = 10 * time.Second |
| minDurationPerLeaseExtensionExactlyOnce = 1 * time.Minute |
| |
| // The total amount of time to retry acks/modacks with exactly once delivery enabled subscriptions. |
| exactlyOnceDeliveryRetryDeadline = 600 * time.Second |
| ) |
| |
| type messageIterator struct { |
| ctx context.Context |
| cancel func() // the function that will cancel ctx; called in stop |
| po *pullOptions |
| ps *pullStream |
| subc *vkit.SubscriberClient |
| subName string |
| kaTick <-chan time.Time // keep-alive (deadline extensions) |
| ackTicker *time.Ticker // message acks |
| nackTicker *time.Ticker // message nacks |
| pingTicker *time.Ticker // sends to the stream to keep it open |
| failed chan struct{} // closed on stream error |
| drained chan struct{} // closed when stopped && no more pending messages |
| wg sync.WaitGroup |
| |
| mu sync.Mutex |
| ackTimeDist *distribution.D // dist uses seconds |
| |
| // keepAliveDeadlines is a map of id to expiration time. This map is used in conjunction with |
| // subscription.ReceiveSettings.MaxExtension to record the maximum amount of time (the |
| // deadline, more specifically) we're willing to extend a message's ack deadline. As each |
| // message arrives, we'll record now+MaxExtension in this table; whenever we have a chance |
| // to update ack deadlines (via modack), we'll consult this table and only include IDs |
| // that are not beyond their deadline. |
| keepAliveDeadlines map[string]time.Time |
| pendingAcks map[string]*AckResult |
| pendingNacks map[string]*AckResult |
| // ack IDs whose ack deadline is to be modified |
| // ModAcks don't have AckResults but allows reuse of the SendModAck function. |
| pendingModAcks map[string]*AckResult |
| err error // error from stream failure |
| |
| eoMu sync.RWMutex |
| enableExactlyOnceDelivery bool |
| sendNewAckDeadline bool |
| |
| orderingMu sync.RWMutex |
| // enableOrdering determines if messages should be processed in order. This is populated |
| // by the response in StreamingPull and can change mid Receive. Must be accessed |
| // with the lock held. |
| enableOrdering bool |
| } |
| |
| // newMessageIterator starts and returns a new messageIterator. |
| // subName is the full name of the subscription to pull messages from. |
| // Stop must be called on the messageIterator when it is no longer needed. |
| // The iterator always uses the background context for acking messages and extending message deadlines. |
| func newMessageIterator(subc *vkit.SubscriberClient, subName string, po *pullOptions) *messageIterator { |
| var ps *pullStream |
| if !po.synchronous { |
| maxMessages := po.maxOutstandingMessages |
| maxBytes := po.maxOutstandingBytes |
| if po.useLegacyFlowControl { |
| maxMessages = 0 |
| maxBytes = 0 |
| } |
| ps = newPullStream(context.Background(), subc.StreamingPull, subName, maxMessages, maxBytes, po.maxExtensionPeriod) |
| } |
| // The period will update each tick based on the distribution of acks. We'll start by arbitrarily sending |
| // the first keepAlive halfway towards the minimum ack deadline. |
| keepAlivePeriod := minDurationPerLeaseExtension / 2 |
| |
| // Ack promptly so users don't lose work if client crashes. |
| ackTicker := time.NewTicker(100 * time.Millisecond) |
| nackTicker := time.NewTicker(100 * time.Millisecond) |
| pingTicker := time.NewTicker(30 * time.Second) |
| cctx, cancel := context.WithCancel(context.Background()) |
| cctx = withSubscriptionKey(cctx, subName) |
| it := &messageIterator{ |
| ctx: cctx, |
| cancel: cancel, |
| ps: ps, |
| po: po, |
| subc: subc, |
| subName: subName, |
| kaTick: time.After(keepAlivePeriod), |
| ackTicker: ackTicker, |
| nackTicker: nackTicker, |
| pingTicker: pingTicker, |
| failed: make(chan struct{}), |
| drained: make(chan struct{}), |
| ackTimeDist: distribution.New(int(maxDurationPerLeaseExtension/time.Second) + 1), |
| keepAliveDeadlines: map[string]time.Time{}, |
| pendingAcks: map[string]*AckResult{}, |
| pendingNacks: map[string]*AckResult{}, |
| pendingModAcks: map[string]*AckResult{}, |
| } |
| it.wg.Add(1) |
| go it.sender() |
| return it |
| } |
| |
| // Subscription.receive will call stop on its messageIterator when finished with it. |
| // Stop will block until Done has been called on all Messages that have been |
| // returned by Next, or until the context with which the messageIterator was created |
| // is cancelled or exceeds its deadline. |
| func (it *messageIterator) stop() { |
| it.cancel() |
| it.mu.Lock() |
| it.checkDrained() |
| it.mu.Unlock() |
| it.wg.Wait() |
| } |
| |
| // checkDrained closes the drained channel if the iterator has been stopped and all |
| // pending messages have either been n/acked or expired. |
| // |
| // Called with the lock held. |
| func (it *messageIterator) checkDrained() { |
| select { |
| case <-it.drained: |
| return |
| default: |
| } |
| select { |
| case <-it.ctx.Done(): |
| if len(it.keepAliveDeadlines) == 0 { |
| close(it.drained) |
| } |
| default: |
| } |
| } |
| |
| // Given a receiveTime, add the elapsed time to the iterator's ack distribution. |
| // These values are bounded by the ModifyAckDeadline limits, which are |
| // min/maxDurationPerLeaseExtension. |
| func (it *messageIterator) addToDistribution(receiveTime time.Time) { |
| d := time.Since(receiveTime) |
| d = maxDuration(d, minDurationPerLeaseExtension) |
| d = minDuration(d, maxDurationPerLeaseExtension) |
| it.ackTimeDist.Record(int(d / time.Second)) |
| } |
| |
| // Called when a message is acked/nacked. |
| func (it *messageIterator) done(ackID string, ack bool, r *AckResult, receiveTime time.Time) { |
| it.addToDistribution(receiveTime) |
| it.mu.Lock() |
| defer it.mu.Unlock() |
| delete(it.keepAliveDeadlines, ackID) |
| if ack { |
| it.pendingAcks[ackID] = r |
| } else { |
| it.pendingNacks[ackID] = r |
| } |
| it.checkDrained() |
| } |
| |
| // fail is called when a stream method returns a permanent error. |
| // fail returns it.err. This may be err, or it may be the error |
| // set by an earlier call to fail. |
| func (it *messageIterator) fail(err error) error { |
| it.mu.Lock() |
| defer it.mu.Unlock() |
| if it.err == nil { |
| it.err = err |
| close(it.failed) |
| } |
| return it.err |
| } |
| |
| // receive makes a call to the stream's Recv method, or the Pull RPC, and returns |
| // its messages. |
| // maxToPull is the maximum number of messages for the Pull RPC. |
| func (it *messageIterator) receive(maxToPull int32) ([]*Message, error) { |
| it.mu.Lock() |
| ierr := it.err |
| it.mu.Unlock() |
| if ierr != nil { |
| return nil, ierr |
| } |
| |
| // Stop retrieving messages if the iterator's Stop method was called. |
| select { |
| case <-it.ctx.Done(): |
| it.wg.Wait() |
| return nil, io.EOF |
| default: |
| } |
| |
| var rmsgs []*pb.ReceivedMessage |
| var err error |
| if it.po.synchronous { |
| rmsgs, err = it.pullMessages(maxToPull) |
| } else { |
| rmsgs, err = it.recvMessages() |
| } |
| // Any error here is fatal. |
| if err != nil { |
| return nil, it.fail(err) |
| } |
| recordStat(it.ctx, PullCount, int64(len(rmsgs))) |
| now := time.Now() |
| msgs, err := convertMessages(rmsgs, now, it.done) |
| if err != nil { |
| return nil, it.fail(err) |
| } |
| // We received some messages. Remember them so we can keep them alive. Also, |
| // do a receipt mod-ack when streaming. |
| maxExt := time.Now().Add(it.po.maxExtension) |
| ackIDs := map[string]*AckResult{} |
| it.eoMu.RLock() |
| exactlyOnceDelivery := it.enableExactlyOnceDelivery |
| it.eoMu.RUnlock() |
| it.mu.Lock() |
| |
| // pendingMessages maps ackID -> message, and is used |
| // only when exactly once delivery is enabled. |
| // At first, all messages are pending, and they |
| // are removed if the modack call fails. All other |
| // messages are returned to the client for processing. |
| pendingMessages := make(map[string]*ipubsub.Message) |
| for _, m := range msgs { |
| ackID := msgAckID(m) |
| addRecv(m.ID, ackID, now) |
| it.keepAliveDeadlines[ackID] = maxExt |
| // Don't change the mod-ack if the message is going to be nacked. This is |
| // possible if there are retries. |
| if _, ok := it.pendingNacks[ackID]; !ok { |
| // Don't use the message's AckResult here since these are only for receipt modacks. |
| // modack results are transparent to the user so these can automatically succeed unless |
| // exactly once is enabled. |
| // We can't use an empty AckResult here either since SetAckResult will try to |
| // close the channel without checking if it exists. |
| if !exactlyOnceDelivery { |
| ackIDs[ackID] = newSuccessAckResult() |
| } else { |
| ackIDs[ackID] = ipubsub.NewAckResult() |
| pendingMessages[ackID] = m |
| } |
| } |
| } |
| deadline := it.ackDeadline() |
| it.mu.Unlock() |
| |
| if len(ackIDs) > 0 { |
| // When exactly once delivery is not enabled, modacks are fire and forget. |
| if !exactlyOnceDelivery { |
| go func() { |
| it.sendModAck(ackIDs, deadline, false) |
| }() |
| return msgs, nil |
| } |
| |
| // If exactly once is enabled, we should wait until modack responses are successes |
| // before attempting to process messages. |
| it.sendModAck(ackIDs, deadline, false) |
| for ackID, ar := range ackIDs { |
| ctx := context.Background() |
| _, err := ar.Get(ctx) |
| if err != nil { |
| delete(pendingMessages, ackID) |
| it.mu.Lock() |
| // Remove the message from lease management if modack fails here. |
| delete(it.keepAliveDeadlines, ackID) |
| it.mu.Unlock() |
| } |
| } |
| // Only return for processing messages that were successfully modack'ed. |
| // Iterate over the original messages slice for ordering. |
| v := make([]*ipubsub.Message, 0, len(pendingMessages)) |
| for _, m := range msgs { |
| ackID := msgAckID(m) |
| if _, ok := pendingMessages[ackID]; ok { |
| v = append(v, m) |
| } |
| } |
| return v, nil |
| } |
| return nil, nil |
| } |
| |
| // Get messages using the Pull RPC. |
| // This may block indefinitely. It may also return zero messages, after some time waiting. |
| func (it *messageIterator) pullMessages(maxToPull int32) ([]*pb.ReceivedMessage, error) { |
| // Use it.ctx as the RPC context, so that if the iterator is stopped, the call |
| // will return immediately. |
| res, err := it.subc.Pull(it.ctx, &pb.PullRequest{ |
| Subscription: it.subName, |
| MaxMessages: maxToPull, |
| }, gax.WithGRPCOptions(grpc.MaxCallRecvMsgSize(maxSendRecvBytes))) |
| switch { |
| case err == context.Canceled: |
| return nil, nil |
| case status.Code(err) == codes.Canceled: |
| return nil, nil |
| case err != nil: |
| return nil, err |
| default: |
| return res.ReceivedMessages, nil |
| } |
| } |
| |
| func (it *messageIterator) recvMessages() ([]*pb.ReceivedMessage, error) { |
| res, err := it.ps.Recv() |
| if err != nil { |
| return nil, err |
| } |
| |
| // If the new exactly once settings are different than the current settings, update it. |
| it.eoMu.RLock() |
| enableEOD := it.enableExactlyOnceDelivery |
| it.eoMu.RUnlock() |
| |
| subProp := res.GetSubscriptionProperties() |
| if got := subProp.GetExactlyOnceDeliveryEnabled(); got != enableEOD { |
| it.eoMu.Lock() |
| it.sendNewAckDeadline = true |
| it.enableExactlyOnceDelivery = got |
| it.eoMu.Unlock() |
| } |
| |
| // Also update the subscriber's ordering setting if stale. |
| it.orderingMu.RLock() |
| enableOrdering := it.enableOrdering |
| it.orderingMu.RUnlock() |
| |
| if got := subProp.GetMessageOrderingEnabled(); got != enableOrdering { |
| it.orderingMu.Lock() |
| it.enableOrdering = got |
| it.orderingMu.Unlock() |
| } |
| return res.ReceivedMessages, nil |
| } |
| |
| // sender runs in a goroutine and handles all sends to the stream. |
| func (it *messageIterator) sender() { |
| defer it.wg.Done() |
| defer it.ackTicker.Stop() |
| defer it.nackTicker.Stop() |
| defer it.pingTicker.Stop() |
| defer func() { |
| if it.ps != nil { |
| it.ps.CloseSend() |
| } |
| }() |
| |
| done := false |
| for !done { |
| sendAcks := false |
| sendNacks := false |
| sendModAcks := false |
| sendPing := false |
| |
| dl := it.ackDeadline() |
| |
| select { |
| case <-it.failed: |
| // Stream failed: nothing to do, so stop immediately. |
| return |
| |
| case <-it.drained: |
| // All outstanding messages have been marked done: |
| // nothing left to do except make the final calls. |
| it.mu.Lock() |
| sendAcks = (len(it.pendingAcks) > 0) |
| sendNacks = (len(it.pendingNacks) > 0) |
| // No point in sending modacks. |
| done = true |
| |
| case <-it.kaTick: |
| it.mu.Lock() |
| it.handleKeepAlives() |
| sendModAcks = (len(it.pendingModAcks) > 0) |
| |
| nextTick := dl - gracePeriod |
| if nextTick <= 0 { |
| // If the deadline is <= gracePeriod, let's tick again halfway to |
| // the deadline. |
| nextTick = dl / 2 |
| } |
| it.kaTick = time.After(nextTick) |
| |
| case <-it.nackTicker.C: |
| it.mu.Lock() |
| sendNacks = (len(it.pendingNacks) > 0) |
| |
| case <-it.ackTicker.C: |
| it.mu.Lock() |
| sendAcks = (len(it.pendingAcks) > 0) |
| |
| case <-it.pingTicker.C: |
| it.mu.Lock() |
| // Ping only if we are processing messages via streaming. |
| sendPing = !it.po.synchronous |
| } |
| // Lock is held here. |
| var acks, nacks, modAcks map[string]*AckResult |
| if sendAcks { |
| acks = it.pendingAcks |
| it.pendingAcks = map[string]*AckResult{} |
| } |
| if sendNacks { |
| nacks = it.pendingNacks |
| it.pendingNacks = map[string]*AckResult{} |
| } |
| if sendModAcks { |
| modAcks = it.pendingModAcks |
| it.pendingModAcks = map[string]*AckResult{} |
| } |
| it.mu.Unlock() |
| // Make Ack and ModAck RPCs. |
| if sendAcks { |
| it.sendAck(acks) |
| } |
| if sendNacks { |
| // Nack indicated by modifying the deadline to zero. |
| it.sendModAck(nacks, 0, false) |
| } |
| if sendModAcks { |
| it.sendModAck(modAcks, dl, true) |
| } |
| if sendPing { |
| it.pingStream() |
| } |
| } |
| } |
| |
| // handleKeepAlives modifies the pending request to include deadline extensions |
| // for live messages. It also purges expired messages. |
| // |
| // Called with the lock held. |
| func (it *messageIterator) handleKeepAlives() { |
| now := time.Now() |
| for id, expiry := range it.keepAliveDeadlines { |
| if expiry.Before(now) { |
| // This delete will not result in skipping any map items, as implied by |
| // the spec at https://golang.org/ref/spec#For_statements, "For |
| // statements with range clause", note 3, and stated explicitly at |
| // https://groups.google.com/forum/#!msg/golang-nuts/UciASUb03Js/pzSq5iVFAQAJ. |
| delete(it.keepAliveDeadlines, id) |
| } else { |
| // Use a success AckResult since we don't propagate ModAcks back to the user. |
| it.pendingModAcks[id] = newSuccessAckResult() |
| } |
| } |
| it.checkDrained() |
| } |
| |
| // sendAck is used to confirm acknowledgement of a message. If exactly once delivery is |
| // enabled, we'll retry these messages for a short duration in a goroutine. |
| func (it *messageIterator) sendAck(m map[string]*AckResult) { |
| ackIDs := make([]string, 0, len(m)) |
| for k := range m { |
| ackIDs = append(ackIDs, k) |
| } |
| it.eoMu.RLock() |
| exactlyOnceDelivery := it.enableExactlyOnceDelivery |
| it.eoMu.RUnlock() |
| |
| var toSend []string |
| for len(ackIDs) > 0 { |
| toSend, ackIDs = splitRequestIDs(ackIDs, ackIDBatchSize) |
| |
| recordStat(it.ctx, AckCount, int64(len(toSend))) |
| addAcks(toSend) |
| // Use context.Background() as the call's context, not it.ctx. We don't |
| // want to cancel this RPC when the iterator is stopped. |
| cctx2, cancel2 := context.WithTimeout(context.Background(), 60*time.Second) |
| defer cancel2() |
| err := it.subc.Acknowledge(cctx2, &pb.AcknowledgeRequest{ |
| Subscription: it.subName, |
| AckIds: toSend, |
| }) |
| if exactlyOnceDelivery { |
| resultsByAckID := make(map[string]*AckResult) |
| for _, ackID := range toSend { |
| resultsByAckID[ackID] = m[ackID] |
| } |
| st, md := extractMetadata(err) |
| _, toRetry := processResults(st, resultsByAckID, md) |
| if len(toRetry) > 0 { |
| // Retry acks in a separate goroutine. |
| go func() { |
| it.retryAcks(toRetry) |
| }() |
| } |
| } |
| } |
| } |
| |
| // sendModAck is used to extend the lease of messages or nack them. |
| // The receipt mod-ack amount is derived from a percentile distribution based |
| // on the time it takes to process messages. The percentile chosen is the 99%th |
| // percentile in order to capture the highest amount of time necessary without |
| // considering 1% outliers. If the ModAck RPC fails and exactly once delivery is |
| // enabled, we retry it in a separate goroutine for a short duration. |
| func (it *messageIterator) sendModAck(m map[string]*AckResult, deadline time.Duration, logOnInvalid bool) { |
| deadlineSec := int32(deadline / time.Second) |
| ackIDs := make([]string, 0, len(m)) |
| for k := range m { |
| ackIDs = append(ackIDs, k) |
| } |
| it.eoMu.RLock() |
| exactlyOnceDelivery := it.enableExactlyOnceDelivery |
| it.eoMu.RUnlock() |
| var toSend []string |
| for len(ackIDs) > 0 { |
| toSend, ackIDs = splitRequestIDs(ackIDs, ackIDBatchSize) |
| if deadline == 0 { |
| recordStat(it.ctx, NackCount, int64(len(toSend))) |
| } else { |
| recordStat(it.ctx, ModAckCount, int64(len(toSend))) |
| } |
| addModAcks(toSend, deadlineSec) |
| // Use context.Background() as the call's context, not it.ctx. We don't |
| // want to cancel this RPC when the iterator is stopped. |
| cctx, cancel2 := context.WithTimeout(context.Background(), 60*time.Second) |
| defer cancel2() |
| err := it.subc.ModifyAckDeadline(cctx, &pb.ModifyAckDeadlineRequest{ |
| Subscription: it.subName, |
| AckDeadlineSeconds: deadlineSec, |
| AckIds: toSend, |
| }) |
| if exactlyOnceDelivery { |
| resultsByAckID := make(map[string]*AckResult) |
| for _, ackID := range toSend { |
| resultsByAckID[ackID] = m[ackID] |
| } |
| |
| st, md := extractMetadata(err) |
| _, toRetry := processResults(st, resultsByAckID, md) |
| if len(toRetry) > 0 { |
| // Retry modacks/nacks in a separate goroutine. |
| go func() { |
| it.retryModAcks(toRetry, deadlineSec, logOnInvalid) |
| }() |
| } |
| } |
| } |
| } |
| |
| // retryAcks retries the ack RPC with backoff. This must be called in a goroutine |
| // in it.sendAck(), with a max of 2500 ackIDs. |
| func (it *messageIterator) retryAcks(m map[string]*AckResult) { |
| ctx, cancel := context.WithTimeout(context.Background(), exactlyOnceDeliveryRetryDeadline) |
| defer cancel() |
| bo := newExactlyOnceBackoff() |
| for { |
| if ctx.Err() != nil { |
| for _, r := range m { |
| ipubsub.SetAckResult(r, AcknowledgeStatusOther, ctx.Err()) |
| } |
| return |
| } |
| // Don't need to split map since this is the retry function and |
| // there is already a max of 2500 ackIDs here. |
| ackIDs := make([]string, 0, len(m)) |
| for k := range m { |
| ackIDs = append(ackIDs, k) |
| } |
| cctx2, cancel2 := context.WithTimeout(ctx, 60*time.Second) |
| defer cancel2() |
| err := it.subc.Acknowledge(cctx2, &pb.AcknowledgeRequest{ |
| Subscription: it.subName, |
| AckIds: ackIDs, |
| }) |
| st, md := extractMetadata(err) |
| _, toRetry := processResults(st, m, md) |
| if len(toRetry) == 0 { |
| return |
| } |
| time.Sleep(bo.Pause()) |
| m = toRetry |
| } |
| } |
| |
| // retryModAcks retries the modack RPC with backoff. This must be called in a goroutine |
| // in it.sendModAck(), with a max of 2500 ackIDs. Modacks are retried up to 3 times |
| // since after that, the message will have expired. Nacks are retried up until the default |
| // deadline of 10 minutes. |
| func (it *messageIterator) retryModAcks(m map[string]*AckResult, deadlineSec int32, logOnInvalid bool) { |
| bo := newExactlyOnceBackoff() |
| retryCount := 0 |
| ctx, cancel := context.WithTimeout(context.Background(), exactlyOnceDeliveryRetryDeadline) |
| defer cancel() |
| for { |
| // If context is done, complete all AckResults with errors. |
| if ctx.Err() != nil { |
| for _, r := range m { |
| ipubsub.SetAckResult(r, AcknowledgeStatusOther, ctx.Err()) |
| } |
| return |
| } |
| // Only retry modack requests up to 3 times. |
| if deadlineSec != 0 && retryCount > 3 { |
| ackIDs := make([]string, 0, len(m)) |
| for k, ar := range m { |
| ackIDs = append(ackIDs, k) |
| ipubsub.SetAckResult(ar, AcknowledgeStatusOther, errors.New("modack retry failed")) |
| } |
| if logOnInvalid { |
| log.Printf("automatic lease modack retry failed for following IDs: %v", ackIDs) |
| } |
| return |
| } |
| // Don't need to split map since this is the retry function and |
| // there is already a max of 2500 ackIDs here. |
| ackIDs := make([]string, 0, len(m)) |
| for k := range m { |
| ackIDs = append(ackIDs, k) |
| } |
| cctx2, cancel2 := context.WithTimeout(ctx, 60*time.Second) |
| defer cancel2() |
| err := it.subc.ModifyAckDeadline(cctx2, &pb.ModifyAckDeadlineRequest{ |
| Subscription: it.subName, |
| AckIds: ackIDs, |
| AckDeadlineSeconds: deadlineSec, |
| }) |
| st, md := extractMetadata(err) |
| _, toRetry := processResults(st, m, md) |
| if len(toRetry) == 0 { |
| return |
| } |
| time.Sleep(bo.Pause()) |
| m = toRetry |
| retryCount++ |
| } |
| } |
| |
| // Send a message to the stream to keep it open. The stream will close if there's no |
| // traffic on it for a while. By keeping it open, we delay the start of the |
| // expiration timer on messages that are buffered by gRPC or elsewhere in the |
| // network. This matters if it takes a long time to process messages relative to the |
| // default ack deadline, and if the messages are small enough so that many can fit |
| // into the buffer. |
| func (it *messageIterator) pingStream() { |
| spr := &pb.StreamingPullRequest{} |
| it.eoMu.RLock() |
| if it.sendNewAckDeadline { |
| spr.StreamAckDeadlineSeconds = int32(it.ackDeadline()) |
| it.sendNewAckDeadline = false |
| } |
| it.eoMu.RUnlock() |
| it.ps.Send(spr) |
| } |
| |
| // calcFieldSizeString returns the number of bytes string fields |
| // will take up in an encoded proto message. |
| func calcFieldSizeString(fields ...string) int { |
| overhead := 0 |
| for _, field := range fields { |
| overhead += 1 + len(field) + protowire.SizeVarint(uint64(len(field))) |
| } |
| return overhead |
| } |
| |
| // calcFieldSizeInt returns the number of bytes int fields |
| // will take up in an encoded proto message. |
| func calcFieldSizeInt(fields ...int) int { |
| overhead := 0 |
| for _, field := range fields { |
| overhead += 1 + protowire.SizeVarint(uint64(field)) |
| } |
| return overhead |
| } |
| |
| // splitRequestIDs takes a slice of ackIDs and returns two slices such that the first |
| // ackID slice can be used in a request where the payload does not exceed ackIDBatchSize. |
| func splitRequestIDs(ids []string, maxBatchSize int) (prefix, remainder []string) { |
| if len(ids) < maxBatchSize { |
| return ids, []string{} |
| } |
| return ids[:maxBatchSize], ids[maxBatchSize:] |
| } |
| |
| // The deadline to ack is derived from a percentile distribution based |
| // on the time it takes to process messages. The percentile chosen is the 99%th |
| // percentile - that is, processing times up to the 99%th longest processing |
| // times should be safe. The highest 1% may expire. This number was chosen |
| // as a way to cover most users' usecases without losing the value of |
| // expiration. |
| func (it *messageIterator) ackDeadline() time.Duration { |
| pt := time.Duration(it.ackTimeDist.Percentile(.99)) * time.Second |
| it.eoMu.RLock() |
| enableExactlyOnce := it.enableExactlyOnceDelivery |
| it.eoMu.RUnlock() |
| return boundedDuration(pt, it.po.minExtensionPeriod, it.po.maxExtensionPeriod, enableExactlyOnce) |
| } |
| |
| func boundedDuration(ackDeadline, minExtension, maxExtension time.Duration, exactlyOnce bool) time.Duration { |
| // If the user explicitly sets a maxExtensionPeriod, respect it. |
| if maxExtension > 0 { |
| ackDeadline = minDuration(ackDeadline, maxExtension) |
| } |
| |
| // If the user explicitly sets a minExtensionPeriod, respect it. |
| if minExtension > 0 { |
| ackDeadline = maxDuration(ackDeadline, minExtension) |
| } else if exactlyOnce { |
| // Higher minimum ack_deadline for subscriptions with |
| // exactly-once delivery enabled. |
| ackDeadline = maxDuration(ackDeadline, minDurationPerLeaseExtensionExactlyOnce) |
| } else if ackDeadline < minDurationPerLeaseExtension { |
| // Otherwise, lower bound is min ack extension. This is normally bounded |
| // when adding datapoints to the distribution, but this is needed for |
| // the initial few calls to ackDeadline. |
| ackDeadline = minDurationPerLeaseExtension |
| } |
| |
| return ackDeadline |
| } |
| |
| func minDuration(x, y time.Duration) time.Duration { |
| if x < y { |
| return x |
| } |
| return y |
| } |
| |
| func maxDuration(x, y time.Duration) time.Duration { |
| if x > y { |
| return x |
| } |
| return y |
| } |
| |
| const ( |
| transientErrStringPrefix = "TRANSIENT_" |
| permanentInvalidAckErrString = "PERMANENT_FAILURE_INVALID_ACK_ID" |
| ) |
| |
| // extracts information from an API error for exactly once delivery's ack/modack err responses. |
| func extractMetadata(err error) (*status.Status, map[string]string) { |
| apiErr, ok := apierror.FromError(err) |
| if ok { |
| return apiErr.GRPCStatus(), apiErr.Metadata() |
| } |
| return nil, nil |
| } |
| |
| // processResults processes AckResults by referring to errorStatus and errorsByAckID. |
| // The errors returned by the server in `errorStatus` or in `errorsByAckID` |
| // are used to complete the AckResults in `ackResMap` (with a success |
| // or error) or to return requests for further retries. |
| // This function returns two maps of ackID to ack results, one for completed results and the other for ones to retry. |
| // Logic is derived from python-pubsub: https://github.com/googleapis/python-pubsub/blob/main/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py#L161-L220 |
| func processResults(errorStatus *status.Status, ackResMap map[string]*AckResult, errorsByAckID map[string]string) (map[string]*AckResult, map[string]*AckResult) { |
| completedResults := make(map[string]*AckResult) |
| retryResults := make(map[string]*AckResult) |
| for ackID, ar := range ackResMap { |
| // Handle special errors returned for ack/modack RPCs via the ErrorInfo |
| // sidecar metadata when exactly-once delivery is enabled. |
| if errAckID, ok := errorsByAckID[ackID]; ok { |
| if strings.HasPrefix(errAckID, transientErrStringPrefix) { |
| retryResults[ackID] = ar |
| } else { |
| if errAckID == permanentInvalidAckErrString { |
| ipubsub.SetAckResult(ar, AcknowledgeStatusInvalidAckID, errors.New(errAckID)) |
| } else { |
| ipubsub.SetAckResult(ar, AcknowledgeStatusOther, errors.New(errAckID)) |
| } |
| completedResults[ackID] = ar |
| } |
| } else if errorStatus != nil && contains(errorStatus.Code(), exactlyOnceDeliveryTemporaryRetryErrors) { |
| retryResults[ackID] = ar |
| } else if errorStatus != nil { |
| // Other gRPC errors are not retried. |
| switch errorStatus.Code() { |
| case codes.PermissionDenied: |
| ipubsub.SetAckResult(ar, AcknowledgeStatusPermissionDenied, errorStatus.Err()) |
| case codes.FailedPrecondition: |
| ipubsub.SetAckResult(ar, AcknowledgeStatusFailedPrecondition, errorStatus.Err()) |
| default: |
| ipubsub.SetAckResult(ar, AcknowledgeStatusOther, errorStatus.Err()) |
| } |
| completedResults[ackID] = ar |
| } else if ar != nil { |
| // Since no error occurred, requests with AckResults are completed successfully. |
| ipubsub.SetAckResult(ar, AcknowledgeStatusSuccess, nil) |
| completedResults[ackID] = ar |
| } else { |
| // All other requests are considered completed. |
| completedResults[ackID] = ar |
| } |
| } |
| return completedResults, retryResults |
| } |