diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index 3f557d3e7..5f67e0d25 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -299,6 +299,9 @@ func (w *worker) _next() { return } + // Since we either Ack() or Term() the message at this point, we can defer decrementing the room backpressure + defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": w.roomID}).Dec() + // Try to unmarshal the input room event. If the JSON unmarshalling // fails then we'll terminate the message — this notifies NATS that // we are done with the message and never want to see it again. @@ -313,8 +316,6 @@ func (w *worker) _next() { scope.SetTag("event_id", inputRoomEvent.Event.EventID()) } - defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": w.roomID}).Dec() - // Process the room event. If something goes wrong then we'll tell // NATS to terminate the message. We'll store the error result as // a string, because we might want to return that to the caller if @@ -417,6 +418,8 @@ func (r *Inputer) queueInputRoomEvents( }).Error("Roomserver failed to queue async event") return nil, fmt.Errorf("r.JetStream.PublishMsg: %w", err) } + + // Now that the event is queued, increment the room backpressure roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc() } return