diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go index b6c189f61..be63290c6 100644 --- a/federationsender/queue/destinationqueue.go +++ b/federationsender/queue/destinationqueue.go @@ -159,10 +159,6 @@ func (oq *destinationQueue) sendEDU(event *gomatrixserverlib.EDU, receipt *share // then we will interrupt the backoff, causing any federation // requests to retry. func (oq *destinationQueue) wakeQueueIfNeeded() { - // If the destination is blacklisted then do nothing. - if _, blacklisted := oq.statistics.BackoffInfo(); blacklisted { - return - } // If we are backing off then interrupt the backoff. if oq.backingOff.CAS(true, false) { oq.interruptBackoff <- true diff --git a/federationsender/queue/queue.go b/federationsender/queue/queue.go index e77f79cfd..d178f10ce 100644 --- a/federationsender/queue/queue.go +++ b/federationsender/queue/queue.go @@ -101,7 +101,6 @@ func NewOutgoingQueues( signing: signing, queues: map[gomatrixserverlib.ServerName]*destinationQueue{}, } - queues.clearQueues() // Look up which servers we have pending items for and then rehydrate those queues. if !disabled { time.AfterFunc(time.Second*5, func() { @@ -121,7 +120,7 @@ func NewOutgoingQueues( log.WithError(err).Error("Failed to get EDU server names for destination queue hydration") } for serverName := range serverNames { - if queue := queues.getQueue(serverName); !queue.statistics.Blacklisted() { + if queue := queues.getQueue(serverName); queue != nil { queue.wakeQueueIfNeeded() } } @@ -149,6 +148,9 @@ type queuedEDU struct { } func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *destinationQueue { + if _, blacklisted := oqs.statistics.ForServer(destination).BackoffInfo(); blacklisted { + return nil + } oqs.queuesMutex.Lock() defer oqs.queuesMutex.Unlock() oq := oqs.queues[destination] @@ -172,24 +174,10 @@ func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *d return oq } -func (oqs *OutgoingQueues) clearQueues() { - oqs.queuesMutex.Lock() - defer oqs.queuesMutex.Unlock() - for _, q := range oqs.queues { - oqs.clearQueue(q) - } - time.AfterFunc(time.Minute, oqs.clearQueues) -} - func (oqs *OutgoingQueues) clearQueue(oq *destinationQueue) { oqs.queuesMutex.Lock() defer oqs.queuesMutex.Unlock() - switch { - case oq.running.Load(): - return - case oq.backingOff.Load(): - return - } + close(oq.notify) close(oq.interruptBackoff) delete(oqs.queues, oq.destination) @@ -262,7 +250,9 @@ func (oqs *OutgoingQueues) SendEvent( } for destination := range destmap { - oqs.getQueue(destination).sendEvent(ev, nid) + if queue := oqs.getQueue(destination); queue != nil { + queue.sendEvent(ev, nid) + } } return nil @@ -332,7 +322,9 @@ func (oqs *OutgoingQueues) SendEDU( } for destination := range destmap { - oqs.getQueue(destination).sendEDU(e, nid) + if queue := oqs.getQueue(destination); queue != nil { + queue.sendEDU(e, nid) + } } return nil @@ -343,9 +335,7 @@ func (oqs *OutgoingQueues) RetryServer(srv gomatrixserverlib.ServerName) { if oqs.disabled { return } - q := oqs.getQueue(srv) - if q == nil { - return + if queue := oqs.getQueue(srv); queue != nil { + queue.wakeQueueIfNeeded() } - q.wakeQueueIfNeeded() }