mirror of
https://github.com/matrix-org/dendrite.git
synced 2024-11-23 14:51:56 -06:00
Prometheus metrics
This commit is contained in:
parent
e745a7663f
commit
f484285f17
|
@ -57,12 +57,14 @@ func (r *Inputer) Start() error {
|
|||
r.InputRoomEventTopic,
|
||||
func(msg *nats.Msg) {
|
||||
_ = msg.InProgress()
|
||||
roomID := msg.Header.Get("room_id")
|
||||
defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec()
|
||||
var inputRoomEvent api.InputRoomEvent
|
||||
if err := json.Unmarshal(msg.Data, &inputRoomEvent); err != nil {
|
||||
_ = msg.Nak()
|
||||
return
|
||||
}
|
||||
inbox, _ := r.workers.LoadOrStore(msg.Header.Get("room_id"), &phony.Inbox{})
|
||||
inbox, _ := r.workers.LoadOrStore(roomID, &phony.Inbox{})
|
||||
inbox.(*phony.Inbox).Act(nil, func() {
|
||||
if _, err := r.processRoomEvent(context.TODO(), &inputRoomEvent); err != nil {
|
||||
sentry.CaptureException(err)
|
||||
|
@ -90,7 +92,8 @@ func (r *Inputer) InputRoomEvents(
|
|||
Subject: r.InputRoomEventTopic,
|
||||
Header: nats.Header{},
|
||||
}
|
||||
msg.Header.Set("room_id", e.Event.RoomID())
|
||||
roomID := e.Event.RoomID()
|
||||
msg.Header.Set("room_id", roomID)
|
||||
msg.Data, err = json.Marshal(e)
|
||||
if err != nil {
|
||||
response.ErrMsg = err.Error()
|
||||
|
@ -100,6 +103,7 @@ func (r *Inputer) InputRoomEvents(
|
|||
response.ErrMsg = err.Error()
|
||||
return
|
||||
}
|
||||
roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue