blob: 5ed9f82b7b9b3ff262b80170b2a75f94eb459a37 [file] [log] [blame]
// Copyright 2018 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package firestore
import (
"context"
"sort"
"testing"
"time"
"cloud.google.com/go/internal/btree"
"github.com/golang/protobuf/proto"
gax "github.com/googleapis/gax-go/v2"
pb "google.golang.org/genproto/googleapis/firestore/v1"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func TestWatchRecv(t *testing.T) {
ctx := context.Background()
c, srv, cleanup := newMock(t)
defer cleanup()
db := defaultBackoff
defaultBackoff = gax.Backoff{Initial: 1, Max: 1, Multiplier: 1}
defer func() { defaultBackoff = db }()
ws := newWatchStream(ctx, c, nil, &pb.Target{})
request := &pb.ListenRequest{
Database: "projects/projectID/databases/(default)",
TargetChange: &pb.ListenRequest_AddTarget{&pb.Target{}},
}
response := &pb.ListenResponse{ResponseType: &pb.ListenResponse_DocumentChange{&pb.DocumentChange{}}}
// Stream should retry on non-permanent errors, returning only the responses.
srv.addRPC(request, []interface{}{response, status.Error(codes.Unknown, "")})
srv.addRPC(request, []interface{}{response}) // stream will return io.EOF
srv.addRPC(request, []interface{}{response, status.Error(codes.DeadlineExceeded, "")})
srv.addRPC(request, []interface{}{status.Error(codes.ResourceExhausted, "")})
srv.addRPC(request, []interface{}{status.Error(codes.Internal, "")})
srv.addRPC(request, []interface{}{status.Error(codes.Unavailable, "")})
srv.addRPC(request, []interface{}{status.Error(codes.Unauthenticated, "")})
srv.addRPC(request, []interface{}{response})
for i := 0; i < 4; i++ {
res, err := ws.recv()
if err != nil {
t.Fatal(err)
}
if !proto.Equal(res, response) {
t.Fatalf("got %v, want %v", res, response)
}
}
// Stream should not retry on a permanent error.
srv.addRPC(request, []interface{}{status.Error(codes.AlreadyExists, "")})
_, err := ws.recv()
if got, want := status.Code(err), codes.AlreadyExists; got != want {
t.Fatalf("got %s, want %s", got, want)
}
}
func TestComputeSnapshot(t *testing.T) {
c := &Client{
projectID: "projID",
databaseID: "(database)",
}
ws := newWatchStream(context.Background(), c, nil, &pb.Target{})
tm := time.Now()
i := 0
doc := func(path, value string) *DocumentSnapshot {
i++
return &DocumentSnapshot{
Ref: c.Doc(path),
proto: &pb.Document{Fields: map[string]*pb.Value{"foo": strval(value)}},
UpdateTime: tm.Add(time.Duration(i) * time.Second), // need unique time for updates
}
}
val := func(d *DocumentSnapshot) string { return d.proto.Fields["foo"].GetStringValue() }
less := func(a, b *DocumentSnapshot) bool { return val(a) < val(b) }
type dmap map[string]*DocumentSnapshot
ds1 := doc("C/d1", "a")
ds2 := doc("C/d2", "b")
ds2c := doc("C/d2", "c")
docTree := btree.New(4, func(a, b interface{}) bool { return less(a.(*DocumentSnapshot), b.(*DocumentSnapshot)) })
var gotChanges []DocumentChange
docMap := dmap{}
// The following test cases are not independent; each builds on the output of the previous.
for _, test := range []struct {
desc string
changeMap dmap
wantDocs []*DocumentSnapshot
wantChanges []DocumentChange
}{
{
"no changes",
nil,
nil,
nil,
},
{
"add a doc",
dmap{ds1.Ref.Path: ds1},
[]*DocumentSnapshot{ds1},
[]DocumentChange{{Kind: DocumentAdded, Doc: ds1, OldIndex: -1, NewIndex: 0}},
},
{
"add, remove",
dmap{ds1.Ref.Path: nil, ds2.Ref.Path: ds2},
[]*DocumentSnapshot{ds2},
[]DocumentChange{
{Kind: DocumentRemoved, Doc: ds1, OldIndex: 0, NewIndex: -1},
{Kind: DocumentAdded, Doc: ds2, OldIndex: -1, NewIndex: 0},
},
},
{
"add back, modify",
dmap{ds1.Ref.Path: ds1, ds2c.Ref.Path: ds2c},
[]*DocumentSnapshot{ds1, ds2c},
[]DocumentChange{
{Kind: DocumentAdded, Doc: ds1, OldIndex: -1, NewIndex: 0},
{Kind: DocumentModified, Doc: ds2c, OldIndex: 1, NewIndex: 1},
},
},
} {
docTree, gotChanges = ws.computeSnapshot(docTree, docMap, test.changeMap, time.Time{})
gotDocs := treeDocs(docTree)
if diff := testDiff(gotDocs, test.wantDocs); diff != "" {
t.Fatalf("%s: %s", test.desc, diff)
}
mgot := mapDocs(docMap, less)
if diff := testDiff(gotDocs, mgot); diff != "" {
t.Fatalf("%s: docTree and docMap disagree: %s", test.desc, diff)
}
if diff := testDiff(gotChanges, test.wantChanges); diff != "" {
t.Fatalf("%s: %s", test.desc, diff)
}
}
// Verify that if there are no changes, the returned docTree is identical to the first arg.
// docTree already has ds2c.
got, _ := ws.computeSnapshot(docTree, docMap, dmap{ds2c.Ref.Path: ds2c}, time.Time{})
if got != docTree {
t.Error("returned docTree != arg docTree")
}
}
func treeDocs(bt *btree.BTree) []*DocumentSnapshot {
var ds []*DocumentSnapshot
it := bt.BeforeIndex(0)
for it.Next() {
ds = append(ds, it.Key.(*DocumentSnapshot))
}
return ds
}
func mapDocs(m map[string]*DocumentSnapshot, less func(a, b *DocumentSnapshot) bool) []*DocumentSnapshot {
var ds []*DocumentSnapshot
for _, d := range m {
ds = append(ds, d)
}
sort.Sort(byLess{ds, less})
return ds
}
func TestWatchCancel(t *testing.T) {
// Canceling the context of a watch should result in a codes.Canceled error from the next
// call to the iterator's Next method.
ctx := context.Background()
c, srv, cleanup := newMock(t)
defer cleanup()
q := Query{c: c, collectionID: "x"}
// Cancel before open.
ctx2, cancel := context.WithCancel(ctx)
ws, err := newWatchStreamForQuery(ctx2, q)
if err != nil {
t.Fatal(err)
}
cancel()
_, _, _, err = ws.nextSnapshot()
codeEq(t, "cancel before open", codes.Canceled, err)
request := &pb.ListenRequest{
Database: "projects/projectID/databases/(default)",
TargetChange: &pb.ListenRequest_AddTarget{ws.target},
}
current := &pb.ListenResponse{ResponseType: &pb.ListenResponse_TargetChange{&pb.TargetChange{
TargetChangeType: pb.TargetChange_CURRENT,
}}}
noChange := &pb.ListenResponse{ResponseType: &pb.ListenResponse_TargetChange{&pb.TargetChange{
TargetChangeType: pb.TargetChange_NO_CHANGE,
ReadTime: aTimestamp,
}}}
// Cancel from gax.Sleep. We should still see a gRPC error with codes.Canceled, not a
// context.Canceled error.
ctx2, cancel = context.WithCancel(ctx)
ws, err = newWatchStreamForQuery(ctx2, q)
if err != nil {
t.Fatal(err)
}
srv.addRPC(request, []interface{}{current, noChange})
_, _, _, _ = ws.nextSnapshot()
cancel()
// Because of how the mock works, the following results in an EOF on the stream, which
// is a non-permanent error that causes a retry. That retry ends up in gax.Sleep, which
// finds that the context is done and returns ctx.Err(), which is context.Canceled.
// Verify that we transform that context.Canceled into a gRPC Status with code Canceled.
_, _, _, err = ws.nextSnapshot()
codeEq(t, "cancel from gax.Sleep", codes.Canceled, err)
// TODO(jba): Test that we get codes.Canceled when canceling an RPC.
// We had a test for this in a21236af, but it was flaky for unclear reasons.
}