diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go index e2314ebbe..70e50d2e8 100644 --- a/federationsender/queue/destinationqueue.go +++ b/federationsender/queue/destinationqueue.go @@ -55,32 +55,8 @@ type destinationQueue struct { pendingPDUs atomic.Int64 // how many PDUs are waiting to be sent pendingEDUs []*gomatrixserverlib.EDU // owned by backgroundSend pendingInvites []*gomatrixserverlib.InviteV2Request // owned by backgroundSend - wakeServerCh chan bool // interrupts idle wait - retryServerCh chan bool // interrupts backoff -} - -// retry will clear the blacklist state and attempt to send built up events to the server, -// resetting and interrupting any backoff timers. -func (oq *destinationQueue) retry() { - // TODO: We don't send all events in the case where the server has been blacklisted as we - // drop events instead then. This means we will send the oldest N events (chan size, currently 128) - // and then skip ahead a lot which feels non-ideal but equally we can't persist thousands of events - // in-memory to maybe-send it one day. Ideally we would just shove these pending events in a database - // so we can send a lot of events. - // - // Interrupt the backoff. If the federation request that happens as a result of this is successful - // then the counters will be reset there and the backoff will cancel. If the federation request - // fails then we will retry at the current backoff interval, so as to prevent us from spamming - // homeservers which are behaving badly. - // We need to use an atomic bool here to prevent multiple calls to retry() blocking on the channel - // as it is unbuffered. - if oq.backingOff.CAS(true, false) { - oq.retryServerCh <- true - } - if !oq.running.Load() { - log.Infof("Restarting queue for %s", oq.destination) - go oq.backgroundSend() - } + notifyPDUs chan bool // interrupts idle wait for PDUs + interruptBackoff chan bool // interrupts backoff } // Send event adds the event to the pending queue for the destination. @@ -122,7 +98,7 @@ func (oq *destinationQueue) sendEvent(nid int64) { // wake up if it's asleep. The return to the Add function will only // be 1 if the previous value was 0, e.g. nothing was waiting before. if oq.pendingPDUs.Add(1) == 1 { - oq.wakeServerCh <- true + oq.notifyPDUs <- true } } @@ -150,7 +126,16 @@ func (oq *destinationQueue) sendInvite(ev *gomatrixserverlib.InviteV2Request) { oq.incomingInvites <- ev } +// wakeQueueIfNeeded will wake up the destination queue if it is +// not already running. If it is running but it is backing off +// then we will interrupt the backoff, causing any federation +// requests to retry. func (oq *destinationQueue) wakeQueueIfNeeded() { + // If we are backing off then interrupt the backoff. + if oq.backingOff.CAS(true, false) { + oq.interruptBackoff <- true + } + // If we aren't running then wake up the queue. if !oq.running.Load() { // Look up how many events are pending in this queue. We need // to do this so that the queue thinks it has work to do. @@ -165,7 +150,7 @@ func (oq *destinationQueue) wakeQueueIfNeeded() { log.WithError(err).Errorf("Can't get pending PDU count for %q destination queue", oq.destination) } if count > 0 { - oq.wakeServerCh <- true + oq.notifyPDUs <- true } // Then start the queue. go oq.backgroundSend() @@ -186,7 +171,7 @@ func (oq *destinationQueue) backgroundSend() { // If we have nothing to do then wait either for incoming events, or // until we hit an idle timeout. select { - case <-oq.wakeServerCh: + case <-oq.notifyPDUs: // We were woken up because there are new PDUs waiting in the // database. case edu := <-oq.incomingEDUs: @@ -232,7 +217,7 @@ func (oq *destinationQueue) backgroundSend() { oq.backingOff.Store(true) select { case <-time.After(duration): - case <-oq.retryServerCh: + case <-oq.interruptBackoff: } oq.backingOff.Store(false) } @@ -255,7 +240,7 @@ func (oq *destinationQueue) backgroundSend() { // PDUs waiting to be sent. By sending a message into the wake chan, // the next loop iteration will try processing these PDUs again, // subject to the backoff. - oq.wakeServerCh <- true + oq.notifyPDUs <- true } } else if transaction { // If we successfully sent the transaction then clear out diff --git a/federationsender/queue/queue.go b/federationsender/queue/queue.go index bc7ec0f93..6c7fca54e 100644 --- a/federationsender/queue/queue.go +++ b/federationsender/queue/queue.go @@ -79,29 +79,23 @@ type SigningInfo struct { PrivateKey ed25519.PrivateKey } -func (oqs *OutgoingQueues) getQueueIfExists(destination gomatrixserverlib.ServerName) *destinationQueue { - oqs.queuesMutex.Lock() - defer oqs.queuesMutex.Unlock() - return oqs.queues[destination] -} - func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *destinationQueue { oqs.queuesMutex.Lock() defer oqs.queuesMutex.Unlock() oq := oqs.queues[destination] if oq == nil { oq = &destinationQueue{ - db: oqs.db, - rsAPI: oqs.rsAPI, - origin: oqs.origin, - destination: destination, - client: oqs.client, - statistics: oqs.statistics.ForServer(destination), - incomingEDUs: make(chan *gomatrixserverlib.EDU, 128), - incomingInvites: make(chan *gomatrixserverlib.InviteV2Request, 128), - wakeServerCh: make(chan bool, 128), - retryServerCh: make(chan bool), - signing: oqs.signing, + db: oqs.db, + rsAPI: oqs.rsAPI, + origin: oqs.origin, + destination: destination, + client: oqs.client, + statistics: oqs.statistics.ForServer(destination), + incomingEDUs: make(chan *gomatrixserverlib.EDU, 128), + incomingInvites: make(chan *gomatrixserverlib.InviteV2Request, 128), + notifyPDUs: make(chan bool, 128), + interruptBackoff: make(chan bool), + signing: oqs.signing, } oqs.queues[destination] = oq } @@ -211,11 +205,11 @@ func (oqs *OutgoingQueues) SendEDU( // RetryServer attempts to resend events to the given server if we had given up. func (oqs *OutgoingQueues) RetryServer(srv gomatrixserverlib.ServerName) { - q := oqs.getQueueIfExists(srv) + q := oqs.getQueue(srv) if q == nil { return } - q.retry() + q.wakeQueueIfNeeded() } // filterAndDedupeDests removes our own server from the list of destinations