Don't overflow for blacklisted servers

This commit is contained in:
Till Faelligen 2022-10-21 11:15:14 +02:00
parent 1518087d0a
commit d321a13123
No known key found for this signature in database
GPG key ID: 3DF82D8AB9211D4E
2 changed files with 28 additions and 39 deletions

View file

@ -76,28 +76,22 @@ func (oq *destinationQueue) sendEvent(event *gomatrixserverlib.HeaderedEvent, re
return return
} }
// Check if the destination is blacklisted. If it isn't then wake // If there's room in memory to hold the event then add it to the
// up the queue. // list.
if !oq.statistics.Blacklisted() { oq.pendingMutex.Lock()
// If there's room in memory to hold the event then add it to the if len(oq.pendingPDUs) < maxPDUsInMemory {
// list. oq.pendingPDUs = append(oq.pendingPDUs, &queuedPDU{
oq.pendingMutex.Lock() pdu: event,
if len(oq.pendingPDUs) < maxPDUsInMemory { receipt: receipt,
oq.pendingPDUs = append(oq.pendingPDUs, &queuedPDU{ })
pdu: event,
receipt: receipt,
})
} else {
oq.overflowed.Store(true)
}
oq.pendingMutex.Unlock()
if !oq.backingOff.Load() {
oq.wakeQueueAndNotify()
}
} else { } else {
oq.overflowed.Store(true) oq.overflowed.Store(true)
} }
oq.pendingMutex.Unlock()
if !oq.backingOff.Load() {
oq.wakeQueueAndNotify()
}
} }
// sendEDU adds the EDU event to the pending queue for the destination. // sendEDU adds the EDU event to the pending queue for the destination.
@ -108,28 +102,23 @@ func (oq *destinationQueue) sendEDU(event *gomatrixserverlib.EDU, receipt *share
logrus.Errorf("attempt to send nil EDU with destination %q", oq.destination) logrus.Errorf("attempt to send nil EDU with destination %q", oq.destination)
return return
} }
// Check if the destination is blacklisted. If it isn't then wake
// up the queue.
if !oq.statistics.Blacklisted() {
// If there's room in memory to hold the event then add it to the
// list.
oq.pendingMutex.Lock()
if len(oq.pendingEDUs) < maxEDUsInMemory {
oq.pendingEDUs = append(oq.pendingEDUs, &queuedEDU{
edu: event,
receipt: receipt,
})
} else {
oq.overflowed.Store(true)
}
oq.pendingMutex.Unlock()
if !oq.backingOff.Load() { // If there's room in memory to hold the event then add it to the
oq.wakeQueueAndNotify() // list.
} oq.pendingMutex.Lock()
if len(oq.pendingEDUs) < maxEDUsInMemory {
oq.pendingEDUs = append(oq.pendingEDUs, &queuedEDU{
edu: event,
receipt: receipt,
})
} else { } else {
oq.overflowed.Store(true) oq.overflowed.Store(true)
} }
oq.pendingMutex.Unlock()
if !oq.backingOff.Load() {
oq.wakeQueueAndNotify()
}
} }
// handleBackoffNotifier is registered as the backoff notification // handleBackoffNotifier is registered as the backoff notification

View file

@ -248,7 +248,7 @@ func (oqs *OutgoingQueues) SendEvent(
} }
for destination := range destmap { for destination := range destmap {
if queue := oqs.getQueue(destination); queue != nil { if queue := oqs.getQueue(destination); queue != nil && !queue.statistics.Blacklisted() {
queue.sendEvent(ev, nid) queue.sendEvent(ev, nid)
} else { } else {
delete(destmap, destination) delete(destmap, destination)
@ -336,7 +336,7 @@ func (oqs *OutgoingQueues) SendEDU(
} }
for destination := range destmap { for destination := range destmap {
if queue := oqs.getQueue(destination); queue != nil { if queue := oqs.getQueue(destination); queue != nil && !queue.statistics.Blacklisted() {
queue.sendEDU(e, nid) queue.sendEDU(e, nid)
} else { } else {
delete(destmap, destination) delete(destmap, destination)