diff --git a/appservice/consumers/roomserver.go b/appservice/consumers/roomserver.go index 7d739e647..8aea5c347 100644 --- a/appservice/consumers/roomserver.go +++ b/appservice/consumers/roomserver.go @@ -55,8 +55,8 @@ func NewOutputRoomEventConsumer( return &OutputRoomEventConsumer{ ctx: process.Context(), jetstream: js, - durable: nats.Durable(cfg.Global.JetStream.Namespaced("AppserviceRoomserverConsumer")), - topic: cfg.Global.JetStream.Namespaced(jetstream.OutputRoomEvent), + durable: cfg.Global.JetStream.Durable("AppserviceRoomserverConsumer"), + topic: cfg.Global.JetStream.TopicFor(jetstream.OutputRoomEvent), asDB: appserviceDB, rsAPI: rsAPI, serverName: string(cfg.Global.ServerName), diff --git a/federationapi/consumers/eduserver.go b/federationapi/consumers/eduserver.go index 06ce2de77..c3e5b4d49 100644 --- a/federationapi/consumers/eduserver.go +++ b/federationapi/consumers/eduserver.go @@ -57,10 +57,10 @@ func NewOutputEDUConsumer( queues: queues, db: store, ServerName: cfg.Matrix.ServerName, - durable: nats.Durable(cfg.Matrix.JetStream.Namespaced("FederationAPIEDUServerConsumer")), - typingTopic: cfg.Matrix.JetStream.Namespaced(jetstream.OutputTypingEvent), - sendToDeviceTopic: cfg.Matrix.JetStream.Namespaced(jetstream.OutputSendToDeviceEvent), - receiptTopic: cfg.Matrix.JetStream.Namespaced(jetstream.OutputReceiptEvent), + durable: cfg.Matrix.JetStream.Durable("FederationAPIEDUServerConsumer"), + typingTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent), + sendToDeviceTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent), + receiptTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent), } } diff --git a/federationapi/consumers/keychange.go b/federationapi/consumers/keychange.go index 764907194..8231fcf44 100644 --- a/federationapi/consumers/keychange.go +++ b/federationapi/consumers/keychange.go @@ -57,7 +57,7 @@ func NewKeyChangeConsumer( consumer: &internal.ContinualConsumer{ Process: process, ComponentName: "federationapi/keychange", - Topic: string(cfg.Matrix.JetStream.Namespaced(jetstream.OutputKeyChangeEvent)), + Topic: string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputKeyChangeEvent)), Consumer: kafkaConsumer, PartitionStore: store, }, diff --git a/federationapi/consumers/roomserver.go b/federationapi/consumers/roomserver.go index ca13078de..632adae34 100644 --- a/federationapi/consumers/roomserver.go +++ b/federationapi/consumers/roomserver.go @@ -59,8 +59,8 @@ func NewOutputRoomEventConsumer( db: store, queues: queues, rsAPI: rsAPI, - durable: nats.Durable(cfg.Matrix.JetStream.Namespaced("FederationAPIRoomServerConsumer")), - topic: cfg.Matrix.JetStream.Namespaced(jetstream.OutputRoomEvent), + durable: cfg.Matrix.JetStream.Durable("FederationAPIRoomServerConsumer"), + topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputRoomEvent), } } diff --git a/roomserver/internal/api.go b/roomserver/internal/api.go index 8abc32897..cf2e59c61 100644 --- a/roomserver/internal/api.go +++ b/roomserver/internal/api.go @@ -67,7 +67,7 @@ func NewRoomserverAPI( InputRoomEventTopic: inputRoomEventTopic, OutputRoomEventTopic: outputRoomEventTopic, JetStream: consumer, - Durable: cfg.Matrix.JetStream.Namespaced("RoomserverInputConsumer"), + Durable: cfg.Matrix.JetStream.Durable("RoomserverInputConsumer"), ServerName: cfg.Matrix.ServerName, ACLs: serverACLs, }, diff --git a/setup/config/config_jetstream.go b/setup/config/config_jetstream.go index b04c5fc1e..e618e63e5 100644 --- a/setup/config/config_jetstream.go +++ b/setup/config/config_jetstream.go @@ -2,6 +2,8 @@ package config import ( "fmt" + + "github.com/nats-io/nats.go" ) type JetStream struct { @@ -19,10 +21,14 @@ type JetStream struct { InMemory bool `yaml:"in_memory"` } -func (c *JetStream) Namespaced(name string) string { +func (c *JetStream) TopicFor(name string) string { return fmt.Sprintf("%s%s", c.TopicPrefix, name) } +func (c *JetStream) Durable(name string) nats.SubOpt { + return nats.Durable(name) +} + func (c *JetStream) Defaults(generate bool) { c.Addresses = []string{} c.TopicPrefix = "Dendrite" diff --git a/syncapi/consumers/clientapi.go b/syncapi/consumers/clientapi.go index 33c4c4fd5..1ec9beb04 100644 --- a/syncapi/consumers/clientapi.go +++ b/syncapi/consumers/clientapi.go @@ -53,8 +53,8 @@ func NewOutputClientDataConsumer( return &OutputClientDataConsumer{ ctx: process.Context(), jetstream: js, - topic: cfg.Matrix.JetStream.Namespaced(jetstream.OutputClientData), - durable: nats.Durable(cfg.Matrix.JetStream.Namespaced("SyncAPIClientAPIConsumer")), + topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputClientData), + durable: cfg.Matrix.JetStream.Durable("SyncAPIClientAPIConsumer"), db: store, notifier: notifier, stream: stream, diff --git a/syncapi/consumers/eduserver_receipts.go b/syncapi/consumers/eduserver_receipts.go index 57b2fb8ed..57d69d6fb 100644 --- a/syncapi/consumers/eduserver_receipts.go +++ b/syncapi/consumers/eduserver_receipts.go @@ -54,8 +54,8 @@ func NewOutputReceiptEventConsumer( return &OutputReceiptEventConsumer{ ctx: process.Context(), jetstream: js, - topic: cfg.Matrix.JetStream.Namespaced(jetstream.OutputReceiptEvent), - durable: nats.Durable(cfg.Matrix.JetStream.Namespaced("SyncAPIEDUServerReceiptConsumer")), + topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent), + durable: cfg.Matrix.JetStream.Durable("SyncAPIEDUServerReceiptConsumer"), db: store, notifier: notifier, stream: stream, diff --git a/syncapi/consumers/eduserver_sendtodevice.go b/syncapi/consumers/eduserver_sendtodevice.go index 6e7954e06..54e689fa1 100644 --- a/syncapi/consumers/eduserver_sendtodevice.go +++ b/syncapi/consumers/eduserver_sendtodevice.go @@ -57,8 +57,8 @@ func NewOutputSendToDeviceEventConsumer( return &OutputSendToDeviceEventConsumer{ ctx: process.Context(), jetstream: js, - topic: cfg.Matrix.JetStream.Namespaced(jetstream.OutputSendToDeviceEvent), - durable: nats.Durable(cfg.Matrix.JetStream.Namespaced("SyncAPIEDUServerSendToDeviceConsumer")), + topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent), + durable: cfg.Matrix.JetStream.Durable("SyncAPIEDUServerSendToDeviceConsumer"), db: store, serverName: cfg.Matrix.ServerName, notifier: notifier, diff --git a/syncapi/consumers/eduserver_typing.go b/syncapi/consumers/eduserver_typing.go index 030353b34..de2f6f950 100644 --- a/syncapi/consumers/eduserver_typing.go +++ b/syncapi/consumers/eduserver_typing.go @@ -56,8 +56,8 @@ func NewOutputTypingEventConsumer( return &OutputTypingEventConsumer{ ctx: process.Context(), jetstream: js, - topic: cfg.Matrix.JetStream.Namespaced(jetstream.OutputTypingEvent), - durable: nats.Durable(cfg.Matrix.JetStream.Namespaced("SyncAPIEDUServerTypingConsumer")), + topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent), + durable: cfg.Matrix.JetStream.Durable("SyncAPIEDUServerTypingConsumer"), eduCache: eduCache, notifier: notifier, stream: stream, diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index 305bbca8f..6b3ebe53e 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -61,8 +61,8 @@ func NewOutputRoomEventConsumer( ctx: process.Context(), cfg: cfg, jetstream: js, - topic: cfg.Matrix.JetStream.Namespaced(jetstream.OutputRoomEvent), - durable: nats.Durable(cfg.Matrix.JetStream.Namespaced("SyncAPIRoomServerConsumer")), + topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputRoomEvent), + durable: cfg.Matrix.JetStream.Durable("SyncAPIRoomServerConsumer"), db: store, notifier: notifier, pduStream: pduStream,