Don't hold destination queues in memory forever

This commit is contained in:
Neil Alexander 2021-02-17 13:17:58 +00:00
parent 5d74a1757f
commit c6c5c553dd
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
2 changed files with 19 additions and 0 deletions

View file

@ -46,6 +46,7 @@ const (
// ensures that only one request is in flight to a given destination // ensures that only one request is in flight to a given destination
// at a time. // at a time.
type destinationQueue struct { type destinationQueue struct {
queues *OutgoingQueues
db storage.Database db storage.Database
process *process.ProcessContext process *process.ProcessContext
signing *SigningInfo signing *SigningInfo
@ -270,6 +271,7 @@ func (oq *destinationQueue) backgroundSend() {
// The worker is idle so stop the goroutine. It'll get // The worker is idle so stop the goroutine. It'll get
// restarted automatically the next time we have an event to // restarted automatically the next time we have an event to
// send. // send.
oq.queues.clearQueue(oq.destination)
return return
} }

View file

@ -154,6 +154,7 @@ func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *d
if oq == nil { if oq == nil {
destinationQueueTotal.Inc() destinationQueueTotal.Inc()
oq = &destinationQueue{ oq = &destinationQueue{
queues: oqs,
db: oqs.db, db: oqs.db,
process: oqs.process, process: oqs.process,
rsAPI: oqs.rsAPI, rsAPI: oqs.rsAPI,
@ -170,6 +171,22 @@ func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *d
return oq return oq
} }
func (oqs *OutgoingQueues) clearQueue(destination gomatrixserverlib.ServerName) {
oqs.queuesMutex.Lock()
defer oqs.queuesMutex.Unlock()
oq, ok := oqs.queues[destination]
switch {
case !ok:
return
case oq.running.Load():
return
case oq.backingOff.Load():
return
}
delete(oqs.queues, destination)
destinationQueueTotal.Dec()
}
type ErrorFederationDisabled struct { type ErrorFederationDisabled struct {
Message string Message string
} }