diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index e9d137be8..70edcf1df 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -76,14 +76,14 @@ func (r *Inputer) workerForRoom(roomID string) *worker { }) w := v.(*worker) if !loaded { + consumer := "DendriteRoomInputConsumerPull" + jetstream.Tokenise(w.roomID) sub, err := w.r.JetStream.PullSubscribe( - jetstream.InputRoomEventSubj(w.roomID), - "DendriteRoomInputConsumerPull"+jetstream.Tokenise(w.roomID), + jetstream.InputRoomEventSubj(w.roomID), consumer, nats.ManualAck(), nats.DeliverAll(), nats.MaxAckPending(-1), nats.AckWait(MaximumMissingProcessingTime+(time.Second*10)), - nats.BindStream(r.InputRoomEventTopic), + nats.Bind(r.InputRoomEventTopic, consumer), ) if err != nil { logrus.WithError(err).Errorf("Failed to subscribe to stream for room %q", w.roomID)