// Copyright 2017 Vector Creations Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package queue

import (
	"context"
	"encoding/json"
	"fmt"
	"sync"
	"time"

	"github.com/matrix-org/dendrite/federationsender/statistics"
	"github.com/matrix-org/dendrite/federationsender/storage"
	"github.com/matrix-org/dendrite/federationsender/storage/shared"
	"github.com/matrix-org/dendrite/roomserver/api"
	"github.com/matrix-org/gomatrix"
	"github.com/matrix-org/gomatrixserverlib"
	"github.com/sirupsen/logrus"
	log "github.com/sirupsen/logrus"
	"go.uber.org/atomic"
)

const (
	maxPDUsPerTransaction = 50
	maxEDUsPerTransaction = 50
	queueIdleTimeout      = time.Second * 30
)

// destinationQueue is a queue of events for a single destination.
// It is responsible for sending the events to the destination and
// ensures that only one request is in flight to a given destination
// at a time.
type destinationQueue struct {
	db                 storage.Database
	signing            *SigningInfo
	rsAPI              api.RoomserverInternalAPI
	client             *gomatrixserverlib.FederationClient // federation client
	origin             gomatrixserverlib.ServerName        // origin of requests
	destination        gomatrixserverlib.ServerName        // destination of requests
	running            atomic.Bool                         // is the queue worker running?
	backingOff         atomic.Bool                         // true if we're backing off
	statistics         *statistics.ServerStatistics        // statistics about this remote server
	transactionIDMutex sync.Mutex                          // protects transactionID
	transactionID      gomatrixserverlib.TransactionID     // last transaction ID
	transactionCount   atomic.Int32                        // how many events in this transaction so far
	notifyPDUs         chan bool                           // interrupts idle wait for PDUs
	notifyEDUs         chan bool                           // interrupts idle wait for EDUs
	interruptBackoff   chan bool                           // interrupts backoff
}

// Send event adds the event to the pending queue for the destination.
// If the queue is empty then it starts a background goroutine to
// start sending events to that destination.
func (oq *destinationQueue) sendEvent(receipt *shared.Receipt) {
	// Create a transaction ID. We'll either do this if we don't have
	// one made up yet, or if we've exceeded the number of maximum
	// events allowed in a single tranaction. We'll reset the counter
	// when we do.
	oq.transactionIDMutex.Lock()
	if oq.transactionID == "" || oq.transactionCount.Load() >= maxPDUsPerTransaction {
		now := gomatrixserverlib.AsTimestamp(time.Now())
		oq.transactionID = gomatrixserverlib.TransactionID(fmt.Sprintf("%d-%d", now, oq.statistics.SuccessCount()))
		oq.transactionCount.Store(0)
	}
	oq.transactionIDMutex.Unlock()
	// Create a database entry that associates the given PDU NID with
	// this destination queue. We'll then be able to retrieve the PDU
	// later.
	if err := oq.db.AssociatePDUWithDestination(
		context.TODO(),
		oq.transactionID, // the current transaction ID
		oq.destination,   // the destination server name
		receipt,          // NIDs from federationsender_queue_json table
	); err != nil {
		log.WithError(err).Errorf("failed to associate PDU receipt %q with destination %q", receipt.String(), oq.destination)
		return
	}
	// We've successfully added a PDU to the transaction so increase
	// the counter.
	oq.transactionCount.Add(1)
	// Check if the destination is blacklisted. If it isn't then wake
	// up the queue.
	if !oq.statistics.Blacklisted() {
		// Wake up the queue if it's asleep.
		oq.wakeQueueIfNeeded()
		// If we're blocking on waiting PDUs then tell the queue that we
		// have work to do.
		select {
		case oq.notifyPDUs <- true:
		default:
		}
	}
}

// sendEDU adds the EDU event to the pending queue for the destination.
// If the queue is empty then it starts a background goroutine to
// start sending events to that destination.
func (oq *destinationQueue) sendEDU(receipt *shared.Receipt) {
	// Create a database entry that associates the given PDU NID with
	// this destination queue. We'll then be able to retrieve the PDU
	// later.
	if err := oq.db.AssociateEDUWithDestination(
		context.TODO(),
		oq.destination, // the destination server name
		receipt,        // NIDs from federationsender_queue_json table
	); err != nil {
		log.WithError(err).Errorf("failed to associate EDU receipt %q with destination %q", receipt.String(), oq.destination)
		return
	}
	// We've successfully added an EDU to the transaction so increase
	// the counter.
	oq.transactionCount.Add(1)
	// Check if the destination is blacklisted. If it isn't then wake
	// up the queue.
	if !oq.statistics.Blacklisted() {
		// Wake up the queue if it's asleep.
		oq.wakeQueueIfNeeded()
		// If we're blocking on waiting EDUs then tell the queue that we
		// have work to do.
		select {
		case oq.notifyEDUs <- true:
		default:
		}
	}
}

// wakeQueueIfNeeded will wake up the destination queue if it is
// not already running. If it is running but it is backing off
// then we will interrupt the backoff, causing any federation
// requests to retry.
func (oq *destinationQueue) wakeQueueIfNeeded() {
	// If we are backing off then interrupt the backoff.
	if oq.backingOff.CAS(true, false) {
		oq.interruptBackoff <- true
	}
	// If we aren't running then wake up the queue.
	if !oq.running.Load() {
		// Start the queue.
		go oq.backgroundSend()
	}
}

// waitForPDUs returns a channel for pending PDUs, which will be
// used in backgroundSend select. It returns a closed channel if
// there is something pending right now, or an open channel if
// we're waiting for something.
func (oq *destinationQueue) waitForPDUs() chan bool {
	pendingPDUs, err := oq.db.GetPendingPDUCount(context.TODO(), oq.destination)
	if err != nil {
		log.WithError(err).Errorf("Failed to get pending PDU count on queue %q", oq.destination)
	}
	// If there are PDUs pending right now then we'll return a closed
	// channel. This will mean that the backgroundSend will not block.
	if pendingPDUs > 0 {
		ch := make(chan bool, 1)
		close(ch)
		return ch
	}
	// If there are no PDUs pending right now then instead we'll return
	// the notify channel, so that backgroundSend can pick up normal
	// notifications from sendEvent.
	return oq.notifyPDUs
}

// waitForEDUs returns a channel for pending EDUs, which will be
// used in backgroundSend select. It returns a closed channel if
// there is something pending right now, or an open channel if
// we're waiting for something.
func (oq *destinationQueue) waitForEDUs() chan bool {
	pendingEDUs, err := oq.db.GetPendingEDUCount(context.TODO(), oq.destination)
	if err != nil {
		log.WithError(err).Errorf("Failed to get pending EDU count on queue %q", oq.destination)
	}
	// If there are EDUs pending right now then we'll return a closed
	// channel. This will mean that the backgroundSend will not block.
	if pendingEDUs > 0 {
		ch := make(chan bool, 1)
		close(ch)
		return ch
	}
	// If there are no EDUs pending right now then instead we'll return
	// the notify channel, so that backgroundSend can pick up normal
	// notifications from sendEvent.
	return oq.notifyEDUs
}

// backgroundSend is the worker goroutine for sending events.
// nolint:gocyclo
func (oq *destinationQueue) backgroundSend() {
	// Check if a worker is already running, and if it isn't, then
	// mark it as started.
	if !oq.running.CAS(false, true) {
		return
	}
	defer oq.running.Store(false)

	for {
		pendingPDUs, pendingEDUs := false, false

		// If we have nothing to do then wait either for incoming events, or
		// until we hit an idle timeout.
		select {
		case <-oq.waitForPDUs():
			// We were woken up because there are new PDUs waiting in the
			// database.
			pendingPDUs = true
		case <-oq.waitForEDUs():
			// We were woken up because there are new PDUs waiting in the
			// database.
			pendingEDUs = true
		case <-time.After(queueIdleTimeout):
			// The worker is idle so stop the goroutine. It'll get
			// restarted automatically the next time we have an event to
			// send.
			log.Tracef("Queue %q has been idle for %s, going to sleep", oq.destination, queueIdleTimeout)
			return
		}

		// If we are backing off this server then wait for the
		// backoff duration to complete first, or until explicitly
		// told to retry.
		until, blacklisted := oq.statistics.BackoffInfo()
		if blacklisted {
			// It's been suggested that we should give up because the backoff
			// has exceeded a maximum allowable value. Clean up the in-memory
			// buffers at this point. The PDU clean-up is already on a defer.
			log.Warnf("Blacklisting %q due to exceeding backoff threshold", oq.destination)
			return
		}
		if until != nil && until.After(time.Now()) {
			// We haven't backed off yet, so wait for the suggested amount of
			// time.
			duration := time.Until(*until)
			log.Warnf("Backing off %q for %s", oq.destination, duration)
			select {
			case <-time.After(duration):
			case <-oq.interruptBackoff:
			}
		}

		// If we have pending PDUs or EDUs then construct a transaction.
		if pendingPDUs || pendingEDUs {
			// Try sending the next transaction and see what happens.
			transaction, terr := oq.nextTransaction()
			if terr != nil {
				// We failed to send the transaction. Mark it as a failure.
				oq.statistics.Failure()
			} else if transaction {
				// If we successfully sent the transaction then clear out
				// the pending events and EDUs, and wipe our transaction ID.
				oq.statistics.Success()
			}
		}
	}
}

// nextTransaction creates a new transaction from the pending event
// queue and sends it. Returns true if a transaction was sent or
// false otherwise.
// nolint:gocyclo
func (oq *destinationQueue) nextTransaction() (bool, error) {
	// Before we do anything, we need to roll over the transaction
	// ID that is being used to coalesce events into the next TX.
	// Otherwise it's possible that we'll pick up an incomplete
	// transaction and end up nuking the rest of the events at the
	// cleanup stage.
	oq.transactionIDMutex.Lock()
	oq.transactionID = ""
	oq.transactionIDMutex.Unlock()
	oq.transactionCount.Store(0)

	// Create the transaction.
	t := gomatrixserverlib.Transaction{
		PDUs: []json.RawMessage{},
		EDUs: []gomatrixserverlib.EDU{},
	}
	t.Origin = oq.origin
	t.Destination = oq.destination
	t.OriginServerTS = gomatrixserverlib.AsTimestamp(time.Now())

	// Ask the database for any pending PDUs from the next transaction.
	// maxPDUsPerTransaction is an upper limit but we probably won't
	// actually retrieve that many events.
	ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
	defer cancel()
	txid, pdus, pduReceipt, err := oq.db.GetNextTransactionPDUs(
		ctx,                   // context
		oq.destination,        // server name
		maxPDUsPerTransaction, // max events to retrieve
	)
	if err != nil {
		log.WithError(err).Errorf("failed to get next transaction PDUs for server %q", oq.destination)
		return false, fmt.Errorf("oq.db.GetNextTransactionPDUs: %w", err)
	}

	edus, eduReceipt, err := oq.db.GetNextTransactionEDUs(
		ctx,                   // context
		oq.destination,        // server name
		maxEDUsPerTransaction, // max events to retrieve
	)
	if err != nil {
		log.WithError(err).Errorf("failed to get next transaction EDUs for server %q", oq.destination)
		return false, fmt.Errorf("oq.db.GetNextTransactionEDUs: %w", err)
	}

	// If we didn't get anything from the database and there are no
	// pending EDUs then there's nothing to do - stop here.
	if len(pdus) == 0 && len(edus) == 0 {
		return false, nil
	}

	// Pick out the transaction ID from the database. If we didn't
	// get a transaction ID (i.e. because there are no PDUs but only
	// EDUs) then generate a transaction ID.
	t.TransactionID = txid
	if t.TransactionID == "" {
		now := gomatrixserverlib.AsTimestamp(time.Now())
		t.TransactionID = gomatrixserverlib.TransactionID(fmt.Sprintf("%d-%d", now, oq.statistics.SuccessCount()))
	}

	// Go through PDUs that we retrieved from the database, if any,
	// and add them into the transaction.
	for _, pdu := range pdus {
		// Append the JSON of the event, since this is a json.RawMessage type in the
		// gomatrixserverlib.Transaction struct
		t.PDUs = append(t.PDUs, (*pdu).JSON())
	}

	// Do the same for pending EDUS in the queue.
	for _, edu := range edus {
		t.EDUs = append(t.EDUs, *edu)
	}

	logrus.WithField("server_name", oq.destination).Debugf("Sending transaction %q containing %d PDUs, %d EDUs", t.TransactionID, len(t.PDUs), len(t.EDUs))

	// Try to send the transaction to the destination server.
	// TODO: we should check for 500-ish fails vs 400-ish here,
	// since we shouldn't queue things indefinitely in response
	// to a 400-ish error
	ctx, cancel = context.WithTimeout(context.Background(), time.Minute*5)
	defer cancel()
	_, err = oq.client.SendTransaction(ctx, t)
	switch err.(type) {
	case nil:
		// Clean up the transaction in the database.
		if pduReceipt != nil {
			//logrus.Infof("Cleaning PDUs %q", pduReceipt.String())
			if err = oq.db.CleanPDUs(context.Background(), oq.destination, pduReceipt); err != nil {
				log.WithError(err).Errorf("failed to clean PDUs %q for server %q", pduReceipt.String(), t.Destination)
			}
		}
		if eduReceipt != nil {
			//logrus.Infof("Cleaning EDUs %q", eduReceipt.String())
			if err = oq.db.CleanEDUs(context.Background(), oq.destination, eduReceipt); err != nil {
				log.WithError(err).Errorf("failed to clean EDUs %q for server %q", eduReceipt.String(), t.Destination)
			}
		}
		return true, nil
	case gomatrix.HTTPError:
		// Report that we failed to send the transaction and we
		// will retry again, subject to backoff.
		return false, err
	default:
		log.WithFields(log.Fields{
			"destination": oq.destination,
			log.ErrorKey:  err,
		}).Info("problem sending transaction")
		return false, err
	}
}