This probably makes more sense

This commit is contained in:
Till Faelligen 2023-11-01 12:27:47 +01:00
parent 116b0b96e2
commit be0c6995e9
No known key found for this signature in database
GPG key ID: ACCDC9606D472758

View file

@ -367,8 +367,6 @@ func (u *DeviceListUpdater) notifyWorkers(userID string) {
index := int(int64(hash.Sum32()) % int64(len(u.workerChans)))
ch := u.assignChannel(userID)
deviceListUpdaterBackpressure.With(prometheus.Labels{"worker_id": strconv.Itoa(index)}).Inc()
defer deviceListUpdaterBackpressure.With(prometheus.Labels{"worker_id": strconv.Itoa(index)}).Dec()
u.workerChans[index] <- remoteServer
select {
case <-ch:
@ -420,17 +418,17 @@ func (u *DeviceListUpdater) worker(ch chan spec.ServerName, workerID int) {
deviceListUpdaterServersRetrying.With(prometheus.Labels{"worker_id": strconv.Itoa(workerID)}).Set(float64(len(retries)))
retriesMu.Unlock()
for _, srv := range serversToRetry {
deviceListUpdaterBackpressure.With(prometheus.Labels{"worker_id": strconv.Itoa(workerID)}).Inc()
ch <- srv
deviceListUpdaterBackpressure.With(prometheus.Labels{"worker_id": strconv.Itoa(workerID)}).Dec()
}
}
}()
for serverName := range ch {
deviceListUpdaterBackpressure.With(prometheus.Labels{"worker_id": strconv.Itoa(workerID)}).Inc()
retriesMu.Lock()
_, exists := retries[serverName]
retriesMu.Unlock()
if exists {
deviceListUpdaterBackpressure.With(prometheus.Labels{"worker_id": strconv.Itoa(workerID)}).Dec()
// Don't retry a server that we're already waiting for.
continue
}
@ -442,6 +440,7 @@ func (u *DeviceListUpdater) worker(ch chan spec.ServerName, workerID int) {
}
retriesMu.Unlock()
}
deviceListUpdaterBackpressure.With(prometheus.Labels{"worker_id": strconv.Itoa(workerID)}).Dec()
}
}