blob: 12d1f0752bd8cebc91dbecd39a4334c085472d08 [file] [log] [blame]
// Copyright 2016 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pubsub
import (
"sync"
"time"
"golang.org/x/net/context"
pb "google.golang.org/genproto/googleapis/pubsub/v1"
)
// newMessageIterator starts a new streamingMessageIterator. Stop must be called on the messageIterator
// when it is no longer needed.
// subName is the full name of the subscription to pull messages from.
// ctx is the context to use for acking messages and extending message deadlines.
func newMessageIterator(ctx context.Context, s service, subName string, po *pullOptions) *streamingMessageIterator {
sp := s.newStreamingPuller(ctx, subName, int32(po.ackDeadline.Seconds()))
_ = sp.open() // error stored in sp
return newStreamingMessageIterator(ctx, sp, po)
}
type streamingMessageIterator struct {
ctx context.Context
po *pullOptions
sp *streamingPuller
kaTicker *time.Ticker // keep-alive (deadline extensions)
ackTicker *time.Ticker // message acks
nackTicker *time.Ticker // message nacks (more frequent than acks)
failed chan struct{} // closed on stream error
stopped chan struct{} // closed when Stop is called
drained chan struct{} // closed when stopped && no more pending messages
wg sync.WaitGroup
mu sync.Mutex
keepAliveDeadlines map[string]time.Time
pendingReq *pb.StreamingPullRequest
err error // error from stream failure
}
func newStreamingMessageIterator(ctx context.Context, sp *streamingPuller, po *pullOptions) *streamingMessageIterator {
// TODO: make kaTicker frequency more configurable. (ackDeadline - 5s) is a
// reasonable default for now, because the minimum ack period is 10s. This
// gives us 5s grace.
keepAlivePeriod := po.ackDeadline - 5*time.Second
kaTicker := time.NewTicker(keepAlivePeriod)
// Ack promptly so users don't lose work if client crashes.
ackTicker := time.NewTicker(100 * time.Millisecond)
nackTicker := time.NewTicker(100 * time.Millisecond)
it := &streamingMessageIterator{
ctx: ctx,
sp: sp,
po: po,
kaTicker: kaTicker,
ackTicker: ackTicker,
nackTicker: nackTicker,
failed: make(chan struct{}),
stopped: make(chan struct{}),
drained: make(chan struct{}),
keepAliveDeadlines: map[string]time.Time{},
pendingReq: &pb.StreamingPullRequest{},
}
it.wg.Add(1)
go it.sender()
return it
}
// Subscription.receive will call stop on its messageIterator when finished with it.
// Stop will block until Done has been called on all Messages that have been
// returned by Next, or until the context with which the messageIterator was created
// is cancelled or exceeds its deadline.
func (it *streamingMessageIterator) stop() {
it.mu.Lock()
select {
case <-it.stopped:
default:
close(it.stopped)
}
it.checkDrained()
it.mu.Unlock()
it.wg.Wait()
}
// checkDrained closes the drained channel if the iterator has been stopped and all
// pending messages have either been n/acked or expired.
//
// Called with the lock held.
func (it *streamingMessageIterator) checkDrained() {
select {
case <-it.drained:
return
default:
}
select {
case <-it.stopped:
if len(it.keepAliveDeadlines) == 0 {
close(it.drained)
}
default:
}
}
// Called when a message is acked/nacked.
func (it *streamingMessageIterator) done(ackID string, ack bool) {
it.mu.Lock()
defer it.mu.Unlock()
delete(it.keepAliveDeadlines, ackID)
if ack {
it.pendingReq.AckIds = append(it.pendingReq.AckIds, ackID)
} else {
it.addDeadlineMod(ackID, 0) // Nack indicated by modifying the deadline to zero.
}
it.checkDrained()
}
// addDeadlineMod adds the ack ID to the pending request with the given deadline.
//
// Called with the lock held.
func (it *streamingMessageIterator) addDeadlineMod(ackID string, deadlineSecs int32) {
pr := it.pendingReq
pr.ModifyDeadlineAckIds = append(pr.ModifyDeadlineAckIds, ackID)
pr.ModifyDeadlineSeconds = append(pr.ModifyDeadlineSeconds, deadlineSecs)
}
// fail is called when a stream method returns a permanent error.
func (it *streamingMessageIterator) fail(err error) {
it.mu.Lock()
if it.err == nil {
it.err = err
close(it.failed)
}
it.mu.Unlock()
}
// receive makes a call to the stream's Recv method and returns
// its messages.
func (it *streamingMessageIterator) receive() ([]*Message, error) {
// Stop retrieving messages if the context is done, the stream
// failed, or the iterator's Stop method was called.
select {
case <-it.ctx.Done():
return nil, it.ctx.Err()
default:
}
it.mu.Lock()
err := it.err
it.mu.Unlock()
if err != nil {
return nil, err
}
// Receive messages from stream. This may block indefinitely.
msgs, err := it.sp.fetchMessages()
// The streamingPuller handles retries, so any error here
// is fatal.
if err != nil {
it.fail(err)
return nil, err
}
// We received some messages. Remember them so we can
// keep them alive.
deadline := time.Now().Add(it.po.maxExtension)
it.mu.Lock()
for _, m := range msgs {
m.doneFunc = it.done
it.keepAliveDeadlines[m.ackID] = deadline
}
it.mu.Unlock()
return msgs, nil
}
// sender runs in a goroutine and handles all sends to the stream.
func (it *streamingMessageIterator) sender() {
defer it.wg.Done()
defer it.kaTicker.Stop()
defer it.ackTicker.Stop()
defer it.nackTicker.Stop()
defer it.sp.closeSend()
done := false
for !done {
send := false
select {
case <-it.ctx.Done():
// Context canceled or timed out: stop immediately, without
// another RPC.
return
case <-it.failed:
// Stream failed: nothing to do, so stop immediately.
return
case <-it.drained:
// All outstanding messages have been marked done:
// nothing left to do except send the final request.
it.mu.Lock()
send = (len(it.pendingReq.AckIds) > 0 || len(it.pendingReq.ModifyDeadlineAckIds) > 0)
done = true
case <-it.kaTicker.C:
it.mu.Lock()
send = it.handleKeepAlives()
case <-it.nackTicker.C:
it.mu.Lock()
send = (len(it.pendingReq.ModifyDeadlineAckIds) > 0)
case <-it.ackTicker.C:
it.mu.Lock()
send = (len(it.pendingReq.AckIds) > 0)
}
// Lock is held here.
if send {
req := it.pendingReq
it.pendingReq = &pb.StreamingPullRequest{}
it.mu.Unlock()
err := it.sp.send(req)
if err != nil {
// The streamingPuller handles retries, so any error here
// is fatal to the iterator.
it.fail(err)
return
}
} else {
it.mu.Unlock()
}
}
}
// handleKeepAlives modifies the pending request to include deadline extensions
// for live messages. It also purges expired messages. It reports whether
// there were any live messages.
//
// Called with the lock held.
func (it *streamingMessageIterator) handleKeepAlives() bool {
live, expired := getKeepAliveAckIDs(it.keepAliveDeadlines)
for _, e := range expired {
delete(it.keepAliveDeadlines, e)
}
dl := trunc32(int64(it.po.ackDeadline.Seconds()))
for _, m := range live {
it.addDeadlineMod(m, dl)
}
it.checkDrained()
return len(live) > 0
}
func getKeepAliveAckIDs(items map[string]time.Time) (live, expired []string) {
now := time.Now()
for id, expiry := range items {
if expiry.Before(now) {
expired = append(expired, id)
} else {
live = append(live, id)
}
}
return live, expired
}