From cbf5f6ee89bd8dfedde5aac960629eb1a36db666 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 17 Feb 2021 14:44:02 +0000 Subject: [PATCH] Clean up periodically, we hit a race condition otherwise --- federationsender/queue/destinationqueue.go | 4 ++++ federationsender/queue/queue.go | 18 +++++++++++++++++- 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go index be63290c6..b6c189f61 100644 --- a/federationsender/queue/destinationqueue.go +++ b/federationsender/queue/destinationqueue.go @@ -159,6 +159,10 @@ func (oq *destinationQueue) sendEDU(event *gomatrixserverlib.EDU, receipt *share // then we will interrupt the backoff, causing any federation // requests to retry. func (oq *destinationQueue) wakeQueueIfNeeded() { + // If the destination is blacklisted then do nothing. + if _, blacklisted := oq.statistics.BackoffInfo(); blacklisted { + return + } // If we are backing off then interrupt the backoff. if oq.backingOff.CAS(true, false) { oq.interruptBackoff <- true diff --git a/federationsender/queue/queue.go b/federationsender/queue/queue.go index 620bb2e82..87d73f5e7 100644 --- a/federationsender/queue/queue.go +++ b/federationsender/queue/queue.go @@ -101,6 +101,7 @@ func NewOutgoingQueues( signing: signing, queues: map[gomatrixserverlib.ServerName]*destinationQueue{}, } + queues.clearQueues() // Look up which servers we have pending items for and then rehydrate those queues. if !disabled { time.AfterFunc(time.Second*5, func() { @@ -167,14 +168,29 @@ func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *d signing: oqs.signing, } oqs.queues[destination] = oq - oq.wakeQueueIfNeeded() } return oq } +func (oqs *OutgoingQueues) clearQueues() { + oqs.queuesMutex.Lock() + queues := oqs.queues + oqs.queuesMutex.Unlock() + for _, q := range queues { + oqs.clearQueue(q) + } + time.AfterFunc(time.Minute, oqs.clearQueues) +} + func (oqs *OutgoingQueues) clearQueue(oq *destinationQueue) { oqs.queuesMutex.Lock() defer oqs.queuesMutex.Unlock() + switch { + case oq.running.Load(): + return + case oq.backingOff.Load(): + return + } close(oq.notify) close(oq.interruptBackoff)