// Copyright 2018 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//      http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package firestore

import (
	"context"
	"sort"
	"testing"
	"time"

	"cloud.google.com/go/internal/btree"
	"github.com/golang/protobuf/proto"
	gax "github.com/googleapis/gax-go/v2"
	pb "google.golang.org/genproto/googleapis/firestore/v1"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"
)

func TestWatchRecv(t *testing.T) {
	ctx := context.Background()
	c, srv, cleanup := newMock(t)
	defer cleanup()

	db := defaultBackoff
	defaultBackoff = gax.Backoff{Initial: 1, Max: 1, Multiplier: 1}
	defer func() { defaultBackoff = db }()

	ws := newWatchStream(ctx, c, nil, &pb.Target{})
	request := &pb.ListenRequest{
		Database:     "projects/projectID/databases/(default)",
		TargetChange: &pb.ListenRequest_AddTarget{&pb.Target{}},
	}
	response := &pb.ListenResponse{ResponseType: &pb.ListenResponse_DocumentChange{&pb.DocumentChange{}}}
	// Stream should retry on non-permanent errors, returning only the responses.
	srv.addRPC(request, []interface{}{response, status.Error(codes.Unknown, "")})
	srv.addRPC(request, []interface{}{response}) // stream will return io.EOF
	srv.addRPC(request, []interface{}{response, status.Error(codes.DeadlineExceeded, "")})
	srv.addRPC(request, []interface{}{status.Error(codes.ResourceExhausted, "")})
	srv.addRPC(request, []interface{}{status.Error(codes.Internal, "")})
	srv.addRPC(request, []interface{}{status.Error(codes.Unavailable, "")})
	srv.addRPC(request, []interface{}{status.Error(codes.Unauthenticated, "")})
	srv.addRPC(request, []interface{}{response})
	for i := 0; i < 4; i++ {
		res, err := ws.recv()
		if err != nil {
			t.Fatal(err)
		}
		if !proto.Equal(res, response) {
			t.Fatalf("got %v, want %v", res, response)
		}
	}

	// Stream should not retry on a permanent error.
	srv.addRPC(request, []interface{}{status.Error(codes.AlreadyExists, "")})
	_, err := ws.recv()
	if got, want := status.Code(err), codes.AlreadyExists; got != want {
		t.Fatalf("got %s, want %s", got, want)
	}
}

func TestComputeSnapshot(t *testing.T) {
	c := &Client{
		projectID:  "projID",
		databaseID: "(database)",
	}
	ws := newWatchStream(context.Background(), c, nil, &pb.Target{})
	tm := time.Now()
	i := 0
	doc := func(path, value string) *DocumentSnapshot {
		i++
		return &DocumentSnapshot{
			Ref:        c.Doc(path),
			proto:      &pb.Document{Fields: map[string]*pb.Value{"foo": strval(value)}},
			UpdateTime: tm.Add(time.Duration(i) * time.Second), // need unique time for updates
		}
	}
	val := func(d *DocumentSnapshot) string { return d.proto.Fields["foo"].GetStringValue() }
	less := func(a, b *DocumentSnapshot) bool { return val(a) < val(b) }

	type dmap map[string]*DocumentSnapshot

	ds1 := doc("C/d1", "a")
	ds2 := doc("C/d2", "b")
	ds2c := doc("C/d2", "c")
	docTree := btree.New(4, func(a, b interface{}) bool { return less(a.(*DocumentSnapshot), b.(*DocumentSnapshot)) })
	var gotChanges []DocumentChange
	docMap := dmap{}
	// The following test cases are not independent; each builds on the output of the previous.
	for _, test := range []struct {
		desc        string
		changeMap   dmap
		wantDocs    []*DocumentSnapshot
		wantChanges []DocumentChange
	}{
		{
			"no changes",
			nil,
			nil,
			nil,
		},
		{
			"add a doc",
			dmap{ds1.Ref.Path: ds1},
			[]*DocumentSnapshot{ds1},
			[]DocumentChange{{Kind: DocumentAdded, Doc: ds1, OldIndex: -1, NewIndex: 0}},
		},
		{
			"add, remove",
			dmap{ds1.Ref.Path: nil, ds2.Ref.Path: ds2},
			[]*DocumentSnapshot{ds2},
			[]DocumentChange{
				{Kind: DocumentRemoved, Doc: ds1, OldIndex: 0, NewIndex: -1},
				{Kind: DocumentAdded, Doc: ds2, OldIndex: -1, NewIndex: 0},
			},
		},
		{
			"add back, modify",
			dmap{ds1.Ref.Path: ds1, ds2c.Ref.Path: ds2c},
			[]*DocumentSnapshot{ds1, ds2c},
			[]DocumentChange{
				{Kind: DocumentAdded, Doc: ds1, OldIndex: -1, NewIndex: 0},
				{Kind: DocumentModified, Doc: ds2c, OldIndex: 1, NewIndex: 1},
			},
		},
	} {
		docTree, gotChanges = ws.computeSnapshot(docTree, docMap, test.changeMap, time.Time{})
		gotDocs := treeDocs(docTree)
		if diff := testDiff(gotDocs, test.wantDocs); diff != "" {
			t.Fatalf("%s: %s", test.desc, diff)
		}
		mgot := mapDocs(docMap, less)
		if diff := testDiff(gotDocs, mgot); diff != "" {
			t.Fatalf("%s: docTree and docMap disagree: %s", test.desc, diff)
		}
		if diff := testDiff(gotChanges, test.wantChanges); diff != "" {
			t.Fatalf("%s: %s", test.desc, diff)
		}
	}

	// Verify that if there are no changes, the returned docTree is identical to the first arg.
	// docTree already has ds2c.
	got, _ := ws.computeSnapshot(docTree, docMap, dmap{ds2c.Ref.Path: ds2c}, time.Time{})
	if got != docTree {
		t.Error("returned docTree != arg docTree")
	}
}

func treeDocs(bt *btree.BTree) []*DocumentSnapshot {
	var ds []*DocumentSnapshot
	it := bt.BeforeIndex(0)
	for it.Next() {
		ds = append(ds, it.Key.(*DocumentSnapshot))
	}
	return ds
}

func mapDocs(m map[string]*DocumentSnapshot, less func(a, b *DocumentSnapshot) bool) []*DocumentSnapshot {
	var ds []*DocumentSnapshot
	for _, d := range m {
		ds = append(ds, d)
	}
	sort.Sort(byLess{ds, less})
	return ds
}

func TestWatchCancel(t *testing.T) {
	// Canceling the context of a watch should result in a codes.Canceled error from the next
	// call to the iterator's Next method.
	ctx := context.Background()
	c, srv, cleanup := newMock(t)
	defer cleanup()

	q := Query{c: c, collectionID: "x"}

	// Cancel before open.
	ctx2, cancel := context.WithCancel(ctx)
	ws, err := newWatchStreamForQuery(ctx2, q)
	if err != nil {
		t.Fatal(err)
	}
	cancel()
	_, _, _, err = ws.nextSnapshot()
	codeEq(t, "cancel before open", codes.Canceled, err)

	request := &pb.ListenRequest{
		Database:     "projects/projectID/databases/(default)",
		TargetChange: &pb.ListenRequest_AddTarget{ws.target},
	}
	current := &pb.ListenResponse{ResponseType: &pb.ListenResponse_TargetChange{&pb.TargetChange{
		TargetChangeType: pb.TargetChange_CURRENT,
	}}}
	noChange := &pb.ListenResponse{ResponseType: &pb.ListenResponse_TargetChange{&pb.TargetChange{
		TargetChangeType: pb.TargetChange_NO_CHANGE,
		ReadTime:         aTimestamp,
	}}}

	// Cancel from gax.Sleep. We should still see a gRPC error with codes.Canceled, not a
	// context.Canceled error.
	ctx2, cancel = context.WithCancel(ctx)
	ws, err = newWatchStreamForQuery(ctx2, q)
	if err != nil {
		t.Fatal(err)
	}
	srv.addRPC(request, []interface{}{current, noChange})
	_, _, _, _ = ws.nextSnapshot()
	cancel()
	// Because of how the mock works, the following results in an EOF on the stream, which
	// is a non-permanent error that causes a retry. That retry ends up in gax.Sleep, which
	// finds that the context is done and returns ctx.Err(), which is context.Canceled.
	// Verify that we transform that context.Canceled into a gRPC Status with code Canceled.
	_, _, _, err = ws.nextSnapshot()
	codeEq(t, "cancel from gax.Sleep", codes.Canceled, err)

	// TODO(jba): Test that we get codes.Canceled when canceling an RPC.
	// We had a test for this in a21236af, but it was flaky for unclear reasons.
}
