| // Copyright 2018 Google Inc. All Rights Reserved. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package firestore |
| |
| import ( |
| "io" |
| "time" |
| |
| gax "github.com/googleapis/gax-go" |
| "golang.org/x/net/context" |
| pb "google.golang.org/genproto/googleapis/firestore/v1beta1" |
| "google.golang.org/grpc/codes" |
| "google.golang.org/grpc/status" |
| ) |
| |
| // Implementation of realtime updates (a.k.a. watch). |
| // This code is closely based on the Node.js implementation, |
| // https://github.com/googleapis/nodejs-firestore/blob/master/src/watch.js. |
| |
| var defaultBackoff = gax.Backoff{ |
| // Values from https://github.com/googleapis/nodejs-firestore/blob/master/src/backoff.js. |
| Initial: 1 * time.Second, |
| Max: 60 * time.Second, |
| Multiplier: 1.5, |
| } |
| |
| type watchStream struct { |
| ctx context.Context |
| c *Client |
| target *pb.Target // document or query being watched |
| lc pb.Firestore_ListenClient |
| backoff gax.Backoff |
| } |
| |
| func newWatchStream(ctx context.Context, c *Client, target *pb.Target) *watchStream { |
| return &watchStream{ |
| ctx: ctx, |
| c: c, |
| target: target, |
| backoff: defaultBackoff, |
| } |
| } |
| |
| // recv receives the next message from the stream. It also handles opening the stream |
| // initially, and reopening it on non-permanent errors. |
| // recv doesn't have to be goroutine-safe. |
| func (s *watchStream) recv() (*pb.ListenResponse, error) { |
| var err error |
| for { |
| if s.lc == nil { |
| s.lc, err = s.open() |
| if err != nil { |
| // Do not retry if open fails. |
| return nil, err |
| } |
| } |
| res, err := s.lc.Recv() |
| if err == nil || isPermanentWatchError(err) { |
| return res, err |
| } |
| // Non-permanent error. Sleep and retry. |
| // TODO: from node: |
| // request.addTarget.resumeToken = resumeToken; |
| // changeMap.clear(); |
| dur := s.backoff.Pause() |
| // If we're out of quota, wait a long time before retrying. |
| if status.Code(err) == codes.ResourceExhausted { |
| dur = s.backoff.Max |
| } |
| if err := gax.Sleep(s.ctx, dur); err != nil { |
| return nil, err |
| } |
| s.lc = nil |
| } |
| } |
| |
| func (s *watchStream) open() (pb.Firestore_ListenClient, error) { |
| lc, err := s.c.c.Listen(s.ctx) |
| if err == nil { |
| err = lc.Send(&pb.ListenRequest{ |
| Database: s.c.path(), |
| TargetChange: &pb.ListenRequest_AddTarget{AddTarget: s.target}, |
| }) |
| } |
| if err != nil { |
| return nil, err |
| } |
| return lc, nil |
| } |
| |
| func isPermanentWatchError(err error) bool { |
| if err == io.EOF { |
| // Retry on normal end-of-stream. |
| return false |
| } |
| switch status.Code(err) { |
| case codes.Canceled, codes.Unknown, codes.DeadlineExceeded, codes.ResourceExhausted, |
| codes.Internal, codes.Unavailable, codes.Unauthenticated: |
| return false |
| default: |
| return true |
| } |
| } |