diff --git a/roomserver/internal/api.go b/roomserver/internal/api.go index 91001e418..f96cefcb3 100644 --- a/roomserver/internal/api.go +++ b/roomserver/internal/api.go @@ -90,6 +90,7 @@ func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.FederationInternalA r.KeyRing = keyRing r.Inputer = &input.Inputer{ + Cfg: r.Cfg, ProcessContext: r.ProcessContext, DB: r.DB, InputRoomEventTopic: r.InputRoomEventTopic, diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index 70edcf1df..d267599ec 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -29,6 +29,7 @@ import ( "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/internal/query" "github.com/matrix-org/dendrite/roomserver/storage" + "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/gomatrixserverlib" @@ -46,6 +47,7 @@ var keyContentFields = map[string]string{ } type Inputer struct { + Cfg *config.RoomServer ProcessContext *process.ProcessContext DB storage.Database NATSClient *nats.Conn @@ -69,7 +71,7 @@ type worker struct { subscription *nats.Subscription } -func (r *Inputer) workerForRoom(roomID string) *worker { +func (r *Inputer) startWorkerForRoom(roomID string) { v, loaded := r.workers.LoadOrStore(roomID, &worker{ r: r, roomID: roomID, @@ -77,23 +79,43 @@ func (r *Inputer) workerForRoom(roomID string) *worker { w := v.(*worker) if !loaded { consumer := "DendriteRoomInputConsumerPull" + jetstream.Tokenise(w.roomID) + subject := jetstream.InputRoomEventSubj(w.roomID) + + if info, err := w.r.JetStream.ConsumerInfo( + jetstream.InputRoomEvent, + consumer, + ); err != nil || info == nil { + if _, err := w.r.JetStream.AddConsumer( + r.Cfg.Matrix.JetStream.TopicFor(jetstream.InputRoomEvent), + &nats.ConsumerConfig{ + Durable: consumer, + AckPolicy: nats.AckExplicitPolicy, + DeliverPolicy: nats.DeliverAllPolicy, + FilterSubject: subject, + AckWait: MaximumMissingProcessingTime + (time.Second * 10), + }, + ); err != nil { + logrus.WithError(err).Errorf("Failed to create consumer for room %q", w.roomID) + return + } + } + sub, err := w.r.JetStream.PullSubscribe( - jetstream.InputRoomEventSubj(w.roomID), consumer, + subject, consumer, nats.ManualAck(), nats.DeliverAll(), - nats.MaxAckPending(-1), nats.AckWait(MaximumMissingProcessingTime+(time.Second*10)), nats.Bind(r.InputRoomEventTopic, consumer), ) if err != nil { logrus.WithError(err).Errorf("Failed to subscribe to stream for room %q", w.roomID) - return nil + return } - logrus.Infof("Subscribed to stream for room %q", w.roomID) + + logrus.Infof("Started stream for room %q", w.roomID) w.subscription = sub w.Act(nil, w.next) } - return w } // onMessage is called when a new event arrives in the roomserver input stream. @@ -102,7 +124,7 @@ func (r *Inputer) Start() error { "", // don't supply a subject because we're using BindStream func(m *nats.Msg) { roomID := m.Header.Get(jetstream.RoomID) - r.workerForRoom(roomID) // start the room worker + r.startWorkerForRoom(roomID) }, nats.DeliverAll(), nats.AckNone(),