diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index 1dbe903f2..9601e018d 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -63,6 +63,13 @@ func (r *Inputer) workerForRoom(roomID string) *phony.Inbox { return inbox.(*phony.Inbox) } +// eventsInProgress is an in-memory map to keep a track of which events we have +// queued up for processing. If we get a redelivery from NATS and we still have +// the queued up item then we won't do anything with the redelivered message. If +// we've restarted Dendrite and now this map is empty then it means that we will +// reload pending work from NATS. +var eventsInProgress sync.Map + // onMessage is called when a new event arrives in the roomserver input stream. func (r *Inputer) Start() error { _, err := r.JetStream.Subscribe( @@ -79,9 +86,19 @@ func (r *Inputer) Start() error { } _ = msg.InProgress() + index := roomID + "\000" + inputRoomEvent.Event.EventID() + if _, ok := eventsInProgress.LoadOrStore(index, struct{}{}); ok { + // We're already waiting to deal with this event, so there's no + // point in queuing it up again. We've notified NATS that we're + // working on the message still, so that will have deferred the + // redelivery by a bit. + return + } + roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc() r.workerForRoom(roomID).Act(nil, func() { _ = msg.InProgress() // resets the acknowledgement wait timer + defer eventsInProgress.Delete(index) defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec() if err := r.processRoomEvent(context.Background(), &inputRoomEvent); err != nil { sentry.CaptureException(err) @@ -138,9 +155,18 @@ func (r *Inputer) InputRoomEvents( for _, e := range request.InputRoomEvents { inputRoomEvent := e roomID := inputRoomEvent.Event.RoomID() + index := roomID + "\000" + inputRoomEvent.Event.EventID() + if _, ok := eventsInProgress.LoadOrStore(index, struct{}{}); ok { + // We're already waiting to deal with this event, so there's no + // point in queuing it up again. We've notified NATS that we're + // working on the message still, so that will have deferred the + // redelivery by a bit. + return + } roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc() worker := r.workerForRoom(roomID) worker.Act(nil, func() { + defer eventsInProgress.Delete(index) defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec() err := r.processRoomEvent(ctx, &inputRoomEvent) if err != nil {