Add prometheus metrics for destination queues, sync requests
Squashed commit of the following: commit 7ed1c6cfe67429dbe378a763d832c150eb0f781d Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Wed Dec 16 14:53:27 2020 +0000 Updates commit 8442099d08760b8d086e6d58f9f30284e378a2cd Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Wed Dec 16 14:43:18 2020 +0000 Add some sync statistics commit ffe2a11644ed3d5297d1775a680886c574143fdb Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Wed Dec 16 14:37:00 2020 +0000 Fix backing off display commit 27443a93855aa60a49806ecabbf9b09f818301bd Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Wed Dec 16 14:28:43 2020 +0000 Add some destination queue metrics
This commit is contained in:
parent
b891c00b09
commit
56b5847c74
|
@ -242,6 +242,8 @@ func (oq *destinationQueue) backgroundSend() {
|
||||||
if !oq.running.CAS(false, true) {
|
if !oq.running.CAS(false, true) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
destinationQueueRunning.Inc()
|
||||||
|
defer destinationQueueRunning.Dec()
|
||||||
defer oq.running.Store(false)
|
defer oq.running.Store(false)
|
||||||
|
|
||||||
// Mark the queue as overflowed, so we will consult the database
|
// Mark the queue as overflowed, so we will consult the database
|
||||||
|
@ -295,10 +297,14 @@ func (oq *destinationQueue) backgroundSend() {
|
||||||
// time.
|
// time.
|
||||||
duration := time.Until(*until)
|
duration := time.Until(*until)
|
||||||
log.Warnf("Backing off %q for %s", oq.destination, duration)
|
log.Warnf("Backing off %q for %s", oq.destination, duration)
|
||||||
|
oq.backingOff.Store(true)
|
||||||
|
destinationQueueBackingOff.Inc()
|
||||||
select {
|
select {
|
||||||
case <-time.After(duration):
|
case <-time.After(duration):
|
||||||
case <-oq.interruptBackoff:
|
case <-oq.interruptBackoff:
|
||||||
}
|
}
|
||||||
|
destinationQueueBackingOff.Dec()
|
||||||
|
oq.backingOff.Store(false)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Work out which PDUs/EDUs to include in the next transaction.
|
// Work out which PDUs/EDUs to include in the next transaction.
|
||||||
|
|
|
@ -27,6 +27,7 @@ import (
|
||||||
"github.com/matrix-org/dendrite/federationsender/storage/shared"
|
"github.com/matrix-org/dendrite/federationsender/storage/shared"
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
"github.com/tidwall/gjson"
|
"github.com/tidwall/gjson"
|
||||||
)
|
)
|
||||||
|
@ -45,6 +46,37 @@ type OutgoingQueues struct {
|
||||||
queues map[gomatrixserverlib.ServerName]*destinationQueue
|
queues map[gomatrixserverlib.ServerName]*destinationQueue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
prometheus.MustRegister(
|
||||||
|
destinationQueueTotal, destinationQueueRunning,
|
||||||
|
destinationQueueBackingOff,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
var destinationQueueTotal = prometheus.NewGauge(
|
||||||
|
prometheus.GaugeOpts{
|
||||||
|
Namespace: "dendrite",
|
||||||
|
Subsystem: "federationsender",
|
||||||
|
Name: "destination_queues_total",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
var destinationQueueRunning = prometheus.NewGauge(
|
||||||
|
prometheus.GaugeOpts{
|
||||||
|
Namespace: "dendrite",
|
||||||
|
Subsystem: "federationsender",
|
||||||
|
Name: "destination_queues_running",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
var destinationQueueBackingOff = prometheus.NewGauge(
|
||||||
|
prometheus.GaugeOpts{
|
||||||
|
Namespace: "dendrite",
|
||||||
|
Subsystem: "federationsender",
|
||||||
|
Name: "destination_queues_backing_off",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
// NewOutgoingQueues makes a new OutgoingQueues
|
// NewOutgoingQueues makes a new OutgoingQueues
|
||||||
func NewOutgoingQueues(
|
func NewOutgoingQueues(
|
||||||
db storage.Database,
|
db storage.Database,
|
||||||
|
@ -116,6 +148,7 @@ func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *d
|
||||||
defer oqs.queuesMutex.Unlock()
|
defer oqs.queuesMutex.Unlock()
|
||||||
oq := oqs.queues[destination]
|
oq := oqs.queues[destination]
|
||||||
if oq == nil {
|
if oq == nil {
|
||||||
|
destinationQueueTotal.Inc()
|
||||||
oq = &destinationQueue{
|
oq = &destinationQueue{
|
||||||
db: oqs.db,
|
db: oqs.db,
|
||||||
rsAPI: oqs.rsAPI,
|
rsAPI: oqs.rsAPI,
|
||||||
|
|
|
@ -35,6 +35,7 @@ import (
|
||||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/matrix-org/util"
|
"github.com/matrix-org/util"
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -99,6 +100,30 @@ func (rp *RequestPool) updateLastSeen(req *http.Request, device *userapi.Device)
|
||||||
rp.lastseen.Store(device.UserID+device.ID, time.Now())
|
rp.lastseen.Store(device.UserID+device.ID, time.Now())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
prometheus.MustRegister(
|
||||||
|
activeSyncRequests, waitingSyncRequests,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
var activeSyncRequests = prometheus.NewGauge(
|
||||||
|
prometheus.GaugeOpts{
|
||||||
|
Namespace: "dendrite",
|
||||||
|
Subsystem: "syncapi",
|
||||||
|
Name: "active_sync_requests",
|
||||||
|
Help: "The number of sync requests that are active right now",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
var waitingSyncRequests = prometheus.NewGauge(
|
||||||
|
prometheus.GaugeOpts{
|
||||||
|
Namespace: "dendrite",
|
||||||
|
Subsystem: "syncapi",
|
||||||
|
Name: "waiting_sync_requests",
|
||||||
|
Help: "The number of sync requests that are waiting to be woken by a notifier",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
// OnIncomingSyncRequest is called when a client makes a /sync request. This function MUST be
|
// OnIncomingSyncRequest is called when a client makes a /sync request. This function MUST be
|
||||||
// called in a dedicated goroutine for this request. This function will block the goroutine
|
// called in a dedicated goroutine for this request. This function will block the goroutine
|
||||||
// until a response is ready, or it times out.
|
// until a response is ready, or it times out.
|
||||||
|
@ -122,6 +147,9 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
|
||||||
"limit": syncReq.limit,
|
"limit": syncReq.limit,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
activeSyncRequests.Inc()
|
||||||
|
defer activeSyncRequests.Dec()
|
||||||
|
|
||||||
rp.updateLastSeen(req, device)
|
rp.updateLastSeen(req, device)
|
||||||
|
|
||||||
currPos := rp.notifier.CurrentPosition()
|
currPos := rp.notifier.CurrentPosition()
|
||||||
|
@ -139,6 +167,9 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
waitingSyncRequests.Inc()
|
||||||
|
defer waitingSyncRequests.Dec()
|
||||||
|
|
||||||
// Otherwise, we wait for the notifier to tell us if something *may* have
|
// Otherwise, we wait for the notifier to tell us if something *may* have
|
||||||
// happened. We loop in case it turns out that nothing did happen.
|
// happened. We loop in case it turns out that nothing did happen.
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue