Update msg.Term()/msg.Ack()

This commit is contained in:
Till Faelligen 2023-12-18 13:00:57 +01:00
parent d8f31484a4
commit 96d1fb8a52
No known key found for this signature in database
GPG key ID: ACCDC9606D472758

View file

@ -310,7 +310,8 @@ func (w *worker) _next() {
msg := msgs[0] msg := msgs[0]
var inputRoomEvent api.InputRoomEvent var inputRoomEvent api.InputRoomEvent
if err = json.Unmarshal(msg.Data, &inputRoomEvent); err != nil { if err = json.Unmarshal(msg.Data, &inputRoomEvent); err != nil {
_ = msg.Term() // using AckWait here makes the call synchronous; 5 seconds is default value
_ = msg.Term(nats.AckWait(time.Second * 5))
return return
} }
@ -346,10 +347,15 @@ func (w *worker) _next() {
"type": inputRoomEvent.Event.Type(), "type": inputRoomEvent.Event.Type(),
}).Warn("Roomserver failed to process event") }).Warn("Roomserver failed to process event")
} }
_ = msg.Term() // Even though we failed to process this message (e.g. due to Dendrite restarting and receiving a context canceled),
// the message may already have been queued for redelivery or will be, so this makes sure that we still reprocess the msg
// after restarting. We only Ack if the context was not yet canceled.
if w.r.ProcessContext.Context().Err() == nil {
_ = msg.AckSync()
}
errString = err.Error() errString = err.Error()
} else { } else {
_ = msg.Ack() _ = msg.AckSync()
} }
// If it was a synchronous input request then the "sync" field // If it was a synchronous input request then the "sync" field