From 880e1514397ff31db778c5139e6d14c58883e54a Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 7 Dec 2020 12:08:23 +0000 Subject: [PATCH] Bug-fixing --- federationsender/queue/destinationqueue.go | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go index a67dd06ae..84fd4bd2c 100644 --- a/federationsender/queue/destinationqueue.go +++ b/federationsender/queue/destinationqueue.go @@ -140,7 +140,6 @@ func (oq *destinationQueue) sendEDU(event *gomatrixserverlib.EDU, receipt *share if !oq.statistics.Blacklisted() { // Wake up the queue if it's asleep. oq.wakeQueueIfNeeded() - // Queue the EDU. select { case oq.notify <- struct{}{}: default: @@ -185,7 +184,7 @@ func (oq *destinationQueue) getPendingFromDatabase() { logrus.WithError(err).Errorf("Failed to get pending PDUs for %q", oq.destination) } } - if eduCapacity := maxPDUsInMemory - len(oq.pendingPDUs); eduCapacity > 0 { + if eduCapacity := maxEDUsInMemory - len(oq.pendingEDUs); eduCapacity > 0 { // We have room in memory for some EDUs - let's request no more than that. if edus, err := oq.db.GetPendingEDUs(ctx, oq.destination, eduCapacity); err == nil { for receipt, edu := range edus { @@ -201,8 +200,12 @@ func (oq *destinationQueue) getPendingFromDatabase() { if len(oq.pendingPDUs) < maxPDUsInMemory && len(oq.pendingEDUs) < maxEDUsInMemory { oq.overflowed.Store(false) } + // If we've retrieved some events then notify the destination queue goroutine. if retrieved { - oq.notify <- struct{}{} + select { + case oq.notify <- struct{}{}: + default: + } } } @@ -216,6 +219,10 @@ func (oq *destinationQueue) backgroundSend() { } defer oq.running.Store(false) + // Mark the queue as overflowed, so we will consult the database + // to see if there's anything new to send. + oq.overflowed.Store(true) + for { // If we are overflowing memory and have sent things out to the // database then we can look up what those things are. @@ -364,7 +371,7 @@ func (oq *destinationQueue) nextTransaction( continue } t.EDUs = append(t.EDUs, *edu.edu) - eduReceipts = append(pduReceipts, edu.receipt) + eduReceipts = append(eduReceipts, edu.receipt) } logrus.WithField("server_name", oq.destination).Debugf("Sending transaction %q containing %d PDUs, %d EDUs", t.TransactionID, len(t.PDUs), len(t.EDUs))