From a42232143526de360309b112b57cf0d95adf47cb Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Fri, 7 Jan 2022 13:41:53 +0000 Subject: [PATCH] Fix panic at startup if roomserver was not given federation API reference by the time NATS consumes an event, tweak backpressure metrics --- roomserver/internal/api.go | 7 ++++--- roomserver/internal/input/input.go | 9 ++++++--- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/roomserver/internal/api.go b/roomserver/internal/api.go index 5cfe68daa..e370f7e44 100644 --- a/roomserver/internal/api.go +++ b/roomserver/internal/api.go @@ -72,9 +72,6 @@ func NewRoomserverAPI( }, // perform-er structs get initialised when we have a federation sender to use } - if err := a.Inputer.Start(); err != nil { - logrus.WithError(err).Panic("failed to start roomserver input API") - } return a } @@ -140,6 +137,10 @@ func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.FederationInternalA r.Forgetter = &perform.Forgetter{ DB: r.DB, } + + if err := r.Inputer.Start(); err != nil { + logrus.WithError(err).Panic("failed to start roomserver input API") + } } func (r *RoomserverInternalAPI) SetAppserviceAPI(asAPI asAPI.AppServiceQueryAPI) { diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index 1eab67802..dbff5fdda 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -59,14 +59,15 @@ func (r *Inputer) Start() error { // later, possibly with an error response to the inputter if synchronous. func(msg *nats.Msg) { roomID := msg.Header.Get("room_id") - defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec() var inputRoomEvent api.InputRoomEvent if err := json.Unmarshal(msg.Data, &inputRoomEvent); err != nil { _ = msg.Term() return } inbox, _ := r.workers.LoadOrStore(roomID, &phony.Inbox{}) + roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc() inbox.(*phony.Inbox).Act(nil, func() { + defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec() if err := r.processRoomEvent(context.TODO(), &inputRoomEvent); err != nil { sentry.CaptureException(err) } else { @@ -111,15 +112,17 @@ func (r *Inputer) InputRoomEvents( if _, err = r.JetStream.PublishMsg(msg); err != nil { return } - roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc() } } else { responses := make(chan error, len(request.InputRoomEvents)) defer close(responses) for _, e := range request.InputRoomEvents { inputRoomEvent := e - inbox, _ := r.workers.LoadOrStore(inputRoomEvent.Event.RoomID(), &phony.Inbox{}) + roomID := inputRoomEvent.Event.RoomID() + inbox, _ := r.workers.LoadOrStore(roomID, &phony.Inbox{}) + roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc() inbox.(*phony.Inbox).Act(nil, func() { + defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec() err := r.processRoomEvent(context.TODO(), &inputRoomEvent) if err != nil { sentry.CaptureException(err)