Don't create queues for blacklisted hosts

This commit is contained in:
Neil Alexander 2021-02-17 14:55:54 +00:00
parent 89bf8c493e
commit db78ec73e8
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
2 changed files with 13 additions and 27 deletions

View file

@ -159,10 +159,6 @@ func (oq *destinationQueue) sendEDU(event *gomatrixserverlib.EDU, receipt *share
// then we will interrupt the backoff, causing any federation
// requests to retry.
func (oq *destinationQueue) wakeQueueIfNeeded() {
// If the destination is blacklisted then do nothing.
if _, blacklisted := oq.statistics.BackoffInfo(); blacklisted {
return
}
// If we are backing off then interrupt the backoff.
if oq.backingOff.CAS(true, false) {
oq.interruptBackoff <- true

View file

@ -101,7 +101,6 @@ func NewOutgoingQueues(
signing: signing,
queues: map[gomatrixserverlib.ServerName]*destinationQueue{},
}
queues.clearQueues()
// Look up which servers we have pending items for and then rehydrate those queues.
if !disabled {
time.AfterFunc(time.Second*5, func() {
@ -121,7 +120,7 @@ func NewOutgoingQueues(
log.WithError(err).Error("Failed to get EDU server names for destination queue hydration")
}
for serverName := range serverNames {
if queue := queues.getQueue(serverName); !queue.statistics.Blacklisted() {
if queue := queues.getQueue(serverName); queue != nil {
queue.wakeQueueIfNeeded()
}
}
@ -149,6 +148,9 @@ type queuedEDU struct {
}
func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *destinationQueue {
if _, blacklisted := oqs.statistics.ForServer(destination).BackoffInfo(); blacklisted {
return nil
}
oqs.queuesMutex.Lock()
defer oqs.queuesMutex.Unlock()
oq := oqs.queues[destination]
@ -172,24 +174,10 @@ func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *d
return oq
}
func (oqs *OutgoingQueues) clearQueues() {
oqs.queuesMutex.Lock()
defer oqs.queuesMutex.Unlock()
for _, q := range oqs.queues {
oqs.clearQueue(q)
}
time.AfterFunc(time.Minute, oqs.clearQueues)
}
func (oqs *OutgoingQueues) clearQueue(oq *destinationQueue) {
oqs.queuesMutex.Lock()
defer oqs.queuesMutex.Unlock()
switch {
case oq.running.Load():
return
case oq.backingOff.Load():
return
}
close(oq.notify)
close(oq.interruptBackoff)
delete(oqs.queues, oq.destination)
@ -262,7 +250,9 @@ func (oqs *OutgoingQueues) SendEvent(
}
for destination := range destmap {
oqs.getQueue(destination).sendEvent(ev, nid)
if queue := oqs.getQueue(destination); queue != nil {
queue.sendEvent(ev, nid)
}
}
return nil
@ -332,7 +322,9 @@ func (oqs *OutgoingQueues) SendEDU(
}
for destination := range destmap {
oqs.getQueue(destination).sendEDU(e, nid)
if queue := oqs.getQueue(destination); queue != nil {
queue.sendEDU(e, nid)
}
}
return nil
@ -343,9 +335,7 @@ func (oqs *OutgoingQueues) RetryServer(srv gomatrixserverlib.ServerName) {
if oqs.disabled {
return
}
q := oqs.getQueue(srv)
if q == nil {
return
if queue := oqs.getQueue(srv); queue != nil {
queue.wakeQueueIfNeeded()
}
q.wakeQueueIfNeeded()
}