From c73a0d9a07130efb7295c4830f505993b37f3283 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Fri, 7 Jan 2022 16:18:01 +0000 Subject: [PATCH] Use named NATS durable consumers --- appservice/consumers/roomserver.go | 6 ++++-- federationapi/consumers/eduserver.go | 14 ++++++++------ federationapi/consumers/keychange.go | 2 +- federationapi/consumers/roomserver.go | 6 ++++-- roomserver/internal/api.go | 1 + roomserver/internal/input/input.go | 3 +++ setup/config/config_jetstream.go | 2 +- syncapi/consumers/clientapi.go | 6 ++++-- syncapi/consumers/eduserver_receipts.go | 6 ++++-- syncapi/consumers/eduserver_sendtodevice.go | 6 ++++-- syncapi/consumers/eduserver_typing.go | 6 ++++-- syncapi/consumers/roomserver.go | 6 ++++-- 12 files changed, 42 insertions(+), 22 deletions(-) diff --git a/appservice/consumers/roomserver.go b/appservice/consumers/roomserver.go index 139b57247..7d739e647 100644 --- a/appservice/consumers/roomserver.go +++ b/appservice/consumers/roomserver.go @@ -34,6 +34,7 @@ import ( type OutputRoomEventConsumer struct { ctx context.Context jetstream nats.JetStreamContext + durable nats.SubOpt topic string asDB storage.Database rsAPI api.RoomserverInternalAPI @@ -54,7 +55,8 @@ func NewOutputRoomEventConsumer( return &OutputRoomEventConsumer{ ctx: process.Context(), jetstream: js, - topic: cfg.Global.JetStream.TopicFor(jetstream.OutputRoomEvent), + durable: nats.Durable(cfg.Global.JetStream.Namespaced("AppserviceRoomserverConsumer")), + topic: cfg.Global.JetStream.Namespaced(jetstream.OutputRoomEvent), asDB: appserviceDB, rsAPI: rsAPI, serverName: string(cfg.Global.ServerName), @@ -64,7 +66,7 @@ func NewOutputRoomEventConsumer( // Start consuming from room servers func (s *OutputRoomEventConsumer) Start() error { - _, err := s.jetstream.Subscribe(s.topic, s.onMessage) + _, err := s.jetstream.Subscribe(s.topic, s.onMessage, s.durable) return err } diff --git a/federationapi/consumers/eduserver.go b/federationapi/consumers/eduserver.go index 9e52acefc..06ce2de77 100644 --- a/federationapi/consumers/eduserver.go +++ b/federationapi/consumers/eduserver.go @@ -34,6 +34,7 @@ import ( type OutputEDUConsumer struct { ctx context.Context jetstream nats.JetStreamContext + durable nats.SubOpt db storage.Database queues *queue.OutgoingQueues ServerName gomatrixserverlib.ServerName @@ -56,21 +57,22 @@ func NewOutputEDUConsumer( queues: queues, db: store, ServerName: cfg.Matrix.ServerName, - typingTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent), - sendToDeviceTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent), - receiptTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent), + durable: nats.Durable(cfg.Matrix.JetStream.Namespaced("FederationAPIEDUServerConsumer")), + typingTopic: cfg.Matrix.JetStream.Namespaced(jetstream.OutputTypingEvent), + sendToDeviceTopic: cfg.Matrix.JetStream.Namespaced(jetstream.OutputSendToDeviceEvent), + receiptTopic: cfg.Matrix.JetStream.Namespaced(jetstream.OutputReceiptEvent), } } // Start consuming from EDU servers func (t *OutputEDUConsumer) Start() error { - if _, err := t.jetstream.Subscribe(t.typingTopic, t.onTypingEvent); err != nil { + if _, err := t.jetstream.Subscribe(t.typingTopic, t.onTypingEvent, t.durable); err != nil { return err } - if _, err := t.jetstream.Subscribe(t.sendToDeviceTopic, t.onSendToDeviceEvent); err != nil { + if _, err := t.jetstream.Subscribe(t.sendToDeviceTopic, t.onSendToDeviceEvent, t.durable); err != nil { return err } - if _, err := t.jetstream.Subscribe(t.receiptTopic, t.onReceiptEvent); err != nil { + if _, err := t.jetstream.Subscribe(t.receiptTopic, t.onReceiptEvent, t.durable); err != nil { return err } return nil diff --git a/federationapi/consumers/keychange.go b/federationapi/consumers/keychange.go index 8231fcf44..764907194 100644 --- a/federationapi/consumers/keychange.go +++ b/federationapi/consumers/keychange.go @@ -57,7 +57,7 @@ func NewKeyChangeConsumer( consumer: &internal.ContinualConsumer{ Process: process, ComponentName: "federationapi/keychange", - Topic: string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputKeyChangeEvent)), + Topic: string(cfg.Matrix.JetStream.Namespaced(jetstream.OutputKeyChangeEvent)), Consumer: kafkaConsumer, PartitionStore: store, }, diff --git a/federationapi/consumers/roomserver.go b/federationapi/consumers/roomserver.go index 12410bb7c..ca13078de 100644 --- a/federationapi/consumers/roomserver.go +++ b/federationapi/consumers/roomserver.go @@ -37,6 +37,7 @@ type OutputRoomEventConsumer struct { cfg *config.FederationAPI rsAPI api.RoomserverInternalAPI jetstream nats.JetStreamContext + durable nats.SubOpt db storage.Database queues *queue.OutgoingQueues topic string @@ -58,13 +59,14 @@ func NewOutputRoomEventConsumer( db: store, queues: queues, rsAPI: rsAPI, - topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputRoomEvent), + durable: nats.Durable(cfg.Matrix.JetStream.Namespaced("FederationAPIRoomServerConsumer")), + topic: cfg.Matrix.JetStream.Namespaced(jetstream.OutputRoomEvent), } } // Start consuming from room servers func (s *OutputRoomEventConsumer) Start() error { - _, err := s.jetstream.Subscribe(s.topic, s.onMessage) + _, err := s.jetstream.Subscribe(s.topic, s.onMessage, s.durable) return err } diff --git a/roomserver/internal/api.go b/roomserver/internal/api.go index e370f7e44..8abc32897 100644 --- a/roomserver/internal/api.go +++ b/roomserver/internal/api.go @@ -67,6 +67,7 @@ func NewRoomserverAPI( InputRoomEventTopic: inputRoomEventTopic, OutputRoomEventTopic: outputRoomEventTopic, JetStream: consumer, + Durable: cfg.Matrix.JetStream.Namespaced("RoomserverInputConsumer"), ServerName: cfg.Matrix.ServerName, ACLs: serverACLs, }, diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index dbff5fdda..57e51055a 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -43,6 +43,7 @@ var keyContentFields = map[string]string{ type Inputer struct { DB storage.Database JetStream nats.JetStreamContext + Durable nats.SubOpt ServerName gomatrixserverlib.ServerName ACLs *acls.ServerACLs InputRoomEventTopic string @@ -85,6 +86,8 @@ func (r *Inputer) Start() error { // or nak them within a certain amount of time. This stops that from // happening, so we don't end up doing a lot of unnecessary duplicate work. nats.MaxDeliver(0), + // Use a durable named consumer. + r.Durable, ) return err } diff --git a/setup/config/config_jetstream.go b/setup/config/config_jetstream.go index 0bd848992..b04c5fc1e 100644 --- a/setup/config/config_jetstream.go +++ b/setup/config/config_jetstream.go @@ -19,7 +19,7 @@ type JetStream struct { InMemory bool `yaml:"in_memory"` } -func (c *JetStream) TopicFor(name string) string { +func (c *JetStream) Namespaced(name string) string { return fmt.Sprintf("%s%s", c.TopicPrefix, name) } diff --git a/syncapi/consumers/clientapi.go b/syncapi/consumers/clientapi.go index 85710cdd1..33c4c4fd5 100644 --- a/syncapi/consumers/clientapi.go +++ b/syncapi/consumers/clientapi.go @@ -34,6 +34,7 @@ import ( type OutputClientDataConsumer struct { ctx context.Context jetstream nats.JetStreamContext + durable nats.SubOpt topic string db storage.Database stream types.StreamProvider @@ -52,7 +53,8 @@ func NewOutputClientDataConsumer( return &OutputClientDataConsumer{ ctx: process.Context(), jetstream: js, - topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputClientData), + topic: cfg.Matrix.JetStream.Namespaced(jetstream.OutputClientData), + durable: nats.Durable(cfg.Matrix.JetStream.Namespaced("SyncAPIClientAPIConsumer")), db: store, notifier: notifier, stream: stream, @@ -61,7 +63,7 @@ func NewOutputClientDataConsumer( // Start consuming from room servers func (s *OutputClientDataConsumer) Start() error { - _, err := s.jetstream.Subscribe(s.topic, s.onMessage) + _, err := s.jetstream.Subscribe(s.topic, s.onMessage, s.durable) return err } diff --git a/syncapi/consumers/eduserver_receipts.go b/syncapi/consumers/eduserver_receipts.go index 582e1d649..57b2fb8ed 100644 --- a/syncapi/consumers/eduserver_receipts.go +++ b/syncapi/consumers/eduserver_receipts.go @@ -34,6 +34,7 @@ import ( type OutputReceiptEventConsumer struct { ctx context.Context jetstream nats.JetStreamContext + durable nats.SubOpt topic string db storage.Database stream types.StreamProvider @@ -53,7 +54,8 @@ func NewOutputReceiptEventConsumer( return &OutputReceiptEventConsumer{ ctx: process.Context(), jetstream: js, - topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent), + topic: cfg.Matrix.JetStream.Namespaced(jetstream.OutputReceiptEvent), + durable: nats.Durable(cfg.Matrix.JetStream.Namespaced("SyncAPIEDUServerReceiptConsumer")), db: store, notifier: notifier, stream: stream, @@ -62,7 +64,7 @@ func NewOutputReceiptEventConsumer( // Start consuming from EDU api func (s *OutputReceiptEventConsumer) Start() error { - _, err := s.jetstream.Subscribe(s.topic, s.onMessage) + _, err := s.jetstream.Subscribe(s.topic, s.onMessage, s.durable) return err } diff --git a/syncapi/consumers/eduserver_sendtodevice.go b/syncapi/consumers/eduserver_sendtodevice.go index 6579c3030..6e7954e06 100644 --- a/syncapi/consumers/eduserver_sendtodevice.go +++ b/syncapi/consumers/eduserver_sendtodevice.go @@ -36,6 +36,7 @@ import ( type OutputSendToDeviceEventConsumer struct { ctx context.Context jetstream nats.JetStreamContext + durable nats.SubOpt topic string db storage.Database serverName gomatrixserverlib.ServerName // our server name @@ -56,7 +57,8 @@ func NewOutputSendToDeviceEventConsumer( return &OutputSendToDeviceEventConsumer{ ctx: process.Context(), jetstream: js, - topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent), + topic: cfg.Matrix.JetStream.Namespaced(jetstream.OutputSendToDeviceEvent), + durable: nats.Durable(cfg.Matrix.JetStream.Namespaced("SyncAPIEDUServerSendToDeviceConsumer")), db: store, serverName: cfg.Matrix.ServerName, notifier: notifier, @@ -66,7 +68,7 @@ func NewOutputSendToDeviceEventConsumer( // Start consuming from EDU api func (s *OutputSendToDeviceEventConsumer) Start() error { - _, err := s.jetstream.Subscribe(s.topic, s.onMessage) + _, err := s.jetstream.Subscribe(s.topic, s.onMessage, s.durable) return err } diff --git a/syncapi/consumers/eduserver_typing.go b/syncapi/consumers/eduserver_typing.go index 487befe88..030353b34 100644 --- a/syncapi/consumers/eduserver_typing.go +++ b/syncapi/consumers/eduserver_typing.go @@ -35,6 +35,7 @@ import ( type OutputTypingEventConsumer struct { ctx context.Context jetstream nats.JetStreamContext + durable nats.SubOpt topic string eduCache *cache.EDUCache stream types.StreamProvider @@ -55,7 +56,8 @@ func NewOutputTypingEventConsumer( return &OutputTypingEventConsumer{ ctx: process.Context(), jetstream: js, - topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent), + topic: cfg.Matrix.JetStream.Namespaced(jetstream.OutputTypingEvent), + durable: nats.Durable(cfg.Matrix.JetStream.Namespaced("SyncAPIEDUServerTypingConsumer")), eduCache: eduCache, notifier: notifier, stream: stream, @@ -64,7 +66,7 @@ func NewOutputTypingEventConsumer( // Start consuming from EDU api func (s *OutputTypingEventConsumer) Start() error { - _, err := s.jetstream.Subscribe(s.topic, s.onMessage) + _, err := s.jetstream.Subscribe(s.topic, s.onMessage, s.durable) return err } diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index 5b008e3d5..305bbca8f 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -38,6 +38,7 @@ type OutputRoomEventConsumer struct { cfg *config.SyncAPI rsAPI api.RoomserverInternalAPI jetstream nats.JetStreamContext + durable nats.SubOpt topic string db storage.Database pduStream types.StreamProvider @@ -60,7 +61,8 @@ func NewOutputRoomEventConsumer( ctx: process.Context(), cfg: cfg, jetstream: js, - topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputRoomEvent), + topic: cfg.Matrix.JetStream.Namespaced(jetstream.OutputRoomEvent), + durable: nats.Durable(cfg.Matrix.JetStream.Namespaced("SyncAPIRoomServerConsumer")), db: store, notifier: notifier, pduStream: pduStream, @@ -71,7 +73,7 @@ func NewOutputRoomEventConsumer( // Start consuming from room servers func (s *OutputRoomEventConsumer) Start() error { - _, err := s.jetstream.Subscribe(s.topic, s.onMessage) + _, err := s.jetstream.Subscribe(s.topic, s.onMessage, s.durable) return err }