mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-29 09:43:10 -06:00
Don't queue up events more than once in memory
This commit is contained in:
parent
3150f6e64e
commit
ffe93f103f
|
|
@ -63,6 +63,13 @@ func (r *Inputer) workerForRoom(roomID string) *phony.Inbox {
|
|||
return inbox.(*phony.Inbox)
|
||||
}
|
||||
|
||||
// eventsInProgress is an in-memory map to keep a track of which events we have
|
||||
// queued up for processing. If we get a redelivery from NATS and we still have
|
||||
// the queued up item then we won't do anything with the redelivered message. If
|
||||
// we've restarted Dendrite and now this map is empty then it means that we will
|
||||
// reload pending work from NATS.
|
||||
var eventsInProgress sync.Map
|
||||
|
||||
// onMessage is called when a new event arrives in the roomserver input stream.
|
||||
func (r *Inputer) Start() error {
|
||||
_, err := r.JetStream.Subscribe(
|
||||
|
|
@ -79,9 +86,19 @@ func (r *Inputer) Start() error {
|
|||
}
|
||||
|
||||
_ = msg.InProgress()
|
||||
index := roomID + "\000" + inputRoomEvent.Event.EventID()
|
||||
if _, ok := eventsInProgress.LoadOrStore(index, struct{}{}); ok {
|
||||
// We're already waiting to deal with this event, so there's no
|
||||
// point in queuing it up again. We've notified NATS that we're
|
||||
// working on the message still, so that will have deferred the
|
||||
// redelivery by a bit.
|
||||
return
|
||||
}
|
||||
|
||||
roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc()
|
||||
r.workerForRoom(roomID).Act(nil, func() {
|
||||
_ = msg.InProgress() // resets the acknowledgement wait timer
|
||||
defer eventsInProgress.Delete(index)
|
||||
defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec()
|
||||
if err := r.processRoomEvent(context.Background(), &inputRoomEvent); err != nil {
|
||||
sentry.CaptureException(err)
|
||||
|
|
@ -138,9 +155,18 @@ func (r *Inputer) InputRoomEvents(
|
|||
for _, e := range request.InputRoomEvents {
|
||||
inputRoomEvent := e
|
||||
roomID := inputRoomEvent.Event.RoomID()
|
||||
index := roomID + "\000" + inputRoomEvent.Event.EventID()
|
||||
if _, ok := eventsInProgress.LoadOrStore(index, struct{}{}); ok {
|
||||
// We're already waiting to deal with this event, so there's no
|
||||
// point in queuing it up again. We've notified NATS that we're
|
||||
// working on the message still, so that will have deferred the
|
||||
// redelivery by a bit.
|
||||
return
|
||||
}
|
||||
roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc()
|
||||
worker := r.workerForRoom(roomID)
|
||||
worker.Act(nil, func() {
|
||||
defer eventsInProgress.Delete(index)
|
||||
defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec()
|
||||
err := r.processRoomEvent(ctx, &inputRoomEvent)
|
||||
if err != nil {
|
||||
|
|
|
|||
Loading…
Reference in a new issue