Wake destination queues gradually, rather than all at once

This commit is contained in:
Neil Alexander 2022-04-04 13:17:08 +01:00
parent 6748a2a823
commit be24eea797
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944

View file

@ -104,28 +104,28 @@ func NewOutgoingQueues(
}
// Look up which servers we have pending items for and then rehydrate those queues.
if !disabled {
time.AfterFunc(time.Second*5, func() {
serverNames := map[gomatrixserverlib.ServerName]struct{}{}
if names, err := db.GetPendingPDUServerNames(context.Background()); err == nil {
for _, serverName := range names {
serverNames[serverName] = struct{}{}
}
} else {
log.WithError(err).Error("Failed to get PDU server names for destination queue hydration")
serverNames := map[gomatrixserverlib.ServerName]struct{}{}
if names, err := db.GetPendingPDUServerNames(context.Background()); err == nil {
for _, serverName := range names {
serverNames[serverName] = struct{}{}
}
if names, err := db.GetPendingEDUServerNames(context.Background()); err == nil {
for _, serverName := range names {
serverNames[serverName] = struct{}{}
}
} else {
log.WithError(err).Error("Failed to get EDU server names for destination queue hydration")
} else {
log.WithError(err).Error("Failed to get PDU server names for destination queue hydration")
}
if names, err := db.GetPendingEDUServerNames(context.Background()); err == nil {
for _, serverName := range names {
serverNames[serverName] = struct{}{}
}
for serverName := range serverNames {
if queue := queues.getQueue(serverName); queue != nil {
queue.wakeQueueIfNeeded()
}
} else {
log.WithError(err).Error("Failed to get EDU server names for destination queue hydration")
}
offset := time.Duration(5)
for serverName := range serverNames {
if queue := queues.getQueue(serverName); queue != nil {
time.AfterFunc(time.Second*offset, queue.wakeQueueIfNeeded)
offset++
}
})
}
}
return queues
}