blob: 7d70b922e487d6e53790f7f3c892f71afe55ca6b [file] [log] [blame]
// Copyright 2019 Google LLC
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package scheduler
import (
// PublishScheduler is a scheduler which is designed for Pub/Sub's Publish flow.
// It bundles items before handling them. All items in this PublishScheduler use
// the same handler.
// Each item is added with a given key. Items added to the empty string key are
// handled in random order. Items added to any other key are handled
// sequentially.
type PublishScheduler struct {
// Settings passed down to each bundler that gets created.
DelayThreshold time.Duration
BundleCountThreshold int
BundleByteThreshold int
BundleByteLimit int
BufferedByteLimit int
mu sync.Mutex
bundlers map[string]*bundler.Bundler
outstanding map[string]int
keysMu sync.RWMutex
// keysWithErrors tracks ordering keys that cannot accept new messages.
// A bundler might not accept new messages if publishing has failed
// for a specific ordering key, and can be resumed with topic.ResumePublish().
keysWithErrors map[string]struct{}
// workers is a channel that represents workers. Rather than a pool, where
// worker are "removed" until the pool is empty, the channel is more like a
// set of work desks, where workers are "added" until all the desks are full.
// workers does not restrict the amount of goroutines in the bundlers.
// Rather, it acts as the flow control for completion of bundler work.
workers chan struct{}
handle func(bundle interface{})
done chan struct{}
// NewPublishScheduler returns a new PublishScheduler.
// The workers arg is the number of workers that will operate on the queue of
// work. A reasonably large number of workers is highly recommended. If the
// workers arg is 0, then a healthy default of 10 workers is used.
// The scheduler does not use a parent context. If it did, canceling that
// context would immediately stop the scheduler without waiting for
// undelivered messages.
// The scheduler should be stopped only with FlushAndStop.
func NewPublishScheduler(workers int, handle func(bundle interface{})) *PublishScheduler {
if workers == 0 {
workers = 10
s := PublishScheduler{
bundlers: make(map[string]*bundler.Bundler),
outstanding: make(map[string]int),
keysWithErrors: make(map[string]struct{}),
workers: make(chan struct{}, workers),
handle: handle,
done: make(chan struct{}),
return &s
// Add adds an item to the scheduler at a given key.
// Add never blocks. Buffering happens in the scheduler's publishers. There is
// no flow control.
// Since ordered keys require only a single outstanding RPC at once, it is
// possible to send ordered key messages to Topic.Publish (and subsequently to
// PublishScheduler.Add) faster than the bundler can publish them to the
// Pub/Sub service, resulting in a backed up queue of Pub/Sub bundles. Each
// item in the bundler queue is a goroutine.
func (s *PublishScheduler) Add(key string, item interface{}, size int) error {
select {
case <-s.done:
return errors.New("draining")
b, ok := s.bundlers[key]
if !ok {
s.outstanding[key] = 1
b = bundler.NewBundler(item, func(bundle interface{}) {
s.workers <- struct{}{}
nlen := reflect.ValueOf(bundle).Len()
s.outstanding[key] -= nlen
if s.outstanding[key] == 0 {
delete(s.outstanding, key)
delete(s.bundlers, key)
b.DelayThreshold = s.DelayThreshold
b.BundleCountThreshold = s.BundleCountThreshold
b.BundleByteThreshold = s.BundleByteThreshold
b.BundleByteLimit = s.BundleByteLimit
b.BufferedByteLimit = s.BufferedByteLimit
if b.BufferedByteLimit == 0 {
b.BufferedByteLimit = 1e9
if key == "" {
// There's no way to express "unlimited" in the bundler, so we use
// some high number.
b.HandlerLimit = 1e9
} else {
// HandlerLimit=1 causes the bundler to act as a sequential queue.
b.HandlerLimit = 1
s.bundlers[key] = b
return b.Add(item, size)
// FlushAndStop begins flushing items from bundlers and from the scheduler. It
// blocks until all items have been flushed.
func (s *PublishScheduler) FlushAndStop() {
for _, b := range s.bundlers {
// IsPaused checks if the bundler associated with an ordering keys is
// paused.
func (s *PublishScheduler) IsPaused(orderingKey string) bool {
defer s.keysMu.RUnlock()
_, ok := s.keysWithErrors[orderingKey]
return ok
// Pause pauses the bundler associated with the provided ordering key,
// preventing it from accepting new messages. Any outstanding messages
// that haven't been published will error. If orderingKey is empty,
// this is a no-op.
func (s *PublishScheduler) Pause(orderingKey string) {
if orderingKey != "" {
defer s.keysMu.Unlock()
s.keysWithErrors[orderingKey] = struct{}{}
// Resume resumes accepting message with the provided ordering key.
func (s *PublishScheduler) Resume(orderingKey string) {
defer s.keysMu.Unlock()
delete(s.keysWithErrors, orderingKey)