blob: 9980e811273379ef5607e88a4d35889915b963eb [file]
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package managedwriter
import (
"fmt"
"sort"
"sync"
"time"
)
type poolRouter interface {
// poolAttach is called once to signal a router that it is responsible for a given pool.
poolAttach(pool *connectionPool) error
// poolDetach is called as part of clean connectionPool shutdown.
// It provides an opportunity for the router to shut down internal state.
poolDetach() error
// writerAttach is a hook to notify the router that a new writer is being attached to the pool.
// It provides an opportunity for the router to allocate resources and update internal state.
writerAttach(writer *ManagedStream) error
// writerAttach signals the router that a given writer is being removed from the pool. The router
// does not have responsibility for closing the writer, but this is called as part of writer close.
writerDetach(writer *ManagedStream) error
// pickConnection is used to select a connection for a given pending write.
pickConnection(pw *pendingWrite) (*connection, error)
}
// simpleRouter is a primitive traffic router that routes all traffic to its single connection instance.
//
// This router is designed for our migration case, where an single ManagedStream writer has as 1:1 relationship
// with a connectionPool. You can multiplex with this router, but it will never scale beyond a single connection.
type simpleRouter struct {
mode connectionMode
pool *connectionPool
mu sync.RWMutex
conn *connection
writers map[string]struct{}
}
func (rtr *simpleRouter) poolAttach(pool *connectionPool) error {
if rtr.pool == nil {
rtr.pool = pool
return nil
}
return fmt.Errorf("router already attached to pool %q", rtr.pool.id)
}
func (rtr *simpleRouter) poolDetach() error {
rtr.mu.Lock()
defer rtr.mu.Unlock()
if rtr.conn != nil {
rtr.conn.close()
rtr.conn = nil
}
return nil
}
func (rtr *simpleRouter) writerAttach(writer *ManagedStream) error {
if writer.id == "" {
return fmt.Errorf("writer has no ID")
}
rtr.mu.Lock()
defer rtr.mu.Unlock()
rtr.writers[writer.id] = struct{}{}
if rtr.conn == nil {
rtr.conn = newConnection(rtr.pool, rtr.mode, nil)
}
return nil
}
func (rtr *simpleRouter) writerDetach(writer *ManagedStream) error {
if writer.id == "" {
return fmt.Errorf("writer has no ID")
}
rtr.mu.Lock()
defer rtr.mu.Unlock()
delete(rtr.writers, writer.id)
if len(rtr.writers) == 0 && rtr.conn != nil {
// no attached writers, cleanup and remove connection.
defer rtr.conn.close()
rtr.conn = nil
}
return nil
}
// Picking a connection is easy; there's only one.
func (rtr *simpleRouter) pickConnection(pw *pendingWrite) (*connection, error) {
rtr.mu.RLock()
defer rtr.mu.RUnlock()
if rtr.conn != nil {
return rtr.conn, nil
}
return nil, fmt.Errorf("no connection available")
}
func newSimpleRouter(mode connectionMode) *simpleRouter {
return &simpleRouter{
// We don't add a connection until writers attach.
mode: mode,
writers: make(map[string]struct{}),
}
}
// sharedRouter is a more comprehensive router for a connection pool.
//
// It maintains state for both exclusive and shared connections, but doesn't commingle the
// two. If the router is configured to allow multiplex, it also runs a watchdog goroutine
// that allows is to curate traffic there by reassigning writers to different connections.
//
// Multiplexing routing here is designed for connection sharing among more idle writers,
// and does NOT yet handle the use case where a single writer produces enough traffic to
// warrant fanout across multiple connections.
type sharedRouter struct {
pool *connectionPool
multiplex bool
maxConns int // multiplex limit.
close chan struct{} // for shutting down watchdog
// mu guards access to exclusive connections
mu sync.RWMutex
// keyed by writer ID
exclusiveConns map[string]*connection
// multiMu guards access to multiplex mappings.
multiMu sync.RWMutex
// keyed by writer ID
multiMap map[string]*connection
// keyed by connection ID
invertedMultiMap map[string][]*ManagedStream
multiConns []*connection
}
type connPair struct {
writer *ManagedStream
conn *connection
}
// attaches the router to the connection pool. The watchdog goroutine
// only curates multiplex connections, so we don't start it if the
// router isn't going to process that traffic.
func (sr *sharedRouter) poolAttach(pool *connectionPool) error {
if sr.pool == nil {
sr.pool = pool
sr.close = make(chan struct{})
if sr.multiplex {
go sr.watchdog()
}
return nil
}
return fmt.Errorf("router already attached to pool %q", sr.pool.id)
}
// poolDetach gives us an opportunity to cleanup connections during
// shutdown/close.
func (sr *sharedRouter) poolDetach() error {
sr.mu.Lock()
// cleanup explicit connections
for writerID, conn := range sr.exclusiveConns {
conn.close()
delete(sr.exclusiveConns, writerID)
}
sr.mu.Unlock()
// cleanup multiplex resources
sr.multiMu.Lock()
for _, co := range sr.multiConns {
co.close()
}
sr.multiMap = make(map[string]*connection)
sr.multiConns = nil
close(sr.close) // trigger watchdog shutdown
sr.multiMu.Unlock()
return nil
}
func (sr *sharedRouter) writerAttach(writer *ManagedStream) error {
if writer == nil {
return fmt.Errorf("invalid writer")
}
if writer.id == "" {
return fmt.Errorf("writer has empty ID")
}
if sr.multiplex && canMultiplex(writer.StreamName()) {
return sr.writerAttachMulti(writer)
}
// Handle non-multiplex writer.
sr.mu.Lock()
defer sr.mu.Unlock()
if pair := sr.exclusiveConns[writer.id]; pair != nil {
return fmt.Errorf("writer %q already attached", writer.id)
}
sr.exclusiveConns[writer.id] = newConnection(sr.pool, simplexConnectionMode, writer.streamSettings)
return nil
}
// multiAttach is the multiplex-specific logic for writerAttach.
// It should only be called from writerAttach. We use the same
// orderAndGrow as watchdog, and simply attach the new writer to
// the most idle connection.
func (sr *sharedRouter) writerAttachMulti(writer *ManagedStream) error {
sr.multiMu.Lock()
defer sr.multiMu.Unlock()
// order any existing connections
sr.orderAndGrowMultiConns()
conn := sr.multiConns[0]
sr.multiMap[writer.id] = conn
var writers []*ManagedStream
if w, ok := sr.invertedMultiMap[conn.id]; ok {
writers = append(w, writer)
} else {
// first connection
writers = []*ManagedStream{writer}
}
sr.invertedMultiMap[conn.id] = writers
return nil
}
// orderMultiConns orders the connection slice by current load, and will grow
// the connections if necessary.
//
// Should only be called with R/W lock.
func (sr *sharedRouter) orderAndGrowMultiConns() {
sort.SliceStable(sr.multiConns,
func(i, j int) bool {
return sr.multiConns[i].curLoad() < sr.multiConns[j].curLoad()
})
if len(sr.multiConns) == 0 {
sr.multiConns = []*connection{newConnection(sr.pool, multiplexConnectionMode, nil)}
} else if sr.multiConns[0].isLoaded() && len(sr.multiConns) < sr.maxConns {
sr.multiConns = append([]*connection{newConnection(sr.pool, multiplexConnectionMode, nil)}, sr.multiConns...)
}
}
var (
// Used by rebalanceWriters to avoid rebalancing if the load difference is within the threshold range.
connLoadDeltaThreshold = 1.2
watchDogInterval = 500 * time.Millisecond
)
// rebalanceWriters looks for opportunities to redistribute traffic load.
//
// This is run as part of a heartbeat, when the connections have been ordered
// by load.
//
// Should only be called with the multiplex mutex r/w lock.
func (sr *sharedRouter) rebalanceWriters() {
mostIdleIdx := 0
leastIdleIdx := len(sr.multiConns) - 1
mostIdleConn := sr.multiConns[0]
mostIdleLoad := mostIdleConn.curLoad()
if mostIdleConn.isLoaded() {
// Don't rebalance if all connections are loaded.
return
}
// only look for rebalance opportunies between different connections.
for mostIdleIdx != leastIdleIdx {
targetConn := sr.multiConns[leastIdleIdx]
if targetConn.curLoad() < mostIdleLoad*connLoadDeltaThreshold {
// the load delta isn't significant enough between most and least idle connections
// to warrant moving traffic. Done for this heartbeat.
return
}
candidates, ok := sr.invertedMultiMap[targetConn.id]
if !ok {
leastIdleIdx = leastIdleIdx - 1
continue
}
if len(candidates) == 1 {
leastIdleIdx = leastIdleIdx - 1
continue
}
// Multiple writers, relocate one.
candidate, remaining := candidates[0], candidates[1:]
// update the moved forward map
sr.multiMap[candidate.id] = mostIdleConn
// update the inverse map
sr.invertedMultiMap[targetConn.id] = remaining
idleWriters, ok := sr.invertedMultiMap[mostIdleConn.id]
if ok {
sr.invertedMultiMap[mostIdleConn.id] = append(idleWriters, candidate)
} else {
sr.invertedMultiMap[mostIdleConn.id] = []*ManagedStream{candidate}
}
return
}
}
func (sr *sharedRouter) writerDetach(writer *ManagedStream) error {
if writer == nil {
return fmt.Errorf("invalid writer")
}
if sr.multiplex && canMultiplex(writer.StreamName()) {
return sr.writerDetachMulti(writer)
}
// Handle non-multiplex writer.
sr.mu.Lock()
defer sr.mu.Unlock()
conn := sr.exclusiveConns[writer.id]
if conn == nil {
return fmt.Errorf("writer not currently attached")
}
conn.close()
delete(sr.exclusiveConns, writer.id)
return nil
}
// writerDetachMulti is the multiplex-specific logic for writerDetach.
// It should only be called from writerDetach.
func (sr *sharedRouter) writerDetachMulti(writer *ManagedStream) error {
sr.multiMu.Lock()
defer sr.multiMu.Unlock()
delete(sr.multiMap, writer.id)
// If the number of writers drops to zero, close all open connections.
if len(sr.multiMap) == 0 {
for _, co := range sr.multiConns {
co.close()
}
sr.multiConns = nil
}
return nil
}
// pickConnection either routes a write to a connection for explicit streams,
// or delegates too pickMultiplexConnection for the multiplex case.
func (sr *sharedRouter) pickConnection(pw *pendingWrite) (*connection, error) {
if pw.writer == nil {
return nil, fmt.Errorf("no writer present pending write")
}
if sr.multiplex && canMultiplex(pw.writer.StreamName()) {
return sr.pickMultiplexConnection(pw)
}
sr.mu.RLock()
defer sr.mu.RUnlock()
conn := sr.exclusiveConns[pw.writer.id]
if conn == nil {
return nil, fmt.Errorf("writer %q unknown", pw.writer.id)
}
return conn, nil
}
func (sr *sharedRouter) pickMultiplexConnection(pw *pendingWrite) (*connection, error) {
sr.multiMu.RLock()
defer sr.multiMu.RUnlock()
conn := sr.multiMap[pw.writer.id]
if conn == nil {
// TODO: update map
return nil, fmt.Errorf("no multiplex connection assigned")
}
return conn, nil
}
// watchdog is intended to run as a goroutine where multiplex features are enabled.
//
// Our goals during a heartbeat are simple:
// * ensure we have sufficient connections.
// * ensure traffic from writers is well distributed across connections.
//
// Our rebalancing strategy in this iteration is modest. We order the connections by
// current load, and then examine the busiest connection(s) looking for opportunities
// to redistribute traffic.
func (sr *sharedRouter) watchdog() {
for {
select {
case <-sr.close:
return
case <-time.After(watchDogInterval):
sr.watchdogPulse()
}
}
}
// an individual pulse of the watchdog loop.
func (sr *sharedRouter) watchdogPulse() {
sr.multiMu.Lock()
defer sr.multiMu.Unlock()
sr.orderAndGrowMultiConns()
sr.rebalanceWriters()
}
func newSharedRouter(multiplex bool, maxConns int) *sharedRouter {
return &sharedRouter{
multiplex: multiplex,
maxConns: maxConns,
exclusiveConns: make(map[string]*connection),
multiMap: make(map[string]*connection),
invertedMultiMap: make(map[string][]*ManagedStream),
}
}