From a1abc8ee9c3bd5fcc73e8c665caff023defc1fca Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Fri, 14 Oct 2022 13:16:46 +0100 Subject: [PATCH] Use a nice const --- roomserver/internal/input/input.go | 31 ++++++++++++++---------------- 1 file changed, 14 insertions(+), 17 deletions(-) diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index 3a9335e4c..ca5db9434 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -43,8 +43,6 @@ import ( "github.com/matrix-org/dendrite/setup/process" ) -const inactiveThreshold = time.Hour - // Inputer is responsible for consuming from the roomserver input // streams and processing the events. All input events are queued // into a single NATS stream and the order is preserved strictly. @@ -91,6 +89,13 @@ type Inputer struct { Queryer *query.Queryer } +// If a room consumer is inactive for a while then we will allow NATS +// to clean it up. This stops us from holding onto durable consumers +// indefinitely for rooms that might no longer be active, since they do +// have an interest overhead in the NATS Server. If the room becomes +// active again then we'll recreate the consumer anyway. +const inactiveThreshold = time.Minute + type worker struct { phony.Inbox sync.Mutex @@ -127,17 +132,11 @@ func (r *Inputer) startWorkerForRoom(roomID string) { if _, err := w.r.JetStream.AddConsumer( r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent), &nats.ConsumerConfig{ - Durable: consumer, - AckPolicy: nats.AckAllPolicy, - DeliverPolicy: nats.DeliverAllPolicy, - FilterSubject: subject, - AckWait: MaximumMissingProcessingTime + (time.Second * 10), - - // If the consumer is inactive for a while then we will allow NATS - // to clean it up. This prevents us from holding onto durable - // consumers indefinitely for rooms that might no longer be active, - // since they do have a small overhead. If the room becomes active - // again then we'll recreate the consumer anyway. + Durable: consumer, + AckPolicy: nats.AckAllPolicy, + DeliverPolicy: nats.DeliverAllPolicy, + FilterSubject: subject, + AckWait: MaximumMissingProcessingTime + (time.Second * 10), InactiveThreshold: inactiveThreshold, }, ); err != nil { @@ -191,15 +190,13 @@ func (r *Inputer) Start() error { nats.BindStream(r.InputRoomEventTopic), ) - // Make sure that we match the expected inactivity threshold. + // Make sure that the room consumers have the right config. stream := r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent) for consumer := range r.JetStream.Consumers(stream) { switch { case consumer.Config.Durable == "": continue // Ignore ephemeral consumers - case consumer.Config.InactiveThreshold == inactiveThreshold: - continue // Ignore consumers that already have the correct threshold - default: + case consumer.Config.InactiveThreshold != inactiveThreshold: consumer.Config.InactiveThreshold = inactiveThreshold if _, cerr := r.JetStream.UpdateConsumer(stream, &consumer.Config); cerr != nil { logrus.WithError(cerr).Warnf("Failed to update inactive threshold on consumer %q", consumer.Name)