From 7246580d54f00358f6e2d78c35a495c1c0ac98db Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 7 Dec 2020 10:28:34 +0000 Subject: [PATCH] Update comments --- federationsender/queue/destinationqueue.go | 25 +++++++++++----------- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go index c1ec40ba9..ae1a3e9ee 100644 --- a/federationsender/queue/destinationqueue.go +++ b/federationsender/queue/destinationqueue.go @@ -53,15 +53,15 @@ type destinationQueue struct { destination gomatrixserverlib.ServerName // destination of requests running atomic.Bool // is the queue worker running? backingOff atomic.Bool // true if we're backing off - overflowed atomic.Bool // exceeded in-memory space? + overflowed atomic.Bool // the queues exceed maxPDUsInMemory/maxEDUsInMemory, so we should consult the database for more statistics *statistics.ServerStatistics // statistics about this remote server transactionIDMutex sync.Mutex // protects transactionID transactionID gomatrixserverlib.TransactionID // last transaction ID - notifyPDUs chan *queuedPDU // interrupts idle wait for PDUs - notifyEDUs chan *queuedEDU // interrupts idle wait for EDUs - notifyOverflow chan struct{} // interrupts idle wait for overflowed PDUs/EDUs - pendingPDUs []*queuedPDU // owned by backgroundSender goroutine once started - pendingEDUs []*queuedEDU // owned by backgroundSender goroutine once started + notifyPDUs chan *queuedPDU // interrupts idle wait for PDUs that have just been queued + notifyEDUs chan *queuedEDU // interrupts idle wait for EDUs that have just been queued + notifyOverflow chan struct{} // interrupts idle wait for overflowed PDUs/EDUs from the database + pendingPDUs []*queuedPDU // PDUs waiting to be sent, owned by backgroundSender goroutine once started + pendingEDUs []*queuedEDU // EDUs waiting to be sent, owned by backgroundSender goroutine once started interruptBackoff chan bool // interrupts backoff } @@ -78,7 +78,7 @@ func (oq *destinationQueue) sendEvent(event *gomatrixserverlib.HeaderedEvent, re // later. if err := oq.db.AssociatePDUWithDestination( context.TODO(), - "", // the current transaction ID, TODO: do something about this + "", // TODO: remove this, as we don't need to persist the transaction ID oq.destination, // the destination server name receipt, // NIDs from federationsender_queue_json table ); err != nil { @@ -90,8 +90,7 @@ func (oq *destinationQueue) sendEvent(event *gomatrixserverlib.HeaderedEvent, re if !oq.statistics.Blacklisted() { // Wake up the queue if it's asleep. oq.wakeQueueIfNeeded() - // If we're blocking on waiting PDUs then tell the queue that we - // have work to do. + // Queue the PDU. select { case oq.notifyPDUs <- &queuedPDU{ receipt: receipt, @@ -126,8 +125,7 @@ func (oq *destinationQueue) sendEDU(event *gomatrixserverlib.EDU, receipt *share if !oq.statistics.Blacklisted() { // Wake up the queue if it's asleep. oq.wakeQueueIfNeeded() - // If we're blocking on waiting EDUs then tell the queue that we - // have work to do. + // Queue the EDU. select { case oq.notifyEDUs <- &queuedEDU{ receipt: receipt, @@ -156,7 +154,10 @@ func (oq *destinationQueue) wakeQueueIfNeeded() { // getPendingFromDatabase will look at the database and see if // there are any persisted events that haven't been sent to this -// destination yet. If so, they will be queued up. +// destination yet. If so, they will be queued up. This function +// MUST be called from backgroundSend() goroutine ONLY because +// it modifies oq.pendingPDUs/oq.pendingEDUs and they aren't +// mutexed. func (oq *destinationQueue) getPendingFromDatabase() { // Check to see if there's anything to do for this server // in the database.