diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go index 9b3f749c0..231f08c67 100644 --- a/federationsender/queue/destinationqueue.go +++ b/federationsender/queue/destinationqueue.go @@ -24,6 +24,7 @@ import ( "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" log "github.com/sirupsen/logrus" + "go.uber.org/atomic" ) // destinationQueue is a queue of events for a single destination. @@ -34,10 +35,10 @@ type destinationQueue struct { client *gomatrixserverlib.FederationClient origin gomatrixserverlib.ServerName destination gomatrixserverlib.ServerName - // The running mutex protects running, sentCounter, lastTransactionIDs and + running atomic.Bool + // The running mutex protects sentCounter, lastTransactionIDs and // pendingEvents, pendingEDUs. runningMutex sync.Mutex - running bool sentCounter int lastTransactionIDs []gomatrixserverlib.TransactionID pendingEvents []*gomatrixserverlib.HeaderedEvent @@ -52,39 +53,43 @@ func (oq *destinationQueue) sendEvent(ev *gomatrixserverlib.HeaderedEvent) { oq.runningMutex.Lock() defer oq.runningMutex.Unlock() oq.pendingEvents = append(oq.pendingEvents, ev) - if !oq.running { - oq.running = true + if !oq.running.Load() { go oq.backgroundSend() } } // sendEDU adds the EDU event to the pending queue for the destination. // If the queue is empty then it starts a background goroutine to -// start sending event to that destination. +// start sending events to that destination. func (oq *destinationQueue) sendEDU(e *gomatrixserverlib.EDU) { oq.runningMutex.Lock() defer oq.runningMutex.Unlock() oq.pendingEDUs = append(oq.pendingEDUs, e) - if !oq.running { - oq.running = true + if !oq.running.Load() { go oq.backgroundSend() } } +// sendInvite adds the invite event to the pending queue for the +// destination. If the queue is empty then it starts a background +// goroutine to start sending events to that destination. func (oq *destinationQueue) sendInvite(ev *gomatrixserverlib.InviteV2Request) { oq.runningMutex.Lock() defer oq.runningMutex.Unlock() oq.pendingInvites = append(oq.pendingInvites, ev) - if !oq.running { - oq.running = true + if !oq.running.Load() { go oq.backgroundSend() } } +// backgroundSend is the worker goroutine for sending events. func (oq *destinationQueue) backgroundSend() { + oq.running.Store(true) + defer oq.running.Store(false) + for { - t := oq.next() - if t == nil { + transaction, invites := oq.nextTransaction(), oq.nextInvites() + if !transaction && !invites { // If the queue is empty then stop processing for this destination. // TODO: Remove this destination from the queue map. return @@ -92,48 +97,18 @@ func (oq *destinationQueue) backgroundSend() { // TODO: handle retries. // TODO: blacklist uncooperative servers. - - util.GetLogger(context.TODO()).Infof("Sending transaction %q containing %d PDUs, %d EDUs", t.TransactionID, len(t.PDUs), len(t.EDUs)) - - _, err := oq.client.SendTransaction(context.TODO(), *t) - if err != nil { - log.WithFields(log.Fields{ - "destination": oq.destination, - log.ErrorKey: err, - }).Info("problem sending transaction") - } } } -// next creates a new transaction from the pending event queue -// and flushes the queue. +// nextTransaction creates a new transaction from the pending event +// queue and sends it. // Returns nil if the queue was empty. -func (oq *destinationQueue) next() *gomatrixserverlib.Transaction { +func (oq *destinationQueue) nextTransaction() bool { oq.runningMutex.Lock() defer oq.runningMutex.Unlock() - if len(oq.pendingEvents) == 0 && len(oq.pendingEDUs) == 0 && len(oq.pendingInvites) == 0 { - oq.running = false - return nil - } - - if len(oq.pendingInvites) > 0 { - for _, inviteReq := range oq.pendingInvites { - ev := inviteReq.Event() - - if _, err := oq.client.SendInviteV2( - context.TODO(), - oq.destination, - *inviteReq, - ); err != nil { - log.WithFields(log.Fields{ - "event_id": ev.EventID(), - "state_key": ev.StateKey(), - "destination": oq.destination, - }).WithError(err).Error("failed to send invite") - } - } - oq.pendingInvites = oq.pendingInvites[:0] + if len(oq.pendingEvents) == 0 && len(oq.pendingEDUs) == 0 { + return false } t := gomatrixserverlib.Transaction{ @@ -166,5 +141,44 @@ func (oq *destinationQueue) next() *gomatrixserverlib.Transaction { oq.pendingEDUs = nil oq.sentCounter += len(t.EDUs) - return &t + util.GetLogger(context.TODO()).Infof("Sending transaction %q containing %d PDUs, %d EDUs", t.TransactionID, len(t.PDUs), len(t.EDUs)) + + _, err := oq.client.SendTransaction(context.TODO(), t) + if err != nil { + log.WithFields(log.Fields{ + "destination": oq.destination, + log.ErrorKey: err, + }).Info("problem sending transaction") + } + + return true +} + +func (oq *destinationQueue) nextInvites() bool { + oq.runningMutex.Lock() + defer oq.runningMutex.Unlock() + + if len(oq.pendingInvites) == 0 { + return false + } + + for _, inviteReq := range oq.pendingInvites { + ev := inviteReq.Event() + + if _, err := oq.client.SendInviteV2( + context.TODO(), + oq.destination, + *inviteReq, + ); err != nil { + log.WithFields(log.Fields{ + "event_id": ev.EventID(), + "state_key": ev.StateKey(), + "destination": oq.destination, + }).WithError(err).Error("failed to send invite") + } + } + + oq.pendingInvites = nil + + return true }