Reduce unnecessary retries

This commit is contained in:
Neil Alexander 2020-07-01 10:25:35 +01:00
parent c4571c9cdd
commit ead6ee69c3

View file

@ -51,6 +51,7 @@ type destinationQueue struct {
incomingEDUs chan *gomatrixserverlib.EDU // EDUs to send incomingEDUs chan *gomatrixserverlib.EDU // EDUs to send
transactionID gomatrixserverlib.TransactionID // last transaction ID transactionID gomatrixserverlib.TransactionID // last transaction ID
transactionCount int // how many events in this transaction so far transactionCount int // how many events in this transaction so far
pendingPDUs atomic.Bool // there are PDUs waiting to be sent
pendingEDUs []*gomatrixserverlib.EDU // owned by backgroundSend pendingEDUs []*gomatrixserverlib.EDU // owned by backgroundSend
pendingInvites []*gomatrixserverlib.InviteV2Request // owned by backgroundSend pendingInvites []*gomatrixserverlib.InviteV2Request // owned by backgroundSend
retryServerCh chan bool // interrupts backoff retryServerCh chan bool // interrupts backoff
@ -118,7 +119,9 @@ func (oq *destinationQueue) sendEvent(nid int64) {
} }
// Signal that we've sent a new PDU. This will cause the queue to // Signal that we've sent a new PDU. This will cause the queue to
// wake up if it's asleep. // wake up if it's asleep.
oq.incomingPDUs <- struct{}{} if oq.pendingPDUs.CAS(false, true) {
oq.incomingPDUs <- struct{}{}
}
} }
// sendEDU adds the EDU event to the pending queue for the destination. // sendEDU adds the EDU event to the pending queue for the destination.
@ -160,8 +163,6 @@ func (oq *destinationQueue) backgroundSend() {
defer oq.running.Store(false) defer oq.running.Store(false)
for { for {
pendingPDUs := false
// For now we don't know the next transaction ID. Set it to an // For now we don't know the next transaction ID. Set it to an
// empty one. The next step will populate it if we have pending // empty one. The next step will populate it if we have pending
// PDUs in the database. Otherwise we'll generate one later on, // PDUs in the database. Otherwise we'll generate one later on,
@ -172,8 +173,8 @@ func (oq *destinationQueue) backgroundSend() {
// idle timeout. // idle timeout.
select { select {
case <-oq.incomingPDUs: case <-oq.incomingPDUs:
// There are new PDUs waiting in the database. // We were woken up because there are new PDUs waiting in the
pendingPDUs = true // database.
case edu := <-oq.incomingEDUs: case edu := <-oq.incomingEDUs:
// EDUs are handled in-memory for now. We will try to keep // EDUs are handled in-memory for now. We will try to keep
// the ordering intact. // the ordering intact.
@ -223,7 +224,7 @@ func (oq *destinationQueue) backgroundSend() {
} }
// If we have pending PDUs or EDUs then construct a transaction. // If we have pending PDUs or EDUs then construct a transaction.
if pendingPDUs || len(oq.pendingEDUs) > 0 { if oq.pendingPDUs.Load() || len(oq.pendingEDUs) > 0 {
// If we haven't got a transaction ID then we should generate // If we haven't got a transaction ID then we should generate
// one. Ideally we'd know this already because something queued // one. Ideally we'd know this already because something queued
// in the database would give us one, but if we're dealing with // in the database would give us one, but if we're dealing with
@ -249,6 +250,7 @@ func (oq *destinationQueue) backgroundSend() {
// If we successfully sent the transaction then clear out // If we successfully sent the transaction then clear out
// the pending events and EDUs, and wipe our transaction ID. // the pending events and EDUs, and wipe our transaction ID.
oq.statistics.Success() oq.statistics.Success()
oq.pendingPDUs.Store(false)
oq.transactionID = "" oq.transactionID = ""
// Clean up the in-memory buffers. // Clean up the in-memory buffers.
oq.cleanPendingEDUs() oq.cleanPendingEDUs()