From a0116959c32826103254fc068e70a0df74c8a1de Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Tue, 22 Mar 2022 12:14:00 +0000 Subject: [PATCH] Preserve consumer after unsubscribe --- roomserver/internal/input/input.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index e9d137be8..70edcf1df 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -76,14 +76,14 @@ func (r *Inputer) workerForRoom(roomID string) *worker { }) w := v.(*worker) if !loaded { + consumer := "DendriteRoomInputConsumerPull" + jetstream.Tokenise(w.roomID) sub, err := w.r.JetStream.PullSubscribe( - jetstream.InputRoomEventSubj(w.roomID), - "DendriteRoomInputConsumerPull"+jetstream.Tokenise(w.roomID), + jetstream.InputRoomEventSubj(w.roomID), consumer, nats.ManualAck(), nats.DeliverAll(), nats.MaxAckPending(-1), nats.AckWait(MaximumMissingProcessingTime+(time.Second*10)), - nats.BindStream(r.InputRoomEventTopic), + nats.Bind(r.InputRoomEventTopic, consumer), ) if err != nil { logrus.WithError(err).Errorf("Failed to subscribe to stream for room %q", w.roomID)