From 6a320d6ab6105f1030d42b974b0cc3d250adaa56 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Mon, 10 Jul 2017 11:08:48 +0100 Subject: [PATCH] Introduced new Kafka topic and producer --- .../matrix-org/dendrite/clientapi/readers/profile.go | 4 +++- .../matrix-org/dendrite/clientapi/routing/routing.go | 5 +++-- .../dendrite/cmd/dendrite-client-api-server/main.go | 5 ++++- src/github.com/matrix-org/dendrite/common/config/config.go | 2 ++ 4 files changed, 12 insertions(+), 4 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/clientapi/readers/profile.go b/src/github.com/matrix-org/dendrite/clientapi/readers/profile.go index 65e4dca10..57093e9df 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/readers/profile.go +++ b/src/github.com/matrix-org/dendrite/clientapi/readers/profile.go @@ -22,7 +22,7 @@ import ( "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" "github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/jsonerror" - // "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/util" ) @@ -70,6 +70,7 @@ func GetProfile( // AvatarURL implements GET and PUT /profile/{userID}/avatar_url func AvatarURL( req *http.Request, accountDB *accounts.Database, userID string, + userUpdatesProducer *producers.RoomserverProducer, ) util.JSONResponse { if req.Method == "GET" { localpart := getLocalPart(userID) @@ -120,6 +121,7 @@ func AvatarURL( // DisplayName implements GET and PUT /profile/{userID}/displayname func DisplayName( req *http.Request, accountDB *accounts.Database, userID string, + userUpdatesProducer *producers.RoomserverProducer, ) util.JSONResponse { if req.Method == "GET" { localpart := getLocalPart(userID) diff --git a/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go b/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go index ad41d798d..dd88258c5 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go +++ b/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go @@ -46,6 +46,7 @@ func Setup( deviceDB *devices.Database, federation *gomatrixserverlib.FederationClient, keyRing gomatrixserverlib.KeyRing, + userUpdatesProducer *producers.RoomserverProducer, ) { apiMux := mux.NewRouter() @@ -171,14 +172,14 @@ func Setup( r0mux.Handle("/profile/{userID}/avatar_url", common.MakeAPI("profile_avatar_url", func(req *http.Request) util.JSONResponse { vars := mux.Vars(req) - return readers.AvatarURL(req, accountDB, vars["userID"]) + return readers.AvatarURL(req, accountDB, vars["userID"], userUpdatesProducer) }), ) r0mux.Handle("/profile/{userID}/displayname", common.MakeAPI("profile_displayname", func(req *http.Request) util.JSONResponse { vars := mux.Vars(req) - return readers.DisplayName(req, accountDB, vars["userID"]) + return readers.DisplayName(req, accountDB, vars["userID"], userUpdatesProducer) }), ) diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-client-api-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-client-api-server/main.go index 51472b759..c80dfdc84 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-client-api-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-client-api-server/main.go @@ -53,6 +53,9 @@ func main() { roomserverProducer, err := producers.NewRoomserverProducer( cfg.Kafka.Addresses, string(cfg.Kafka.Topics.InputRoomEvent), ) + userUpdatesProducer, err := producers.NewRoomserverProducer( + cfg.Kafka.Addresses, string(cfg.Kafka.Topics.UserUpdates), + ) if err != nil { log.Panicf("Failed to setup kafka producers(%q): %s", cfg.Kafka.Addresses, err) } @@ -86,7 +89,7 @@ func main() { log.Info("Starting client API server on ", cfg.Listen.ClientAPI) routing.Setup( http.DefaultServeMux, http.DefaultClient, *cfg, roomserverProducer, - queryAPI, accountDB, deviceDB, federation, keyRing, + queryAPI, accountDB, deviceDB, federation, keyRing, userUpdatesProducer, ) log.Fatal(http.ListenAndServe(string(cfg.Listen.ClientAPI), nil)) } diff --git a/src/github.com/matrix-org/dendrite/common/config/config.go b/src/github.com/matrix-org/dendrite/common/config/config.go index 9a5a3dcc9..311312a9d 100644 --- a/src/github.com/matrix-org/dendrite/common/config/config.go +++ b/src/github.com/matrix-org/dendrite/common/config/config.go @@ -99,6 +99,8 @@ type Dendrite struct { InputRoomEvent Topic `yaml:"input_room_event"` // Topic for roomserver/api.OutputRoomEvent events. OutputRoomEvent Topic `yaml:"output_room_event"` + // Topic for user updates (profile, presence) + UserUpdates Topic `yaml:"user_updates"` } } `yaml:"kafka"`