| // Copyright 2021 Google LLC |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // https://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package managedwriter |
| |
| import ( |
| "errors" |
| "io" |
| "math/rand" |
| "strings" |
| "sync" |
| "time" |
| |
| "github.com/googleapis/gax-go/v2" |
| "google.golang.org/grpc/codes" |
| "google.golang.org/grpc/status" |
| ) |
| |
| var ( |
| defaultRetryAttempts = 4 |
| ) |
| |
| // This retry predicate is used for higher level retries, enqueing appends onto to a bidi |
| // channel and evaluating whether an append should be retried (re-enqueued). |
| func retryPredicate(err error) (shouldRetry, aggressiveBackoff bool) { |
| if err == nil { |
| return |
| } |
| |
| s, ok := status.FromError(err) |
| // non-status based error conditions. |
| if !ok { |
| // EOF can happen in the case of connection close. |
| if errors.Is(err, io.EOF) { |
| shouldRetry = true |
| return |
| } |
| // All other non-status errors are treated as non-retryable (including context errors). |
| return |
| } |
| switch s.Code() { |
| case codes.Aborted, |
| codes.Canceled, |
| codes.DeadlineExceeded, |
| codes.FailedPrecondition, |
| codes.Internal, |
| codes.Unavailable: |
| shouldRetry = true |
| return |
| case codes.ResourceExhausted: |
| if strings.HasPrefix(s.Message(), "Exceeds 'AppendRows throughput' quota") { |
| // Note: internal b/246031522 opened to give this a structured error |
| // and avoid string parsing. Should be a QuotaFailure or similar. |
| shouldRetry = true |
| return |
| } |
| } |
| return |
| } |
| |
| // unaryRetryer is for retrying a unary-style operation, like (re)-opening the bidi connection. |
| type unaryRetryer struct { |
| bo gax.Backoff |
| } |
| |
| func (ur *unaryRetryer) Retry(err error) (time.Duration, bool) { |
| shouldRetry, _ := retryPredicate(err) |
| return ur.bo.Pause(), shouldRetry |
| } |
| |
| // statelessRetryer is used for backing off within a continuous process, like processing the responses |
| // from the receive side of the bidi stream. An individual item in that process has a notion of an attempt |
| // count, and we use maximum retries as a way of evicting bad items. |
| type statelessRetryer struct { |
| mu sync.Mutex // guards r |
| r *rand.Rand |
| |
| minBackoff time.Duration |
| jitter time.Duration |
| aggressiveFactor int |
| maxAttempts int |
| } |
| |
| func newStatelessRetryer() *statelessRetryer { |
| return &statelessRetryer{ |
| r: rand.New(rand.NewSource(time.Now().UnixNano())), |
| minBackoff: 50 * time.Millisecond, |
| jitter: time.Second, |
| maxAttempts: defaultRetryAttempts, |
| } |
| } |
| |
| func (sr *statelessRetryer) pause(aggressiveBackoff bool) time.Duration { |
| jitter := sr.jitter.Nanoseconds() |
| if jitter > 0 { |
| sr.mu.Lock() |
| jitter = sr.r.Int63n(jitter) |
| sr.mu.Unlock() |
| } |
| pause := sr.minBackoff.Nanoseconds() + jitter |
| if aggressiveBackoff { |
| pause = pause * int64(sr.aggressiveFactor) |
| } |
| return time.Duration(pause) |
| } |
| |
| func (sr *statelessRetryer) Retry(err error, attemptCount int) (time.Duration, bool) { |
| if attemptCount >= sr.maxAttempts { |
| return 0, false |
| } |
| shouldRetry, aggressive := retryPredicate(err) |
| if shouldRetry { |
| return sr.pause(aggressive), true |
| } |
| return 0, false |
| } |
| |
| // shouldReconnect is akin to a retry predicate, in that it evaluates whether we should force |
| // our bidi stream to close/reopen based on the responses error. Errors here signal that no |
| // further appends will succeed. |
| func shouldReconnect(err error) bool { |
| |
| // io.EOF is the typical not connected signal. |
| if errors.Is(err, io.EOF) { |
| return true |
| } |
| // Backend responses that trigger reconnection on send. |
| reconnectCodes := []codes.Code{ |
| codes.Aborted, |
| codes.Canceled, |
| codes.Unavailable, |
| codes.DeadlineExceeded, |
| } |
| if s, ok := status.FromError(err); ok { |
| for _, c := range reconnectCodes { |
| if s.Code() == c { |
| return true |
| } |
| } |
| } |
| return false |
| } |