| // Copyright 2020 Google LLC |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // https://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| |
| package pubsub |
| |
| import ( |
| "context" |
| "time" |
| ) |
| |
| // AckHandler implements ack/nack handling. |
| type AckHandler interface { |
| // OnAck processes a message ack. |
| OnAck() |
| |
| // OnNack processes a message nack. |
| OnNack() |
| |
| // OnAckWithResult processes a message ack and returns |
| // a result that shows if it succeeded. |
| OnAckWithResult() *AckResult |
| |
| // OnNackWithResult processes a message nack and returns |
| // a result that shows if it succeeded. |
| OnNackWithResult() *AckResult |
| } |
| |
| // Message represents a Pub/Sub message. |
| type Message struct { |
| // ID identifies this message. This ID is assigned by the server and is |
| // populated for Messages obtained from a subscription. |
| // |
| // This field is read-only. |
| ID string |
| |
| // Data is the actual data in the message. |
| Data []byte |
| |
| // Attributes represents the key-value pairs the current message is |
| // labelled with. |
| Attributes map[string]string |
| |
| // PublishTime is the time at which the message was published. This is |
| // populated by the server for Messages obtained from a subscription. |
| // |
| // This field is read-only. |
| PublishTime time.Time |
| |
| // DeliveryAttempt is the number of times a message has been delivered. |
| // This is part of the dead lettering feature that forwards messages that |
| // fail to be processed (from nack/ack deadline timeout) to a dead letter topic. |
| // If dead lettering is enabled, this will be set on all attempts, starting |
| // with value 1. Otherwise, the value will be nil. |
| // This field is read-only. |
| DeliveryAttempt *int |
| |
| // OrderingKey identifies related messages for which publish order should |
| // be respected. If empty string is used, message will be sent unordered. |
| OrderingKey string |
| |
| // ackh handles Ack() or Nack(). |
| ackh AckHandler |
| } |
| |
| // Ack indicates successful processing of a Message passed to the Subscriber.Receive callback. |
| // It should not be called on any other Message value. |
| // If message acknowledgement fails, the Message will be redelivered. |
| // Client code must call Ack or Nack when finished for each received Message. |
| // Calls to Ack or Nack have no effect after the first call. |
| func (m *Message) Ack() { |
| if m.ackh != nil { |
| m.ackh.OnAck() |
| } |
| } |
| |
| // Nack indicates that the client will not or cannot process a Message passed to the Subscriber.Receive callback. |
| // It should not be called on any other Message value. |
| // Nack will result in the Message being redelivered more quickly than if it were allowed to expire. |
| // Client code must call Ack or Nack when finished for each received Message. |
| // Calls to Ack or Nack have no effect after the first call. |
| func (m *Message) Nack() { |
| if m.ackh != nil { |
| m.ackh.OnNack() |
| } |
| } |
| |
| // AcknowledgeStatus represents the status of an Ack or Nack request. |
| type AcknowledgeStatus int |
| |
| const ( |
| // AcknowledgeStatusSuccess indicates the request was a success. |
| AcknowledgeStatusSuccess AcknowledgeStatus = iota |
| // AcknowledgeStatusPermissionDenied indicates the caller does not have sufficient permissions. |
| AcknowledgeStatusPermissionDenied |
| // AcknowledgeStatusFailedPrecondition indicates the request encountered a FailedPrecondition error. |
| AcknowledgeStatusFailedPrecondition |
| // AcknowledgeStatusInvalidAckID indicates one or more of the ack IDs sent were invalid. |
| AcknowledgeStatusInvalidAckID |
| // AcknowledgeStatusOther indicates another unknown error was returned. |
| AcknowledgeStatusOther |
| ) |
| |
| // AckResult holds the result from a call to Ack or Nack. |
| type AckResult struct { |
| ready chan struct{} |
| res AcknowledgeStatus |
| err error |
| } |
| |
| // Ready returns a channel that is closed when the result is ready. |
| // When the Ready channel is closed, Get is guaranteed not to block. |
| func (r *AckResult) Ready() <-chan struct{} { return r.ready } |
| |
| // Get returns the status and/or error result of a Ack, Nack, or Modack call. |
| // Get blocks until the Ack/Nack completes or the context is done. |
| func (r *AckResult) Get(ctx context.Context) (res AcknowledgeStatus, err error) { |
| // If the result is already ready, return it even if the context is done. |
| select { |
| case <-r.Ready(): |
| return r.res, r.err |
| default: |
| } |
| select { |
| case <-ctx.Done(): |
| // Explicitly return AcknowledgeStatusOther for context cancelled cases, |
| // since the default is success. |
| return AcknowledgeStatusOther, ctx.Err() |
| case <-r.Ready(): |
| return r.res, r.err |
| } |
| } |
| |
| // NewAckResult creates a AckResult. |
| func NewAckResult() *AckResult { |
| return &AckResult{ |
| ready: make(chan struct{}), |
| } |
| } |
| |
| // SetAckResult sets the ack response and error for a ack result and closes |
| // the Ready channel. Any call after the first for the same AckResult |
| // is a no-op. |
| func SetAckResult(r *AckResult, res AcknowledgeStatus, err error) { |
| select { |
| case <-r.Ready(): |
| return |
| default: |
| r.res = res |
| r.err = err |
| close(r.ready) |
| } |
| } |
| |
| // AckWithResult acknowledges a message in Pub/Sub and it will not be |
| // delivered to this subscription again. |
| // |
| // You should avoid acknowledging messages until you have |
| // *finished* processing them, so that in the event of a failure, |
| // you receive the message again. |
| // |
| // If exactly-once delivery is enabled on the subscription, the |
| // AckResult returned by this method tracks the state of acknowledgement |
| // operation. If the operation completes successfully, the message is |
| // guaranteed NOT to be re-delivered. Otherwise, the result will |
| // contain an error with more details about the failure and the |
| // message may be re-delivered. |
| // |
| // If exactly-once delivery is NOT enabled on the subscription, or |
| // if using Pub/Sub Lite, AckResult readies immediately with a AcknowledgeStatus.Success. |
| // Since acks in Cloud Pub/Sub are best effort when exactly-once |
| // delivery is disabled, the message may be re-delivered. Because |
| // re-deliveries are possible, you should ensure that your processing |
| // code is idempotent, as you may receive any given message more than |
| // once. |
| func (m *Message) AckWithResult() *AckResult { |
| if m.ackh != nil { |
| return m.ackh.OnAckWithResult() |
| } |
| // When the message was constructed directly rather passed in the callback in `sub.Receive`, |
| // ready the message with success so calling `AckResult.Get` doesn't panic. |
| return newSuccessAckResult() |
| } |
| |
| // NackWithResult declines to acknowledge the message which indicates that |
| // the client will not or cannot process a Message. This will cause the message |
| // to be re-delivered to subscribers. Re-deliveries may take place immediately |
| // or after a delay. |
| // |
| // If exactly-once delivery is enabled on the subscription, the |
| // AckResult returned by this method tracks the state of nack |
| // operation. If the operation completes successfully, the result will |
| // contain AckResponse.Success. Otherwise, the result will contain an error |
| // with more details about the failure. |
| // |
| // If exactly-once delivery is NOT enabled on the subscription, or |
| // if using Pub/Sub Lite, AckResult readies immediately with a AcknowledgeStatus.Success. |
| func (m *Message) NackWithResult() *AckResult { |
| if m.ackh != nil { |
| return m.ackh.OnNackWithResult() |
| } |
| // When the message was constructed directly rather passed in the callback in `sub.Receive`, |
| // ready the message with success so calling `AckResult.Get` doesn't panic. |
| return newSuccessAckResult() |
| } |
| |
| // NewMessage creates a message with an AckHandler implementation, which should |
| // not be nil. |
| func NewMessage(ackh AckHandler) *Message { |
| return &Message{ackh: ackh} |
| } |
| |
| // MessageAckHandler provides access to the internal field Message.ackh. |
| func MessageAckHandler(m *Message) AckHandler { |
| return m.ackh |
| } |
| |
| func newSuccessAckResult() *AckResult { |
| ar := NewAckResult() |
| SetAckResult(ar, AcknowledgeStatusSuccess, nil) |
| return ar |
| } |