// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and

package wire

import (
	"context"
	"errors"
	"sort"
	"testing"
	"time"

	"cloud.google.com/go/internal/testutil"
	"cloud.google.com/go/pubsublite/internal/test"
	"github.com/google/uuid"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"

	pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
)

func TestPartitionSet(t *testing.T) {
	partitions := newPartitionSet(&pb.PartitionAssignment{
		Partitions: []int64{8, 5, 8, 1},
	})

	wantPartitions := []int{1, 5, 8}
	for _, partition := range wantPartitions {
		if !partitions.Contains(partition) {
			t.Errorf("Contains(%d) got false, want true", partition)
		}
	}
	for _, partition := range []int{2, 3, 4, 6, 7} {
		if partitions.Contains(partition) {
			t.Errorf("Contains(%d) got true, want false", partition)
		}
	}

	gotPartitions := partitions.Ints()
	sort.Ints(gotPartitions)
	if !testutil.Equal(gotPartitions, wantPartitions) {
		t.Errorf("Ints() got %v, want %v", gotPartitions, wantPartitions)
	}
}

var fakeUUID = [16]byte{'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '0', '1', '2', '3', '4', '5'}

func fakeGenerateUUID() (uuid.UUID, error) {
	return fakeUUID, nil
}

// testAssigner wraps an assigner for ease of testing.
type testAssigner struct {
	// Fake error to simulate receiver unable to handle assignment.
	recvError  error
	t          *testing.T
	asn        *assigner
	partitions chan []int

	serviceTestProxy
}

func newTestAssigner(t *testing.T, subscription string, recvErr error) *testAssigner {
	ctx := context.Background()
	assignmentClient, err := newPartitionAssignmentClient(ctx, "ignored", testServer.ClientConn())
	if err != nil {
		t.Fatal(err)
	}

	ta := &testAssigner{
		recvError:  recvErr,
		t:          t,
		partitions: make(chan []int, 1),
	}
	asn, err := newAssigner(ctx, assignmentClient, fakeGenerateUUID, testReceiveSettings(), subscription, ta.receiveAssignment)
	if err != nil {
		t.Fatal(err)
	}
	ta.asn = asn
	ta.initAndStart(t, ta.asn, "Assigner", assignmentClient)
	return ta
}

func (ta *testAssigner) receiveAssignment(partitions partitionSet) error {
	p := partitions.Ints()
	sort.Ints(p)
	ta.partitions <- p

	if ta.recvError != nil {
		return ta.recvError
	}
	return nil
}

func (ta *testAssigner) NextPartitions() []int {
	select {
	case <-time.After(serviceTestWaitTimeout):
		ta.t.Errorf("%s partitions not received within %v", ta.name, serviceTestWaitTimeout)
		return nil
	case p := <-ta.partitions:
		return p
	}
}

func TestAssignerNoInitialResponse(t *testing.T) {
	subscription := "projects/123456/locations/us-central1-b/subscriptions/my-subs"

	verifiers := test.NewVerifiers(t)
	stream := test.NewRPCVerifier(t)
	barrier := stream.PushWithBarrier(initAssignmentReq(subscription, fakeUUID[:]), nil, nil)
	verifiers.AddAssignmentStream(subscription, stream)

	mockServer.OnTestStart(verifiers)
	defer mockServer.OnTestEnd()

	asn := newTestAssigner(t, subscription, nil)

	// Assigner starts even though no initial response was received from the
	// server.
	if gotErr := asn.StartError(); gotErr != nil {
		t.Errorf("Start() got err: (%v)", gotErr)
	}
	// To ensure test is deterministic, i.e. server must receive initial request
	// before stopping the client.
	barrier.Release()
	asn.StopVerifyNoError()
}

func TestAssignerReconnect(t *testing.T) {
	subscription := "projects/123456/locations/us-central1-b/subscriptions/my-subs"
	permanentErr := status.Error(codes.FailedPrecondition, "failed")

	verifiers := test.NewVerifiers(t)

	// Simulate a transient error that results in a reconnect.
	stream1 := test.NewRPCVerifier(t)
	stream1.Push(initAssignmentReq(subscription, fakeUUID[:]), nil, status.Error(codes.Unavailable, "server unavailable"))
	verifiers.AddAssignmentStream(subscription, stream1)

	// Send 2 partition assignments before terminating with permanent error.
	stream2 := test.NewRPCVerifier(t)
	stream2.Push(initAssignmentReq(subscription, fakeUUID[:]), assignmentResp([]int64{3, 2, 4}), nil)
	stream2.Push(assignmentAckReq(), assignmentResp([]int64{0, 3, 3}), nil)
	stream2.Push(assignmentAckReq(), nil, permanentErr)
	verifiers.AddAssignmentStream(subscription, stream2)

	mockServer.OnTestStart(verifiers)
	defer mockServer.OnTestEnd()

	asn := newTestAssigner(t, subscription, nil)

	if gotErr := asn.StartError(); gotErr != nil {
		t.Errorf("Start() got err: (%v)", gotErr)
	}
	if got, want := asn.NextPartitions(), []int{2, 3, 4}; !testutil.Equal(got, want) {
		t.Errorf("Partition assignment #1: got %v, want %v", got, want)
	}
	if got, want := asn.NextPartitions(), []int{0, 3}; !testutil.Equal(got, want) {
		t.Errorf("Partition assignment #2: got %v, want %v", got, want)
	}
	if gotErr, wantErr := asn.FinalError(), permanentErr; !test.ErrorEqual(gotErr, wantErr) {
		t.Errorf("Final err: (%v), want: (%v)", gotErr, wantErr)
	}
}

func TestAssignerHandlePartitionFailure(t *testing.T) {
	subscription := "projects/123456/locations/us-central1-b/subscriptions/my-subs"

	verifiers := test.NewVerifiers(t)
	stream := test.NewRPCVerifier(t)
	stream.Push(initAssignmentReq(subscription, fakeUUID[:]), assignmentResp([]int64{1, 2}), nil)
	verifiers.AddAssignmentStream(subscription, stream)

	mockServer.OnTestStart(verifiers)
	defer mockServer.OnTestEnd()

	// Simulates the assigningSubscriber discarding assignments.
	wantErr := errors.New("subscriber shutting down")
	asn := newTestAssigner(t, subscription, wantErr)

	if gotErr := asn.FinalError(); !test.ErrorEqual(gotErr, wantErr) {
		t.Errorf("Final err: (%v), want: (%v)", gotErr, wantErr)
	}
	if got, want := asn.NextPartitions(), []int{1, 2}; !testutil.Equal(got, want) {
		t.Errorf("Partition assignments: got %v, want %v", got, want)
	}
}
