diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go index 0d62b4361..9f3be8875 100644 --- a/federationsender/queue/destinationqueue.go +++ b/federationsender/queue/destinationqueue.go @@ -51,6 +51,7 @@ type destinationQueue struct { incomingEDUs chan *gomatrixserverlib.EDU // EDUs to send transactionID gomatrixserverlib.TransactionID // last transaction ID transactionCount int // how many events in this transaction so far + pendingPDUs atomic.Bool // there are PDUs waiting to be sent pendingEDUs []*gomatrixserverlib.EDU // owned by backgroundSend pendingInvites []*gomatrixserverlib.InviteV2Request // owned by backgroundSend retryServerCh chan bool // interrupts backoff @@ -118,7 +119,9 @@ func (oq *destinationQueue) sendEvent(nid int64) { } // Signal that we've sent a new PDU. This will cause the queue to // wake up if it's asleep. - oq.incomingPDUs <- struct{}{} + if oq.pendingPDUs.CAS(false, true) { + oq.incomingPDUs <- struct{}{} + } } // sendEDU adds the EDU event to the pending queue for the destination. @@ -160,8 +163,6 @@ func (oq *destinationQueue) backgroundSend() { defer oq.running.Store(false) for { - pendingPDUs := false - // For now we don't know the next transaction ID. Set it to an // empty one. The next step will populate it if we have pending // PDUs in the database. Otherwise we'll generate one later on, @@ -172,8 +173,8 @@ func (oq *destinationQueue) backgroundSend() { // idle timeout. select { case <-oq.incomingPDUs: - // There are new PDUs waiting in the database. - pendingPDUs = true + // We were woken up because there are new PDUs waiting in the + // database. case edu := <-oq.incomingEDUs: // EDUs are handled in-memory for now. We will try to keep // the ordering intact. @@ -223,7 +224,7 @@ func (oq *destinationQueue) backgroundSend() { } // If we have pending PDUs or EDUs then construct a transaction. - if pendingPDUs || len(oq.pendingEDUs) > 0 { + if oq.pendingPDUs.Load() || len(oq.pendingEDUs) > 0 { // If we haven't got a transaction ID then we should generate // one. Ideally we'd know this already because something queued // in the database would give us one, but if we're dealing with @@ -249,6 +250,7 @@ func (oq *destinationQueue) backgroundSend() { // If we successfully sent the transaction then clear out // the pending events and EDUs, and wipe our transaction ID. oq.statistics.Success() + oq.pendingPDUs.Store(false) oq.transactionID = "" // Clean up the in-memory buffers. oq.cleanPendingEDUs()