diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go index 23294022a..c8d12e889 100644 --- a/federationsender/queue/destinationqueue.go +++ b/federationsender/queue/destinationqueue.go @@ -204,6 +204,7 @@ func (oq *destinationQueue) getPendingFromDatabase() { } } else { logrus.WithError(err).Errorf("Failed to get pending PDUs for %q", oq.destination) + return } } if eduCapacity := maxEDUsInMemory - len(oq.pendingEDUs); eduCapacity > 0 { @@ -218,6 +219,7 @@ func (oq *destinationQueue) getPendingFromDatabase() { } } else { logrus.WithError(err).Errorf("Failed to get pending EDUs for %q", oq.destination) + return } } // If we've retrieved all of the events from the database with room to spare @@ -326,20 +328,24 @@ func (oq *destinationQueue) backgroundSend() { // If we successfully sent the transaction then clear out // the pending events and EDUs, and wipe our transaction ID. oq.statistics.Success() - oq.pendingMutex.Lock() - for i := range oq.pendingPDUs[:pc] { - oq.pendingPDUs[i] = nil - } - for i := range oq.pendingEDUs[:ec] { - oq.pendingEDUs[i] = nil - } - oq.pendingPDUs = oq.pendingPDUs[pc:] - oq.pendingEDUs = oq.pendingEDUs[ec:] - oq.pendingMutex.Unlock() + oq.deleteUpto(pc, ec) } } } +func (oq *destinationQueue) deleteUpto(pc, ec int) { + oq.pendingMutex.Lock() + for i := range oq.pendingPDUs[:pc] { + oq.pendingPDUs[i] = nil + } + for i := range oq.pendingEDUs[:ec] { + oq.pendingEDUs[i] = nil + } + oq.pendingPDUs = nil + oq.pendingEDUs = nil + oq.pendingMutex.Unlock() +} + // nextTransaction creates a new transaction from the pending event // queue and sends it. Returns true if a transaction was sent or // false otherwise.