diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index 2b63a0f22..3720061ee 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -43,6 +43,8 @@ import ( "github.com/matrix-org/dendrite/setup/process" ) +const inactiveThreshold = time.Hour + // Inputer is responsible for consuming from the roomserver input // streams and processing the events. All input events are queued // into a single NATS stream and the order is preserved strictly. @@ -136,7 +138,7 @@ func (r *Inputer) startWorkerForRoom(roomID string) { // consumers indefinitely for rooms that might no longer be active, // since they do have a small overhead. If the room becomes active // again then we'll recreate the consumer anyway. - InactiveThreshold: time.Hour, + InactiveThreshold: inactiveThreshold, }, ); err != nil { logrus.WithError(err).Errorf("Failed to create consumer for room %q", w.roomID) @@ -152,7 +154,7 @@ func (r *Inputer) startWorkerForRoom(roomID string) { nats.DeliverAll(), nats.AckWait(MaximumMissingProcessingTime+(time.Second*10)), nats.Bind(r.InputRoomEventTopic, consumer), - nats.InactiveThreshold(time.Hour), + nats.InactiveThreshold(inactiveThreshold), ) if err != nil { logrus.WithError(err).Errorf("Failed to subscribe to stream for room %q", w.roomID) @@ -188,6 +190,19 @@ func (r *Inputer) Start() error { nats.ReplayInstant(), nats.BindStream(r.InputRoomEventTopic), ) + + // Make sure that we match the expected inactivity threshold. + stream := r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent) + for consumer := range r.JetStream.Consumers(stream) { + if consumer.Config.InactiveThreshold == inactiveThreshold { + continue + } + consumer.Config.InactiveThreshold = inactiveThreshold + if _, cerr := r.JetStream.UpdateConsumer(stream, &consumer.Config); cerr != nil { + logrus.WithError(cerr).Warnf("Failed to update inactive threshold on consumer %q", consumer.Name) + } + } + return err }