diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index ae2674c62..20d2cfc7a 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -159,10 +159,17 @@ func (r *Inputer) startWorkerForRoom(roomID string) { // The consumer already exists, try to update if necessary. if info != nil { + // Not using reflect.DeepEqual here, since consumerConfig does not explicitly set + // e.g. the consumerName, which is added by NATS later. So this would result + // in constantly updating/recreating the consumer. switch { case info.Config.AckWait.Nanoseconds() != consumerConfig.AckWait.Nanoseconds(): + // Initially we had a AckWait of 2m 10s, now we have 5m 10s, so we need to update + // existing consumers. fallthrough case info.Config.AckPolicy != consumerConfig.AckPolicy: + // We've changed the AckPolicy from AckAll to AckExplicit, this needs a + // recreation of the consumer. (Note: Only a few changes actually need a recreat) logger.Warn("Consumer already exists, trying to update it.") // Try updating the consumer first if _, err = w.r.JetStream.UpdateConsumer(streamName, consumerConfig); err != nil { @@ -172,6 +179,7 @@ func (r *Inputer) startWorkerForRoom(roomID string) { logger.WithError(err).Fatal("Unable to delete consumer") return } + // Set info to nil, so it can be recreated with the correct config. info = nil } } @@ -310,7 +318,7 @@ func (w *worker) _next() { msg := msgs[0] var inputRoomEvent api.InputRoomEvent if err = json.Unmarshal(msg.Data, &inputRoomEvent); err != nil { - // using AckWait here makes the call synchronous; 5 seconds is default value + // using AckWait here makes the call synchronous; 5 seconds is the default value used by NATS _ = msg.Term(nats.AckWait(time.Second * 5)) return } diff --git a/roomserver/internal/input/input_events.go b/roomserver/internal/input/input_events.go index f2737ec1c..1d9208434 100644 --- a/roomserver/internal/input/input_events.go +++ b/roomserver/internal/input/input_events.go @@ -48,7 +48,9 @@ import ( "github.com/matrix-org/dendrite/roomserver/types" ) -// TODO: Does this value make sense? +// MaximumMissingProcessingTime is the maximum time we allow "processRoomEvent" to fetch +// e.g. missing auth/prev events. This duration is used for AckWait, and if it is exceeded +// NATS queues the event for redelivery. const MaximumMissingProcessingTime = time.Minute * 5 var processRoomEventDuration = prometheus.NewHistogramVec(