From 74ed7140f93ee486ddfe27b45cf4dbfee082497c Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Tue, 5 May 2020 14:59:43 +0100 Subject: [PATCH] Tweaks --- federationsender/queue/destinationqueue.go | 28 ++++++++++------------ 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go index 6e7774111..6fff11ee1 100644 --- a/federationsender/queue/destinationqueue.go +++ b/federationsender/queue/destinationqueue.go @@ -59,6 +59,16 @@ type destinationQueue struct { pendingInvites []*gomatrixserverlib.InviteV2Request // protected by runningMutex } +// Start the destination queue if it needs to be started, or +// otherwise signal to it that it should wake up from sleep. +func (oq *destinationQueue) wake() { + if !oq.running.Load() { + go oq.backgroundSend() + } else { + oq.wakeup <- true + } +} + // Backoff marks a failure and works out when to back off until. It // returns true if the worker should give up altogether because of // too many consecutive failures. @@ -117,11 +127,7 @@ func (oq *destinationQueue) sendEvent(ev *gomatrixserverlib.HeaderedEvent) { oq.runningMutex.Lock() oq.pendingPDUs = append(oq.pendingPDUs, ev) oq.runningMutex.Unlock() - if !oq.running.Load() { - go oq.backgroundSend() - } else { - oq.wakeup <- true - } + oq.wake() } // sendEDU adds the EDU event to the pending queue for the destination. @@ -135,11 +141,7 @@ func (oq *destinationQueue) sendEDU(ev *gomatrixserverlib.EDU) { oq.runningMutex.Lock() oq.pendingEDUs = append(oq.pendingEDUs, ev) oq.runningMutex.Unlock() - if !oq.running.Load() { - go oq.backgroundSend() - } else { - oq.wakeup <- true - } + oq.wake() } // sendInvite adds the invite event to the pending queue for the @@ -153,11 +155,7 @@ func (oq *destinationQueue) sendInvite(ev *gomatrixserverlib.InviteV2Request) { oq.runningMutex.Lock() oq.pendingInvites = append(oq.pendingInvites, ev) oq.runningMutex.Unlock() - if !oq.running.Load() { - go oq.backgroundSend() - } else { - oq.wakeup <- true - } + oq.wake() } // backgroundSend is the worker goroutine for sending events.