diff --git a/userapi/internal/device_list_update.go b/userapi/internal/device_list_update.go index 80631456a..e17549ad9 100644 --- a/userapi/internal/device_list_update.go +++ b/userapi/internal/device_list_update.go @@ -367,8 +367,6 @@ func (u *DeviceListUpdater) notifyWorkers(userID string) { index := int(int64(hash.Sum32()) % int64(len(u.workerChans))) ch := u.assignChannel(userID) - deviceListUpdaterBackpressure.With(prometheus.Labels{"worker_id": strconv.Itoa(index)}).Inc() - defer deviceListUpdaterBackpressure.With(prometheus.Labels{"worker_id": strconv.Itoa(index)}).Dec() u.workerChans[index] <- remoteServer select { case <-ch: @@ -420,17 +418,17 @@ func (u *DeviceListUpdater) worker(ch chan spec.ServerName, workerID int) { deviceListUpdaterServersRetrying.With(prometheus.Labels{"worker_id": strconv.Itoa(workerID)}).Set(float64(len(retries))) retriesMu.Unlock() for _, srv := range serversToRetry { - deviceListUpdaterBackpressure.With(prometheus.Labels{"worker_id": strconv.Itoa(workerID)}).Inc() ch <- srv - deviceListUpdaterBackpressure.With(prometheus.Labels{"worker_id": strconv.Itoa(workerID)}).Dec() } } }() for serverName := range ch { + deviceListUpdaterBackpressure.With(prometheus.Labels{"worker_id": strconv.Itoa(workerID)}).Inc() retriesMu.Lock() _, exists := retries[serverName] retriesMu.Unlock() if exists { + deviceListUpdaterBackpressure.With(prometheus.Labels{"worker_id": strconv.Itoa(workerID)}).Dec() // Don't retry a server that we're already waiting for. continue } @@ -442,6 +440,7 @@ func (u *DeviceListUpdater) worker(ch chan spec.ServerName, workerID int) { } retriesMu.Unlock() } + deviceListUpdaterBackpressure.With(prometheus.Labels{"worker_id": strconv.Itoa(workerID)}).Dec() } }