diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go index accd39f4d..1d159b9c2 100644 --- a/federationsender/queue/destinationqueue.go +++ b/federationsender/queue/destinationqueue.go @@ -225,7 +225,7 @@ func (oq *destinationQueue) backgroundSend() { } // If we have pending PDUs or EDUs then construct a transaction. - for oq.pendingPDUs.Load() > 0 || len(oq.pendingEDUs) > 0 { + if oq.pendingPDUs.Load() > 0 || len(oq.pendingEDUs) > 0 { // Try sending the next transaction and see what happens. transaction, terr := oq.nextTransaction(oq.pendingEDUs) if terr != nil { @@ -275,6 +275,17 @@ func (oq *destinationQueue) backgroundSend() { oq.cleanPendingInvites() } } + + // If something else has come along since we sent the previous + // transactions then we want the next loop iteration to skip the + // wait and not go to sleep. In which case, if there isn't a + // wake-up message already, send one. + if oq.pendingPDUs.Load() > 0 { + select { + case oq.notifyPDUs <- true: + default: + } + } } } diff --git a/federationsender/queue/queue.go b/federationsender/queue/queue.go index 6c7fca54e..2288689ee 100644 --- a/federationsender/queue/queue.go +++ b/federationsender/queue/queue.go @@ -93,7 +93,7 @@ func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *d statistics: oqs.statistics.ForServer(destination), incomingEDUs: make(chan *gomatrixserverlib.EDU, 128), incomingInvites: make(chan *gomatrixserverlib.InviteV2Request, 128), - notifyPDUs: make(chan bool, 128), + notifyPDUs: make(chan bool, 1), interruptBackoff: make(chan bool), signing: oqs.signing, }