From e62ee5cd8ee2eaffddd75ad1c49aa13917806b29 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 23 Mar 2022 09:47:28 +0000 Subject: [PATCH] Separate `queueInputRoomEvents` function --- roomserver/internal/input/input.go | 70 ++++++++++++++++++------------ 1 file changed, 42 insertions(+), 28 deletions(-) diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index f57bd3109..c90c3536f 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -278,6 +278,44 @@ func (w *worker) _next() { } } +// queueInputRoomEvents queues events into the roomserver input +// stream in NATS. +func (r *Inputer) queueInputRoomEvents( + ctx context.Context, + request *api.InputRoomEventsRequest, + response *api.InputRoomEventsResponse, + replyTo string, // empty string if not synchronous +) (err error) { + // For each event, marshal the input room event and then + // send it into the input queue. + for _, e := range request.InputRoomEvents { + roomID := e.Event.RoomID() + subj := r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEventSubj(roomID)) + msg := &nats.Msg{ + Subject: subj, + Header: nats.Header{}, + } + msg.Header.Set("room_id", roomID) + if replyTo != "" { + msg.Header.Set("sync", replyTo) + } + msg.Data, err = json.Marshal(e) + if err != nil { + response.ErrMsg = err.Error() + return + } + if _, err = r.JetStream.PublishMsg(msg, nats.Context(ctx)); err != nil { + logrus.WithError(err).WithFields(logrus.Fields{ + "room_id": roomID, + "event_id": e.Event.EventID(), + "subj": subj, + }).Error("Roomserver failed to queue async event") + return + } + } + return +} + // InputRoomEvents implements api.RoomserverInternalAPI func (r *Inputer) InputRoomEvents( ctx context.Context, @@ -307,34 +345,10 @@ func (r *Inputer) InputRoomEvents( } } - // For each event, marshal the input room event and then - // send it into the input queue. - var err error - for _, e := range request.InputRoomEvents { - roomID := e.Event.RoomID() - subj := r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEventSubj(roomID)) - msg := &nats.Msg{ - Subject: subj, - Header: nats.Header{}, - Reply: replyTo, - } - msg.Header.Set("room_id", roomID) - if replyTo != "" { - msg.Header.Set("sync", replyTo) - } - msg.Data, err = json.Marshal(e) - if err != nil { - response.ErrMsg = err.Error() - return - } - if _, err = r.JetStream.PublishMsg(msg); err != nil { - logrus.WithError(err).WithFields(logrus.Fields{ - "room_id": roomID, - "event_id": e.Event.EventID(), - "subj": subj, - }).Error("Roomserver failed to queue async event") - return - } + // Queue up the event into the roomserver. + if err := r.queueInputRoomEvents(ctx, request, response, replyTo); err != nil { + response.ErrMsg = err.Error() + return } // If we aren't waiting for synchronous responses then we can