Improve logic a bit, don't block on wakeup, move idle check

This commit is contained in:
Neil Alexander 2020-05-06 12:39:55 +01:00
parent 0eee57cbea
commit 53f690f2d3
2 changed files with 25 additions and 24 deletions

View file

@ -55,7 +55,10 @@ func (oq *destinationQueue) wake() {
if !oq.running.Load() { if !oq.running.Load() {
go oq.backgroundSend() go oq.backgroundSend()
} else { } else {
oq.wakeup <- true select {
case oq.wakeup <- true:
default:
}
} }
} }
@ -124,6 +127,17 @@ func (oq *destinationQueue) backgroundSend() {
idleCounter, sentCounter := oq.idleCounter.Load(), oq.statistics.SuccessCount() idleCounter, sentCounter := oq.idleCounter.Load(), oq.statistics.SuccessCount()
oq.runningMutex.RUnlock() oq.runningMutex.RUnlock()
// If this worker has been idle for a while then stop
// running it, otherwise the goroutine will just tick
// endlessly. It'll get automatically restarted when
// a new event needs to be sent.
if idleCounter >= 5 {
return
}
if len(pendingInvites) == 0 && len(pendingPDUs) == 0 && len(pendingEDUs) == 0 {
oq.idleCounter.Add(1)
}
// If we have pending PDUs or EDUs then construct a transaction. // If we have pending PDUs or EDUs then construct a transaction.
if len(pendingPDUs) > 0 || len(pendingEDUs) > 0 { if len(pendingPDUs) > 0 || len(pendingEDUs) > 0 {
// Try sending the next transaction and see what happens. // Try sending the next transaction and see what happens.
@ -135,12 +149,10 @@ func (oq *destinationQueue) backgroundSend() {
// the backoff has exceeded a maximum allowable value. // the backoff has exceeded a maximum allowable value.
return return
} }
continue } else if transaction {
}
// If we successfully sent the transaction then clear out // If we successfully sent the transaction then clear out
// the pending events and EDUs. // the pending events and EDUs.
if transaction { oq.statistics.Success()
oq.runningMutex.Lock() oq.runningMutex.Lock()
// Reallocate so that the underlying arrays can be GC'd, as // Reallocate so that the underlying arrays can be GC'd, as
// opposed to growing forever. // opposed to growing forever.
@ -168,11 +180,10 @@ func (oq *destinationQueue) backgroundSend() {
return return
} }
continue continue
} } else if invites {
// If we successfully sent the invites then clear out // If we successfully sent the invites then clear out
// the pending invites. // the pending invites.
if invites { oq.statistics.Success()
oq.runningMutex.Lock() oq.runningMutex.Lock()
// Reallocate so that the underlying array can be GC'd, as // Reallocate so that the underlying array can be GC'd, as
// opposed to growing forever. // opposed to growing forever.
@ -184,22 +195,11 @@ func (oq *destinationQueue) backgroundSend() {
} }
} }
// If everything was fine at this point then we can update
// the counters for the transaction IDs.
oq.statistics.Success()
// Wait either for a few seconds, or until a new event is // Wait either for a few seconds, or until a new event is
// available. // available.
select { select {
case <-oq.wakeup: case <-oq.wakeup:
case <-time.After(time.Second * 5): case <-time.After(time.Second * 5):
// If this worker has been idle for a while then stop
// running it, otherwise the goroutine will just tick
// endlessly. It'll get automatically restarted when
// a new event needs to be sent.
if idleCounter >= 5 {
return
}
} }
} }
} }

View file

@ -118,6 +118,7 @@ func (oqs *OutgoingQueues) SendInvite(
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"event_id": ev.EventID(), "event_id": ev.EventID(),
"server_name": destination,
}).Info("Sending invite") }).Info("Sending invite")
oqs.queuesMutex.RLock() oqs.queuesMutex.RLock()