From c71bcb2cdf1686256771f1520303fe3b6fc462eb Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Tue, 5 May 2020 14:37:06 +0100 Subject: [PATCH] Tweaks --- federationsender/queue/destinationqueue.go | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go index dc0cd41f8..6e7774111 100644 --- a/federationsender/queue/destinationqueue.go +++ b/federationsender/queue/destinationqueue.go @@ -81,7 +81,6 @@ func (oq *destinationQueue) backoff() bool { logrus.WithField("server_name", oq.destination).Infof("Increasing backoff to %s", backoffSeconds) return false // Don't give up yet. } else { - logrus.WithField("server_name", oq.destination).Infof("Backlisting due to errors") // We've exceeded the maximum amount of times we're willing // to back off, which is probably in the region of hours by // now. Just give up - clear the queues and reset the queue @@ -92,6 +91,7 @@ func (oq *destinationQueue) backoff() bool { oq.pendingEDUs = nil oq.pendingInvites = nil oq.runningMutex.Unlock() + logrus.WithField("server_name", oq.destination).Infof("Blacklisting server due to %d consecutive errors", failCounter) return true // Give up. } } @@ -115,8 +115,7 @@ func (oq *destinationQueue) sendEvent(ev *gomatrixserverlib.HeaderedEvent) { return } oq.runningMutex.Lock() - evCopy := *ev - oq.pendingPDUs = append(oq.pendingPDUs, &evCopy) + oq.pendingPDUs = append(oq.pendingPDUs, ev) oq.runningMutex.Unlock() if !oq.running.Load() { go oq.backgroundSend() @@ -128,14 +127,13 @@ func (oq *destinationQueue) sendEvent(ev *gomatrixserverlib.HeaderedEvent) { // sendEDU adds the EDU event to the pending queue for the destination. // If the queue is empty then it starts a background goroutine to // start sending events to that destination. -func (oq *destinationQueue) sendEDU(e *gomatrixserverlib.EDU) { +func (oq *destinationQueue) sendEDU(ev *gomatrixserverlib.EDU) { if oq.blacklisted.Load() { // If the destination is blacklisted then drop the event. return } oq.runningMutex.Lock() - eCopy := *e - oq.pendingEDUs = append(oq.pendingEDUs, &eCopy) + oq.pendingEDUs = append(oq.pendingEDUs, ev) oq.runningMutex.Unlock() if !oq.running.Load() { go oq.backgroundSend() @@ -153,8 +151,7 @@ func (oq *destinationQueue) sendInvite(ev *gomatrixserverlib.InviteV2Request) { return } oq.runningMutex.Lock() - evCopy := *ev - oq.pendingInvites = append(oq.pendingInvites, &evCopy) + oq.pendingInvites = append(oq.pendingInvites, ev) oq.runningMutex.Unlock() if !oq.running.Load() { go oq.backgroundSend() @@ -164,6 +161,7 @@ func (oq *destinationQueue) sendInvite(ev *gomatrixserverlib.InviteV2Request) { } // backgroundSend is the worker goroutine for sending events. +// nolint:gocyclo func (oq *destinationQueue) backgroundSend() { // Mark the worker as running for its lifetime. oq.wakeup = make(chan bool) @@ -207,7 +205,6 @@ func (oq *destinationQueue) backgroundSend() { // If we successfully sent the transaction then clear out // the pending events and EDUs. if transaction { - oq.success() oq.runningMutex.Lock() oq.pendingPDUs = oq.pendingPDUs[:0] oq.pendingEDUs = oq.pendingEDUs[:0] @@ -234,6 +231,10 @@ func (oq *destinationQueue) backgroundSend() { } } + // If everything was fine at this point then we can update + // the counters for the transaction IDs. + oq.success() + // Wait either for a few seconds, or until a new event is // available. select {