From be24eea797f26a0868d4ca79c1c11004d6939f2d Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 4 Apr 2022 13:17:08 +0100 Subject: [PATCH] Wake destination queues gradually, rather than all at once --- federationapi/queue/queue.go | 38 ++++++++++++++++++------------------ 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/federationapi/queue/queue.go b/federationapi/queue/queue.go index dcd090856..63861e458 100644 --- a/federationapi/queue/queue.go +++ b/federationapi/queue/queue.go @@ -104,28 +104,28 @@ func NewOutgoingQueues( } // Look up which servers we have pending items for and then rehydrate those queues. if !disabled { - time.AfterFunc(time.Second*5, func() { - serverNames := map[gomatrixserverlib.ServerName]struct{}{} - if names, err := db.GetPendingPDUServerNames(context.Background()); err == nil { - for _, serverName := range names { - serverNames[serverName] = struct{}{} - } - } else { - log.WithError(err).Error("Failed to get PDU server names for destination queue hydration") + serverNames := map[gomatrixserverlib.ServerName]struct{}{} + if names, err := db.GetPendingPDUServerNames(context.Background()); err == nil { + for _, serverName := range names { + serverNames[serverName] = struct{}{} } - if names, err := db.GetPendingEDUServerNames(context.Background()); err == nil { - for _, serverName := range names { - serverNames[serverName] = struct{}{} - } - } else { - log.WithError(err).Error("Failed to get EDU server names for destination queue hydration") + } else { + log.WithError(err).Error("Failed to get PDU server names for destination queue hydration") + } + if names, err := db.GetPendingEDUServerNames(context.Background()); err == nil { + for _, serverName := range names { + serverNames[serverName] = struct{}{} } - for serverName := range serverNames { - if queue := queues.getQueue(serverName); queue != nil { - queue.wakeQueueIfNeeded() - } + } else { + log.WithError(err).Error("Failed to get EDU server names for destination queue hydration") + } + offset := time.Duration(5) + for serverName := range serverNames { + if queue := queues.getQueue(serverName); queue != nil { + time.AfterFunc(time.Second*offset, queue.wakeQueueIfNeeded) + offset++ } - }) + } } return queues }