Separate queueInputRoomEvents function

This commit is contained in:
Neil Alexander 2022-03-23 09:47:28 +00:00
parent 01403f9d62
commit e62ee5cd8e
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944

View file

@ -278,6 +278,44 @@ func (w *worker) _next() {
}
}
// queueInputRoomEvents queues events into the roomserver input
// stream in NATS.
func (r *Inputer) queueInputRoomEvents(
ctx context.Context,
request *api.InputRoomEventsRequest,
response *api.InputRoomEventsResponse,
replyTo string, // empty string if not synchronous
) (err error) {
// For each event, marshal the input room event and then
// send it into the input queue.
for _, e := range request.InputRoomEvents {
roomID := e.Event.RoomID()
subj := r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEventSubj(roomID))
msg := &nats.Msg{
Subject: subj,
Header: nats.Header{},
}
msg.Header.Set("room_id", roomID)
if replyTo != "" {
msg.Header.Set("sync", replyTo)
}
msg.Data, err = json.Marshal(e)
if err != nil {
response.ErrMsg = err.Error()
return
}
if _, err = r.JetStream.PublishMsg(msg, nats.Context(ctx)); err != nil {
logrus.WithError(err).WithFields(logrus.Fields{
"room_id": roomID,
"event_id": e.Event.EventID(),
"subj": subj,
}).Error("Roomserver failed to queue async event")
return
}
}
return
}
// InputRoomEvents implements api.RoomserverInternalAPI
func (r *Inputer) InputRoomEvents(
ctx context.Context,
@ -307,34 +345,10 @@ func (r *Inputer) InputRoomEvents(
}
}
// For each event, marshal the input room event and then
// send it into the input queue.
var err error
for _, e := range request.InputRoomEvents {
roomID := e.Event.RoomID()
subj := r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEventSubj(roomID))
msg := &nats.Msg{
Subject: subj,
Header: nats.Header{},
Reply: replyTo,
}
msg.Header.Set("room_id", roomID)
if replyTo != "" {
msg.Header.Set("sync", replyTo)
}
msg.Data, err = json.Marshal(e)
if err != nil {
response.ErrMsg = err.Error()
return
}
if _, err = r.JetStream.PublishMsg(msg); err != nil {
logrus.WithError(err).WithFields(logrus.Fields{
"room_id": roomID,
"event_id": e.Event.EventID(),
"subj": subj,
}).Error("Roomserver failed to queue async event")
return
}
// Queue up the event into the roomserver.
if err := r.queueInputRoomEvents(ctx, request, response, replyTo); err != nil {
response.ErrMsg = err.Error()
return
}
// If we aren't waiting for synchronous responses then we can