From f8825e0c1202b60abb9f1182043bbddf3c154334 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Fri, 4 Dec 2020 17:18:23 +0000 Subject: [PATCH] Try to get overflowed events from database --- federationsender/queue/destinationqueue.go | 131 +++++++++++++-------- federationsender/queue/queue.go | 10 +- 2 files changed, 86 insertions(+), 55 deletions(-) diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go index f3d41515c..9293d7802 100644 --- a/federationsender/queue/destinationqueue.go +++ b/federationsender/queue/destinationqueue.go @@ -35,6 +35,8 @@ import ( const ( maxPDUsPerTransaction = 50 maxEDUsPerTransaction = 50 + maxPDUsInMemory = 128 + maxEDUsInMemory = 128 queueIdleTimeout = time.Second * 30 ) @@ -51,12 +53,15 @@ type destinationQueue struct { destination gomatrixserverlib.ServerName // destination of requests running atomic.Bool // is the queue worker running? backingOff atomic.Bool // true if we're backing off + overflowed atomic.Bool // exceeded in-memory space? statistics *statistics.ServerStatistics // statistics about this remote server transactionIDMutex sync.Mutex // protects transactionID transactionID gomatrixserverlib.TransactionID // last transaction ID transactionCount atomic.Int32 // how many events in this transaction so far notifyPDUs chan *queuedPDU // interrupts idle wait for PDUs notifyEDUs chan *queuedEDU // interrupts idle wait for EDUs + pendingPDUs []*queuedPDU // owned by backgroundSender goroutine + pendingEDUs []*queuedEDU // owned by backgroundSender goroutine interruptBackoff chan bool // interrupts backoff } @@ -155,32 +160,35 @@ func (oq *destinationQueue) wakeQueueIfNeeded() { if !oq.running.Load() { // Start the queue. go oq.backgroundSend() + } +} - // Check to see if there's anything to do for this server - // in the database. - ctx := context.Background() - go func(ctx context.Context) { - if pdus, err := oq.db.GetPendingPDUs(ctx, oq.destination, maxPDUsPerTransaction); err == nil { - for receipt, pdu := range pdus { - select { - case oq.notifyPDUs <- &queuedPDU{receipt, pdu}: - default: - return - } - } +// getPendingFromDatabase will look at the database and see if +// there are any persisted events that haven't been sent to this +// destination yet. If so, they will be queued up. +func (oq *destinationQueue) getPendingFromDatabase() { + // Check to see if there's anything to do for this server + // in the database. + ctx := context.Background() + if pduCapacity := maxPDUsInMemory - len(oq.pendingPDUs); pduCapacity > 0 { + logrus.Infof("Retrieving up to %d pending PDUs from the database for %q", pduCapacity, oq.destination) + if pdus, err := oq.db.GetPendingPDUs(ctx, oq.destination, pduCapacity); err == nil { + for receipt, pdu := range pdus { + oq.pendingPDUs = append(oq.pendingPDUs, &queuedPDU{receipt, pdu}) } - }(ctx) - go func(ctx context.Context) { - if edus, err := oq.db.GetPendingEDUs(ctx, oq.destination, maxEDUsPerTransaction); err == nil { - for receipt, edu := range edus { - select { - case oq.notifyEDUs <- &queuedEDU{receipt, edu}: - default: - return - } - } + } else { + logrus.WithError(err).Errorf("Failed to get pending PDUs for %q", oq.destination) + } + } + if eduCapacity := maxPDUsInMemory - len(oq.pendingPDUs); eduCapacity > 0 { + logrus.Infof("Retrieving up to %d pending EDUs from the database for %q", eduCapacity, oq.destination) + if edus, err := oq.db.GetPendingEDUs(ctx, oq.destination, eduCapacity); err == nil { + for receipt, edu := range edus { + oq.pendingEDUs = append(oq.pendingEDUs, &queuedEDU{receipt, edu}) } - }(ctx) + } else { + logrus.WithError(err).Errorf("Failed to get pending EDUs for %q", oq.destination) + } } } @@ -194,22 +202,30 @@ func (oq *destinationQueue) backgroundSend() { } defer oq.running.Store(false) - pendingPDUs := []*queuedPDU{} - pendingEDUs := []*queuedEDU{} - for { + // If we are overflowing memory and have sent things out to the + // database then we can look up what those things are. + if oq.overflowed.Load() { + oq.getPendingFromDatabase() + } + // If we have nothing to do then wait either for incoming events, or // until we hit an idle timeout. + awaitSelect: select { case pdu := <-oq.notifyPDUs: // We were woken up because there are new PDUs waiting in the // database. - pendingPDUs = append(pendingPDUs, pdu) + if len(oq.pendingPDUs) > maxPDUsInMemory { + oq.overflowed.Store(true) + break awaitSelect + } + oq.pendingPDUs = append(oq.pendingPDUs, pdu) pendingPDULoop: - for i := 1; i < maxPDUsPerTransaction; i++ { + for i := 1; i < maxPDUsInMemory-len(oq.pendingPDUs); i++ { select { - case edu := <-oq.notifyEDUs: - pendingEDUs = append(pendingEDUs, edu) + case pdu := <-oq.notifyPDUs: + oq.pendingPDUs = append(oq.pendingPDUs, pdu) default: break pendingPDULoop } @@ -218,12 +234,16 @@ func (oq *destinationQueue) backgroundSend() { case edu := <-oq.notifyEDUs: // We were woken up because there are new PDUs waiting in the // database. - pendingEDUs = append(pendingEDUs, edu) + if len(oq.pendingEDUs) > maxEDUsInMemory { + oq.overflowed.Store(true) + break awaitSelect + } + oq.pendingEDUs = append(oq.pendingEDUs, edu) pendingEDULoop: - for i := 1; i < maxEDUsPerTransaction; i++ { + for i := 1; i < maxEDUsInMemory-len(oq.pendingEDUs); i++ { select { case edu := <-oq.notifyEDUs: - pendingEDUs = append(pendingEDUs, edu) + oq.pendingEDUs = append(oq.pendingEDUs, edu) default: break pendingEDULoop } @@ -246,14 +266,14 @@ func (oq *destinationQueue) backgroundSend() { // has exceeded a maximum allowable value. Clean up the in-memory // buffers at this point. The PDU clean-up is already on a defer. log.Warnf("Blacklisting %q due to exceeding backoff threshold", oq.destination) - for i := range pendingPDUs { - pendingPDUs[i] = nil + for i := range oq.pendingPDUs { + oq.pendingPDUs[i] = nil } - for i := range pendingEDUs { - pendingEDUs[i] = nil + for i := range oq.pendingEDUs { + oq.pendingEDUs[i] = nil } - pendingPDUs = nil - pendingEDUs = nil + oq.pendingPDUs = nil + oq.pendingEDUs = nil return } if until != nil && until.After(time.Now()) { @@ -267,9 +287,18 @@ func (oq *destinationQueue) backgroundSend() { } } + pduCount := len(oq.pendingPDUs) + eduCount := len(oq.pendingEDUs) + if pduCount > maxPDUsPerTransaction { + pduCount = maxPDUsPerTransaction + } + if eduCount > maxEDUsPerTransaction { + eduCount = maxEDUsPerTransaction + } + // If we have pending PDUs or EDUs then construct a transaction. // Try sending the next transaction and see what happens. - transaction, terr := oq.nextTransaction(pendingPDUs, pendingEDUs) + transaction, pc, ec, terr := oq.nextTransaction(oq.pendingPDUs[:pduCount], oq.pendingEDUs[:eduCount]) if terr != nil { // We failed to send the transaction. Mark it as a failure. oq.statistics.Failure() @@ -277,14 +306,14 @@ func (oq *destinationQueue) backgroundSend() { // If we successfully sent the transaction then clear out // the pending events and EDUs, and wipe our transaction ID. oq.statistics.Success() - for i := range pendingPDUs { - pendingPDUs[i] = nil + for i := range oq.pendingPDUs { + oq.pendingPDUs[i] = nil } - for i := range pendingEDUs { - pendingEDUs[i] = nil + for i := range oq.pendingEDUs { + oq.pendingEDUs[i] = nil } - pendingPDUs = pendingPDUs[:0] - pendingEDUs = pendingEDUs[:0] + oq.pendingPDUs = oq.pendingPDUs[pc:] + oq.pendingEDUs = oq.pendingEDUs[ec:] } } } @@ -296,7 +325,7 @@ func (oq *destinationQueue) backgroundSend() { func (oq *destinationQueue) nextTransaction( pdus []*queuedPDU, edus []*queuedEDU, -) (bool, error) { +) (bool, int, int, error) { // Before we do anything, we need to roll over the transaction // ID that is being used to coalesce events into the next TX. // Otherwise it's possible that we'll pick up an incomplete @@ -319,7 +348,7 @@ func (oq *destinationQueue) nextTransaction( // If we didn't get anything from the database and there are no // pending EDUs then there's nothing to do - stop here. if len(pdus) == 0 && len(edus) == 0 { - return false, nil + return false, 0, 0, nil } // Pick out the transaction ID from the database. If we didn't @@ -372,16 +401,16 @@ func (oq *destinationQueue) nextTransaction( log.WithError(err).Errorf("failed to clean EDUs for server %q", t.Destination) } } - return true, nil + return true, len(t.PDUs), len(t.EDUs), nil case gomatrix.HTTPError: // Report that we failed to send the transaction and we // will retry again, subject to backoff. - return false, err + return false, 0, 0, err default: log.WithFields(log.Fields{ "destination": oq.destination, log.ErrorKey: err, }).Info("problem sending transaction") - return false, err + return false, 0, 0, err } } diff --git a/federationsender/queue/queue.go b/federationsender/queue/queue.go index 5ef7198a8..290b5a4bb 100644 --- a/federationsender/queue/queue.go +++ b/federationsender/queue/queue.go @@ -84,8 +84,9 @@ func NewOutgoingQueues( log.WithError(err).Error("Failed to get EDU server names for destination queue hydration") } for serverName := range serverNames { - if !queues.getQueue(serverName).statistics.Blacklisted() { - queues.getQueue(serverName).wakeQueueIfNeeded() + queue := queues.getQueue(serverName) + if !queue.statistics.Blacklisted() { + queue.wakeQueueIfNeeded() } } }) @@ -123,12 +124,13 @@ func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *d destination: destination, client: oqs.client, statistics: oqs.statistics.ForServer(destination), - notifyPDUs: make(chan *queuedPDU, 128), - notifyEDUs: make(chan *queuedEDU, 128), + notifyPDUs: make(chan *queuedPDU, 16), + notifyEDUs: make(chan *queuedEDU, 16), interruptBackoff: make(chan bool), signing: oqs.signing, } oqs.queues[destination] = oq + oq.getPendingFromDatabase() } return oq }