Use named NATS durable consumers

This commit is contained in:
Neil Alexander 2022-01-07 16:18:01 +00:00
parent a422321435
commit c73a0d9a07
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
12 changed files with 42 additions and 22 deletions

View file

@ -34,6 +34,7 @@ import (
type OutputRoomEventConsumer struct { type OutputRoomEventConsumer struct {
ctx context.Context ctx context.Context
jetstream nats.JetStreamContext jetstream nats.JetStreamContext
durable nats.SubOpt
topic string topic string
asDB storage.Database asDB storage.Database
rsAPI api.RoomserverInternalAPI rsAPI api.RoomserverInternalAPI
@ -54,7 +55,8 @@ func NewOutputRoomEventConsumer(
return &OutputRoomEventConsumer{ return &OutputRoomEventConsumer{
ctx: process.Context(), ctx: process.Context(),
jetstream: js, jetstream: js,
topic: cfg.Global.JetStream.TopicFor(jetstream.OutputRoomEvent), durable: nats.Durable(cfg.Global.JetStream.Namespaced("AppserviceRoomserverConsumer")),
topic: cfg.Global.JetStream.Namespaced(jetstream.OutputRoomEvent),
asDB: appserviceDB, asDB: appserviceDB,
rsAPI: rsAPI, rsAPI: rsAPI,
serverName: string(cfg.Global.ServerName), serverName: string(cfg.Global.ServerName),
@ -64,7 +66,7 @@ func NewOutputRoomEventConsumer(
// Start consuming from room servers // Start consuming from room servers
func (s *OutputRoomEventConsumer) Start() error { func (s *OutputRoomEventConsumer) Start() error {
_, err := s.jetstream.Subscribe(s.topic, s.onMessage) _, err := s.jetstream.Subscribe(s.topic, s.onMessage, s.durable)
return err return err
} }

View file

@ -34,6 +34,7 @@ import (
type OutputEDUConsumer struct { type OutputEDUConsumer struct {
ctx context.Context ctx context.Context
jetstream nats.JetStreamContext jetstream nats.JetStreamContext
durable nats.SubOpt
db storage.Database db storage.Database
queues *queue.OutgoingQueues queues *queue.OutgoingQueues
ServerName gomatrixserverlib.ServerName ServerName gomatrixserverlib.ServerName
@ -56,21 +57,22 @@ func NewOutputEDUConsumer(
queues: queues, queues: queues,
db: store, db: store,
ServerName: cfg.Matrix.ServerName, ServerName: cfg.Matrix.ServerName,
typingTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent), durable: nats.Durable(cfg.Matrix.JetStream.Namespaced("FederationAPIEDUServerConsumer")),
sendToDeviceTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent), typingTopic: cfg.Matrix.JetStream.Namespaced(jetstream.OutputTypingEvent),
receiptTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent), sendToDeviceTopic: cfg.Matrix.JetStream.Namespaced(jetstream.OutputSendToDeviceEvent),
receiptTopic: cfg.Matrix.JetStream.Namespaced(jetstream.OutputReceiptEvent),
} }
} }
// Start consuming from EDU servers // Start consuming from EDU servers
func (t *OutputEDUConsumer) Start() error { func (t *OutputEDUConsumer) Start() error {
if _, err := t.jetstream.Subscribe(t.typingTopic, t.onTypingEvent); err != nil { if _, err := t.jetstream.Subscribe(t.typingTopic, t.onTypingEvent, t.durable); err != nil {
return err return err
} }
if _, err := t.jetstream.Subscribe(t.sendToDeviceTopic, t.onSendToDeviceEvent); err != nil { if _, err := t.jetstream.Subscribe(t.sendToDeviceTopic, t.onSendToDeviceEvent, t.durable); err != nil {
return err return err
} }
if _, err := t.jetstream.Subscribe(t.receiptTopic, t.onReceiptEvent); err != nil { if _, err := t.jetstream.Subscribe(t.receiptTopic, t.onReceiptEvent, t.durable); err != nil {
return err return err
} }
return nil return nil

View file

@ -57,7 +57,7 @@ func NewKeyChangeConsumer(
consumer: &internal.ContinualConsumer{ consumer: &internal.ContinualConsumer{
Process: process, Process: process,
ComponentName: "federationapi/keychange", ComponentName: "federationapi/keychange",
Topic: string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputKeyChangeEvent)), Topic: string(cfg.Matrix.JetStream.Namespaced(jetstream.OutputKeyChangeEvent)),
Consumer: kafkaConsumer, Consumer: kafkaConsumer,
PartitionStore: store, PartitionStore: store,
}, },

View file

@ -37,6 +37,7 @@ type OutputRoomEventConsumer struct {
cfg *config.FederationAPI cfg *config.FederationAPI
rsAPI api.RoomserverInternalAPI rsAPI api.RoomserverInternalAPI
jetstream nats.JetStreamContext jetstream nats.JetStreamContext
durable nats.SubOpt
db storage.Database db storage.Database
queues *queue.OutgoingQueues queues *queue.OutgoingQueues
topic string topic string
@ -58,13 +59,14 @@ func NewOutputRoomEventConsumer(
db: store, db: store,
queues: queues, queues: queues,
rsAPI: rsAPI, rsAPI: rsAPI,
topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputRoomEvent), durable: nats.Durable(cfg.Matrix.JetStream.Namespaced("FederationAPIRoomServerConsumer")),
topic: cfg.Matrix.JetStream.Namespaced(jetstream.OutputRoomEvent),
} }
} }
// Start consuming from room servers // Start consuming from room servers
func (s *OutputRoomEventConsumer) Start() error { func (s *OutputRoomEventConsumer) Start() error {
_, err := s.jetstream.Subscribe(s.topic, s.onMessage) _, err := s.jetstream.Subscribe(s.topic, s.onMessage, s.durable)
return err return err
} }

View file

@ -67,6 +67,7 @@ func NewRoomserverAPI(
InputRoomEventTopic: inputRoomEventTopic, InputRoomEventTopic: inputRoomEventTopic,
OutputRoomEventTopic: outputRoomEventTopic, OutputRoomEventTopic: outputRoomEventTopic,
JetStream: consumer, JetStream: consumer,
Durable: cfg.Matrix.JetStream.Namespaced("RoomserverInputConsumer"),
ServerName: cfg.Matrix.ServerName, ServerName: cfg.Matrix.ServerName,
ACLs: serverACLs, ACLs: serverACLs,
}, },

View file

@ -43,6 +43,7 @@ var keyContentFields = map[string]string{
type Inputer struct { type Inputer struct {
DB storage.Database DB storage.Database
JetStream nats.JetStreamContext JetStream nats.JetStreamContext
Durable nats.SubOpt
ServerName gomatrixserverlib.ServerName ServerName gomatrixserverlib.ServerName
ACLs *acls.ServerACLs ACLs *acls.ServerACLs
InputRoomEventTopic string InputRoomEventTopic string
@ -85,6 +86,8 @@ func (r *Inputer) Start() error {
// or nak them within a certain amount of time. This stops that from // or nak them within a certain amount of time. This stops that from
// happening, so we don't end up doing a lot of unnecessary duplicate work. // happening, so we don't end up doing a lot of unnecessary duplicate work.
nats.MaxDeliver(0), nats.MaxDeliver(0),
// Use a durable named consumer.
r.Durable,
) )
return err return err
} }

View file

@ -19,7 +19,7 @@ type JetStream struct {
InMemory bool `yaml:"in_memory"` InMemory bool `yaml:"in_memory"`
} }
func (c *JetStream) TopicFor(name string) string { func (c *JetStream) Namespaced(name string) string {
return fmt.Sprintf("%s%s", c.TopicPrefix, name) return fmt.Sprintf("%s%s", c.TopicPrefix, name)
} }

View file

@ -34,6 +34,7 @@ import (
type OutputClientDataConsumer struct { type OutputClientDataConsumer struct {
ctx context.Context ctx context.Context
jetstream nats.JetStreamContext jetstream nats.JetStreamContext
durable nats.SubOpt
topic string topic string
db storage.Database db storage.Database
stream types.StreamProvider stream types.StreamProvider
@ -52,7 +53,8 @@ func NewOutputClientDataConsumer(
return &OutputClientDataConsumer{ return &OutputClientDataConsumer{
ctx: process.Context(), ctx: process.Context(),
jetstream: js, jetstream: js,
topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputClientData), topic: cfg.Matrix.JetStream.Namespaced(jetstream.OutputClientData),
durable: nats.Durable(cfg.Matrix.JetStream.Namespaced("SyncAPIClientAPIConsumer")),
db: store, db: store,
notifier: notifier, notifier: notifier,
stream: stream, stream: stream,
@ -61,7 +63,7 @@ func NewOutputClientDataConsumer(
// Start consuming from room servers // Start consuming from room servers
func (s *OutputClientDataConsumer) Start() error { func (s *OutputClientDataConsumer) Start() error {
_, err := s.jetstream.Subscribe(s.topic, s.onMessage) _, err := s.jetstream.Subscribe(s.topic, s.onMessage, s.durable)
return err return err
} }

View file

@ -34,6 +34,7 @@ import (
type OutputReceiptEventConsumer struct { type OutputReceiptEventConsumer struct {
ctx context.Context ctx context.Context
jetstream nats.JetStreamContext jetstream nats.JetStreamContext
durable nats.SubOpt
topic string topic string
db storage.Database db storage.Database
stream types.StreamProvider stream types.StreamProvider
@ -53,7 +54,8 @@ func NewOutputReceiptEventConsumer(
return &OutputReceiptEventConsumer{ return &OutputReceiptEventConsumer{
ctx: process.Context(), ctx: process.Context(),
jetstream: js, jetstream: js,
topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent), topic: cfg.Matrix.JetStream.Namespaced(jetstream.OutputReceiptEvent),
durable: nats.Durable(cfg.Matrix.JetStream.Namespaced("SyncAPIEDUServerReceiptConsumer")),
db: store, db: store,
notifier: notifier, notifier: notifier,
stream: stream, stream: stream,
@ -62,7 +64,7 @@ func NewOutputReceiptEventConsumer(
// Start consuming from EDU api // Start consuming from EDU api
func (s *OutputReceiptEventConsumer) Start() error { func (s *OutputReceiptEventConsumer) Start() error {
_, err := s.jetstream.Subscribe(s.topic, s.onMessage) _, err := s.jetstream.Subscribe(s.topic, s.onMessage, s.durable)
return err return err
} }

View file

@ -36,6 +36,7 @@ import (
type OutputSendToDeviceEventConsumer struct { type OutputSendToDeviceEventConsumer struct {
ctx context.Context ctx context.Context
jetstream nats.JetStreamContext jetstream nats.JetStreamContext
durable nats.SubOpt
topic string topic string
db storage.Database db storage.Database
serverName gomatrixserverlib.ServerName // our server name serverName gomatrixserverlib.ServerName // our server name
@ -56,7 +57,8 @@ func NewOutputSendToDeviceEventConsumer(
return &OutputSendToDeviceEventConsumer{ return &OutputSendToDeviceEventConsumer{
ctx: process.Context(), ctx: process.Context(),
jetstream: js, jetstream: js,
topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent), topic: cfg.Matrix.JetStream.Namespaced(jetstream.OutputSendToDeviceEvent),
durable: nats.Durable(cfg.Matrix.JetStream.Namespaced("SyncAPIEDUServerSendToDeviceConsumer")),
db: store, db: store,
serverName: cfg.Matrix.ServerName, serverName: cfg.Matrix.ServerName,
notifier: notifier, notifier: notifier,
@ -66,7 +68,7 @@ func NewOutputSendToDeviceEventConsumer(
// Start consuming from EDU api // Start consuming from EDU api
func (s *OutputSendToDeviceEventConsumer) Start() error { func (s *OutputSendToDeviceEventConsumer) Start() error {
_, err := s.jetstream.Subscribe(s.topic, s.onMessage) _, err := s.jetstream.Subscribe(s.topic, s.onMessage, s.durable)
return err return err
} }

View file

@ -35,6 +35,7 @@ import (
type OutputTypingEventConsumer struct { type OutputTypingEventConsumer struct {
ctx context.Context ctx context.Context
jetstream nats.JetStreamContext jetstream nats.JetStreamContext
durable nats.SubOpt
topic string topic string
eduCache *cache.EDUCache eduCache *cache.EDUCache
stream types.StreamProvider stream types.StreamProvider
@ -55,7 +56,8 @@ func NewOutputTypingEventConsumer(
return &OutputTypingEventConsumer{ return &OutputTypingEventConsumer{
ctx: process.Context(), ctx: process.Context(),
jetstream: js, jetstream: js,
topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent), topic: cfg.Matrix.JetStream.Namespaced(jetstream.OutputTypingEvent),
durable: nats.Durable(cfg.Matrix.JetStream.Namespaced("SyncAPIEDUServerTypingConsumer")),
eduCache: eduCache, eduCache: eduCache,
notifier: notifier, notifier: notifier,
stream: stream, stream: stream,
@ -64,7 +66,7 @@ func NewOutputTypingEventConsumer(
// Start consuming from EDU api // Start consuming from EDU api
func (s *OutputTypingEventConsumer) Start() error { func (s *OutputTypingEventConsumer) Start() error {
_, err := s.jetstream.Subscribe(s.topic, s.onMessage) _, err := s.jetstream.Subscribe(s.topic, s.onMessage, s.durable)
return err return err
} }

View file

@ -38,6 +38,7 @@ type OutputRoomEventConsumer struct {
cfg *config.SyncAPI cfg *config.SyncAPI
rsAPI api.RoomserverInternalAPI rsAPI api.RoomserverInternalAPI
jetstream nats.JetStreamContext jetstream nats.JetStreamContext
durable nats.SubOpt
topic string topic string
db storage.Database db storage.Database
pduStream types.StreamProvider pduStream types.StreamProvider
@ -60,7 +61,8 @@ func NewOutputRoomEventConsumer(
ctx: process.Context(), ctx: process.Context(),
cfg: cfg, cfg: cfg,
jetstream: js, jetstream: js,
topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputRoomEvent), topic: cfg.Matrix.JetStream.Namespaced(jetstream.OutputRoomEvent),
durable: nats.Durable(cfg.Matrix.JetStream.Namespaced("SyncAPIRoomServerConsumer")),
db: store, db: store,
notifier: notifier, notifier: notifier,
pduStream: pduStream, pduStream: pduStream,
@ -71,7 +73,7 @@ func NewOutputRoomEventConsumer(
// Start consuming from room servers // Start consuming from room servers
func (s *OutputRoomEventConsumer) Start() error { func (s *OutputRoomEventConsumer) Start() error {
_, err := s.jetstream.Subscribe(s.topic, s.onMessage) _, err := s.jetstream.Subscribe(s.topic, s.onMessage, s.durable)
return err return err
} }