From c6c5c553dd19d41e8dfd81002bcf94bfd6c5d389 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 17 Feb 2021 13:17:58 +0000 Subject: [PATCH] Don't hold destination queues in memory forever --- federationsender/queue/destinationqueue.go | 2 ++ federationsender/queue/queue.go | 17 +++++++++++++++++ 2 files changed, 19 insertions(+) diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go index d9567eeba..96d7783e0 100644 --- a/federationsender/queue/destinationqueue.go +++ b/federationsender/queue/destinationqueue.go @@ -46,6 +46,7 @@ const ( // ensures that only one request is in flight to a given destination // at a time. type destinationQueue struct { + queues *OutgoingQueues db storage.Database process *process.ProcessContext signing *SigningInfo @@ -270,6 +271,7 @@ func (oq *destinationQueue) backgroundSend() { // The worker is idle so stop the goroutine. It'll get // restarted automatically the next time we have an event to // send. + oq.queues.clearQueue(oq.destination) return } diff --git a/federationsender/queue/queue.go b/federationsender/queue/queue.go index 4453ddb01..d91b36bba 100644 --- a/federationsender/queue/queue.go +++ b/federationsender/queue/queue.go @@ -154,6 +154,7 @@ func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *d if oq == nil { destinationQueueTotal.Inc() oq = &destinationQueue{ + queues: oqs, db: oqs.db, process: oqs.process, rsAPI: oqs.rsAPI, @@ -170,6 +171,22 @@ func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *d return oq } +func (oqs *OutgoingQueues) clearQueue(destination gomatrixserverlib.ServerName) { + oqs.queuesMutex.Lock() + defer oqs.queuesMutex.Unlock() + oq, ok := oqs.queues[destination] + switch { + case !ok: + return + case oq.running.Load(): + return + case oq.backingOff.Load(): + return + } + delete(oqs.queues, destination) + destinationQueueTotal.Dec() +} + type ErrorFederationDisabled struct { Message string }