diff --git a/userapi/internal/device_list_update.go b/userapi/internal/device_list_update.go index 1fbacaf4c..b40635160 100644 --- a/userapi/internal/device_list_update.go +++ b/userapi/internal/device_list_update.go @@ -381,6 +381,8 @@ func (u *DeviceListUpdater) notifyWorkers(userID string) { index := int(int64(hash.Sum32()) % int64(len(u.workerChans))) ch := u.assignChannel(userID) + // Since workerChans are buffered, we only increment here and let the worker + // decrement it once it is done processing. deviceListUpdaterBackpressure.With(prometheus.Labels{"worker_id": strconv.Itoa(index)}).Inc() u.workerChans[index] <- remoteServer select { @@ -418,15 +420,13 @@ func (u *DeviceListUpdater) worker(ch chan spec.ServerName, workerID int) { go func() { var serversToRetry []spec.ServerName for { + // nuke serversToRetry by re-slicing it to be "empty". + // The capacity of the slice is unchanged, which ensures we can reuse the memory. + serversToRetry = serversToRetry[:0] + deviceListUpdaterServersRetrying.With(prometheus.Labels{"worker_id": strconv.Itoa(workerID)}).Set(float64(len(retries))) time.Sleep(time.Second * 2) - // The channel is at capacity, don't try to send more work - if len(ch) == cap(ch) { - continue - } - serversToRetry = serversToRetry[:0] // reuse memory - // -2, so we have space for incoming device list updates over federation maxServers := (cap(ch) - len(ch)) - 2 if maxServers <= 0 { @@ -459,9 +459,18 @@ func (u *DeviceListUpdater) worker(ch chan spec.ServerName, workerID int) { retriesMu.Lock() _, exists := retries[serverName] retriesMu.Unlock() - if exists { + + // If the serverName is coming from retries, maybe it was + // blacklisted in the meantime. + _, err := u.isBlacklistedOrBackingOffFn(serverName) + var federationClientError *fedsenderapi.FederationClientError + // unwrap errors and check for FederationClientError, if found, federationClientError will be not nil + errors.As(err, &federationClientError) + isBlacklisted := federationClientError != nil && federationClientError.Blacklisted + + // Don't retry a server that we're already waiting for or is blacklisted by now. + if exists || isBlacklisted { deviceListUpdaterBackpressure.With(prometheus.Labels{"worker_id": strconv.Itoa(workerID)}).Dec() - // Don't retry a server that we're already waiting for. continue } waitTime, shouldRetry := u.processServer(serverName)