mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-06 22:43:10 -06:00
Use AckExplicitPolicy instead of AckAllPolicy, fix back pressure metric
This commit is contained in:
parent
d65449c782
commit
1f7e110a7f
|
|
@ -119,9 +119,14 @@ func (r *Inputer) startWorkerForRoom(roomID string) {
|
||||||
w.Lock()
|
w.Lock()
|
||||||
defer w.Unlock()
|
defer w.Unlock()
|
||||||
if !loaded || w.subscription == nil {
|
if !loaded || w.subscription == nil {
|
||||||
|
streamName := r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent)
|
||||||
consumer := r.Cfg.Matrix.JetStream.Prefixed("RoomInput" + jetstream.Tokenise(w.roomID))
|
consumer := r.Cfg.Matrix.JetStream.Prefixed("RoomInput" + jetstream.Tokenise(w.roomID))
|
||||||
subject := r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEventSubj(w.roomID))
|
subject := r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEventSubj(w.roomID))
|
||||||
|
|
||||||
|
logger := logrus.WithFields(logrus.Fields{
|
||||||
|
"stream_name": streamName,
|
||||||
|
"consumer": consumer,
|
||||||
|
})
|
||||||
// Create the consumer. We do this as a specific step rather than
|
// Create the consumer. We do this as a specific step rather than
|
||||||
// letting PullSubscribe create it for us because we need the consumer
|
// letting PullSubscribe create it for us because we need the consumer
|
||||||
// to outlive the subscription. If we do it this way, we can Bind in the
|
// to outlive the subscription. If we do it this way, we can Bind in the
|
||||||
|
|
@ -135,20 +140,51 @@ func (r *Inputer) startWorkerForRoom(roomID string) {
|
||||||
// before it. This is necessary because otherwise our consumer will never
|
// before it. This is necessary because otherwise our consumer will never
|
||||||
// acknowledge things we filtered out for other subjects and therefore they
|
// acknowledge things we filtered out for other subjects and therefore they
|
||||||
// will linger around forever.
|
// will linger around forever.
|
||||||
if _, err := w.r.JetStream.AddConsumer(
|
|
||||||
r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent),
|
info, err := w.r.JetStream.ConsumerInfo(streamName, consumer)
|
||||||
&nats.ConsumerConfig{
|
if err != nil {
|
||||||
|
// log and return, we will retry anyway
|
||||||
|
logger.WithError(err).Errorf("failed to get consumer info")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
consumerConfig := &nats.ConsumerConfig{
|
||||||
Durable: consumer,
|
Durable: consumer,
|
||||||
AckPolicy: nats.AckAllPolicy,
|
AckPolicy: nats.AckExplicitPolicy,
|
||||||
DeliverPolicy: nats.DeliverAllPolicy,
|
DeliverPolicy: nats.DeliverAllPolicy,
|
||||||
FilterSubject: subject,
|
FilterSubject: subject,
|
||||||
AckWait: MaximumMissingProcessingTime + (time.Second * 10),
|
AckWait: MaximumMissingProcessingTime + (time.Second * 10),
|
||||||
InactiveThreshold: inactiveThreshold,
|
InactiveThreshold: inactiveThreshold,
|
||||||
},
|
}
|
||||||
); err != nil {
|
|
||||||
logrus.WithError(err).Errorf("Failed to create consumer for room %q", w.roomID)
|
// The consumer already exists, try to update if necessary.
|
||||||
|
if info != nil {
|
||||||
|
switch {
|
||||||
|
case info.Config.AckPolicy != consumerConfig.AckPolicy:
|
||||||
|
logger.Warn("Consumer already exists, trying to update it.")
|
||||||
|
// Try updating the consumer first
|
||||||
|
if _, err = w.r.JetStream.UpdateConsumer(streamName, consumerConfig); err != nil {
|
||||||
|
// We failed to update the consumer, recreate it
|
||||||
|
logger.WithError(err).Warnf("Unable to update consumer %q, recreating...", consumer)
|
||||||
|
if err = w.r.JetStream.DeleteConsumer(streamName, consumer); err != nil {
|
||||||
|
logger.WithError(err).Fatalf("Unable to delete consumer %q", consumer)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
info = nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if info == nil {
|
||||||
|
// Create the consumer with the correct config
|
||||||
|
if _, err = w.r.JetStream.AddConsumer(
|
||||||
|
r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent),
|
||||||
|
consumerConfig,
|
||||||
|
); err != nil {
|
||||||
|
logger.WithError(err).Errorf("Failed to create consumer for room %q", w.roomID)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Bind to our durable consumer. We want to receive all messages waiting
|
// Bind to our durable consumer. We want to receive all messages waiting
|
||||||
// for this subject and we want to manually acknowledge them, so that we
|
// for this subject and we want to manually acknowledge them, so that we
|
||||||
|
|
@ -162,7 +198,7 @@ func (r *Inputer) startWorkerForRoom(roomID string) {
|
||||||
nats.InactiveThreshold(inactiveThreshold),
|
nats.InactiveThreshold(inactiveThreshold),
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.WithError(err).Errorf("Failed to subscribe to stream for room %q", w.roomID)
|
logger.WithError(err).Errorf("Failed to subscribe to stream for room %q", w.roomID)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -276,7 +312,7 @@ func (w *worker) _next() {
|
||||||
if scope := sentry.CurrentHub().Scope(); scope != nil {
|
if scope := sentry.CurrentHub().Scope(); scope != nil {
|
||||||
scope.SetTag("event_id", inputRoomEvent.Event.EventID())
|
scope.SetTag("event_id", inputRoomEvent.Event.EventID())
|
||||||
}
|
}
|
||||||
roomserverInputBackpressure.With(prometheus.Labels{"room_id": w.roomID}).Inc()
|
|
||||||
defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": w.roomID}).Dec()
|
defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": w.roomID}).Dec()
|
||||||
|
|
||||||
// Process the room event. If something goes wrong then we'll tell
|
// Process the room event. If something goes wrong then we'll tell
|
||||||
|
|
@ -381,6 +417,7 @@ func (r *Inputer) queueInputRoomEvents(
|
||||||
}).Error("Roomserver failed to queue async event")
|
}).Error("Roomserver failed to queue async event")
|
||||||
return nil, fmt.Errorf("r.JetStream.PublishMsg: %w", err)
|
return nil, fmt.Errorf("r.JetStream.PublishMsg: %w", err)
|
||||||
}
|
}
|
||||||
|
roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc()
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue