diff --git a/src/github.com/matrix-org/dendrite/clientapi/clientapi.go b/src/github.com/matrix-org/dendrite/clientapi/clientapi.go index e4374b76d..9f17edea1 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/clientapi.go +++ b/src/github.com/matrix-org/dendrite/clientapi/clientapi.go @@ -48,10 +48,11 @@ func main() { } cfg := config.ClientAPI{ - ServerName: "localhost", - KeyID: "ed25519:something", - PrivateKey: privKey, - KafkaProducerURIs: []string{"localhost:9092"}, + ServerName: "localhost", + KeyID: "ed25519:something", + PrivateKey: privKey, + KafkaProducerURIs: []string{"localhost:9092"}, + ClientAPIOutputTopic: "clientapiOutput", } log.Info("Starting clientapi") diff --git a/src/github.com/matrix-org/dendrite/clientapi/config/config.go b/src/github.com/matrix-org/dendrite/clientapi/config/config.go index 9779b26ea..001da837b 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/config/config.go +++ b/src/github.com/matrix-org/dendrite/clientapi/config/config.go @@ -13,4 +13,6 @@ type ClientAPI struct { KeyID string // A list of URIs to send events to. These kafka logs should be consumed by a Room Server. KafkaProducerURIs []string + // The topic for events which are written to the logs. + ClientAPIOutputTopic string } diff --git a/src/github.com/matrix-org/dendrite/clientapi/writers/createroom.go b/src/github.com/matrix-org/dendrite/clientapi/writers/createroom.go index cf78ac7d7..9f7f7efd9 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/writers/createroom.go +++ b/src/github.com/matrix-org/dendrite/clientapi/writers/createroom.go @@ -181,7 +181,7 @@ func createRoom(req *http.Request, cfg config.ClientAPI, roomID string, producer } // send events to the room server - msgs, err := eventsToMessages(builtEvents) + msgs, err := eventsToMessages(builtEvents, cfg.ClientAPIOutputTopic) if err != nil { return util.ErrorResponse(err) } @@ -252,7 +252,7 @@ func authEventsFromStateNeeded(eventsNeeded gomatrixserverlib.StateNeeded, return } -func eventsToMessages(events []*gomatrixserverlib.Event) ([]*sarama.ProducerMessage, error) { +func eventsToMessages(events []*gomatrixserverlib.Event, topic string) ([]*sarama.ProducerMessage, error) { msgs := make([]*sarama.ProducerMessage, len(events)) for i, e := range events { var m sarama.ProducerMessage @@ -273,7 +273,7 @@ func eventsToMessages(events []*gomatrixserverlib.Event) ([]*sarama.ProducerMess if err != nil { return nil, err } - m.Topic = "clientapiOutput" // TODO: Make this customisable like roomserver is? + m.Topic = topic m.Key = sarama.StringEncoder(e.EventID()) m.Value = sarama.ByteEncoder(value) msgs[i] = &m