diff --git a/src/github.com/matrix-org/dendrite/cmd/syncserver-integration-tests/main.go b/src/github.com/matrix-org/dendrite/cmd/syncserver-integration-tests/main.go index b0e36c425..bb75b1dfc 100644 --- a/src/github.com/matrix-org/dendrite/cmd/syncserver-integration-tests/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/syncserver-integration-tests/main.go @@ -54,6 +54,7 @@ var ( ) const inputTopic = "syncserverInput" +const clientTopic = "clientapiOutput" var exe = test.KafkaExecutor{ ZookeeperURI: zookeeperURI, @@ -134,6 +135,7 @@ func startSyncServer() (*exec.Cmd, chan error) { cfg.Matrix.ServerName = "localhost" cfg.Listen.SyncAPI = config.Address(syncserverAddr) cfg.Kafka.Topics.OutputRoomEvent = config.Topic(inputTopic) + cfg.Kafka.Topics.OutputClientData = config.Topic(clientTopic) if err := test.WriteConfig(cfg, dir); err != nil { panic(err) @@ -177,6 +179,10 @@ func prepareKafka() { if err := exe.CreateTopic(inputTopic); err != nil { panic(err) } + exe.DeleteTopic(clientTopic) + if err := exe.CreateTopic(clientTopic); err != nil { + panic(err) + } } func testSyncServer(syncServerCmdChan chan error, userID, since, want string) { diff --git a/src/github.com/matrix-org/dendrite/common/config/config.go b/src/github.com/matrix-org/dendrite/common/config/config.go index 50f820124..324561f68 100644 --- a/src/github.com/matrix-org/dendrite/common/config/config.go +++ b/src/github.com/matrix-org/dendrite/common/config/config.go @@ -300,6 +300,7 @@ func (config *Dendrite) check() error { checkNotZero("kafka.addresses", int64(len(config.Kafka.Addresses))) checkNotEmpty("kafka.topics.output_room_event", string(config.Kafka.Topics.OutputRoomEvent)) + checkNotEmpty("kafka.topics.output_client_data", string(config.Kafka.Topics.OutputClientData)) checkNotEmpty("database.account", string(config.Database.Account)) checkNotEmpty("database.device", string(config.Database.Device)) checkNotEmpty("database.server_key", string(config.Database.ServerKey))