Clean up periodically, we hit a race condition otherwise

This commit is contained in:
Neil Alexander 2021-02-17 14:44:02 +00:00
parent 8cbba695e1
commit cbf5f6ee89
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
2 changed files with 21 additions and 1 deletions

View file

@ -159,6 +159,10 @@ func (oq *destinationQueue) sendEDU(event *gomatrixserverlib.EDU, receipt *share
// then we will interrupt the backoff, causing any federation
// requests to retry.
func (oq *destinationQueue) wakeQueueIfNeeded() {
// If the destination is blacklisted then do nothing.
if _, blacklisted := oq.statistics.BackoffInfo(); blacklisted {
return
}
// If we are backing off then interrupt the backoff.
if oq.backingOff.CAS(true, false) {
oq.interruptBackoff <- true

View file

@ -101,6 +101,7 @@ func NewOutgoingQueues(
signing: signing,
queues: map[gomatrixserverlib.ServerName]*destinationQueue{},
}
queues.clearQueues()
// Look up which servers we have pending items for and then rehydrate those queues.
if !disabled {
time.AfterFunc(time.Second*5, func() {
@ -167,14 +168,29 @@ func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *d
signing: oqs.signing,
}
oqs.queues[destination] = oq
oq.wakeQueueIfNeeded()
}
return oq
}
func (oqs *OutgoingQueues) clearQueues() {
oqs.queuesMutex.Lock()
queues := oqs.queues
oqs.queuesMutex.Unlock()
for _, q := range queues {
oqs.clearQueue(q)
}
time.AfterFunc(time.Minute, oqs.clearQueues)
}
func (oqs *OutgoingQueues) clearQueue(oq *destinationQueue) {
oqs.queuesMutex.Lock()
defer oqs.queuesMutex.Unlock()
switch {
case oq.running.Load():
return
case oq.backingOff.Load():
return
}
close(oq.notify)
close(oq.interruptBackoff)