diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go index 4fe73be08..63c8cbd0f 100644 --- a/federationsender/queue/destinationqueue.go +++ b/federationsender/queue/destinationqueue.go @@ -55,7 +55,10 @@ func (oq *destinationQueue) wake() { if !oq.running.Load() { go oq.backgroundSend() } else { - oq.wakeup <- true + select { + case oq.wakeup <- true: + default: + } } } @@ -124,6 +127,17 @@ func (oq *destinationQueue) backgroundSend() { idleCounter, sentCounter := oq.idleCounter.Load(), oq.statistics.SuccessCount() oq.runningMutex.RUnlock() + // If this worker has been idle for a while then stop + // running it, otherwise the goroutine will just tick + // endlessly. It'll get automatically restarted when + // a new event needs to be sent. + if idleCounter >= 5 { + return + } + if len(pendingInvites) == 0 && len(pendingPDUs) == 0 && len(pendingEDUs) == 0 { + oq.idleCounter.Add(1) + } + // If we have pending PDUs or EDUs then construct a transaction. if len(pendingPDUs) > 0 || len(pendingEDUs) > 0 { // Try sending the next transaction and see what happens. @@ -135,12 +149,10 @@ func (oq *destinationQueue) backgroundSend() { // the backoff has exceeded a maximum allowable value. return } - continue - } - - // If we successfully sent the transaction then clear out - // the pending events and EDUs. - if transaction { + } else if transaction { + // If we successfully sent the transaction then clear out + // the pending events and EDUs. + oq.statistics.Success() oq.runningMutex.Lock() // Reallocate so that the underlying arrays can be GC'd, as // opposed to growing forever. @@ -168,11 +180,10 @@ func (oq *destinationQueue) backgroundSend() { return } continue - } - - // If we successfully sent the invites then clear out - // the pending invites. - if invites { + } else if invites { + // If we successfully sent the invites then clear out + // the pending invites. + oq.statistics.Success() oq.runningMutex.Lock() // Reallocate so that the underlying array can be GC'd, as // opposed to growing forever. @@ -184,22 +195,11 @@ func (oq *destinationQueue) backgroundSend() { } } - // If everything was fine at this point then we can update - // the counters for the transaction IDs. - oq.statistics.Success() - // Wait either for a few seconds, or until a new event is // available. select { case <-oq.wakeup: case <-time.After(time.Second * 5): - // If this worker has been idle for a while then stop - // running it, otherwise the goroutine will just tick - // endlessly. It'll get automatically restarted when - // a new event needs to be sent. - if idleCounter >= 5 { - return - } } } } diff --git a/federationsender/queue/queue.go b/federationsender/queue/queue.go index 9e6523414..6f066efdd 100644 --- a/federationsender/queue/queue.go +++ b/federationsender/queue/queue.go @@ -117,7 +117,8 @@ func (oqs *OutgoingQueues) SendInvite( } log.WithFields(log.Fields{ - "event_id": ev.EventID(), + "event_id": ev.EventID(), + "server_name": destination, }).Info("Sending invite") oqs.queuesMutex.RLock()