diff --git a/keyserver/internal/device_list_update.go b/keyserver/internal/device_list_update.go index 36918256c..3fbf31f1e 100644 --- a/keyserver/internal/device_list_update.go +++ b/keyserver/internal/device_list_update.go @@ -310,24 +310,25 @@ func (u *DeviceListUpdater) worker(ch chan gomatrixserverlib.ServerName) { } } lastProcessed[serverName] = time.Now() - shouldRetry := u.processServer(serverName) + waitTime, shouldRetry := u.processServer(serverName) if shouldRetry { - scheduledRetries[serverName] = time.Now().Add(cooloffPeriod) - go inject(serverName, cooloffPeriod) + scheduledRetries[serverName] = time.Now().Add(waitTime) + go inject(serverName, waitTime) } } } -func (u *DeviceListUpdater) processServer(serverName gomatrixserverlib.ServerName) bool { +func (u *DeviceListUpdater) processServer(serverName gomatrixserverlib.ServerName) (time.Duration, bool) { requestTimeout := time.Minute // max amount of time we want to spend on each request ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) defer cancel() logger := util.GetLogger(ctx).WithField("server_name", serverName) + waitTime := 2 * time.Second // fetch stale device lists userIDs, err := u.db.StaleDeviceLists(ctx, []gomatrixserverlib.ServerName{serverName}) if err != nil { logger.WithError(err).Error("failed to load stale device lists") - return true + return waitTime, true } hasFailures := false for _, userID := range userIDs { @@ -339,6 +340,10 @@ func (u *DeviceListUpdater) processServer(serverName gomatrixserverlib.ServerNam res, err := u.fedClient.GetUserDevices(ctx, serverName, userID) if err != nil { logger.WithError(err).WithField("user_id", userID).Error("failed to query device keys for user") + fcerr, ok := err.(*fedsenderapi.FederationClientError) + if ok && fcerr.RetryAfter > 0 { + waitTime = fcerr.RetryAfter + } hasFailures = true continue } @@ -352,7 +357,7 @@ func (u *DeviceListUpdater) processServer(serverName gomatrixserverlib.ServerNam // always clear the channel to unblock Update calls regardless of success/failure u.clearChannel(userID) } - return hasFailures + return waitTime, hasFailures } func (u *DeviceListUpdater) updateDeviceList(res *gomatrixserverlib.RespUserDevices) error {