diff --git a/appservice/consumers/roomserver.go b/appservice/consumers/roomserver.go index b9883546a..f9790a0b1 100644 --- a/appservice/consumers/roomserver.go +++ b/appservice/consumers/roomserver.go @@ -53,7 +53,7 @@ func NewOutputRoomEventConsumer( consumer := internal.ContinualConsumer{ Process: process, ComponentName: "appservice/roomserver", - Topic: jetstream.OutputRoomEvent, + Topic: cfg.Global.JetStream.TopicFor(jetstream.OutputRoomEvent), Consumer: kafkaConsumer, PartitionStore: appserviceDB, } diff --git a/clientapi/clientapi.go b/clientapi/clientapi.go index 9303eb649..ffab1337d 100644 --- a/clientapi/clientapi.go +++ b/clientapi/clientapi.go @@ -53,7 +53,7 @@ func AddPublicRoutes( syncProducer := &producers.SyncAPIProducer{ Producer: producer, - Topic: jetstream.OutputClientData, + Topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputClientData), } routing.Setup( diff --git a/eduserver/eduserver.go b/eduserver/eduserver.go index ee9f707af..2e8ef1893 100644 --- a/eduserver/eduserver.go +++ b/eduserver/eduserver.go @@ -48,9 +48,9 @@ func NewInternalAPI( Cache: eduCache, UserAPI: userAPI, Producer: producer, - OutputTypingEventTopic: jetstream.OutputTypingEvent, - OutputSendToDeviceEventTopic: jetstream.OutputSendToDeviceEvent, - OutputReceiptEventTopic: jetstream.OutputReceiptEvent, + OutputTypingEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent), + OutputSendToDeviceEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent), + OutputReceiptEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent), ServerName: cfg.Matrix.ServerName, } } diff --git a/federationsender/consumers/eduserver.go b/federationsender/consumers/eduserver.go index 2e12da120..63a367f52 100644 --- a/federationsender/consumers/eduserver.go +++ b/federationsender/consumers/eduserver.go @@ -56,29 +56,29 @@ func NewOutputEDUConsumer( typingConsumer: &internal.ContinualConsumer{ Process: process, ComponentName: "eduserver/typing", - Topic: jetstream.OutputTypingEvent, + Topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent), Consumer: kafkaConsumer, PartitionStore: store, }, sendToDeviceConsumer: &internal.ContinualConsumer{ Process: process, ComponentName: "eduserver/sendtodevice", - Topic: jetstream.OutputSendToDeviceEvent, + Topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent), Consumer: kafkaConsumer, PartitionStore: store, }, receiptConsumer: &internal.ContinualConsumer{ Process: process, ComponentName: "eduserver/receipt", - Topic: jetstream.OutputReceiptEvent, + Topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent), Consumer: kafkaConsumer, PartitionStore: store, }, queues: queues, db: store, ServerName: cfg.Matrix.ServerName, - TypingTopic: jetstream.OutputTypingEvent, - SendToDeviceTopic: jetstream.OutputSendToDeviceEvent, + TypingTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent), + SendToDeviceTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent), } c.typingConsumer.ProcessMessage = c.onTypingEvent c.sendToDeviceConsumer.ProcessMessage = c.onSendToDeviceEvent diff --git a/federationsender/consumers/keychange.go b/federationsender/consumers/keychange.go index 70e2983c5..4226d4922 100644 --- a/federationsender/consumers/keychange.go +++ b/federationsender/consumers/keychange.go @@ -54,7 +54,7 @@ func NewKeyChangeConsumer( consumer: &internal.ContinualConsumer{ Process: process, ComponentName: "federationsender/keychange", - Topic: jetstream.OutputKeyChangeEvent, + Topic: string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputKeyChangeEvent)), Consumer: kafkaConsumer, PartitionStore: store, }, diff --git a/federationsender/consumers/roomserver.go b/federationsender/consumers/roomserver.go index 231bf462c..b145bc506 100644 --- a/federationsender/consumers/roomserver.go +++ b/federationsender/consumers/roomserver.go @@ -53,7 +53,7 @@ func NewOutputRoomEventConsumer( consumer := internal.ContinualConsumer{ Process: process, ComponentName: "federationsender/roomserver", - Topic: jetstream.OutputRoomEvent, + Topic: string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputRoomEvent)), Consumer: kafkaConsumer, PartitionStore: store, } diff --git a/keyserver/keyserver.go b/keyserver/keyserver.go index 3464e0d0c..ed345dcb8 100644 --- a/keyserver/keyserver.go +++ b/keyserver/keyserver.go @@ -45,7 +45,7 @@ func NewInternalAPI( logrus.WithError(err).Panicf("failed to connect to key server database") } keyChangeProducer := &producers.KeyChange{ - Topic: jetstream.OutputKeyChangeEvent, + Topic: string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputKeyChangeEvent)), Producer: producer, DB: db, } diff --git a/roomserver/roomserver.go b/roomserver/roomserver.go index fcb802dab..192a056b8 100644 --- a/roomserver/roomserver.go +++ b/roomserver/roomserver.go @@ -54,7 +54,7 @@ func NewInternalAPI( } return internal.NewRoomserverAPI( - cfg, roomserverDB, producer, jetstream.OutputRoomEvent, + cfg, roomserverDB, producer, string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputRoomEvent)), base.Caches, keyRing, perspectiveServerNames, ) } diff --git a/roomserver/roomserver_test.go b/roomserver/roomserver_test.go index 5b85d7c26..335d2a4ca 100644 --- a/roomserver/roomserver_test.go +++ b/roomserver/roomserver_test.go @@ -166,7 +166,7 @@ func mustCreateRoomserverAPI(t *testing.T) (api.RoomserverInternalAPI, *dummyPro ConnectionString: roomserverDBFileURI, } dp := &dummyProducer{ - topic: jetstream.OutputRoomEvent, + topic: cfg.Global.JetStream.TopicFor(jetstream.OutputRoomEvent), } cache, err := caching.NewInMemoryLRUCache(false) if err != nil { @@ -181,7 +181,7 @@ func mustCreateRoomserverAPI(t *testing.T) (api.RoomserverInternalAPI, *dummyPro logrus.WithError(err).Panicf("failed to connect to room server db") } return internal.NewRoomserverAPI( - &cfg.RoomServer, roomserverDB, dp, jetstream.OutputRoomEvent, + &cfg.RoomServer, roomserverDB, dp, string(cfg.Global.JetStream.TopicFor(jetstream.OutputRoomEvent)), base.Caches, &test.NopJSONVerifier{}, nil, ), dp } diff --git a/syncapi/consumers/clientapi.go b/syncapi/consumers/clientapi.go index 55736925a..a50e9c8f9 100644 --- a/syncapi/consumers/clientapi.go +++ b/syncapi/consumers/clientapi.go @@ -51,7 +51,7 @@ func NewOutputClientDataConsumer( consumer := internal.ContinualConsumer{ Process: process, ComponentName: "syncapi/clientapi", - Topic: jetstream.OutputClientData, + Topic: string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputClientData)), Consumer: kafkaConsumer, PartitionStore: store, } diff --git a/syncapi/consumers/eduserver_receipts.go b/syncapi/consumers/eduserver_receipts.go index d6805d177..2b25985f9 100644 --- a/syncapi/consumers/eduserver_receipts.go +++ b/syncapi/consumers/eduserver_receipts.go @@ -53,7 +53,7 @@ func NewOutputReceiptEventConsumer( consumer := internal.ContinualConsumer{ Process: process, ComponentName: "syncapi/eduserver/receipt", - Topic: jetstream.OutputReceiptEvent, + Topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent), Consumer: kafkaConsumer, PartitionStore: store, } diff --git a/syncapi/consumers/eduserver_sendtodevice.go b/syncapi/consumers/eduserver_sendtodevice.go index 701634f04..4de8eeb77 100644 --- a/syncapi/consumers/eduserver_sendtodevice.go +++ b/syncapi/consumers/eduserver_sendtodevice.go @@ -56,7 +56,7 @@ func NewOutputSendToDeviceEventConsumer( consumer := internal.ContinualConsumer{ Process: process, ComponentName: "syncapi/eduserver/sendtodevice", - Topic: jetstream.OutputSendToDeviceEvent, + Topic: string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent)), Consumer: kafkaConsumer, PartitionStore: store, } diff --git a/syncapi/consumers/eduserver_typing.go b/syncapi/consumers/eduserver_typing.go index 790b52588..343f777e1 100644 --- a/syncapi/consumers/eduserver_typing.go +++ b/syncapi/consumers/eduserver_typing.go @@ -54,7 +54,7 @@ func NewOutputTypingEventConsumer( consumer := internal.ContinualConsumer{ Process: process, ComponentName: "syncapi/eduserver/typing", - Topic: jetstream.OutputTypingEvent, + Topic: string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent)), Consumer: kafkaConsumer, PartitionStore: store, } diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index 99475d012..7ddf2c092 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -59,7 +59,7 @@ func NewOutputRoomEventConsumer( consumer := internal.ContinualConsumer{ Process: process, ComponentName: "syncapi/roomserver", - Topic: jetstream.OutputRoomEvent, + Topic: string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputRoomEvent)), Consumer: kafkaConsumer, PartitionStore: store, } diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index f6195c24d..16e222cb0 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -65,7 +65,7 @@ func AddPublicRoutes( requestPool := sync.NewRequestPool(syncDB, cfg, userAPI, keyAPI, rsAPI, streams, notifier) keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer( - process, cfg.Matrix.ServerName, jetstream.OutputKeyChangeEvent, + process, cfg.Matrix.ServerName, string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputKeyChangeEvent)), consumer, keyAPI, rsAPI, syncDB, notifier, streams.DeviceListStreamProvider, ) if err = keyChangeConsumer.Start(); err != nil {