From 117003035289a77fbbbf175eeb7045cdd98882f3 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Tue, 22 Mar 2022 14:35:56 +0000 Subject: [PATCH] Prefix subjects, preserve inboxes --- roomserver/internal/input/input.go | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index 66548358b..47844e0f6 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -66,6 +66,7 @@ type Inputer struct { type worker struct { phony.Inbox + sync.Mutex r *Inputer roomID string subscription *nats.Subscription @@ -77,9 +78,11 @@ func (r *Inputer) startWorkerForRoom(roomID string) { roomID: roomID, }) w := v.(*worker) - if !loaded { + w.Lock() + defer w.Lock() + if !loaded || w.subscription == nil { consumer := r.Cfg.Matrix.JetStream.Prefixed("RoomInput" + jetstream.Tokenise(w.roomID)) - subject := jetstream.InputRoomEventSubj(w.roomID) + subject := r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEventSubj(w.roomID)) if _, err := w.r.JetStream.AddConsumer( r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent), @@ -143,7 +146,9 @@ func (w *worker) next() { if err = w.subscription.Unsubscribe(); err != nil { logrus.WithError(err).Errorf("Failed to unsubscribe to stream for room %q", w.roomID) } - w.r.workers.Delete(w.roomID) + w.Lock() + w.subscription = nil + w.Unlock() return case nats.ErrTimeout: w.Act(nil, w.next) @@ -153,7 +158,9 @@ func (w *worker) next() { if err = w.subscription.Unsubscribe(); err != nil { logrus.WithError(err).Errorf("Failed to unsubscribe to stream for room %q", w.roomID) } - w.r.workers.Delete(w.roomID) + w.Lock() + w.subscription = nil + w.Unlock() return } @@ -214,7 +221,7 @@ func (r *Inputer) InputRoomEvents( var err error for _, e := range request.InputRoomEvents { roomID := e.Event.RoomID() - subj := jetstream.InputRoomEventSubj(roomID) + subj := r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEventSubj(roomID)) msg := &nats.Msg{ Subject: subj, Header: nats.Header{},