diff --git a/src/github.com/matrix-org/dendrite/clientapi/readers/profile.go b/src/github.com/matrix-org/dendrite/clientapi/readers/profile.go index 23e3f8634..cddd1ffc6 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/readers/profile.go +++ b/src/github.com/matrix-org/dendrite/clientapi/readers/profile.go @@ -15,6 +15,7 @@ package readers import ( + "encoding/json" "fmt" "net/http" "strings" @@ -22,8 +23,10 @@ import ( "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" "github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/jsonerror" - // "github.com/matrix-org/dendrite/clientapi/producers" + "github.com/matrix-org/dendrite/common/config" "github.com/matrix-org/util" + + sarama "gopkg.in/Shopify/sarama.v1" ) type profileResponse struct { @@ -39,6 +42,12 @@ type displayName struct { DisplayName string `json:"displayname"` } +type profileUpdate struct { + Updated string `json:"updated"` + OldValue string `json:"old_value"` + NewValue string `json:"new_value"` +} + // GetProfile implements GET /profile/{userID} func GetProfile( req *http.Request, accountDB *accounts.Database, userID string, @@ -85,6 +94,7 @@ func GetAvatarURL( // SetAvatarURL implements PUT /profile/{userID}/avatar_url func SetAvatarURL( req *http.Request, accountDB *accounts.Database, userID string, + cfg config.Dendrite, ) util.JSONResponse { var r avatarURL if resErr := httputil.UnmarshalJSONRequest(req, &r); resErr != nil { @@ -97,6 +107,10 @@ func SetAvatarURL( } } + if err := sendUpdate(userID, r.AvatarURL, "", accountDB, cfg); err != nil { + return httputil.LogThenError(req, err) + } + localpart := getLocalPart(userID) if err := accountDB.SetAvatarURL(localpart, r.AvatarURL); err != nil { return httputil.LogThenError(req, err) @@ -128,6 +142,7 @@ func GetDisplayName( // SetDisplayName implements PUT /profile/{userID}/displayname func SetDisplayName( req *http.Request, accountDB *accounts.Database, userID string, + cfg config.Dendrite, ) util.JSONResponse { var r displayName if resErr := httputil.UnmarshalJSONRequest(req, &r); resErr != nil { @@ -140,6 +155,10 @@ func SetDisplayName( } } + if err := sendUpdate(userID, "", r.DisplayName, accountDB, cfg); err != nil { + return httputil.LogThenError(req, err) + } + localpart := getLocalPart(userID) if err := accountDB.SetDisplayName(localpart, r.DisplayName); err != nil { return httputil.LogThenError(req, err) @@ -160,3 +179,53 @@ func getLocalPart(userID string) string { // Return the part after the "@" return strings.Split(username, "@")[1] } + +func sendUpdate(userID string, newAvatarURL string, newDisplayName string, + accountDB *accounts.Database, cfg config.Dendrite, +) error { + var update profileUpdate + var m sarama.ProducerMessage + + m.Topic = string(cfg.Kafka.Topics.UserUpdates) + m.Key = sarama.StringEncoder(userID) + + localpart := getLocalPart(userID) + profile, err := accountDB.GetProfileByLocalpart(localpart) + if err != nil { + return err + } + + // Determining if the changed value is the avatar URL or the display name + if len(newAvatarURL) > 0 { + update = profileUpdate{ + Updated: "avatar_url", + OldValue: profile.AvatarURL, + NewValue: newAvatarURL, + } + } else if len(newDisplayName) > 0 { + update = profileUpdate{ + Updated: "displayname", + OldValue: profile.DisplayName, + NewValue: newDisplayName, + } + } else { + panic(fmt.Errorf("No update to send")) + } + + value, err := json.Marshal(update) + if err != nil { + return err + } + m.Value = sarama.ByteEncoder(value) + + producer, err := sarama.NewSyncProducer(cfg.Kafka.Addresses, nil) + if err != nil { + return err + } + + if _, _, err := producer.SendMessage(&m); err != nil { + return err + } + + return nil +} diff --git a/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go b/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go index ec607b82a..dad7fdfee 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go +++ b/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go @@ -46,7 +46,6 @@ func Setup( deviceDB *devices.Database, federation *gomatrixserverlib.FederationClient, keyRing gomatrixserverlib.KeyRing, - userUpdatesProducer *producers.RoomserverProducer, ) { apiMux := mux.NewRouter() @@ -179,7 +178,7 @@ func Setup( r0mux.Handle("/profile/{userID}/avatar_url", common.MakeAuthAPI("profile_avatar_url", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse { vars := mux.Vars(req) - return readers.SetAvatarURL(req, accountDB, vars["userID"]) + return readers.SetAvatarURL(req, accountDB, vars["userID"], cfg) }), ).Methods("PUT", "OPTIONS") // Browsers use the OPTIONS HTTP method to check if the CORS policy allows @@ -195,7 +194,7 @@ func Setup( r0mux.Handle("/profile/{userID}/displayname", common.MakeAuthAPI("profile_displayname", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse { vars := mux.Vars(req) - return readers.SetDisplayName(req, accountDB, vars["userID"]) + return readers.SetDisplayName(req, accountDB, vars["userID"], cfg) }), ).Methods("PUT", "OPTIONS") // Browsers use the OPTIONS HTTP method to check if the CORS policy allows diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-client-api-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-client-api-server/main.go index c80dfdc84..51472b759 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-client-api-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-client-api-server/main.go @@ -53,9 +53,6 @@ func main() { roomserverProducer, err := producers.NewRoomserverProducer( cfg.Kafka.Addresses, string(cfg.Kafka.Topics.InputRoomEvent), ) - userUpdatesProducer, err := producers.NewRoomserverProducer( - cfg.Kafka.Addresses, string(cfg.Kafka.Topics.UserUpdates), - ) if err != nil { log.Panicf("Failed to setup kafka producers(%q): %s", cfg.Kafka.Addresses, err) } @@ -89,7 +86,7 @@ func main() { log.Info("Starting client API server on ", cfg.Listen.ClientAPI) routing.Setup( http.DefaultServeMux, http.DefaultClient, *cfg, roomserverProducer, - queryAPI, accountDB, deviceDB, federation, keyRing, userUpdatesProducer, + queryAPI, accountDB, deviceDB, federation, keyRing, ) log.Fatal(http.ListenAndServe(string(cfg.Listen.ClientAPI), nil)) }