// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and

package wire

import (
	"context"
	"errors"
	"reflect"
	"time"

	"github.com/google/uuid"
	"google.golang.org/api/option"
	"google.golang.org/grpc"

	vkit "cloud.google.com/go/pubsublite/apiv1"
	pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
)

var (
	errServerNoMessages                = errors.New("pubsublite: server delivered no messages")
	errInvalidInitialSubscribeResponse = errors.New("pubsublite: first response from server was not an initial response for subscribe")
	errInvalidSubscribeResponse        = errors.New("pubsublite: received invalid subscribe response from server")
	errNoInFlightSeek                  = errors.New("pubsublite: received seek response for no in-flight seek")
)

// ReceivedMessage stores a received Pub/Sub message and AckConsumer for
// acknowledging the message.
type ReceivedMessage struct {
	Msg       *pb.SequencedMessage
	Ack       AckConsumer
	Partition int
}

// MessageReceiverFunc receives a Pub/Sub message from a topic partition.
type MessageReceiverFunc func(*ReceivedMessage)

// messageDeliveryQueue delivers received messages to the client-provided
// MessageReceiverFunc sequentially.
type messageDeliveryQueue struct {
	receiver  MessageReceiverFunc
	messagesC chan *ReceivedMessage
	stopC     chan struct{}
	acks      *ackTracker
	status    serviceStatus
}

func newMessageDeliveryQueue(acks *ackTracker, receiver MessageReceiverFunc, bufferSize int) *messageDeliveryQueue {
	return &messageDeliveryQueue{
		acks:      acks,
		receiver:  receiver,
		messagesC: make(chan *ReceivedMessage, bufferSize),
		stopC:     make(chan struct{}),
	}
}

func (mq *messageDeliveryQueue) Start() {
	if mq.status == serviceUninitialized {
		go mq.deliverMessages()
		mq.status = serviceActive
	}
}

func (mq *messageDeliveryQueue) Stop() {
	if mq.status < serviceTerminated {
		close(mq.stopC)
		mq.status = serviceTerminated
	}
}

func (mq *messageDeliveryQueue) Add(msg *ReceivedMessage) {
	if mq.status == serviceActive {
		mq.messagesC <- msg
	}
}

func (mq *messageDeliveryQueue) deliverMessages() {
	for {
		// stopC has higher priority.
		select {
		case <-mq.stopC:
			return // Ends the goroutine.
		default:
		}

		select {
		case <-mq.stopC:
			return // Ends the goroutine.
		case msg := <-mq.messagesC:
			// Register outstanding acks, which are primarily handled by the
			// `committer`.
			mq.acks.Push(msg.Ack.(*ackConsumer))
			mq.receiver(msg)
		}
	}
}

// The frequency of sending batch flow control requests.
const batchFlowControlPeriod = 100 * time.Millisecond

// subscribeStream directly wraps the subscribe client stream. It passes
// messages to the message receiver and manages flow control. Flow control
// tokens are batched and sent to the stream via a periodic background task,
// although it can be expedited if the user is rapidly acking messages.
//
// Client-initiated seek unsupported.
type subscribeStream struct {
	// Immutable after creation.
	subClient    *vkit.SubscriberClient
	settings     ReceiveSettings
	subscription subscriptionPartition
	initialReq   *pb.SubscribeRequest
	metadata     pubsubMetadata

	// Fields below must be guarded with mu.
	messageQueue    *messageDeliveryQueue
	stream          *retryableStream
	offsetTracker   subscriberOffsetTracker
	flowControl     flowControlBatcher
	pollFlowControl *periodicTask
	seekInFlight    bool

	abstractService
}

func newSubscribeStream(ctx context.Context, subClient *vkit.SubscriberClient, settings ReceiveSettings,
	receiver MessageReceiverFunc, subscription subscriptionPartition, acks *ackTracker, disableTasks bool) *subscribeStream {

	s := &subscribeStream{
		subClient:    subClient,
		settings:     settings,
		subscription: subscription,
		initialReq: &pb.SubscribeRequest{
			Request: &pb.SubscribeRequest_Initial{
				Initial: &pb.InitialSubscribeRequest{
					Subscription: subscription.Path,
					Partition:    int64(subscription.Partition),
				},
			},
		},
		messageQueue: newMessageDeliveryQueue(acks, receiver, settings.MaxOutstandingMessages),
		metadata:     newPubsubMetadata(),
	}
	s.stream = newRetryableStream(ctx, s, settings.Timeout, reflect.TypeOf(pb.SubscribeResponse{}))
	s.metadata.AddSubscriptionRoutingMetadata(s.subscription)
	s.metadata.AddClientInfo(settings.Framework)

	backgroundTask := s.sendBatchFlowControl
	if disableTasks {
		backgroundTask = func() {}
	}
	s.pollFlowControl = newPeriodicTask(batchFlowControlPeriod, backgroundTask)
	return s
}

// Start establishes a subscribe stream connection and initializes flow control
// tokens from ReceiveSettings.
func (s *subscribeStream) Start() {
	s.mu.Lock()
	defer s.mu.Unlock()

	if s.unsafeUpdateStatus(serviceStarting, nil) {
		s.stream.Start()
		s.pollFlowControl.Start()
		s.messageQueue.Start()

		s.flowControl.OnClientFlow(flowControlTokens{
			Bytes:    int64(s.settings.MaxOutstandingBytes),
			Messages: int64(s.settings.MaxOutstandingMessages),
		})
	}
}

// Stop immediately terminates the subscribe stream.
func (s *subscribeStream) Stop() {
	s.mu.Lock()
	defer s.mu.Unlock()
	s.unsafeInitiateShutdown(serviceTerminating, nil)
}

func (s *subscribeStream) newStream(ctx context.Context) (grpc.ClientStream, error) {
	return s.subClient.Subscribe(s.metadata.AddToContext(ctx))
}

func (s *subscribeStream) initialRequest() (interface{}, initialResponseRequired) {
	return s.initialReq, initialResponseRequired(true)
}

func (s *subscribeStream) validateInitialResponse(response interface{}) error {
	subscribeResponse, _ := response.(*pb.SubscribeResponse)
	if subscribeResponse.GetInitial() == nil {
		return errInvalidInitialSubscribeResponse
	}
	return nil
}

func (s *subscribeStream) onStreamStatusChange(status streamStatus) {
	s.mu.Lock()
	defer s.mu.Unlock()

	switch status {
	case streamConnected:
		s.unsafeUpdateStatus(serviceActive, nil)

		// Reinitialize the offset and flow control tokens when a new subscribe
		// stream instance is connected.
		if seekReq := s.offsetTracker.RequestForRestart(); seekReq != nil {
			if !s.stream.Send(&pb.SubscribeRequest{
				Request: &pb.SubscribeRequest_Seek{Seek: seekReq},
			}) {
				return
			}
			s.seekInFlight = true
		}
		s.unsafeSendFlowControl(s.flowControl.RequestForRestart())
		s.pollFlowControl.Start()

	case streamReconnecting:
		s.seekInFlight = false
		s.pollFlowControl.Stop()

	case streamTerminated:
		s.unsafeInitiateShutdown(serviceTerminated, s.stream.Error())
	}
}

func (s *subscribeStream) onResponse(response interface{}) {
	s.mu.Lock()
	defer s.mu.Unlock()

	if s.status >= serviceTerminating {
		return
	}

	var err error
	subscribeResponse, _ := response.(*pb.SubscribeResponse)
	switch {
	case subscribeResponse.GetMessages() != nil:
		err = s.unsafeOnMessageResponse(subscribeResponse.GetMessages())
	case subscribeResponse.GetSeek() != nil:
		err = s.unsafeOnSeekResponse(subscribeResponse.GetSeek())
	default:
		err = errInvalidSubscribeResponse
	}
	if err != nil {
		s.unsafeInitiateShutdown(serviceTerminated, err)
	}
}

func (s *subscribeStream) unsafeOnSeekResponse(response *pb.SeekResponse) error {
	if !s.seekInFlight {
		return errNoInFlightSeek
	}
	s.seekInFlight = false
	return nil
}

func (s *subscribeStream) unsafeOnMessageResponse(response *pb.MessageResponse) error {
	if len(response.Messages) == 0 {
		return errServerNoMessages
	}
	if err := s.offsetTracker.OnMessages(response.Messages); err != nil {
		return err
	}
	if err := s.flowControl.OnMessages(response.Messages); err != nil {
		return err
	}

	for _, msg := range response.Messages {
		ack := newAckConsumer(msg.GetCursor().GetOffset(), msg.GetSizeBytes(), s.onAck)
		s.messageQueue.Add(&ReceivedMessage{Msg: msg, Ack: ack, Partition: s.subscription.Partition})
	}
	return nil
}

func (s *subscribeStream) onAck(ac *ackConsumer) {
	// Don't block the user's goroutine with potentially expensive ack processing.
	go s.onAckAsync(ac.MsgBytes)
}

func (s *subscribeStream) onAckAsync(msgBytes int64) {
	s.mu.Lock()
	defer s.mu.Unlock()

	if s.status == serviceActive {
		s.unsafeAllowFlow(flowControlTokens{Bytes: msgBytes, Messages: 1})
	}
}

// sendBatchFlowControl is called by the periodic background task.
func (s *subscribeStream) sendBatchFlowControl() {
	s.mu.Lock()
	defer s.mu.Unlock()
	s.unsafeSendFlowControl(s.flowControl.ReleasePendingRequest())
}

func (s *subscribeStream) unsafeAllowFlow(allow flowControlTokens) {
	s.flowControl.OnClientFlow(allow)
	if s.flowControl.ShouldExpediteBatchRequest() {
		s.unsafeSendFlowControl(s.flowControl.ReleasePendingRequest())
	}
}

func (s *subscribeStream) unsafeSendFlowControl(req *pb.FlowControlRequest) {
	if req == nil {
		return
	}

	// Note: If Send() returns false, the stream will be reconnected and
	// flowControlBatcher.RequestForRestart() will be sent when the stream
	// reconnects. So its return value is ignored.
	s.stream.Send(&pb.SubscribeRequest{
		Request: &pb.SubscribeRequest_FlowControl{FlowControl: req},
	})
}

func (s *subscribeStream) unsafeInitiateShutdown(targetStatus serviceStatus, err error) {
	if !s.unsafeUpdateStatus(targetStatus, wrapError("subscriber", s.subscription.String(), err)) {
		return
	}

	// No data to send. Immediately terminate the stream.
	s.messageQueue.Stop()
	s.pollFlowControl.Stop()
	s.stream.Stop()
}

// singlePartitionSubscriber receives messages from a single topic partition.
// It requires 2 child services:
// - subscribeStream to receive messages from the subscribe stream.
// - committer to commit cursor offsets to the streaming commit cursor stream.
type singlePartitionSubscriber struct {
	subscriber *subscribeStream
	committer  *committer

	compositeService
}

// Terminate shuts down the singlePartitionSubscriber without waiting for
// outstanding acks. Alternatively, Stop() will wait for outstanding acks.
func (s *singlePartitionSubscriber) Terminate() {
	s.subscriber.Stop()
	s.committer.Terminate()
}

type singlePartitionSubscriberFactory struct {
	ctx              context.Context
	subClient        *vkit.SubscriberClient
	cursorClient     *vkit.CursorClient
	settings         ReceiveSettings
	subscriptionPath string
	receiver         MessageReceiverFunc
	disableTasks     bool
}

func (f *singlePartitionSubscriberFactory) New(partition int) *singlePartitionSubscriber {
	subscription := subscriptionPartition{Path: f.subscriptionPath, Partition: partition}
	acks := newAckTracker()
	commit := newCommitter(f.ctx, f.cursorClient, f.settings, subscription, acks, f.disableTasks)
	sub := newSubscribeStream(f.ctx, f.subClient, f.settings, f.receiver, subscription, acks, f.disableTasks)
	ps := &singlePartitionSubscriber{
		subscriber: sub,
		committer:  commit,
	}
	ps.init()
	ps.unsafeAddServices(sub, commit)
	return ps
}

// multiPartitionSubscriber receives messages from a fixed set of topic
// partitions.
type multiPartitionSubscriber struct {
	// Immutable after creation.
	clients     apiClients
	subscribers []*singlePartitionSubscriber

	compositeService
}

func newMultiPartitionSubscriber(allClients apiClients, subFactory *singlePartitionSubscriberFactory) *multiPartitionSubscriber {
	ms := &multiPartitionSubscriber{
		clients: allClients,
	}
	ms.init()

	for _, partition := range subFactory.settings.Partitions {
		subscriber := subFactory.New(partition)
		ms.unsafeAddServices(subscriber)
		ms.subscribers = append(ms.subscribers, subscriber)
	}
	return ms
}

// Terminate shuts down all singlePartitionSubscribers without waiting for
// outstanding acks. Alternatively, Stop() will wait for outstanding acks.
func (ms *multiPartitionSubscriber) Terminate() {
	ms.mu.Lock()
	defer ms.mu.Unlock()

	for _, sub := range ms.subscribers {
		sub.Terminate()
	}
}

func (ms *multiPartitionSubscriber) WaitStopped() error {
	err := ms.compositeService.WaitStopped()
	ms.clients.Close()
	return err
}

// assigningSubscriber uses the Pub/Sub Lite partition assignment service to
// listen to its assigned partition numbers and dynamically add/remove
// singlePartitionSubscribers.
type assigningSubscriber struct {
	// Immutable after creation.
	clients    apiClients
	subFactory *singlePartitionSubscriberFactory
	assigner   *assigner

	// Fields below must be guarded with mu.
	// Subscribers keyed by partition number. Updated as assignments change.
	subscribers map[int]*singlePartitionSubscriber

	compositeService
}

func newAssigningSubscriber(allClients apiClients, assignmentClient *vkit.PartitionAssignmentClient, genUUID generateUUIDFunc, subFactory *singlePartitionSubscriberFactory) (*assigningSubscriber, error) {
	as := &assigningSubscriber{
		clients:     allClients,
		subFactory:  subFactory,
		subscribers: make(map[int]*singlePartitionSubscriber),
	}
	as.init()

	assigner, err := newAssigner(subFactory.ctx, assignmentClient, genUUID, subFactory.settings, subFactory.subscriptionPath, as.handleAssignment)
	if err != nil {
		return nil, err
	}
	as.assigner = assigner
	as.unsafeAddServices(assigner)
	return as, nil
}

func (as *assigningSubscriber) handleAssignment(partitions partitionSet) error {
	as.mu.Lock()
	defer as.mu.Unlock()

	// Handle new partitions.
	for _, partition := range partitions.Ints() {
		if _, exists := as.subscribers[partition]; !exists {
			subscriber := as.subFactory.New(partition)
			if err := as.unsafeAddServices(subscriber); err != nil {
				// Occurs when the assigningSubscriber is stopping/stopped.
				return err
			}
			as.subscribers[partition] = subscriber
		}
	}

	// Handle removed partitions.
	for partition, subscriber := range as.subscribers {
		if !partitions.Contains(partition) {
			// Ignore unacked messages from this point on to avoid conflicting with
			// the commits of the new subscriber that will be assigned this partition.
			subscriber.Terminate()

			as.unsafeRemoveService(subscriber)
			// Safe to delete map entry during range loop:
			// https://golang.org/ref/spec#For_statements
			delete(as.subscribers, partition)
		}
	}
	return nil
}

// Terminate shuts down all singlePartitionSubscribers without waiting for
// outstanding acks. Alternatively, Stop() will wait for outstanding acks.
func (as *assigningSubscriber) Terminate() {
	as.mu.Lock()
	defer as.mu.Unlock()

	for _, sub := range as.subscribers {
		sub.Terminate()
	}
}

func (as *assigningSubscriber) WaitStopped() error {
	err := as.compositeService.WaitStopped()
	as.clients.Close()
	return err
}

// Subscriber is the client interface exported from this package for receiving
// messages.
type Subscriber interface {
	Start()
	WaitStarted() error
	Stop()
	WaitStopped() error
	Terminate()
}

// NewSubscriber creates a new client for receiving messages.
func NewSubscriber(ctx context.Context, settings ReceiveSettings, receiver MessageReceiverFunc, region, subscriptionPath string, opts ...option.ClientOption) (Subscriber, error) {
	if err := ValidateRegion(region); err != nil {
		return nil, err
	}
	if err := validateReceiveSettings(settings); err != nil {
		return nil, err
	}
	subClient, err := newSubscriberClient(ctx, region, opts...)
	if err != nil {
		return nil, err
	}
	cursorClient, err := newCursorClient(ctx, region, opts...)
	if err != nil {
		return nil, err
	}
	allClients := apiClients{subClient, cursorClient}

	subFactory := &singlePartitionSubscriberFactory{
		ctx:              ctx,
		subClient:        subClient,
		cursorClient:     cursorClient,
		settings:         settings,
		subscriptionPath: subscriptionPath,
		receiver:         receiver,
	}

	if len(settings.Partitions) > 0 {
		return newMultiPartitionSubscriber(allClients, subFactory), nil
	}
	partitionClient, err := newPartitionAssignmentClient(ctx, region, opts...)
	if err != nil {
		return nil, err
	}
	allClients = append(allClients, partitionClient)
	return newAssigningSubscriber(allClients, partitionClient, uuid.NewRandom, subFactory)
}
