This commit is contained in:
Neil Alexander 2020-12-08 16:46:07 +00:00
parent 3a15792b31
commit b64e79e3ac
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944

View file

@ -204,6 +204,7 @@ func (oq *destinationQueue) getPendingFromDatabase() {
} }
} else { } else {
logrus.WithError(err).Errorf("Failed to get pending PDUs for %q", oq.destination) logrus.WithError(err).Errorf("Failed to get pending PDUs for %q", oq.destination)
return
} }
} }
if eduCapacity := maxEDUsInMemory - len(oq.pendingEDUs); eduCapacity > 0 { if eduCapacity := maxEDUsInMemory - len(oq.pendingEDUs); eduCapacity > 0 {
@ -218,6 +219,7 @@ func (oq *destinationQueue) getPendingFromDatabase() {
} }
} else { } else {
logrus.WithError(err).Errorf("Failed to get pending EDUs for %q", oq.destination) logrus.WithError(err).Errorf("Failed to get pending EDUs for %q", oq.destination)
return
} }
} }
// If we've retrieved all of the events from the database with room to spare // If we've retrieved all of the events from the database with room to spare
@ -326,6 +328,12 @@ func (oq *destinationQueue) backgroundSend() {
// If we successfully sent the transaction then clear out // If we successfully sent the transaction then clear out
// the pending events and EDUs, and wipe our transaction ID. // the pending events and EDUs, and wipe our transaction ID.
oq.statistics.Success() oq.statistics.Success()
oq.deleteUpto(pc, ec)
}
}
}
func (oq *destinationQueue) deleteUpto(pc, ec int) {
oq.pendingMutex.Lock() oq.pendingMutex.Lock()
for i := range oq.pendingPDUs[:pc] { for i := range oq.pendingPDUs[:pc] {
oq.pendingPDUs[i] = nil oq.pendingPDUs[i] = nil
@ -333,11 +341,9 @@ func (oq *destinationQueue) backgroundSend() {
for i := range oq.pendingEDUs[:ec] { for i := range oq.pendingEDUs[:ec] {
oq.pendingEDUs[i] = nil oq.pendingEDUs[i] = nil
} }
oq.pendingPDUs = oq.pendingPDUs[pc:] oq.pendingPDUs = nil
oq.pendingEDUs = oq.pendingEDUs[ec:] oq.pendingEDUs = nil
oq.pendingMutex.Unlock() oq.pendingMutex.Unlock()
}
}
} }
// nextTransaction creates a new transaction from the pending event // nextTransaction creates a new transaction from the pending event