From 98de7c2f77435934928dc3139d25372dfe2297cc Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Tue, 25 Jan 2022 17:07:37 +0000 Subject: [PATCH] Update consumers for the roomserver output stream --- federationapi/consumers/roomserver.go | 6 +++++- setup/jetstream/helpers.go | 1 + syncapi/consumers/roomserver.go | 6 +++++- 3 files changed, 11 insertions(+), 2 deletions(-) diff --git a/federationapi/consumers/roomserver.go b/federationapi/consumers/roomserver.go index 632adae34..25ea78274 100644 --- a/federationapi/consumers/roomserver.go +++ b/federationapi/consumers/roomserver.go @@ -66,7 +66,11 @@ func NewOutputRoomEventConsumer( // Start consuming from room servers func (s *OutputRoomEventConsumer) Start() error { - _, err := s.jetstream.Subscribe(s.topic, s.onMessage, s.durable) + _, err := s.jetstream.Subscribe( + s.topic, s.onMessage, s.durable, + nats.DeliverAll(), + nats.ManualAck(), + ) return err } diff --git a/setup/jetstream/helpers.go b/setup/jetstream/helpers.go index 2d5632269..1891b96b3 100644 --- a/setup/jetstream/helpers.go +++ b/setup/jetstream/helpers.go @@ -3,6 +3,7 @@ package jetstream import "github.com/nats-io/nats.go" func WithJetStreamMessage(msg *nats.Msg, f func(msg *nats.Msg) bool) { + _ = msg.InProgress() if f(msg) { _ = msg.Ack() } else { diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index 6b3ebe53e..e9c4abe88 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -73,7 +73,11 @@ func NewOutputRoomEventConsumer( // Start consuming from room servers func (s *OutputRoomEventConsumer) Start() error { - _, err := s.jetstream.Subscribe(s.topic, s.onMessage, s.durable) + _, err := s.jetstream.Subscribe( + s.topic, s.onMessage, s.durable, + nats.DeliverAll(), + nats.ManualAck(), + ) return err }