This commit is contained in:
Neil Alexander 2020-05-05 14:59:43 +01:00
parent c71bcb2cdf
commit 74ed7140f9

View file

@ -59,6 +59,16 @@ type destinationQueue struct {
pendingInvites []*gomatrixserverlib.InviteV2Request // protected by runningMutex pendingInvites []*gomatrixserverlib.InviteV2Request // protected by runningMutex
} }
// Start the destination queue if it needs to be started, or
// otherwise signal to it that it should wake up from sleep.
func (oq *destinationQueue) wake() {
if !oq.running.Load() {
go oq.backgroundSend()
} else {
oq.wakeup <- true
}
}
// Backoff marks a failure and works out when to back off until. It // Backoff marks a failure and works out when to back off until. It
// returns true if the worker should give up altogether because of // returns true if the worker should give up altogether because of
// too many consecutive failures. // too many consecutive failures.
@ -117,11 +127,7 @@ func (oq *destinationQueue) sendEvent(ev *gomatrixserverlib.HeaderedEvent) {
oq.runningMutex.Lock() oq.runningMutex.Lock()
oq.pendingPDUs = append(oq.pendingPDUs, ev) oq.pendingPDUs = append(oq.pendingPDUs, ev)
oq.runningMutex.Unlock() oq.runningMutex.Unlock()
if !oq.running.Load() { oq.wake()
go oq.backgroundSend()
} else {
oq.wakeup <- true
}
} }
// sendEDU adds the EDU event to the pending queue for the destination. // sendEDU adds the EDU event to the pending queue for the destination.
@ -135,11 +141,7 @@ func (oq *destinationQueue) sendEDU(ev *gomatrixserverlib.EDU) {
oq.runningMutex.Lock() oq.runningMutex.Lock()
oq.pendingEDUs = append(oq.pendingEDUs, ev) oq.pendingEDUs = append(oq.pendingEDUs, ev)
oq.runningMutex.Unlock() oq.runningMutex.Unlock()
if !oq.running.Load() { oq.wake()
go oq.backgroundSend()
} else {
oq.wakeup <- true
}
} }
// sendInvite adds the invite event to the pending queue for the // sendInvite adds the invite event to the pending queue for the
@ -153,11 +155,7 @@ func (oq *destinationQueue) sendInvite(ev *gomatrixserverlib.InviteV2Request) {
oq.runningMutex.Lock() oq.runningMutex.Lock()
oq.pendingInvites = append(oq.pendingInvites, ev) oq.pendingInvites = append(oq.pendingInvites, ev)
oq.runningMutex.Unlock() oq.runningMutex.Unlock()
if !oq.running.Load() { oq.wake()
go oq.backgroundSend()
} else {
oq.wakeup <- true
}
} }
// backgroundSend is the worker goroutine for sending events. // backgroundSend is the worker goroutine for sending events.