diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index c47793f0a..f5099ca11 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -89,6 +89,13 @@ type Inputer struct { Queryer *query.Queryer } +// If a room consumer is inactive for a while then we will allow NATS +// to clean it up. This stops us from holding onto durable consumers +// indefinitely for rooms that might no longer be active, since they do +// have an interest overhead in the NATS Server. If the room becomes +// active again then we'll recreate the consumer anyway. +const inactiveThreshold = time.Hour * 24 + type worker struct { phony.Inbox sync.Mutex @@ -125,11 +132,12 @@ func (r *Inputer) startWorkerForRoom(roomID string) { if _, err := w.r.JetStream.AddConsumer( r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent), &nats.ConsumerConfig{ - Durable: consumer, - AckPolicy: nats.AckAllPolicy, - DeliverPolicy: nats.DeliverAllPolicy, - FilterSubject: subject, - AckWait: MaximumMissingProcessingTime + (time.Second * 10), + Durable: consumer, + AckPolicy: nats.AckAllPolicy, + DeliverPolicy: nats.DeliverAllPolicy, + FilterSubject: subject, + AckWait: MaximumMissingProcessingTime + (time.Second * 10), + InactiveThreshold: inactiveThreshold, }, ); err != nil { logrus.WithError(err).Errorf("Failed to create consumer for room %q", w.roomID) @@ -145,6 +153,7 @@ func (r *Inputer) startWorkerForRoom(roomID string) { nats.DeliverAll(), nats.AckWait(MaximumMissingProcessingTime+(time.Second*10)), nats.Bind(r.InputRoomEventTopic, consumer), + nats.InactiveThreshold(inactiveThreshold), ) if err != nil { logrus.WithError(err).Errorf("Failed to subscribe to stream for room %q", w.roomID) @@ -180,6 +189,21 @@ func (r *Inputer) Start() error { nats.ReplayInstant(), nats.BindStream(r.InputRoomEventTopic), ) + + // Make sure that the room consumers have the right config. + stream := r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent) + for consumer := range r.JetStream.Consumers(stream) { + switch { + case consumer.Config.Durable == "": + continue // Ignore ephemeral consumers + case consumer.Config.InactiveThreshold != inactiveThreshold: + consumer.Config.InactiveThreshold = inactiveThreshold + if _, cerr := r.JetStream.UpdateConsumer(stream, &consumer.Config); cerr != nil { + logrus.WithError(cerr).Warnf("Failed to update inactive threshold on consumer %q", consumer.Name) + } + } + } + return err }