diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go index 07d84dd7c..dc0cd41f8 100644 --- a/federationsender/queue/destinationqueue.go +++ b/federationsender/queue/destinationqueue.go @@ -24,7 +24,7 @@ import ( "github.com/matrix-org/dendrite/federationsender/producers" "github.com/matrix-org/gomatrixserverlib" - "github.com/matrix-org/util" + "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus" "go.uber.org/atomic" ) @@ -51,6 +51,7 @@ type destinationQueue struct { idleCounter atomic.Uint32 // how many ticks have we done nothing? failCounter atomic.Uint32 // how many times have we failed? sentCounter atomic.Uint32 // how many times have we succeeded? + wakeup chan bool // wakes up a sleeping worker runningMutex sync.RWMutex // protects the below lastTransactionIDs []gomatrixserverlib.TransactionID // protected by runningMutex pendingPDUs []*gomatrixserverlib.HeaderedEvent // protected by runningMutex @@ -73,12 +74,14 @@ func (oq *destinationQueue) backoff() bool { // backoff based on how many times we have failed already. The // worker goroutine will wait until this time before processing // anything from the queue. - backoffSeconds := math.Exp2(float64(failCounter)) + backoffSeconds := time.Second * time.Duration(math.Exp2(float64(failCounter))) oq.backoffUntil.Store( - time.Now().Add(time.Second * time.Duration(backoffSeconds)), + time.Now().Add(backoffSeconds), ) + logrus.WithField("server_name", oq.destination).Infof("Increasing backoff to %s", backoffSeconds) return false // Don't give up yet. } else { + logrus.WithField("server_name", oq.destination).Infof("Backlisting due to errors") // We've exceeded the maximum amount of times we're willing // to back off, which is probably in the region of hours by // now. Just give up - clear the queues and reset the queue @@ -112,10 +115,13 @@ func (oq *destinationQueue) sendEvent(ev *gomatrixserverlib.HeaderedEvent) { return } oq.runningMutex.Lock() - oq.pendingPDUs = append(oq.pendingPDUs, ev) + evCopy := *ev + oq.pendingPDUs = append(oq.pendingPDUs, &evCopy) oq.runningMutex.Unlock() if !oq.running.Load() { go oq.backgroundSend() + } else { + oq.wakeup <- true } } @@ -128,10 +134,13 @@ func (oq *destinationQueue) sendEDU(e *gomatrixserverlib.EDU) { return } oq.runningMutex.Lock() - oq.pendingEDUs = append(oq.pendingEDUs, e) + eCopy := *e + oq.pendingEDUs = append(oq.pendingEDUs, &eCopy) oq.runningMutex.Unlock() if !oq.running.Load() { go oq.backgroundSend() + } else { + oq.wakeup <- true } } @@ -144,18 +153,23 @@ func (oq *destinationQueue) sendInvite(ev *gomatrixserverlib.InviteV2Request) { return } oq.runningMutex.Lock() - oq.pendingInvites = append(oq.pendingInvites, ev) + evCopy := *ev + oq.pendingInvites = append(oq.pendingInvites, &evCopy) oq.runningMutex.Unlock() if !oq.running.Load() { go oq.backgroundSend() + } else { + oq.wakeup <- true } } // backgroundSend is the worker goroutine for sending events. func (oq *destinationQueue) backgroundSend() { // Mark the worker as running for its lifetime. + oq.wakeup = make(chan bool) oq.running.Store(true) defer oq.running.Store(false) + defer close(oq.wakeup) for { // Wait for our backoff timer. @@ -163,6 +177,8 @@ func (oq *destinationQueue) backgroundSend() { if b, ok := oq.backoffUntil.Load().(time.Time); ok { backoffUntil = b } + + // If we have a backoff period then sit and wait for it. if backoffUntil.After(time.Now()) { <-time.After(time.Until(backoffUntil)) } @@ -212,27 +228,24 @@ func (oq *destinationQueue) backgroundSend() { // If we successfully sent the invites then clear out // the pending invites. if invites { - oq.success() oq.runningMutex.Lock() oq.pendingInvites = oq.pendingInvites[:0] oq.runningMutex.Unlock() } } - // At this point, if we did everything successfully, - // we can reset the backoff duration. - if idleCounter >= 5 { + // Wait either for a few seconds, or until a new event is + // available. + select { + case <-oq.wakeup: + case <-time.After(time.Second * 5): // If this worker has been idle for a while then stop // running it, otherwise the goroutine will just tick // endlessly. It'll get automatically restarted when // a new event needs to be sent. - return - } else { - // Otherwise, add to the ticker counter and ask the - // next iteration to wait for a second (to stop CPU - // spinning). - oq.idleCounter.Store(idleCounter + 1) - oq.backoffUntil.Store(time.Now().Add(time.Second)) + if idleCounter >= 5 { + return + } } } } @@ -271,7 +284,7 @@ func (oq *destinationQueue) nextTransaction( t.EDUs = append(t.EDUs, *edu) } - util.GetLogger(context.TODO()).Infof("Sending transaction %q containing %d PDUs, %d EDUs", t.TransactionID, len(t.PDUs), len(t.EDUs)) + logrus.WithField("server_name", oq.destination).Infof("Sending transaction %q containing %d PDUs, %d EDUs", t.TransactionID, len(t.PDUs), len(t.EDUs)) _, err := oq.client.SendTransaction(context.TODO(), t) if err != nil {