Update comments

This commit is contained in:
Till Faelligen 2023-12-19 08:10:15 +01:00
parent 96d1fb8a52
commit d395a91d4e
No known key found for this signature in database
GPG key ID: 3DF82D8AB9211D4E
2 changed files with 12 additions and 2 deletions

View file

@ -159,10 +159,17 @@ func (r *Inputer) startWorkerForRoom(roomID string) {
// The consumer already exists, try to update if necessary. // The consumer already exists, try to update if necessary.
if info != nil { if info != nil {
// Not using reflect.DeepEqual here, since consumerConfig does not explicitly set
// e.g. the consumerName, which is added by NATS later. So this would result
// in constantly updating/recreating the consumer.
switch { switch {
case info.Config.AckWait.Nanoseconds() != consumerConfig.AckWait.Nanoseconds(): case info.Config.AckWait.Nanoseconds() != consumerConfig.AckWait.Nanoseconds():
// Initially we had a AckWait of 2m 10s, now we have 5m 10s, so we need to update
// existing consumers.
fallthrough fallthrough
case info.Config.AckPolicy != consumerConfig.AckPolicy: case info.Config.AckPolicy != consumerConfig.AckPolicy:
// We've changed the AckPolicy from AckAll to AckExplicit, this needs a
// recreation of the consumer. (Note: Only a few changes actually need a recreat)
logger.Warn("Consumer already exists, trying to update it.") logger.Warn("Consumer already exists, trying to update it.")
// Try updating the consumer first // Try updating the consumer first
if _, err = w.r.JetStream.UpdateConsumer(streamName, consumerConfig); err != nil { if _, err = w.r.JetStream.UpdateConsumer(streamName, consumerConfig); err != nil {
@ -172,6 +179,7 @@ func (r *Inputer) startWorkerForRoom(roomID string) {
logger.WithError(err).Fatal("Unable to delete consumer") logger.WithError(err).Fatal("Unable to delete consumer")
return return
} }
// Set info to nil, so it can be recreated with the correct config.
info = nil info = nil
} }
} }
@ -310,7 +318,7 @@ func (w *worker) _next() {
msg := msgs[0] msg := msgs[0]
var inputRoomEvent api.InputRoomEvent var inputRoomEvent api.InputRoomEvent
if err = json.Unmarshal(msg.Data, &inputRoomEvent); err != nil { if err = json.Unmarshal(msg.Data, &inputRoomEvent); err != nil {
// using AckWait here makes the call synchronous; 5 seconds is default value // using AckWait here makes the call synchronous; 5 seconds is the default value used by NATS
_ = msg.Term(nats.AckWait(time.Second * 5)) _ = msg.Term(nats.AckWait(time.Second * 5))
return return
} }

View file

@ -48,7 +48,9 @@ import (
"github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/dendrite/roomserver/types"
) )
// TODO: Does this value make sense? // MaximumMissingProcessingTime is the maximum time we allow "processRoomEvent" to fetch
// e.g. missing auth/prev events. This duration is used for AckWait, and if it is exceeded
// NATS queues the event for redelivery.
const MaximumMissingProcessingTime = time.Minute * 5 const MaximumMissingProcessingTime = time.Minute * 5
var processRoomEventDuration = prometheus.NewHistogramVec( var processRoomEventDuration = prometheus.NewHistogramVec(