Slower federation warm-up (#2320)

* Wake destination queues gradually, rather than all at once

* Delay device list updates too

* Maximum two minute warmup period
This commit is contained in:
Neil Alexander 2022-04-04 15:14:10 +01:00 committed by GitHub
parent 6748a2a823
commit 9b316ac64c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 30 additions and 20 deletions

View file

@ -104,28 +104,31 @@ func NewOutgoingQueues(
} }
// Look up which servers we have pending items for and then rehydrate those queues. // Look up which servers we have pending items for and then rehydrate those queues.
if !disabled { if !disabled {
time.AfterFunc(time.Second*5, func() { serverNames := map[gomatrixserverlib.ServerName]struct{}{}
serverNames := map[gomatrixserverlib.ServerName]struct{}{} if names, err := db.GetPendingPDUServerNames(context.Background()); err == nil {
if names, err := db.GetPendingPDUServerNames(context.Background()); err == nil { for _, serverName := range names {
for _, serverName := range names { serverNames[serverName] = struct{}{}
serverNames[serverName] = struct{}{}
}
} else {
log.WithError(err).Error("Failed to get PDU server names for destination queue hydration")
} }
if names, err := db.GetPendingEDUServerNames(context.Background()); err == nil { } else {
for _, serverName := range names { log.WithError(err).Error("Failed to get PDU server names for destination queue hydration")
serverNames[serverName] = struct{}{} }
} if names, err := db.GetPendingEDUServerNames(context.Background()); err == nil {
} else { for _, serverName := range names {
log.WithError(err).Error("Failed to get EDU server names for destination queue hydration") serverNames[serverName] = struct{}{}
} }
for serverName := range serverNames { } else {
if queue := queues.getQueue(serverName); queue != nil { log.WithError(err).Error("Failed to get EDU server names for destination queue hydration")
queue.wakeQueueIfNeeded() }
} offset, step := time.Second*5, time.Second
if max := len(serverNames); max > 120 {
step = (time.Second * 120) / time.Duration(max)
}
for serverName := range serverNames {
if queue := queues.getQueue(serverName); queue != nil {
time.AfterFunc(offset, queue.wakeQueueIfNeeded)
offset += step
} }
}) }
} }
return queues return queues
} }

View file

@ -157,8 +157,15 @@ func (u *DeviceListUpdater) Start() error {
if err != nil { if err != nil {
return err return err
} }
offset, step := time.Second*10, time.Second
if max := len(staleLists); max > 120 {
step = (time.Second * 120) / time.Duration(max)
}
for _, userID := range staleLists { for _, userID := range staleLists {
u.notifyWorkers(userID) time.AfterFunc(offset, func() {
u.notifyWorkers(userID)
})
offset += step
} }
return nil return nil
} }