mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-31 10:43:10 -06:00
Update consumers for the roomserver output stream
This commit is contained in:
parent
0e8bb6b762
commit
98de7c2f77
|
|
@ -66,7 +66,11 @@ func NewOutputRoomEventConsumer(
|
||||||
|
|
||||||
// Start consuming from room servers
|
// Start consuming from room servers
|
||||||
func (s *OutputRoomEventConsumer) Start() error {
|
func (s *OutputRoomEventConsumer) Start() error {
|
||||||
_, err := s.jetstream.Subscribe(s.topic, s.onMessage, s.durable)
|
_, err := s.jetstream.Subscribe(
|
||||||
|
s.topic, s.onMessage, s.durable,
|
||||||
|
nats.DeliverAll(),
|
||||||
|
nats.ManualAck(),
|
||||||
|
)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -3,6 +3,7 @@ package jetstream
|
||||||
import "github.com/nats-io/nats.go"
|
import "github.com/nats-io/nats.go"
|
||||||
|
|
||||||
func WithJetStreamMessage(msg *nats.Msg, f func(msg *nats.Msg) bool) {
|
func WithJetStreamMessage(msg *nats.Msg, f func(msg *nats.Msg) bool) {
|
||||||
|
_ = msg.InProgress()
|
||||||
if f(msg) {
|
if f(msg) {
|
||||||
_ = msg.Ack()
|
_ = msg.Ack()
|
||||||
} else {
|
} else {
|
||||||
|
|
|
||||||
|
|
@ -73,7 +73,11 @@ func NewOutputRoomEventConsumer(
|
||||||
|
|
||||||
// Start consuming from room servers
|
// Start consuming from room servers
|
||||||
func (s *OutputRoomEventConsumer) Start() error {
|
func (s *OutputRoomEventConsumer) Start() error {
|
||||||
_, err := s.jetstream.Subscribe(s.topic, s.onMessage, s.durable)
|
_, err := s.jetstream.Subscribe(
|
||||||
|
s.topic, s.onMessage, s.durable,
|
||||||
|
nats.DeliverAll(),
|
||||||
|
nats.ManualAck(),
|
||||||
|
)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue