diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go index fc37165fd..4ab610de7 100644 --- a/federationsender/queue/destinationqueue.go +++ b/federationsender/queue/destinationqueue.go @@ -39,6 +39,7 @@ type destinationQueue struct { origin gomatrixserverlib.ServerName // origin of requests destination gomatrixserverlib.ServerName // destination of requests running atomic.Bool // is the queue worker running? + backingOff atomic.Bool // true if we're backing off statistics *types.ServerStatistics // statistics about this remote server incomingPDUs chan *gomatrixserverlib.HeaderedEvent // PDUs to send incomingEDUs chan *gomatrixserverlib.EDU // EDUs to send @@ -59,7 +60,12 @@ func (oq *destinationQueue) retry() { // in-memory to maybe-send it one day. Ideally we would just shove these pending events in a database // so we can send a lot of events. oq.statistics.Success() - oq.retryServerCh <- true + // if we were backing off, swap to not backing off and interrupt the select. + // We need to use an atomic bool here to prevent multiple calls to retry() blocking on the channel + // as it is unbuffered. + if oq.backingOff.CAS(true, false) { + oq.retryServerCh <- true + } if !oq.running.Load() { log.Infof("Restarting queue for %s", oq.destination) go oq.backgroundSend() @@ -175,10 +181,12 @@ func (oq *destinationQueue) backgroundSend() { // backoff duration to complete first, or until explicitly // told to retry. if backoff, duration := oq.statistics.BackoffDuration(); backoff { + oq.backingOff.Store(true) select { case <-time.After(duration): case <-oq.retryServerCh: } + oq.backingOff.Store(false) } // How many things do we have waiting?