From 7faef153394a843934043b2e0dbb3efcdcc06056 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Fri, 4 Dec 2020 17:37:48 +0000 Subject: [PATCH] Comments --- federationsender/queue/destinationqueue.go | 11 +++++++++-- federationsender/queue/queue.go | 3 +-- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go index e20500315..d4e28a9c4 100644 --- a/federationsender/queue/destinationqueue.go +++ b/federationsender/queue/destinationqueue.go @@ -60,8 +60,8 @@ type destinationQueue struct { transactionCount atomic.Int32 // how many events in this transaction so far notifyPDUs chan *queuedPDU // interrupts idle wait for PDUs notifyEDUs chan *queuedEDU // interrupts idle wait for EDUs - pendingPDUs []*queuedPDU // owned by backgroundSender goroutine - pendingEDUs []*queuedEDU // owned by backgroundSender goroutine + pendingPDUs []*queuedPDU // owned by backgroundSender goroutine once started + pendingEDUs []*queuedEDU // owned by backgroundSender goroutine once started interruptBackoff chan bool // interrupts backoff } @@ -171,6 +171,7 @@ func (oq *destinationQueue) getPendingFromDatabase() { // in the database. ctx := context.Background() if pduCapacity := maxPDUsInMemory - len(oq.pendingPDUs); pduCapacity > 0 { + // We have room in memory for some PDUs - let's request no more than that. if pdus, err := oq.db.GetPendingPDUs(ctx, oq.destination, pduCapacity); err == nil { for receipt, pdu := range pdus { oq.pendingPDUs = append(oq.pendingPDUs, &queuedPDU{receipt, pdu}) @@ -180,6 +181,7 @@ func (oq *destinationQueue) getPendingFromDatabase() { } } if eduCapacity := maxPDUsInMemory - len(oq.pendingPDUs); eduCapacity > 0 { + // We have room in memory for some EDUs - let's request no more than that. if edus, err := oq.db.GetPendingEDUs(ctx, oq.destination, eduCapacity); err == nil { for receipt, edu := range edus { oq.pendingEDUs = append(oq.pendingEDUs, &queuedEDU{receipt, edu}) @@ -188,6 +190,11 @@ func (oq *destinationQueue) getPendingFromDatabase() { logrus.WithError(err).Errorf("Failed to get pending EDUs for %q", oq.destination) } } + // If we've retrieved all of the events from the database with room to spare + // in memory then we'll no longer consider this queue to be overflowed. + if len(oq.pendingPDUs) < maxPDUsInMemory && len(oq.pendingEDUs) < maxEDUsInMemory { + oq.overflowed.Store(false) + } } // backgroundSend is the worker goroutine for sending events. diff --git a/federationsender/queue/queue.go b/federationsender/queue/queue.go index 290b5a4bb..96c173687 100644 --- a/federationsender/queue/queue.go +++ b/federationsender/queue/queue.go @@ -84,8 +84,7 @@ func NewOutgoingQueues( log.WithError(err).Error("Failed to get EDU server names for destination queue hydration") } for serverName := range serverNames { - queue := queues.getQueue(serverName) - if !queue.statistics.Blacklisted() { + if queue := queues.getQueue(serverName); !queue.statistics.Blacklisted() { queue.wakeQueueIfNeeded() } }