From 9e75165dc9df26db566bde034d08475cb9c4f067 Mon Sep 17 00:00:00 2001 From: Anant Prakash Date: Tue, 31 Jul 2018 19:29:47 +0530 Subject: [PATCH] Integrate the typing server component, create kafka topic --- dendrite-config.yaml | 1 + .../dendrite/cmd/dendrite-monolith-server/main.go | 10 +++++----- .../matrix-org/dendrite/common/config/config.go | 3 +++ .../matrix-org/dendrite/common/config/config_test.go | 1 + .../matrix-org/dendrite/common/test/config.go | 1 + .../matrix-org/dendrite/typingserver/typingserver.go | 12 ++++++++++-- 6 files changed, 21 insertions(+), 7 deletions(-) diff --git a/dendrite-config.yaml b/dendrite-config.yaml index a838c1bbf..d26e74777 100644 --- a/dendrite-config.yaml +++ b/dendrite-config.yaml @@ -86,6 +86,7 @@ kafka: topics: output_room_event: roomserverOutput output_client_data: clientapiOutput + output_typing_event: typingServerOutput user_updates: userUpdates # The postgres connection configs for connecting to the databases e.g a postgres:// URI diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-monolith-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-monolith-server/main.go index c66231285..b1ad0910b 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-monolith-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-monolith-server/main.go @@ -18,20 +18,20 @@ import ( "flag" "net/http" - "github.com/matrix-org/dendrite/common/keydb" - "github.com/matrix-org/dendrite/common/transactions" - "github.com/matrix-org/dendrite/typingserver" - "github.com/matrix-org/dendrite/appservice" "github.com/matrix-org/dendrite/clientapi" "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common/basecomponent" + "github.com/matrix-org/dendrite/common/keydb" + "github.com/matrix-org/dendrite/common/transactions" "github.com/matrix-org/dendrite/federationapi" "github.com/matrix-org/dendrite/federationsender" "github.com/matrix-org/dendrite/mediaapi" "github.com/matrix-org/dendrite/publicroomsapi" "github.com/matrix-org/dendrite/roomserver" "github.com/matrix-org/dendrite/syncapi" + "github.com/matrix-org/dendrite/typingserver" + "github.com/matrix-org/dendrite/typingserver/cache" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/sirupsen/logrus" @@ -56,7 +56,7 @@ func main() { keyRing := keydb.CreateKeyRing(federation.Client, keyDB) alias, input, query := roomserver.SetupRoomServerComponent(base) - typingInputAPI := typingserver.SetupTypingServerComponent(base) + typingInputAPI := typingserver.SetupTypingServerComponent(base, cache.NewTypingCache()) clientapi.SetupClientAPIComponent( base, deviceDB, accountDB, diff --git a/src/github.com/matrix-org/dendrite/common/config/config.go b/src/github.com/matrix-org/dendrite/common/config/config.go index f901e01f5..16e50aead 100644 --- a/src/github.com/matrix-org/dendrite/common/config/config.go +++ b/src/github.com/matrix-org/dendrite/common/config/config.go @@ -134,6 +134,8 @@ type Dendrite struct { OutputRoomEvent Topic `yaml:"output_room_event"` // Topic for sending account data from client API to sync API OutputClientData Topic `yaml:"output_client_data"` + // Topic for typingserver/api.OutputTypingEvent events. + OutputTypingEvent Topic `yaml:"output_typing_event"` // Topic for user updates (profile, presence) UserUpdates Topic `yaml:"user_updates"` } @@ -527,6 +529,7 @@ func (config *Dendrite) checkKafka(configErrs *configErrors, monolithic bool) { } checkNotEmpty(configErrs, "kafka.topics.output_room_event", string(config.Kafka.Topics.OutputRoomEvent)) checkNotEmpty(configErrs, "kafka.topics.output_client_data", string(config.Kafka.Topics.OutputClientData)) + checkNotEmpty(configErrs, "kafka.topics.output_typing_event", string(config.Kafka.Topics.OutputTypingEvent)) checkNotEmpty(configErrs, "kafka.topics.user_updates", string(config.Kafka.Topics.UserUpdates)) } diff --git a/src/github.com/matrix-org/dendrite/common/config/config_test.go b/src/github.com/matrix-org/dendrite/common/config/config_test.go index e91e03d6c..acc4dbd12 100644 --- a/src/github.com/matrix-org/dendrite/common/config/config_test.go +++ b/src/github.com/matrix-org/dendrite/common/config/config_test.go @@ -45,6 +45,7 @@ kafka: topics: output_room_event: output.room output_client_data: output.client + output_typing_event: output.typing user_updates: output.user database: media_api: "postgresql:///media_api" diff --git a/src/github.com/matrix-org/dendrite/common/test/config.go b/src/github.com/matrix-org/dendrite/common/test/config.go index 2c023b9ad..08a1b398f 100644 --- a/src/github.com/matrix-org/dendrite/common/test/config.go +++ b/src/github.com/matrix-org/dendrite/common/test/config.go @@ -83,6 +83,7 @@ func MakeConfig(configDir, kafkaURI, database, host string, startPort int) (*con // Make this configurable somehow? cfg.Kafka.Topics.OutputRoomEvent = "test.room.output" cfg.Kafka.Topics.OutputClientData = "test.clientapi.output" + cfg.Kafka.Topics.OutputTypingEvent = "test.typing.output" cfg.Kafka.Topics.UserUpdates = "test.user.output" // TODO: Use different databases for the different schemas. diff --git a/src/github.com/matrix-org/dendrite/typingserver/typingserver.go b/src/github.com/matrix-org/dendrite/typingserver/typingserver.go index d611d677d..6acb3cfb4 100644 --- a/src/github.com/matrix-org/dendrite/typingserver/typingserver.go +++ b/src/github.com/matrix-org/dendrite/typingserver/typingserver.go @@ -15,6 +15,8 @@ package typingserver import ( "github.com/matrix-org/dendrite/common/basecomponent" "github.com/matrix-org/dendrite/typingserver/api" + "github.com/matrix-org/dendrite/typingserver/cache" + "github.com/matrix-org/dendrite/typingserver/input" ) // SetupTypingServerComponent sets up and registers HTTP handlers for the @@ -23,7 +25,13 @@ import ( // APIs directly instead of having to use HTTP. func SetupTypingServerComponent( base *basecomponent.BaseDendrite, + typingCache *cache.TypingCache, ) api.TypingServerInputAPI { - // TODO: implement typing server - return base.CreateHTTPTypingServerAPIs() + inputAPI := &input.TypingServerInputAPI{ + Cache: typingCache, + Producer: base.KafkaProducer, + OutputTypingEventTopic: string(base.Cfg.Kafka.Topics.OutputTypingEvent), + } + + return inputAPI }