This commit is contained in:
Till Faelligen 2023-12-16 20:13:52 +01:00
parent 61d7b31388
commit 3abb616074
No known key found for this signature in database
GPG key ID: 3DF82D8AB9211D4E

View file

@ -299,6 +299,9 @@ func (w *worker) _next() {
return
}
// Since we either Ack() or Term() the message at this point, we can defer decrementing the room backpressure
defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": w.roomID}).Dec()
// Try to unmarshal the input room event. If the JSON unmarshalling
// fails then we'll terminate the message — this notifies NATS that
// we are done with the message and never want to see it again.
@ -313,8 +316,6 @@ func (w *worker) _next() {
scope.SetTag("event_id", inputRoomEvent.Event.EventID())
}
defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": w.roomID}).Dec()
// Process the room event. If something goes wrong then we'll tell
// NATS to terminate the message. We'll store the error result as
// a string, because we might want to return that to the caller if
@ -417,6 +418,8 @@ func (r *Inputer) queueInputRoomEvents(
}).Error("Roomserver failed to queue async event")
return nil, fmt.Errorf("r.JetStream.PublishMsg: %w", err)
}
// Now that the event is queued, increment the room backpressure
roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc()
}
return