diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go index 7751326c3..de4d487ee 100644 --- a/federationsender/queue/destinationqueue.go +++ b/federationsender/queue/destinationqueue.go @@ -32,6 +32,7 @@ import ( ) const maxPDUsPerTransaction = 50 +const queueIdleTimeout = time.Second * 30 // destinationQueue is a queue of events for a single destination. // It is responsible for sending the events to the destination and @@ -52,7 +53,6 @@ type destinationQueue struct { transactionIDMutex sync.Mutex // protects transactionID transactionID gomatrixserverlib.TransactionID // last transaction ID transactionCount atomic.Int32 // how many events in this transaction so far - pendingPDUs atomic.Int64 // how many PDUs are waiting to be sent pendingEDUs []*gomatrixserverlib.EDU // owned by backgroundSend pendingInvites []*gomatrixserverlib.InviteV2Request // owned by backgroundSend notifyPDUs chan bool // interrupts idle wait for PDUs @@ -68,7 +68,6 @@ func (oq *destinationQueue) sendEvent(nid int64) { log.Infof("%s is blacklisted; dropping event", oq.destination) return } - oq.wakeQueueIfNeeded() // Create a transaction ID. We'll either do this if we don't have // one made up yet, or if we've exceeded the number of maximum // events allowed in a single tranaction. We'll reset the counter @@ -95,11 +94,13 @@ func (oq *destinationQueue) sendEvent(nid int64) { // We've successfully added a PDU to the transaction so increase // the counter. oq.transactionCount.Add(1) - // Signal that we've sent a new PDU. This will cause the queue to - // wake up if it's asleep. The return to the Add function will only - // be 1 if the previous value was 0, e.g. nothing was waiting before. - if oq.pendingPDUs.Add(1) == 1 { - oq.notifyPDUs <- true + // Wake up the queue if it's asleep. + oq.wakeQueueIfNeeded() + // If we're blocking on waiting PDUs then tell the queue that we + // have work to do. + select { + case oq.notifyPDUs <- true: + default: } } @@ -138,26 +139,33 @@ func (oq *destinationQueue) wakeQueueIfNeeded() { } // If we aren't running then wake up the queue. if !oq.running.Load() { - // Look up how many events are pending in this queue. We need - // to do this so that the queue thinks it has work to do. - count, err := oq.db.GetPendingPDUCount( - context.TODO(), - oq.destination, - ) - if err == nil { - oq.pendingPDUs.Store(count) - log.Printf("Destination queue %q has %d pending PDUs", oq.destination, count) - } else { - log.WithError(err).Errorf("Can't get pending PDU count for %q destination queue", oq.destination) - } - if count > 0 { - oq.notifyPDUs <- true - } - // Then start the queue. + // Start the queue. go oq.backgroundSend() } } +// waitForPDUs returns a channel for pending PDUs, which will be +// used in backgroundSend select. It returns a closed channel if +// there is something pending right now, or an open channel if +// we're waiting for something. +func (oq *destinationQueue) waitForPDUs() chan bool { + pendingPDUs, err := oq.db.GetPendingPDUCount(context.TODO(), oq.destination) + if err != nil { + log.WithError(err).Errorf("Failed to get pending PDU count on queue %q", oq.destination) + } + // If there are PDUs pending right now then we'll return a closed + // channel. This will mean that the backgroundSend will not block. + if pendingPDUs > 0 { + ch := make(chan bool, 1) + close(ch) + return ch + } + // If there are no PDUs pending right now then instead we'll return + // the notify channel, so that backgroundSend can pick up normal + // notifications from sendEvent. + return oq.notifyPDUs +} + // backgroundSend is the worker goroutine for sending events. // nolint:gocyclo func (oq *destinationQueue) backgroundSend() { @@ -169,12 +177,15 @@ func (oq *destinationQueue) backgroundSend() { defer oq.running.Store(false) for { + pendingPDUs := false + // If we have nothing to do then wait either for incoming events, or // until we hit an idle timeout. select { - case <-oq.notifyPDUs: + case <-oq.waitForPDUs(): // We were woken up because there are new PDUs waiting in the // database. + pendingPDUs = true case edu := <-oq.incomingEDUs: // EDUs are handled in-memory for now. We will try to keep // the ordering intact. @@ -204,10 +215,11 @@ func (oq *destinationQueue) backgroundSend() { for len(oq.incomingInvites) > 0 { oq.pendingInvites = append(oq.pendingInvites, <-oq.incomingInvites) } - case <-time.After(time.Second * 30): + case <-time.After(queueIdleTimeout): // The worker is idle so stop the goroutine. It'll get // restarted automatically the next time we have an event to // send. + log.Infof("Queue %q has been idle for %s, going to sleep", oq.destination, queueIdleTimeout) return } @@ -220,12 +232,13 @@ func (oq *destinationQueue) backgroundSend() { select { case <-time.After(duration): case <-oq.interruptBackoff: + log.Infof("Interrupting backoff for %q", oq.destination) } oq.backingOff.Store(false) } // If we have pending PDUs or EDUs then construct a transaction. - if oq.pendingPDUs.Load() > 0 || len(oq.pendingEDUs) > 0 { + if pendingPDUs || len(oq.pendingEDUs) > 0 { // Try sending the next transaction and see what happens. transaction, terr := oq.nextTransaction(oq.pendingEDUs) if terr != nil { @@ -236,6 +249,7 @@ func (oq *destinationQueue) backgroundSend() { // buffers at this point. The PDU clean-up is already on a defer. oq.cleanPendingEDUs() oq.cleanPendingInvites() + log.Infof("Blacklisting %q due to errors", oq.destination) return } else { // We haven't been told to give up terminally yet but we still have @@ -262,6 +276,7 @@ func (oq *destinationQueue) backgroundSend() { if giveUp := oq.statistics.Failure(); giveUp { // It's been suggested that we should give up because // the backoff has exceeded a maximum allowable value. + log.Infof("Blacklisting %q due to errors", oq.destination) return } } else if sent > 0 { @@ -273,17 +288,6 @@ func (oq *destinationQueue) backgroundSend() { oq.cleanPendingInvites() } } - - // If something else has come along while we were busy sending - // the previous transaction then we don't want the next loop - // iteration to sleep. Send a message if someone else hasn't - // already sent a wake-up. - if oq.pendingPDUs.Load() > 0 { - select { - case oq.notifyPDUs <- true: - default: - } - } } } @@ -350,16 +354,6 @@ func (oq *destinationQueue) nextTransaction( // pending EDUs then there's nothing to do - stop here. if len(pdus) == 0 && len(pendingEDUs) == 0 { log.Warnf("Expected PDUs/EDUs for destination %q but got none", oq.destination) - // This shouldn't really happen but since it has, let's check - // how many events are *really* in the database that are waiting. - if count, cerr := oq.db.GetPendingPDUCount( - context.TODO(), - oq.destination, - ); cerr == nil { - oq.pendingPDUs.Store(count) - } else { - log.Warnf("Failed to retrieve pending PDU count for %q", oq.destination) - } return false, nil } @@ -396,9 +390,6 @@ func (oq *destinationQueue) nextTransaction( _, err = oq.client.SendTransaction(ctx, t) switch err.(type) { case nil: - // No error was returned so the transaction looks to have - // been successfully sent. - oq.pendingPDUs.Sub(int64(len(t.PDUs))) // Clean up the transaction in the database. if err = oq.db.CleanTransactionPDUs( context.Background(), diff --git a/federationsender/queue/queue.go b/federationsender/queue/queue.go index 2288689ee..0858d813f 100644 --- a/federationsender/queue/queue.go +++ b/federationsender/queue/queue.go @@ -136,7 +136,7 @@ func (oqs *OutgoingQueues) SendEvent( } for _, destination := range destinations { - oqs.getQueue(destination).sendEvent(nid) + go oqs.getQueue(destination).sendEvent(nid) } return nil