mirror of
https://github.com/matrix-org/dendrite.git
synced 2024-11-27 08:41:57 -06:00
parent
9ac699b77a
commit
5f9996b1c5
|
@ -53,7 +53,7 @@ func NewOutputRoomEventConsumer(
|
||||||
consumer := internal.ContinualConsumer{
|
consumer := internal.ContinualConsumer{
|
||||||
Process: process,
|
Process: process,
|
||||||
ComponentName: "appservice/roomserver",
|
ComponentName: "appservice/roomserver",
|
||||||
Topic: jetstream.OutputRoomEvent,
|
Topic: cfg.Global.JetStream.TopicFor(jetstream.OutputRoomEvent),
|
||||||
Consumer: kafkaConsumer,
|
Consumer: kafkaConsumer,
|
||||||
PartitionStore: appserviceDB,
|
PartitionStore: appserviceDB,
|
||||||
}
|
}
|
||||||
|
|
|
@ -53,7 +53,7 @@ func AddPublicRoutes(
|
||||||
|
|
||||||
syncProducer := &producers.SyncAPIProducer{
|
syncProducer := &producers.SyncAPIProducer{
|
||||||
Producer: producer,
|
Producer: producer,
|
||||||
Topic: jetstream.OutputClientData,
|
Topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputClientData),
|
||||||
}
|
}
|
||||||
|
|
||||||
routing.Setup(
|
routing.Setup(
|
||||||
|
|
|
@ -48,9 +48,9 @@ func NewInternalAPI(
|
||||||
Cache: eduCache,
|
Cache: eduCache,
|
||||||
UserAPI: userAPI,
|
UserAPI: userAPI,
|
||||||
Producer: producer,
|
Producer: producer,
|
||||||
OutputTypingEventTopic: jetstream.OutputTypingEvent,
|
OutputTypingEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent),
|
||||||
OutputSendToDeviceEventTopic: jetstream.OutputSendToDeviceEvent,
|
OutputSendToDeviceEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent),
|
||||||
OutputReceiptEventTopic: jetstream.OutputReceiptEvent,
|
OutputReceiptEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent),
|
||||||
ServerName: cfg.Matrix.ServerName,
|
ServerName: cfg.Matrix.ServerName,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -56,29 +56,29 @@ func NewOutputEDUConsumer(
|
||||||
typingConsumer: &internal.ContinualConsumer{
|
typingConsumer: &internal.ContinualConsumer{
|
||||||
Process: process,
|
Process: process,
|
||||||
ComponentName: "eduserver/typing",
|
ComponentName: "eduserver/typing",
|
||||||
Topic: jetstream.OutputTypingEvent,
|
Topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent),
|
||||||
Consumer: kafkaConsumer,
|
Consumer: kafkaConsumer,
|
||||||
PartitionStore: store,
|
PartitionStore: store,
|
||||||
},
|
},
|
||||||
sendToDeviceConsumer: &internal.ContinualConsumer{
|
sendToDeviceConsumer: &internal.ContinualConsumer{
|
||||||
Process: process,
|
Process: process,
|
||||||
ComponentName: "eduserver/sendtodevice",
|
ComponentName: "eduserver/sendtodevice",
|
||||||
Topic: jetstream.OutputSendToDeviceEvent,
|
Topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent),
|
||||||
Consumer: kafkaConsumer,
|
Consumer: kafkaConsumer,
|
||||||
PartitionStore: store,
|
PartitionStore: store,
|
||||||
},
|
},
|
||||||
receiptConsumer: &internal.ContinualConsumer{
|
receiptConsumer: &internal.ContinualConsumer{
|
||||||
Process: process,
|
Process: process,
|
||||||
ComponentName: "eduserver/receipt",
|
ComponentName: "eduserver/receipt",
|
||||||
Topic: jetstream.OutputReceiptEvent,
|
Topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent),
|
||||||
Consumer: kafkaConsumer,
|
Consumer: kafkaConsumer,
|
||||||
PartitionStore: store,
|
PartitionStore: store,
|
||||||
},
|
},
|
||||||
queues: queues,
|
queues: queues,
|
||||||
db: store,
|
db: store,
|
||||||
ServerName: cfg.Matrix.ServerName,
|
ServerName: cfg.Matrix.ServerName,
|
||||||
TypingTopic: jetstream.OutputTypingEvent,
|
TypingTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent),
|
||||||
SendToDeviceTopic: jetstream.OutputSendToDeviceEvent,
|
SendToDeviceTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent),
|
||||||
}
|
}
|
||||||
c.typingConsumer.ProcessMessage = c.onTypingEvent
|
c.typingConsumer.ProcessMessage = c.onTypingEvent
|
||||||
c.sendToDeviceConsumer.ProcessMessage = c.onSendToDeviceEvent
|
c.sendToDeviceConsumer.ProcessMessage = c.onSendToDeviceEvent
|
||||||
|
|
|
@ -54,7 +54,7 @@ func NewKeyChangeConsumer(
|
||||||
consumer: &internal.ContinualConsumer{
|
consumer: &internal.ContinualConsumer{
|
||||||
Process: process,
|
Process: process,
|
||||||
ComponentName: "federationsender/keychange",
|
ComponentName: "federationsender/keychange",
|
||||||
Topic: jetstream.OutputKeyChangeEvent,
|
Topic: string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputKeyChangeEvent)),
|
||||||
Consumer: kafkaConsumer,
|
Consumer: kafkaConsumer,
|
||||||
PartitionStore: store,
|
PartitionStore: store,
|
||||||
},
|
},
|
||||||
|
|
|
@ -53,7 +53,7 @@ func NewOutputRoomEventConsumer(
|
||||||
consumer := internal.ContinualConsumer{
|
consumer := internal.ContinualConsumer{
|
||||||
Process: process,
|
Process: process,
|
||||||
ComponentName: "federationsender/roomserver",
|
ComponentName: "federationsender/roomserver",
|
||||||
Topic: jetstream.OutputRoomEvent,
|
Topic: string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputRoomEvent)),
|
||||||
Consumer: kafkaConsumer,
|
Consumer: kafkaConsumer,
|
||||||
PartitionStore: store,
|
PartitionStore: store,
|
||||||
}
|
}
|
||||||
|
|
|
@ -45,7 +45,7 @@ func NewInternalAPI(
|
||||||
logrus.WithError(err).Panicf("failed to connect to key server database")
|
logrus.WithError(err).Panicf("failed to connect to key server database")
|
||||||
}
|
}
|
||||||
keyChangeProducer := &producers.KeyChange{
|
keyChangeProducer := &producers.KeyChange{
|
||||||
Topic: jetstream.OutputKeyChangeEvent,
|
Topic: string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputKeyChangeEvent)),
|
||||||
Producer: producer,
|
Producer: producer,
|
||||||
DB: db,
|
DB: db,
|
||||||
}
|
}
|
||||||
|
|
|
@ -54,7 +54,7 @@ func NewInternalAPI(
|
||||||
}
|
}
|
||||||
|
|
||||||
return internal.NewRoomserverAPI(
|
return internal.NewRoomserverAPI(
|
||||||
cfg, roomserverDB, producer, jetstream.OutputRoomEvent,
|
cfg, roomserverDB, producer, string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputRoomEvent)),
|
||||||
base.Caches, keyRing, perspectiveServerNames,
|
base.Caches, keyRing, perspectiveServerNames,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
|
@ -166,7 +166,7 @@ func mustCreateRoomserverAPI(t *testing.T) (api.RoomserverInternalAPI, *dummyPro
|
||||||
ConnectionString: roomserverDBFileURI,
|
ConnectionString: roomserverDBFileURI,
|
||||||
}
|
}
|
||||||
dp := &dummyProducer{
|
dp := &dummyProducer{
|
||||||
topic: jetstream.OutputRoomEvent,
|
topic: cfg.Global.JetStream.TopicFor(jetstream.OutputRoomEvent),
|
||||||
}
|
}
|
||||||
cache, err := caching.NewInMemoryLRUCache(false)
|
cache, err := caching.NewInMemoryLRUCache(false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -181,7 +181,7 @@ func mustCreateRoomserverAPI(t *testing.T) (api.RoomserverInternalAPI, *dummyPro
|
||||||
logrus.WithError(err).Panicf("failed to connect to room server db")
|
logrus.WithError(err).Panicf("failed to connect to room server db")
|
||||||
}
|
}
|
||||||
return internal.NewRoomserverAPI(
|
return internal.NewRoomserverAPI(
|
||||||
&cfg.RoomServer, roomserverDB, dp, jetstream.OutputRoomEvent,
|
&cfg.RoomServer, roomserverDB, dp, string(cfg.Global.JetStream.TopicFor(jetstream.OutputRoomEvent)),
|
||||||
base.Caches, &test.NopJSONVerifier{}, nil,
|
base.Caches, &test.NopJSONVerifier{}, nil,
|
||||||
), dp
|
), dp
|
||||||
}
|
}
|
||||||
|
|
|
@ -51,7 +51,7 @@ func NewOutputClientDataConsumer(
|
||||||
consumer := internal.ContinualConsumer{
|
consumer := internal.ContinualConsumer{
|
||||||
Process: process,
|
Process: process,
|
||||||
ComponentName: "syncapi/clientapi",
|
ComponentName: "syncapi/clientapi",
|
||||||
Topic: jetstream.OutputClientData,
|
Topic: string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputClientData)),
|
||||||
Consumer: kafkaConsumer,
|
Consumer: kafkaConsumer,
|
||||||
PartitionStore: store,
|
PartitionStore: store,
|
||||||
}
|
}
|
||||||
|
|
|
@ -53,7 +53,7 @@ func NewOutputReceiptEventConsumer(
|
||||||
consumer := internal.ContinualConsumer{
|
consumer := internal.ContinualConsumer{
|
||||||
Process: process,
|
Process: process,
|
||||||
ComponentName: "syncapi/eduserver/receipt",
|
ComponentName: "syncapi/eduserver/receipt",
|
||||||
Topic: jetstream.OutputReceiptEvent,
|
Topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent),
|
||||||
Consumer: kafkaConsumer,
|
Consumer: kafkaConsumer,
|
||||||
PartitionStore: store,
|
PartitionStore: store,
|
||||||
}
|
}
|
||||||
|
|
|
@ -56,7 +56,7 @@ func NewOutputSendToDeviceEventConsumer(
|
||||||
consumer := internal.ContinualConsumer{
|
consumer := internal.ContinualConsumer{
|
||||||
Process: process,
|
Process: process,
|
||||||
ComponentName: "syncapi/eduserver/sendtodevice",
|
ComponentName: "syncapi/eduserver/sendtodevice",
|
||||||
Topic: jetstream.OutputSendToDeviceEvent,
|
Topic: string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent)),
|
||||||
Consumer: kafkaConsumer,
|
Consumer: kafkaConsumer,
|
||||||
PartitionStore: store,
|
PartitionStore: store,
|
||||||
}
|
}
|
||||||
|
|
|
@ -54,7 +54,7 @@ func NewOutputTypingEventConsumer(
|
||||||
consumer := internal.ContinualConsumer{
|
consumer := internal.ContinualConsumer{
|
||||||
Process: process,
|
Process: process,
|
||||||
ComponentName: "syncapi/eduserver/typing",
|
ComponentName: "syncapi/eduserver/typing",
|
||||||
Topic: jetstream.OutputTypingEvent,
|
Topic: string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent)),
|
||||||
Consumer: kafkaConsumer,
|
Consumer: kafkaConsumer,
|
||||||
PartitionStore: store,
|
PartitionStore: store,
|
||||||
}
|
}
|
||||||
|
|
|
@ -59,7 +59,7 @@ func NewOutputRoomEventConsumer(
|
||||||
consumer := internal.ContinualConsumer{
|
consumer := internal.ContinualConsumer{
|
||||||
Process: process,
|
Process: process,
|
||||||
ComponentName: "syncapi/roomserver",
|
ComponentName: "syncapi/roomserver",
|
||||||
Topic: jetstream.OutputRoomEvent,
|
Topic: string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputRoomEvent)),
|
||||||
Consumer: kafkaConsumer,
|
Consumer: kafkaConsumer,
|
||||||
PartitionStore: store,
|
PartitionStore: store,
|
||||||
}
|
}
|
||||||
|
|
|
@ -65,7 +65,7 @@ func AddPublicRoutes(
|
||||||
requestPool := sync.NewRequestPool(syncDB, cfg, userAPI, keyAPI, rsAPI, streams, notifier)
|
requestPool := sync.NewRequestPool(syncDB, cfg, userAPI, keyAPI, rsAPI, streams, notifier)
|
||||||
|
|
||||||
keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer(
|
keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer(
|
||||||
process, cfg.Matrix.ServerName, jetstream.OutputKeyChangeEvent,
|
process, cfg.Matrix.ServerName, string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputKeyChangeEvent)),
|
||||||
consumer, keyAPI, rsAPI, syncDB, notifier, streams.DeviceListStreamProvider,
|
consumer, keyAPI, rsAPI, syncDB, notifier, streams.DeviceListStreamProvider,
|
||||||
)
|
)
|
||||||
if err = keyChangeConsumer.Start(); err != nil {
|
if err = keyChangeConsumer.Start(); err != nil {
|
||||||
|
|
Loading…
Reference in a new issue