From d321a1312362bb8e0cba92c20c61b30c51f09dad Mon Sep 17 00:00:00 2001 From: Till Faelligen <2353100+S7evinK@users.noreply.github.com> Date: Fri, 21 Oct 2022 11:15:14 +0200 Subject: [PATCH] Don't overflow for blacklisted servers --- federationapi/queue/destinationqueue.go | 63 ++++++++++--------------- federationapi/queue/queue.go | 4 +- 2 files changed, 28 insertions(+), 39 deletions(-) diff --git a/federationapi/queue/destinationqueue.go b/federationapi/queue/destinationqueue.go index 256359ad0..1b7670e9a 100644 --- a/federationapi/queue/destinationqueue.go +++ b/federationapi/queue/destinationqueue.go @@ -76,28 +76,22 @@ func (oq *destinationQueue) sendEvent(event *gomatrixserverlib.HeaderedEvent, re return } - // Check if the destination is blacklisted. If it isn't then wake - // up the queue. - if !oq.statistics.Blacklisted() { - // If there's room in memory to hold the event then add it to the - // list. - oq.pendingMutex.Lock() - if len(oq.pendingPDUs) < maxPDUsInMemory { - oq.pendingPDUs = append(oq.pendingPDUs, &queuedPDU{ - pdu: event, - receipt: receipt, - }) - } else { - oq.overflowed.Store(true) - } - oq.pendingMutex.Unlock() - - if !oq.backingOff.Load() { - oq.wakeQueueAndNotify() - } + // If there's room in memory to hold the event then add it to the + // list. + oq.pendingMutex.Lock() + if len(oq.pendingPDUs) < maxPDUsInMemory { + oq.pendingPDUs = append(oq.pendingPDUs, &queuedPDU{ + pdu: event, + receipt: receipt, + }) } else { oq.overflowed.Store(true) } + oq.pendingMutex.Unlock() + + if !oq.backingOff.Load() { + oq.wakeQueueAndNotify() + } } // sendEDU adds the EDU event to the pending queue for the destination. @@ -108,28 +102,23 @@ func (oq *destinationQueue) sendEDU(event *gomatrixserverlib.EDU, receipt *share logrus.Errorf("attempt to send nil EDU with destination %q", oq.destination) return } - // Check if the destination is blacklisted. If it isn't then wake - // up the queue. - if !oq.statistics.Blacklisted() { - // If there's room in memory to hold the event then add it to the - // list. - oq.pendingMutex.Lock() - if len(oq.pendingEDUs) < maxEDUsInMemory { - oq.pendingEDUs = append(oq.pendingEDUs, &queuedEDU{ - edu: event, - receipt: receipt, - }) - } else { - oq.overflowed.Store(true) - } - oq.pendingMutex.Unlock() - if !oq.backingOff.Load() { - oq.wakeQueueAndNotify() - } + // If there's room in memory to hold the event then add it to the + // list. + oq.pendingMutex.Lock() + if len(oq.pendingEDUs) < maxEDUsInMemory { + oq.pendingEDUs = append(oq.pendingEDUs, &queuedEDU{ + edu: event, + receipt: receipt, + }) } else { oq.overflowed.Store(true) } + oq.pendingMutex.Unlock() + + if !oq.backingOff.Load() { + oq.wakeQueueAndNotify() + } } // handleBackoffNotifier is registered as the backoff notification diff --git a/federationapi/queue/queue.go b/federationapi/queue/queue.go index fb6d45a90..328334379 100644 --- a/federationapi/queue/queue.go +++ b/federationapi/queue/queue.go @@ -248,7 +248,7 @@ func (oqs *OutgoingQueues) SendEvent( } for destination := range destmap { - if queue := oqs.getQueue(destination); queue != nil { + if queue := oqs.getQueue(destination); queue != nil && !queue.statistics.Blacklisted() { queue.sendEvent(ev, nid) } else { delete(destmap, destination) @@ -336,7 +336,7 @@ func (oqs *OutgoingQueues) SendEDU( } for destination := range destmap { - if queue := oqs.getQueue(destination); queue != nil { + if queue := oqs.getQueue(destination); queue != nil && !queue.statistics.Blacklisted() { queue.sendEDU(e, nid) } else { delete(destmap, destination)