blob: 60c5c347d1e25f74f2b7a6d779a749791ac9900b [file] [edit]
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package managedwriter
import (
"errors"
"io"
"math/rand"
"strings"
"sync"
"time"
"github.com/googleapis/gax-go/v2"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
var (
defaultRetryAttempts = 4
)
// This retry predicate is used for higher level retries, enqueing appends onto to a bidi
// channel and evaluating whether an append should be retried (re-enqueued).
func retryPredicate(err error) (shouldRetry, aggressiveBackoff bool) {
if err == nil {
return
}
s, ok := status.FromError(err)
// non-status based error conditions.
if !ok {
// EOF can happen in the case of connection close.
if errors.Is(err, io.EOF) {
shouldRetry = true
return
}
// All other non-status errors are treated as non-retryable (including context errors).
return
}
switch s.Code() {
case codes.Aborted,
codes.Canceled,
codes.DeadlineExceeded,
codes.FailedPrecondition,
codes.Internal,
codes.Unavailable:
shouldRetry = true
return
case codes.ResourceExhausted:
if strings.HasPrefix(s.Message(), "Exceeds 'AppendRows throughput' quota") {
// Note: internal b/246031522 opened to give this a structured error
// and avoid string parsing. Should be a QuotaFailure or similar.
shouldRetry = true
return
}
}
return
}
// unaryRetryer is for retrying a unary-style operation, like (re)-opening the bidi connection.
type unaryRetryer struct {
bo gax.Backoff
}
func (ur *unaryRetryer) Retry(err error) (time.Duration, bool) {
shouldRetry, _ := retryPredicate(err)
return ur.bo.Pause(), shouldRetry
}
// statelessRetryer is used for backing off within a continuous process, like processing the responses
// from the receive side of the bidi stream. An individual item in that process has a notion of an attempt
// count, and we use maximum retries as a way of evicting bad items.
type statelessRetryer struct {
mu sync.Mutex // guards r
r *rand.Rand
minBackoff time.Duration
jitter time.Duration
aggressiveFactor int
maxAttempts int
}
func newStatelessRetryer() *statelessRetryer {
return &statelessRetryer{
r: rand.New(rand.NewSource(time.Now().UnixNano())),
minBackoff: 50 * time.Millisecond,
jitter: time.Second,
maxAttempts: defaultRetryAttempts,
}
}
func (sr *statelessRetryer) pause(aggressiveBackoff bool) time.Duration {
jitter := sr.jitter.Nanoseconds()
if jitter > 0 {
sr.mu.Lock()
jitter = sr.r.Int63n(jitter)
sr.mu.Unlock()
}
pause := sr.minBackoff.Nanoseconds() + jitter
if aggressiveBackoff {
pause = pause * int64(sr.aggressiveFactor)
}
return time.Duration(pause)
}
func (sr *statelessRetryer) Retry(err error, attemptCount int) (time.Duration, bool) {
if attemptCount >= sr.maxAttempts {
return 0, false
}
shouldRetry, aggressive := retryPredicate(err)
if shouldRetry {
return sr.pause(aggressive), true
}
return 0, false
}
// shouldReconnect is akin to a retry predicate, in that it evaluates whether we should force
// our bidi stream to close/reopen based on the responses error. Errors here signal that no
// further appends will succeed.
func shouldReconnect(err error) bool {
// io.EOF is the typical not connected signal.
if errors.Is(err, io.EOF) {
return true
}
// Backend responses that trigger reconnection on send.
reconnectCodes := []codes.Code{
codes.Aborted,
codes.Canceled,
codes.Unavailable,
codes.DeadlineExceeded,
}
if s, ok := status.FromError(err); ok {
for _, c := range reconnectCodes {
if s.Code() == c {
return true
}
}
}
return false
}