Update existing consumers

This commit is contained in:
Neil Alexander 2022-10-14 12:24:08 +01:00
parent 4def2f695d
commit cce0567289
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944

View file

@ -43,6 +43,8 @@ import (
"github.com/matrix-org/dendrite/setup/process"
)
const inactiveThreshold = time.Hour
// Inputer is responsible for consuming from the roomserver input
// streams and processing the events. All input events are queued
// into a single NATS stream and the order is preserved strictly.
@ -136,7 +138,7 @@ func (r *Inputer) startWorkerForRoom(roomID string) {
// consumers indefinitely for rooms that might no longer be active,
// since they do have a small overhead. If the room becomes active
// again then we'll recreate the consumer anyway.
InactiveThreshold: time.Hour,
InactiveThreshold: inactiveThreshold,
},
); err != nil {
logrus.WithError(err).Errorf("Failed to create consumer for room %q", w.roomID)
@ -152,7 +154,7 @@ func (r *Inputer) startWorkerForRoom(roomID string) {
nats.DeliverAll(),
nats.AckWait(MaximumMissingProcessingTime+(time.Second*10)),
nats.Bind(r.InputRoomEventTopic, consumer),
nats.InactiveThreshold(time.Hour),
nats.InactiveThreshold(inactiveThreshold),
)
if err != nil {
logrus.WithError(err).Errorf("Failed to subscribe to stream for room %q", w.roomID)
@ -188,6 +190,19 @@ func (r *Inputer) Start() error {
nats.ReplayInstant(),
nats.BindStream(r.InputRoomEventTopic),
)
// Make sure that we match the expected inactivity threshold.
stream := r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent)
for consumer := range r.JetStream.Consumers(stream) {
if consumer.Config.InactiveThreshold == inactiveThreshold {
continue
}
consumer.Config.InactiveThreshold = inactiveThreshold
if _, cerr := r.JetStream.UpdateConsumer(stream, &consumer.Config); cerr != nil {
logrus.WithError(cerr).Warnf("Failed to update inactive threshold on consumer %q", consumer.Name)
}
}
return err
}