From f5a4e4a339b6f94ec215778dca22204adaa893d1 Mon Sep 17 00:00:00 2001 From: Till Faelligen Date: Sat, 24 Jul 2021 12:17:42 +0200 Subject: [PATCH] Remove unneeded TopicFor --- appservice/consumers/roomserver.go | 2 +- clientapi/clientapi.go | 2 +- eduserver/eduserver.go | 6 +++--- federationsender/consumers/eduserver.go | 10 +++++----- federationsender/consumers/keychange.go | 2 +- federationsender/consumers/roomserver.go | 2 +- keyserver/keyserver.go | 2 +- roomserver/roomserver.go | 2 +- roomserver/roomserver_test.go | 4 ++-- syncapi/consumers/clientapi.go | 2 +- syncapi/consumers/eduserver_receipts.go | 2 +- syncapi/consumers/eduserver_sendtodevice.go | 2 +- syncapi/consumers/eduserver_typing.go | 2 +- syncapi/consumers/roomserver.go | 2 +- syncapi/syncapi.go | 2 +- 15 files changed, 22 insertions(+), 22 deletions(-) diff --git a/appservice/consumers/roomserver.go b/appservice/consumers/roomserver.go index f9790a0b1..b9883546a 100644 --- a/appservice/consumers/roomserver.go +++ b/appservice/consumers/roomserver.go @@ -53,7 +53,7 @@ func NewOutputRoomEventConsumer( consumer := internal.ContinualConsumer{ Process: process, ComponentName: "appservice/roomserver", - Topic: cfg.Global.JetStream.TopicFor(jetstream.OutputRoomEvent), + Topic: jetstream.OutputRoomEvent, Consumer: kafkaConsumer, PartitionStore: appserviceDB, } diff --git a/clientapi/clientapi.go b/clientapi/clientapi.go index ffab1337d..9303eb649 100644 --- a/clientapi/clientapi.go +++ b/clientapi/clientapi.go @@ -53,7 +53,7 @@ func AddPublicRoutes( syncProducer := &producers.SyncAPIProducer{ Producer: producer, - Topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputClientData), + Topic: jetstream.OutputClientData, } routing.Setup( diff --git a/eduserver/eduserver.go b/eduserver/eduserver.go index 2e8ef1893..ee9f707af 100644 --- a/eduserver/eduserver.go +++ b/eduserver/eduserver.go @@ -48,9 +48,9 @@ func NewInternalAPI( Cache: eduCache, UserAPI: userAPI, Producer: producer, - OutputTypingEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent), - OutputSendToDeviceEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent), - OutputReceiptEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent), + OutputTypingEventTopic: jetstream.OutputTypingEvent, + OutputSendToDeviceEventTopic: jetstream.OutputSendToDeviceEvent, + OutputReceiptEventTopic: jetstream.OutputReceiptEvent, ServerName: cfg.Matrix.ServerName, } } diff --git a/federationsender/consumers/eduserver.go b/federationsender/consumers/eduserver.go index 63a367f52..2e12da120 100644 --- a/federationsender/consumers/eduserver.go +++ b/federationsender/consumers/eduserver.go @@ -56,29 +56,29 @@ func NewOutputEDUConsumer( typingConsumer: &internal.ContinualConsumer{ Process: process, ComponentName: "eduserver/typing", - Topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent), + Topic: jetstream.OutputTypingEvent, Consumer: kafkaConsumer, PartitionStore: store, }, sendToDeviceConsumer: &internal.ContinualConsumer{ Process: process, ComponentName: "eduserver/sendtodevice", - Topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent), + Topic: jetstream.OutputSendToDeviceEvent, Consumer: kafkaConsumer, PartitionStore: store, }, receiptConsumer: &internal.ContinualConsumer{ Process: process, ComponentName: "eduserver/receipt", - Topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent), + Topic: jetstream.OutputReceiptEvent, Consumer: kafkaConsumer, PartitionStore: store, }, queues: queues, db: store, ServerName: cfg.Matrix.ServerName, - TypingTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent), - SendToDeviceTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent), + TypingTopic: jetstream.OutputTypingEvent, + SendToDeviceTopic: jetstream.OutputSendToDeviceEvent, } c.typingConsumer.ProcessMessage = c.onTypingEvent c.sendToDeviceConsumer.ProcessMessage = c.onSendToDeviceEvent diff --git a/federationsender/consumers/keychange.go b/federationsender/consumers/keychange.go index 4226d4922..70e2983c5 100644 --- a/federationsender/consumers/keychange.go +++ b/federationsender/consumers/keychange.go @@ -54,7 +54,7 @@ func NewKeyChangeConsumer( consumer: &internal.ContinualConsumer{ Process: process, ComponentName: "federationsender/keychange", - Topic: string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputKeyChangeEvent)), + Topic: jetstream.OutputKeyChangeEvent, Consumer: kafkaConsumer, PartitionStore: store, }, diff --git a/federationsender/consumers/roomserver.go b/federationsender/consumers/roomserver.go index b145bc506..231bf462c 100644 --- a/federationsender/consumers/roomserver.go +++ b/federationsender/consumers/roomserver.go @@ -53,7 +53,7 @@ func NewOutputRoomEventConsumer( consumer := internal.ContinualConsumer{ Process: process, ComponentName: "federationsender/roomserver", - Topic: string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputRoomEvent)), + Topic: jetstream.OutputRoomEvent, Consumer: kafkaConsumer, PartitionStore: store, } diff --git a/keyserver/keyserver.go b/keyserver/keyserver.go index ed345dcb8..3464e0d0c 100644 --- a/keyserver/keyserver.go +++ b/keyserver/keyserver.go @@ -45,7 +45,7 @@ func NewInternalAPI( logrus.WithError(err).Panicf("failed to connect to key server database") } keyChangeProducer := &producers.KeyChange{ - Topic: string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputKeyChangeEvent)), + Topic: jetstream.OutputKeyChangeEvent, Producer: producer, DB: db, } diff --git a/roomserver/roomserver.go b/roomserver/roomserver.go index 192a056b8..fcb802dab 100644 --- a/roomserver/roomserver.go +++ b/roomserver/roomserver.go @@ -54,7 +54,7 @@ func NewInternalAPI( } return internal.NewRoomserverAPI( - cfg, roomserverDB, producer, string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputRoomEvent)), + cfg, roomserverDB, producer, jetstream.OutputRoomEvent, base.Caches, keyRing, perspectiveServerNames, ) } diff --git a/roomserver/roomserver_test.go b/roomserver/roomserver_test.go index 335d2a4ca..5b85d7c26 100644 --- a/roomserver/roomserver_test.go +++ b/roomserver/roomserver_test.go @@ -166,7 +166,7 @@ func mustCreateRoomserverAPI(t *testing.T) (api.RoomserverInternalAPI, *dummyPro ConnectionString: roomserverDBFileURI, } dp := &dummyProducer{ - topic: cfg.Global.JetStream.TopicFor(jetstream.OutputRoomEvent), + topic: jetstream.OutputRoomEvent, } cache, err := caching.NewInMemoryLRUCache(false) if err != nil { @@ -181,7 +181,7 @@ func mustCreateRoomserverAPI(t *testing.T) (api.RoomserverInternalAPI, *dummyPro logrus.WithError(err).Panicf("failed to connect to room server db") } return internal.NewRoomserverAPI( - &cfg.RoomServer, roomserverDB, dp, string(cfg.Global.JetStream.TopicFor(jetstream.OutputRoomEvent)), + &cfg.RoomServer, roomserverDB, dp, jetstream.OutputRoomEvent, base.Caches, &test.NopJSONVerifier{}, nil, ), dp } diff --git a/syncapi/consumers/clientapi.go b/syncapi/consumers/clientapi.go index a50e9c8f9..55736925a 100644 --- a/syncapi/consumers/clientapi.go +++ b/syncapi/consumers/clientapi.go @@ -51,7 +51,7 @@ func NewOutputClientDataConsumer( consumer := internal.ContinualConsumer{ Process: process, ComponentName: "syncapi/clientapi", - Topic: string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputClientData)), + Topic: jetstream.OutputClientData, Consumer: kafkaConsumer, PartitionStore: store, } diff --git a/syncapi/consumers/eduserver_receipts.go b/syncapi/consumers/eduserver_receipts.go index 2b25985f9..d6805d177 100644 --- a/syncapi/consumers/eduserver_receipts.go +++ b/syncapi/consumers/eduserver_receipts.go @@ -53,7 +53,7 @@ func NewOutputReceiptEventConsumer( consumer := internal.ContinualConsumer{ Process: process, ComponentName: "syncapi/eduserver/receipt", - Topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent), + Topic: jetstream.OutputReceiptEvent, Consumer: kafkaConsumer, PartitionStore: store, } diff --git a/syncapi/consumers/eduserver_sendtodevice.go b/syncapi/consumers/eduserver_sendtodevice.go index 4de8eeb77..701634f04 100644 --- a/syncapi/consumers/eduserver_sendtodevice.go +++ b/syncapi/consumers/eduserver_sendtodevice.go @@ -56,7 +56,7 @@ func NewOutputSendToDeviceEventConsumer( consumer := internal.ContinualConsumer{ Process: process, ComponentName: "syncapi/eduserver/sendtodevice", - Topic: string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent)), + Topic: jetstream.OutputSendToDeviceEvent, Consumer: kafkaConsumer, PartitionStore: store, } diff --git a/syncapi/consumers/eduserver_typing.go b/syncapi/consumers/eduserver_typing.go index 343f777e1..790b52588 100644 --- a/syncapi/consumers/eduserver_typing.go +++ b/syncapi/consumers/eduserver_typing.go @@ -54,7 +54,7 @@ func NewOutputTypingEventConsumer( consumer := internal.ContinualConsumer{ Process: process, ComponentName: "syncapi/eduserver/typing", - Topic: string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent)), + Topic: jetstream.OutputTypingEvent, Consumer: kafkaConsumer, PartitionStore: store, } diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index 7ddf2c092..99475d012 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -59,7 +59,7 @@ func NewOutputRoomEventConsumer( consumer := internal.ContinualConsumer{ Process: process, ComponentName: "syncapi/roomserver", - Topic: string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputRoomEvent)), + Topic: jetstream.OutputRoomEvent, Consumer: kafkaConsumer, PartitionStore: store, } diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index 16e222cb0..f6195c24d 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -65,7 +65,7 @@ func AddPublicRoutes( requestPool := sync.NewRequestPool(syncDB, cfg, userAPI, keyAPI, rsAPI, streams, notifier) keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer( - process, cfg.Matrix.ServerName, string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputKeyChangeEvent)), + process, cfg.Matrix.ServerName, jetstream.OutputKeyChangeEvent, consumer, keyAPI, rsAPI, syncDB, notifier, streams.DeviceListStreamProvider, ) if err = keyChangeConsumer.Start(); err != nil {