diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index a94371981..ae2674c62 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -310,7 +310,8 @@ func (w *worker) _next() { msg := msgs[0] var inputRoomEvent api.InputRoomEvent if err = json.Unmarshal(msg.Data, &inputRoomEvent); err != nil { - _ = msg.Term() + // using AckWait here makes the call synchronous; 5 seconds is default value + _ = msg.Term(nats.AckWait(time.Second * 5)) return } @@ -346,10 +347,15 @@ func (w *worker) _next() { "type": inputRoomEvent.Event.Type(), }).Warn("Roomserver failed to process event") } - _ = msg.Term() + // Even though we failed to process this message (e.g. due to Dendrite restarting and receiving a context canceled), + // the message may already have been queued for redelivery or will be, so this makes sure that we still reprocess the msg + // after restarting. We only Ack if the context was not yet canceled. + if w.r.ProcessContext.Context().Err() == nil { + _ = msg.AckSync() + } errString = err.Error() } else { - _ = msg.Ack() + _ = msg.AckSync() } // If it was a synchronous input request then the "sync" field