Don't hold destination queues in memory forever (#1769)
* Don't hold destination queues in memory forever * Close channels * Fix ordering * Clear more aggressively * clearQueue only called by defer so should be safe to delete queue in any case * Wake queue when created, otherwise cleanup doesn't get called in all cases * Clean up periodically, we hit a race condition otherwise * Tweaks * Don't create queues for blacklisted hosts * Check blacklist properly
This commit is contained in:
parent
da797c7998
commit
8b5cd256cb
|
@ -46,6 +46,7 @@ const (
|
||||||
// ensures that only one request is in flight to a given destination
|
// ensures that only one request is in flight to a given destination
|
||||||
// at a time.
|
// at a time.
|
||||||
type destinationQueue struct {
|
type destinationQueue struct {
|
||||||
|
queues *OutgoingQueues
|
||||||
db storage.Database
|
db storage.Database
|
||||||
process *process.ProcessContext
|
process *process.ProcessContext
|
||||||
signing *SigningInfo
|
signing *SigningInfo
|
||||||
|
@ -246,6 +247,7 @@ func (oq *destinationQueue) backgroundSend() {
|
||||||
}
|
}
|
||||||
destinationQueueRunning.Inc()
|
destinationQueueRunning.Inc()
|
||||||
defer destinationQueueRunning.Dec()
|
defer destinationQueueRunning.Dec()
|
||||||
|
defer oq.queues.clearQueue(oq)
|
||||||
defer oq.running.Store(false)
|
defer oq.running.Store(false)
|
||||||
|
|
||||||
// Mark the queue as overflowed, so we will consult the database
|
// Mark the queue as overflowed, so we will consult the database
|
||||||
|
|
|
@ -120,7 +120,7 @@ func NewOutgoingQueues(
|
||||||
log.WithError(err).Error("Failed to get EDU server names for destination queue hydration")
|
log.WithError(err).Error("Failed to get EDU server names for destination queue hydration")
|
||||||
}
|
}
|
||||||
for serverName := range serverNames {
|
for serverName := range serverNames {
|
||||||
if queue := queues.getQueue(serverName); !queue.statistics.Blacklisted() {
|
if queue := queues.getQueue(serverName); queue != nil {
|
||||||
queue.wakeQueueIfNeeded()
|
queue.wakeQueueIfNeeded()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -148,12 +148,16 @@ type queuedEDU struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *destinationQueue {
|
func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *destinationQueue {
|
||||||
|
if oqs.statistics.ForServer(destination).Blacklisted() {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
oqs.queuesMutex.Lock()
|
oqs.queuesMutex.Lock()
|
||||||
defer oqs.queuesMutex.Unlock()
|
defer oqs.queuesMutex.Unlock()
|
||||||
oq := oqs.queues[destination]
|
oq, ok := oqs.queues[destination]
|
||||||
if oq == nil {
|
if !ok {
|
||||||
destinationQueueTotal.Inc()
|
destinationQueueTotal.Inc()
|
||||||
oq = &destinationQueue{
|
oq = &destinationQueue{
|
||||||
|
queues: oqs,
|
||||||
db: oqs.db,
|
db: oqs.db,
|
||||||
process: oqs.process,
|
process: oqs.process,
|
||||||
rsAPI: oqs.rsAPI,
|
rsAPI: oqs.rsAPI,
|
||||||
|
@ -170,6 +174,16 @@ func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *d
|
||||||
return oq
|
return oq
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (oqs *OutgoingQueues) clearQueue(oq *destinationQueue) {
|
||||||
|
oqs.queuesMutex.Lock()
|
||||||
|
defer oqs.queuesMutex.Unlock()
|
||||||
|
|
||||||
|
close(oq.notify)
|
||||||
|
close(oq.interruptBackoff)
|
||||||
|
delete(oqs.queues, oq.destination)
|
||||||
|
destinationQueueTotal.Dec()
|
||||||
|
}
|
||||||
|
|
||||||
type ErrorFederationDisabled struct {
|
type ErrorFederationDisabled struct {
|
||||||
Message string
|
Message string
|
||||||
}
|
}
|
||||||
|
@ -236,7 +250,9 @@ func (oqs *OutgoingQueues) SendEvent(
|
||||||
}
|
}
|
||||||
|
|
||||||
for destination := range destmap {
|
for destination := range destmap {
|
||||||
oqs.getQueue(destination).sendEvent(ev, nid)
|
if queue := oqs.getQueue(destination); queue != nil {
|
||||||
|
queue.sendEvent(ev, nid)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -306,7 +322,9 @@ func (oqs *OutgoingQueues) SendEDU(
|
||||||
}
|
}
|
||||||
|
|
||||||
for destination := range destmap {
|
for destination := range destmap {
|
||||||
oqs.getQueue(destination).sendEDU(e, nid)
|
if queue := oqs.getQueue(destination); queue != nil {
|
||||||
|
queue.sendEDU(e, nid)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -317,9 +335,7 @@ func (oqs *OutgoingQueues) RetryServer(srv gomatrixserverlib.ServerName) {
|
||||||
if oqs.disabled {
|
if oqs.disabled {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
q := oqs.getQueue(srv)
|
if queue := oqs.getQueue(srv); queue != nil {
|
||||||
if q == nil {
|
queue.wakeQueueIfNeeded()
|
||||||
return
|
|
||||||
}
|
}
|
||||||
q.wakeQueueIfNeeded()
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue