Fix panic at startup if roomserver was not given federation API reference by the time NATS consumes an event, tweak backpressure metrics

This commit is contained in:
Neil Alexander 2022-01-07 13:41:53 +00:00
parent 173b1e8d3e
commit a422321435
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
2 changed files with 10 additions and 6 deletions

View file

@ -72,9 +72,6 @@ func NewRoomserverAPI(
}, },
// perform-er structs get initialised when we have a federation sender to use // perform-er structs get initialised when we have a federation sender to use
} }
if err := a.Inputer.Start(); err != nil {
logrus.WithError(err).Panic("failed to start roomserver input API")
}
return a return a
} }
@ -140,6 +137,10 @@ func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.FederationInternalA
r.Forgetter = &perform.Forgetter{ r.Forgetter = &perform.Forgetter{
DB: r.DB, DB: r.DB,
} }
if err := r.Inputer.Start(); err != nil {
logrus.WithError(err).Panic("failed to start roomserver input API")
}
} }
func (r *RoomserverInternalAPI) SetAppserviceAPI(asAPI asAPI.AppServiceQueryAPI) { func (r *RoomserverInternalAPI) SetAppserviceAPI(asAPI asAPI.AppServiceQueryAPI) {

View file

@ -59,14 +59,15 @@ func (r *Inputer) Start() error {
// later, possibly with an error response to the inputter if synchronous. // later, possibly with an error response to the inputter if synchronous.
func(msg *nats.Msg) { func(msg *nats.Msg) {
roomID := msg.Header.Get("room_id") roomID := msg.Header.Get("room_id")
defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec()
var inputRoomEvent api.InputRoomEvent var inputRoomEvent api.InputRoomEvent
if err := json.Unmarshal(msg.Data, &inputRoomEvent); err != nil { if err := json.Unmarshal(msg.Data, &inputRoomEvent); err != nil {
_ = msg.Term() _ = msg.Term()
return return
} }
inbox, _ := r.workers.LoadOrStore(roomID, &phony.Inbox{}) inbox, _ := r.workers.LoadOrStore(roomID, &phony.Inbox{})
roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc()
inbox.(*phony.Inbox).Act(nil, func() { inbox.(*phony.Inbox).Act(nil, func() {
defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec()
if err := r.processRoomEvent(context.TODO(), &inputRoomEvent); err != nil { if err := r.processRoomEvent(context.TODO(), &inputRoomEvent); err != nil {
sentry.CaptureException(err) sentry.CaptureException(err)
} else { } else {
@ -111,15 +112,17 @@ func (r *Inputer) InputRoomEvents(
if _, err = r.JetStream.PublishMsg(msg); err != nil { if _, err = r.JetStream.PublishMsg(msg); err != nil {
return return
} }
roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc()
} }
} else { } else {
responses := make(chan error, len(request.InputRoomEvents)) responses := make(chan error, len(request.InputRoomEvents))
defer close(responses) defer close(responses)
for _, e := range request.InputRoomEvents { for _, e := range request.InputRoomEvents {
inputRoomEvent := e inputRoomEvent := e
inbox, _ := r.workers.LoadOrStore(inputRoomEvent.Event.RoomID(), &phony.Inbox{}) roomID := inputRoomEvent.Event.RoomID()
inbox, _ := r.workers.LoadOrStore(roomID, &phony.Inbox{})
roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc()
inbox.(*phony.Inbox).Act(nil, func() { inbox.(*phony.Inbox).Act(nil, func() {
defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec()
err := r.processRoomEvent(context.TODO(), &inputRoomEvent) err := r.processRoomEvent(context.TODO(), &inputRoomEvent)
if err != nil { if err != nil {
sentry.CaptureException(err) sentry.CaptureException(err)