blob: 5dc25666a0727ef1f0c08d72b1e71ab37d6d2e6d [file] [log] [blame]
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
package pubsub
import (
"context"
"time"
)
// AckHandler implements ack/nack handling.
type AckHandler interface {
// OnAck processes a message ack.
OnAck()
// OnNack processes a message nack.
OnNack()
// OnAckWithResult processes a message ack and returns
// a result that shows if it succeeded.
OnAckWithResult() *AckResult
// OnNackWithResult processes a message nack and returns
// a result that shows if it succeeded.
OnNackWithResult() *AckResult
}
// Message represents a Pub/Sub message.
type Message struct {
// ID identifies this message. This ID is assigned by the server and is
// populated for Messages obtained from a subscription.
//
// This field is read-only.
ID string
// Data is the actual data in the message.
Data []byte
// Attributes represents the key-value pairs the current message is
// labelled with.
Attributes map[string]string
// PublishTime is the time at which the message was published. This is
// populated by the server for Messages obtained from a subscription.
//
// This field is read-only.
PublishTime time.Time
// DeliveryAttempt is the number of times a message has been delivered.
// This is part of the dead lettering feature that forwards messages that
// fail to be processed (from nack/ack deadline timeout) to a dead letter topic.
// If dead lettering is enabled, this will be set on all attempts, starting
// with value 1. Otherwise, the value will be nil.
// This field is read-only.
DeliveryAttempt *int
// OrderingKey identifies related messages for which publish order should
// be respected. If empty string is used, message will be sent unordered.
OrderingKey string
// ackh handles Ack() or Nack().
ackh AckHandler
}
// Ack indicates successful processing of a Message passed to the Subscriber.Receive callback.
// It should not be called on any other Message value.
// If message acknowledgement fails, the Message will be redelivered.
// Client code must call Ack or Nack when finished for each received Message.
// Calls to Ack or Nack have no effect after the first call.
func (m *Message) Ack() {
if m.ackh != nil {
m.ackh.OnAck()
}
}
// Nack indicates that the client will not or cannot process a Message passed to the Subscriber.Receive callback.
// It should not be called on any other Message value.
// Nack will result in the Message being redelivered more quickly than if it were allowed to expire.
// Client code must call Ack or Nack when finished for each received Message.
// Calls to Ack or Nack have no effect after the first call.
func (m *Message) Nack() {
if m.ackh != nil {
m.ackh.OnNack()
}
}
// AcknowledgeStatus represents the status of an Ack or Nack request.
type AcknowledgeStatus int
const (
// AcknowledgeStatusSuccess indicates the request was a success.
AcknowledgeStatusSuccess AcknowledgeStatus = iota
// AcknowledgeStatusPermissionDenied indicates the caller does not have sufficient permissions.
AcknowledgeStatusPermissionDenied
// AcknowledgeStatusFailedPrecondition indicates the request encountered a FailedPrecondition error.
AcknowledgeStatusFailedPrecondition
// AcknowledgeStatusInvalidAckID indicates one or more of the ack IDs sent were invalid.
AcknowledgeStatusInvalidAckID
// AcknowledgeStatusOther indicates another unknown error was returned.
AcknowledgeStatusOther
)
// AckResult holds the result from a call to Ack or Nack.
type AckResult struct {
ready chan struct{}
res AcknowledgeStatus
err error
}
// Ready returns a channel that is closed when the result is ready.
// When the Ready channel is closed, Get is guaranteed not to block.
func (r *AckResult) Ready() <-chan struct{} { return r.ready }
// Get returns the status and/or error result of a Ack, Nack, or Modack call.
// Get blocks until the Ack/Nack completes or the context is done.
func (r *AckResult) Get(ctx context.Context) (res AcknowledgeStatus, err error) {
// If the result is already ready, return it even if the context is done.
select {
case <-r.Ready():
return r.res, r.err
default:
}
select {
case <-ctx.Done():
// Explicitly return AcknowledgeStatusOther for context cancelled cases,
// since the default is success.
return AcknowledgeStatusOther, ctx.Err()
case <-r.Ready():
return r.res, r.err
}
}
// NewAckResult creates a AckResult.
func NewAckResult() *AckResult {
return &AckResult{
ready: make(chan struct{}),
}
}
// SetAckResult sets the ack response and error for a ack result and closes
// the Ready channel. Any call after the first for the same AckResult
// is a no-op.
func SetAckResult(r *AckResult, res AcknowledgeStatus, err error) {
select {
case <-r.Ready():
return
default:
r.res = res
r.err = err
close(r.ready)
}
}
// AckWithResult acknowledges a message in Pub/Sub and it will not be
// delivered to this subscription again.
//
// You should avoid acknowledging messages until you have
// *finished* processing them, so that in the event of a failure,
// you receive the message again.
//
// If exactly-once delivery is enabled on the subscription, the
// AckResult returned by this method tracks the state of acknowledgement
// operation. If the operation completes successfully, the message is
// guaranteed NOT to be re-delivered. Otherwise, the result will
// contain an error with more details about the failure and the
// message may be re-delivered.
//
// If exactly-once delivery is NOT enabled on the subscription, or
// if using Pub/Sub Lite, AckResult readies immediately with a AcknowledgeStatus.Success.
// Since acks in Cloud Pub/Sub are best effort when exactly-once
// delivery is disabled, the message may be re-delivered. Because
// re-deliveries are possible, you should ensure that your processing
// code is idempotent, as you may receive any given message more than
// once.
func (m *Message) AckWithResult() *AckResult {
if m.ackh != nil {
return m.ackh.OnAckWithResult()
}
// When the message was constructed directly rather passed in the callback in `sub.Receive`,
// ready the message with success so calling `AckResult.Get` doesn't panic.
return newSuccessAckResult()
}
// NackWithResult declines to acknowledge the message which indicates that
// the client will not or cannot process a Message. This will cause the message
// to be re-delivered to subscribers. Re-deliveries may take place immediately
// or after a delay.
//
// If exactly-once delivery is enabled on the subscription, the
// AckResult returned by this method tracks the state of nack
// operation. If the operation completes successfully, the result will
// contain AckResponse.Success. Otherwise, the result will contain an error
// with more details about the failure.
//
// If exactly-once delivery is NOT enabled on the subscription, or
// if using Pub/Sub Lite, AckResult readies immediately with a AcknowledgeStatus.Success.
func (m *Message) NackWithResult() *AckResult {
if m.ackh != nil {
return m.ackh.OnNackWithResult()
}
// When the message was constructed directly rather passed in the callback in `sub.Receive`,
// ready the message with success so calling `AckResult.Get` doesn't panic.
return newSuccessAckResult()
}
// NewMessage creates a message with an AckHandler implementation, which should
// not be nil.
func NewMessage(ackh AckHandler) *Message {
return &Message{ackh: ackh}
}
// MessageAckHandler provides access to the internal field Message.ackh.
func MessageAckHandler(m *Message) AckHandler {
return m.ackh
}
func newSuccessAckResult() *AckResult {
ar := NewAckResult()
SetAckResult(ar, AcknowledgeStatusSuccess, nil)
return ar
}