From 3abb6160748d2cd5af782067c4bb70f267d8b87d Mon Sep 17 00:00:00 2001 From: Till Faelligen <2353100+S7evinK@users.noreply.github.com> Date: Sat, 16 Dec 2023 20:13:52 +0100 Subject: [PATCH] Comments --- roomserver/internal/input/input.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index 3f557d3e7..5f67e0d25 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -299,6 +299,9 @@ func (w *worker) _next() { return } + // Since we either Ack() or Term() the message at this point, we can defer decrementing the room backpressure + defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": w.roomID}).Dec() + // Try to unmarshal the input room event. If the JSON unmarshalling // fails then we'll terminate the message — this notifies NATS that // we are done with the message and never want to see it again. @@ -313,8 +316,6 @@ func (w *worker) _next() { scope.SetTag("event_id", inputRoomEvent.Event.EventID()) } - defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": w.roomID}).Dec() - // Process the room event. If something goes wrong then we'll tell // NATS to terminate the message. We'll store the error result as // a string, because we might want to return that to the caller if @@ -417,6 +418,8 @@ func (r *Inputer) queueInputRoomEvents( }).Error("Roomserver failed to queue async event") return nil, fmt.Errorf("r.JetStream.PublishMsg: %w", err) } + + // Now that the event is queued, increment the room backpressure roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc() } return