Add another metric

This commit is contained in:
Till Faelligen 2023-10-31 11:47:44 +01:00
parent f2dcdf6213
commit 407f25008c
No known key found for this signature in database
GPG key ID: ACCDC9606D472758

View file

@ -152,6 +152,15 @@ var deviceListUpdaterBackpressure = prometheus.NewGaugeVec(
}, },
[]string{"worker_id"}, []string{"worker_id"},
) )
var deviceListUpdaterServersRetrying = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "dendrite",
Subsystem: "keyserver",
Name: "worker_servers_retrying",
Help: "How many servers are queued for retry",
},
[]string{"worker_id"},
)
// NewDeviceListUpdater creates a new updater which fetches fresh device lists when they go stale. // NewDeviceListUpdater creates a new updater which fetches fresh device lists when they go stale.
func NewDeviceListUpdater( func NewDeviceListUpdater(
@ -162,7 +171,7 @@ func NewDeviceListUpdater(
enableMetrics bool, enableMetrics bool,
) *DeviceListUpdater { ) *DeviceListUpdater {
if enableMetrics { if enableMetrics {
prometheus.MustRegister(deviceListUpdaterBackpressure) prometheus.MustRegister(deviceListUpdaterBackpressure, deviceListUpdaterServersRetrying)
} }
return &DeviceListUpdater{ return &DeviceListUpdater{
process: process, process: process,
@ -188,7 +197,7 @@ func (u *DeviceListUpdater) Start() error {
// to stop (in this transaction) until key requests can be made. // to stop (in this transaction) until key requests can be made.
ch := make(chan spec.ServerName, 10) ch := make(chan spec.ServerName, 10)
u.workerChans[i] = ch u.workerChans[i] = ch
go u.worker(ch) go u.worker(ch, i)
} }
staleLists, err := u.db.StaleDeviceLists(u.process.Context(), []spec.ServerName{}) staleLists, err := u.db.StaleDeviceLists(u.process.Context(), []spec.ServerName{})
@ -389,7 +398,7 @@ func (u *DeviceListUpdater) clearChannel(userID string) {
} }
} }
func (u *DeviceListUpdater) worker(ch chan spec.ServerName) { func (u *DeviceListUpdater) worker(ch chan spec.ServerName, workerID int) {
retries := make(map[spec.ServerName]time.Time) retries := make(map[spec.ServerName]time.Time)
retriesMu := &sync.Mutex{} retriesMu := &sync.Mutex{}
// restarter goroutine which will inject failed servers into ch when it is time // restarter goroutine which will inject failed servers into ch when it is time
@ -408,9 +417,12 @@ func (u *DeviceListUpdater) worker(ch chan spec.ServerName) {
for _, srv := range serversToRetry { for _, srv := range serversToRetry {
delete(retries, srv) delete(retries, srv)
} }
deviceListUpdaterServersRetrying.With(prometheus.Labels{"worker_id": strconv.Itoa(workerID)}).Set(float64(len(retries)))
retriesMu.Unlock() retriesMu.Unlock()
for _, srv := range serversToRetry { for _, srv := range serversToRetry {
deviceListUpdaterBackpressure.With(prometheus.Labels{"worker_id": strconv.Itoa(workerID)}).Inc()
ch <- srv ch <- srv
deviceListUpdaterBackpressure.With(prometheus.Labels{"worker_id": strconv.Itoa(workerID)}).Dec()
} }
} }
}() }()