Use a nice const

This commit is contained in:
Neil Alexander 2022-10-14 13:16:46 +01:00
parent 54ce7b5c9a
commit a1abc8ee9c
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944

View file

@ -43,8 +43,6 @@ import (
"github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/dendrite/setup/process"
) )
const inactiveThreshold = time.Hour
// Inputer is responsible for consuming from the roomserver input // Inputer is responsible for consuming from the roomserver input
// streams and processing the events. All input events are queued // streams and processing the events. All input events are queued
// into a single NATS stream and the order is preserved strictly. // into a single NATS stream and the order is preserved strictly.
@ -91,6 +89,13 @@ type Inputer struct {
Queryer *query.Queryer Queryer *query.Queryer
} }
// If a room consumer is inactive for a while then we will allow NATS
// to clean it up. This stops us from holding onto durable consumers
// indefinitely for rooms that might no longer be active, since they do
// have an interest overhead in the NATS Server. If the room becomes
// active again then we'll recreate the consumer anyway.
const inactiveThreshold = time.Minute
type worker struct { type worker struct {
phony.Inbox phony.Inbox
sync.Mutex sync.Mutex
@ -127,17 +132,11 @@ func (r *Inputer) startWorkerForRoom(roomID string) {
if _, err := w.r.JetStream.AddConsumer( if _, err := w.r.JetStream.AddConsumer(
r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent), r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent),
&nats.ConsumerConfig{ &nats.ConsumerConfig{
Durable: consumer, Durable: consumer,
AckPolicy: nats.AckAllPolicy, AckPolicy: nats.AckAllPolicy,
DeliverPolicy: nats.DeliverAllPolicy, DeliverPolicy: nats.DeliverAllPolicy,
FilterSubject: subject, FilterSubject: subject,
AckWait: MaximumMissingProcessingTime + (time.Second * 10), AckWait: MaximumMissingProcessingTime + (time.Second * 10),
// If the consumer is inactive for a while then we will allow NATS
// to clean it up. This prevents us from holding onto durable
// consumers indefinitely for rooms that might no longer be active,
// since they do have a small overhead. If the room becomes active
// again then we'll recreate the consumer anyway.
InactiveThreshold: inactiveThreshold, InactiveThreshold: inactiveThreshold,
}, },
); err != nil { ); err != nil {
@ -191,15 +190,13 @@ func (r *Inputer) Start() error {
nats.BindStream(r.InputRoomEventTopic), nats.BindStream(r.InputRoomEventTopic),
) )
// Make sure that we match the expected inactivity threshold. // Make sure that the room consumers have the right config.
stream := r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent) stream := r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent)
for consumer := range r.JetStream.Consumers(stream) { for consumer := range r.JetStream.Consumers(stream) {
switch { switch {
case consumer.Config.Durable == "": case consumer.Config.Durable == "":
continue // Ignore ephemeral consumers continue // Ignore ephemeral consumers
case consumer.Config.InactiveThreshold == inactiveThreshold: case consumer.Config.InactiveThreshold != inactiveThreshold:
continue // Ignore consumers that already have the correct threshold
default:
consumer.Config.InactiveThreshold = inactiveThreshold consumer.Config.InactiveThreshold = inactiveThreshold
if _, cerr := r.JetStream.UpdateConsumer(stream, &consumer.Config); cerr != nil { if _, cerr := r.JetStream.UpdateConsumer(stream, &consumer.Config); cerr != nil {
logrus.WithError(cerr).Warnf("Failed to update inactive threshold on consumer %q", consumer.Name) logrus.WithError(cerr).Warnf("Failed to update inactive threshold on consumer %q", consumer.Name)