Count PDUs to make more resilient

This commit is contained in:
Neil Alexander 2020-07-01 10:32:49 +01:00
parent ead6ee69c3
commit a9aa3c263b

View file

@ -51,7 +51,7 @@ type destinationQueue struct {
incomingEDUs chan *gomatrixserverlib.EDU // EDUs to send incomingEDUs chan *gomatrixserverlib.EDU // EDUs to send
transactionID gomatrixserverlib.TransactionID // last transaction ID transactionID gomatrixserverlib.TransactionID // last transaction ID
transactionCount int // how many events in this transaction so far transactionCount int // how many events in this transaction so far
pendingPDUs atomic.Bool // there are PDUs waiting to be sent pendingPDUs atomic.Int32 // how many PDUs are waiting to be sent
pendingEDUs []*gomatrixserverlib.EDU // owned by backgroundSend pendingEDUs []*gomatrixserverlib.EDU // owned by backgroundSend
pendingInvites []*gomatrixserverlib.InviteV2Request // owned by backgroundSend pendingInvites []*gomatrixserverlib.InviteV2Request // owned by backgroundSend
retryServerCh chan bool // interrupts backoff retryServerCh chan bool // interrupts backoff
@ -119,9 +119,8 @@ func (oq *destinationQueue) sendEvent(nid int64) {
} }
// Signal that we've sent a new PDU. This will cause the queue to // Signal that we've sent a new PDU. This will cause the queue to
// wake up if it's asleep. // wake up if it's asleep.
if oq.pendingPDUs.CAS(false, true) { oq.pendingPDUs.Add(1)
oq.incomingPDUs <- struct{}{} oq.incomingPDUs <- struct{}{}
}
} }
// sendEDU adds the EDU event to the pending queue for the destination. // sendEDU adds the EDU event to the pending queue for the destination.
@ -224,7 +223,7 @@ func (oq *destinationQueue) backgroundSend() {
} }
// If we have pending PDUs or EDUs then construct a transaction. // If we have pending PDUs or EDUs then construct a transaction.
if oq.pendingPDUs.Load() || len(oq.pendingEDUs) > 0 { if oq.pendingPDUs.Load() > 0 || len(oq.pendingEDUs) > 0 {
// If we haven't got a transaction ID then we should generate // If we haven't got a transaction ID then we should generate
// one. Ideally we'd know this already because something queued // one. Ideally we'd know this already because something queued
// in the database would give us one, but if we're dealing with // in the database would give us one, but if we're dealing with
@ -250,7 +249,6 @@ func (oq *destinationQueue) backgroundSend() {
// If we successfully sent the transaction then clear out // If we successfully sent the transaction then clear out
// the pending events and EDUs, and wipe our transaction ID. // the pending events and EDUs, and wipe our transaction ID.
oq.statistics.Success() oq.statistics.Success()
oq.pendingPDUs.Store(false)
oq.transactionID = "" oq.transactionID = ""
// Clean up the in-memory buffers. // Clean up the in-memory buffers.
oq.cleanPendingEDUs() oq.cleanPendingEDUs()
@ -357,8 +355,9 @@ func (oq *destinationQueue) nextTransaction(
switch e := err.(type) { switch e := err.(type) {
case nil: case nil:
// No error was returned so the transaction looks to have // No error was returned so the transaction looks to have
// been successfully sent. Clean up the transaction in the // been successfully sent.
// database. oq.pendingPDUs.Sub(int32(len(t.PDUs)))
// Clean up the transaction in the database.
if err = oq.db.CleanTransactionPDUs( if err = oq.db.CleanTransactionPDUs(
context.TODO(), context.TODO(),
t.Destination, t.Destination,