blob: 088ed79cf952ed9640a6cad1888073a4aa9e1be9 [file] [log] [blame]
// Copyright 2016 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pubsub
import (
"sync"
"time"
"golang.org/x/net/context"
)
// ackBuffer stores the pending ack IDs and notifies the Dirty channel when it becomes non-empty.
type ackBuffer struct {
Dirty chan struct{}
// Close done when ackBuffer is no longer needed.
Done chan struct{}
mu sync.Mutex
pending []string
send bool
}
// Add adds ackID to the buffer.
func (buf *ackBuffer) Add(ackID string) {
buf.mu.Lock()
defer buf.mu.Unlock()
buf.pending = append(buf.pending, ackID)
// If we are transitioning into a non-empty notification state.
if buf.send && len(buf.pending) == 1 {
buf.notify()
}
}
// RemoveAll removes all ackIDs from the buffer and returns them.
func (buf *ackBuffer) RemoveAll() []string {
buf.mu.Lock()
defer buf.mu.Unlock()
ret := buf.pending
buf.pending = nil
return ret
}
// SendNotifications enables sending dirty notification on empty -> non-empty transitions.
// If the buffer is already non-empty, a notification will be sent immediately.
func (buf *ackBuffer) SendNotifications() {
buf.mu.Lock()
defer buf.mu.Unlock()
buf.send = true
// If we are transitioning into a non-empty notification state.
if len(buf.pending) > 0 {
buf.notify()
}
}
func (buf *ackBuffer) notify() {
go func() {
select {
case buf.Dirty <- struct{}{}:
case <-buf.Done:
}
}()
}
// acker acks messages in batches.
type acker struct {
s service
Ctx context.Context // The context to use when acknowledging messages.
Sub string // The full name of the subscription.
AckTick <-chan time.Time // AckTick supplies the frequency with which to make ack requests.
// Notify is called with an ack ID after the message with that ack ID
// has been processed. An ackID is considered to have been processed
// if at least one attempt has been made to acknowledge it.
Notify func(string)
ackBuffer
wg sync.WaitGroup
done chan struct{}
}
// Start intiates processing of ackIDs which are added via Add.
// Notify is called with each ackID once it has been processed.
func (a *acker) Start() {
a.done = make(chan struct{})
a.ackBuffer.Dirty = make(chan struct{})
a.ackBuffer.Done = a.done
a.wg.Add(1)
go func() {
defer a.wg.Done()
for {
select {
case <-a.ackBuffer.Dirty:
a.ack(a.ackBuffer.RemoveAll())
case <-a.AckTick:
a.ack(a.ackBuffer.RemoveAll())
case <-a.done:
return
}
}
}()
}
// Ack adds an ack id to be acked in the next batch.
func (a *acker) Ack(ackID string) {
a.ackBuffer.Add(ackID)
}
// FastMode switches acker into a mode which acks messages as they arrive, rather than waiting
// for a.AckTick.
func (a *acker) FastMode() {
a.ackBuffer.SendNotifications()
}
// Stop drops all pending messages, and releases resources before returning.
func (a *acker) Stop() {
close(a.done)
a.wg.Wait()
}
const maxAckAttempts = 2
// ack acknowledges the supplied ackIDs.
// After the acknowledgement request has completed (regardless of its success
// or failure), ids will be passed to a.Notify.
func (a *acker) ack(ids []string) {
head, tail := a.s.splitAckIDs(ids)
for len(head) > 0 {
for i := 0; i < maxAckAttempts; i++ {
if a.s.acknowledge(a.Ctx, a.Sub, head) == nil {
break
}
}
// NOTE: if retry gives up and returns an error, we simply drop
// those ack IDs. The messages will be redelivered and this is
// a documented behaviour of the API.
head, tail = a.s.splitAckIDs(tail)
}
for _, id := range ids {
a.Notify(id)
}
}