From c68037b3e83caae7eee39b86e717957477e3b92d Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 24 Jan 2022 10:37:19 +0000 Subject: [PATCH] Apply backpressure to consumers/synchronous requests to hopefully stop things being overwhelmed --- roomserver/internal/input/input.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index 2626b5872..fbd0c79f6 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -77,7 +77,8 @@ func (r *Inputer) Start() error { return } roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc() - r.workerForRoom(roomID).Act(nil, func() { + worker := r.workerForRoom(roomID) + worker.Act(worker, func() { defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec() if err := r.processRoomEvent(context.TODO(), &inputRoomEvent); err != nil { sentry.CaptureException(err) @@ -133,7 +134,8 @@ func (r *Inputer) InputRoomEvents( inputRoomEvent := e roomID := inputRoomEvent.Event.RoomID() roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc() - r.workerForRoom(roomID).Act(nil, func() { + worker := r.workerForRoom(roomID) + worker.Act(worker, func() { defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec() err := r.processRoomEvent(ctx, &inputRoomEvent) if err != nil {