diff --git a/federationapi/consumers/roomserver.go b/federationapi/consumers/roomserver.go index 632adae34..25ea78274 100644 --- a/federationapi/consumers/roomserver.go +++ b/federationapi/consumers/roomserver.go @@ -66,7 +66,11 @@ func NewOutputRoomEventConsumer( // Start consuming from room servers func (s *OutputRoomEventConsumer) Start() error { - _, err := s.jetstream.Subscribe(s.topic, s.onMessage, s.durable) + _, err := s.jetstream.Subscribe( + s.topic, s.onMessage, s.durable, + nats.DeliverAll(), + nats.ManualAck(), + ) return err } diff --git a/setup/jetstream/helpers.go b/setup/jetstream/helpers.go index 2d5632269..1891b96b3 100644 --- a/setup/jetstream/helpers.go +++ b/setup/jetstream/helpers.go @@ -3,6 +3,7 @@ package jetstream import "github.com/nats-io/nats.go" func WithJetStreamMessage(msg *nats.Msg, f func(msg *nats.Msg) bool) { + _ = msg.InProgress() if f(msg) { _ = msg.Ack() } else { diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index 6b3ebe53e..e9c4abe88 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -73,7 +73,11 @@ func NewOutputRoomEventConsumer( // Start consuming from room servers func (s *OutputRoomEventConsumer) Start() error { - _, err := s.jetstream.Subscribe(s.topic, s.onMessage, s.durable) + _, err := s.jetstream.Subscribe( + s.topic, s.onMessage, s.durable, + nats.DeliverAll(), + nats.ManualAck(), + ) return err }