Prefix subjects, preserve inboxes

This commit is contained in:
Neil Alexander 2022-03-22 14:35:56 +00:00
parent 4d636428c0
commit 1170030352
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944

View file

@ -66,6 +66,7 @@ type Inputer struct {
type worker struct { type worker struct {
phony.Inbox phony.Inbox
sync.Mutex
r *Inputer r *Inputer
roomID string roomID string
subscription *nats.Subscription subscription *nats.Subscription
@ -77,9 +78,11 @@ func (r *Inputer) startWorkerForRoom(roomID string) {
roomID: roomID, roomID: roomID,
}) })
w := v.(*worker) w := v.(*worker)
if !loaded { w.Lock()
defer w.Lock()
if !loaded || w.subscription == nil {
consumer := r.Cfg.Matrix.JetStream.Prefixed("RoomInput" + jetstream.Tokenise(w.roomID)) consumer := r.Cfg.Matrix.JetStream.Prefixed("RoomInput" + jetstream.Tokenise(w.roomID))
subject := jetstream.InputRoomEventSubj(w.roomID) subject := r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEventSubj(w.roomID))
if _, err := w.r.JetStream.AddConsumer( if _, err := w.r.JetStream.AddConsumer(
r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent), r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent),
@ -143,7 +146,9 @@ func (w *worker) next() {
if err = w.subscription.Unsubscribe(); err != nil { if err = w.subscription.Unsubscribe(); err != nil {
logrus.WithError(err).Errorf("Failed to unsubscribe to stream for room %q", w.roomID) logrus.WithError(err).Errorf("Failed to unsubscribe to stream for room %q", w.roomID)
} }
w.r.workers.Delete(w.roomID) w.Lock()
w.subscription = nil
w.Unlock()
return return
case nats.ErrTimeout: case nats.ErrTimeout:
w.Act(nil, w.next) w.Act(nil, w.next)
@ -153,7 +158,9 @@ func (w *worker) next() {
if err = w.subscription.Unsubscribe(); err != nil { if err = w.subscription.Unsubscribe(); err != nil {
logrus.WithError(err).Errorf("Failed to unsubscribe to stream for room %q", w.roomID) logrus.WithError(err).Errorf("Failed to unsubscribe to stream for room %q", w.roomID)
} }
w.r.workers.Delete(w.roomID) w.Lock()
w.subscription = nil
w.Unlock()
return return
} }
@ -214,7 +221,7 @@ func (r *Inputer) InputRoomEvents(
var err error var err error
for _, e := range request.InputRoomEvents { for _, e := range request.InputRoomEvents {
roomID := e.Event.RoomID() roomID := e.Event.RoomID()
subj := jetstream.InputRoomEventSubj(roomID) subj := r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEventSubj(roomID))
msg := &nats.Msg{ msg := &nats.Msg{
Subject: subj, Subject: subj,
Header: nats.Header{}, Header: nats.Header{},