diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go index 9f3be8875..5f1c4ddec 100644 --- a/federationsender/queue/destinationqueue.go +++ b/federationsender/queue/destinationqueue.go @@ -51,7 +51,7 @@ type destinationQueue struct { incomingEDUs chan *gomatrixserverlib.EDU // EDUs to send transactionID gomatrixserverlib.TransactionID // last transaction ID transactionCount int // how many events in this transaction so far - pendingPDUs atomic.Bool // there are PDUs waiting to be sent + pendingPDUs atomic.Int32 // how many PDUs are waiting to be sent pendingEDUs []*gomatrixserverlib.EDU // owned by backgroundSend pendingInvites []*gomatrixserverlib.InviteV2Request // owned by backgroundSend retryServerCh chan bool // interrupts backoff @@ -119,9 +119,8 @@ func (oq *destinationQueue) sendEvent(nid int64) { } // Signal that we've sent a new PDU. This will cause the queue to // wake up if it's asleep. - if oq.pendingPDUs.CAS(false, true) { - oq.incomingPDUs <- struct{}{} - } + oq.pendingPDUs.Add(1) + oq.incomingPDUs <- struct{}{} } // sendEDU adds the EDU event to the pending queue for the destination. @@ -224,7 +223,7 @@ func (oq *destinationQueue) backgroundSend() { } // If we have pending PDUs or EDUs then construct a transaction. - if oq.pendingPDUs.Load() || len(oq.pendingEDUs) > 0 { + if oq.pendingPDUs.Load() > 0 || len(oq.pendingEDUs) > 0 { // If we haven't got a transaction ID then we should generate // one. Ideally we'd know this already because something queued // in the database would give us one, but if we're dealing with @@ -250,7 +249,6 @@ func (oq *destinationQueue) backgroundSend() { // If we successfully sent the transaction then clear out // the pending events and EDUs, and wipe our transaction ID. oq.statistics.Success() - oq.pendingPDUs.Store(false) oq.transactionID = "" // Clean up the in-memory buffers. oq.cleanPendingEDUs() @@ -357,8 +355,9 @@ func (oq *destinationQueue) nextTransaction( switch e := err.(type) { case nil: // No error was returned so the transaction looks to have - // been successfully sent. Clean up the transaction in the - // database. + // been successfully sent. + oq.pendingPDUs.Sub(int32(len(t.PDUs))) + // Clean up the transaction in the database. if err = oq.db.CleanTransactionPDUs( context.TODO(), t.Destination,