Use atomic bool to stop us blocking on the channel

This commit is contained in:
Kegan Dougal 2020-06-01 18:04:24 +01:00
parent 40c89776c5
commit c4501e1f9c

View file

@ -39,6 +39,7 @@ type destinationQueue struct {
origin gomatrixserverlib.ServerName // origin of requests
destination gomatrixserverlib.ServerName // destination of requests
running atomic.Bool // is the queue worker running?
backingOff atomic.Bool // true if we're backing off
statistics *types.ServerStatistics // statistics about this remote server
incomingPDUs chan *gomatrixserverlib.HeaderedEvent // PDUs to send
incomingEDUs chan *gomatrixserverlib.EDU // EDUs to send
@ -59,7 +60,12 @@ func (oq *destinationQueue) retry() {
// in-memory to maybe-send it one day. Ideally we would just shove these pending events in a database
// so we can send a lot of events.
oq.statistics.Success()
oq.retryServerCh <- true
// if we were backing off, swap to not backing off and interrupt the select.
// We need to use an atomic bool here to prevent multiple calls to retry() blocking on the channel
// as it is unbuffered.
if oq.backingOff.CAS(true, false) {
oq.retryServerCh <- true
}
if !oq.running.Load() {
log.Infof("Restarting queue for %s", oq.destination)
go oq.backgroundSend()
@ -175,10 +181,12 @@ func (oq *destinationQueue) backgroundSend() {
// backoff duration to complete first, or until explicitly
// told to retry.
if backoff, duration := oq.statistics.BackoffDuration(); backoff {
oq.backingOff.Store(true)
select {
case <-time.After(duration):
case <-oq.retryServerCh:
}
oq.backingOff.Store(false)
}
// How many things do we have waiting?