From 355ab5eedf3636915bb77a53c6f164ca1e860eef Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Tue, 11 Jul 2017 14:14:06 +0100 Subject: [PATCH] Notify profile update (#153) * Profile retrieval * Saving avatar (without propagating it) * Saving display name (without propagating it) * Getters for display name and avatar URL * Doc'd * Introduced new Kafka topic and producer * Updated config with new kafka topic * Switched to samara producer and now sending messages * Doc'd * Put kafka update after the database insert * Doc'd profileUpdate structure * Removed unused parameter * Moved user updates producer to clientapi/producers --- dendrite-config.yaml | 1 + .../clientapi/producers/userupdate.go | 77 +++++++++++++++++++ .../dendrite/clientapi/readers/profile.go | 26 +++++++ .../dendrite/clientapi/routing/routing.go | 5 +- .../cmd/dendrite-client-api-server/main.go | 5 +- .../dendrite/common/config/config.go | 2 + 6 files changed, 113 insertions(+), 3 deletions(-) create mode 100644 src/github.com/matrix-org/dendrite/clientapi/producers/userupdate.go diff --git a/dendrite-config.yaml b/dendrite-config.yaml index 0fc8e8baa..0a4402fc2 100644 --- a/dendrite-config.yaml +++ b/dendrite-config.yaml @@ -56,6 +56,7 @@ kafka: topics: input_room_event: roomserverInput output_room_event: roomserverOutput + user_updates: userUpdates # The postgres connection configs for connecting to the databases e.g a postgres:// URI database: diff --git a/src/github.com/matrix-org/dendrite/clientapi/producers/userupdate.go b/src/github.com/matrix-org/dendrite/clientapi/producers/userupdate.go new file mode 100644 index 000000000..f76be0d75 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/clientapi/producers/userupdate.go @@ -0,0 +1,77 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package producers + +import ( + "encoding/json" + + sarama "gopkg.in/Shopify/sarama.v1" +) + +// UserUpdateProducer produces events related to user updates. +type UserUpdateProducer struct { + Topic string + Producer sarama.SyncProducer +} + +// TODO: Move this struct to `common` so the components that consume the topic +// can use it when parsing incoming messages +type profileUpdate struct { + Updated string `json:"updated"` // Which attribute is updated (can be either `avatar_url` or `displayname`) + OldValue string `json:"old_value"` // The attribute's value before the update + NewValue string `json:"new_value"` // The attribute's value after the update +} + +// NewUserUpdateProducer creates a new UserUpdateProducer +func NewUserUpdateProducer(kafkaURIs []string, topic string) (*UserUpdateProducer, error) { + producer, err := sarama.NewSyncProducer(kafkaURIs, nil) + if err != nil { + return nil, err + } + return &UserUpdateProducer{ + Topic: topic, + Producer: producer, + }, nil +} + +// SendUpdate sends an update using kafka to notify the roomserver of the +// profile update. Returns an error if the update failed to send. +func (p *UserUpdateProducer) SendUpdate( + userID string, updatedAttribute string, oldValue string, newValue string, +) error { + var update profileUpdate + var m sarama.ProducerMessage + + m.Topic = string(p.Topic) + m.Key = sarama.StringEncoder(userID) + + update = profileUpdate{ + Updated: updatedAttribute, + OldValue: oldValue, + NewValue: newValue, + } + + value, err := json.Marshal(update) + if err != nil { + return err + } + m.Value = sarama.ByteEncoder(value) + + if _, _, err := p.Producer.SendMessage(&m); err != nil { + return err + } + + return nil +} diff --git a/src/github.com/matrix-org/dendrite/clientapi/readers/profile.go b/src/github.com/matrix-org/dendrite/clientapi/readers/profile.go index 65fa9a062..dcdb14b44 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/readers/profile.go +++ b/src/github.com/matrix-org/dendrite/clientapi/readers/profile.go @@ -22,6 +22,8 @@ import ( "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" "github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/jsonerror" + "github.com/matrix-org/dendrite/clientapi/producers" + "github.com/matrix-org/util" ) @@ -84,6 +86,7 @@ func GetAvatarURL( // SetAvatarURL implements PUT /profile/{userID}/avatar_url func SetAvatarURL( req *http.Request, accountDB *accounts.Database, userID string, + producer *producers.UserUpdateProducer, ) util.JSONResponse { var r avatarURL if resErr := httputil.UnmarshalJSONRequest(req, &r); resErr != nil { @@ -97,9 +100,20 @@ func SetAvatarURL( } localpart := getLocalPart(userID) + + oldProfile, err := accountDB.GetProfileByLocalpart(localpart) + if err != nil { + return httputil.LogThenError(req, err) + } + if err := accountDB.SetAvatarURL(localpart, r.AvatarURL); err != nil { return httputil.LogThenError(req, err) } + + if err := producer.SendUpdate(userID, "avatar_url", oldProfile.AvatarURL, r.AvatarURL); err != nil { + return httputil.LogThenError(req, err) + } + return util.JSONResponse{ Code: 200, JSON: struct{}{}, @@ -127,6 +141,7 @@ func GetDisplayName( // SetDisplayName implements PUT /profile/{userID}/displayname func SetDisplayName( req *http.Request, accountDB *accounts.Database, userID string, + producer *producers.UserUpdateProducer, ) util.JSONResponse { var r displayName if resErr := httputil.UnmarshalJSONRequest(req, &r); resErr != nil { @@ -140,9 +155,20 @@ func SetDisplayName( } localpart := getLocalPart(userID) + + oldProfile, err := accountDB.GetProfileByLocalpart(localpart) + if err != nil { + return httputil.LogThenError(req, err) + } + if err := accountDB.SetDisplayName(localpart, r.DisplayName); err != nil { return httputil.LogThenError(req, err) } + + if err := producer.SendUpdate(userID, "displayname", oldProfile.DisplayName, r.DisplayName); err != nil { + return httputil.LogThenError(req, err) + } + return util.JSONResponse{ Code: 200, JSON: struct{}{}, diff --git a/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go b/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go index 3482344d3..4d5556d9f 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go +++ b/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go @@ -46,6 +46,7 @@ func Setup( deviceDB *devices.Database, federation *gomatrixserverlib.FederationClient, keyRing gomatrixserverlib.KeyRing, + userUpdateProducer *producers.UserUpdateProducer, ) { apiMux := mux.NewRouter() @@ -178,7 +179,7 @@ func Setup( r0mux.Handle("/profile/{userID}/avatar_url", common.MakeAuthAPI("profile_avatar_url", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse { vars := mux.Vars(req) - return readers.SetAvatarURL(req, accountDB, vars["userID"]) + return readers.SetAvatarURL(req, accountDB, vars["userID"], userUpdateProducer) }), ).Methods("PUT", "OPTIONS") // Browsers use the OPTIONS HTTP method to check if the CORS policy allows @@ -194,7 +195,7 @@ func Setup( r0mux.Handle("/profile/{userID}/displayname", common.MakeAuthAPI("profile_displayname", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse { vars := mux.Vars(req) - return readers.SetDisplayName(req, accountDB, vars["userID"]) + return readers.SetDisplayName(req, accountDB, vars["userID"], userUpdateProducer) }), ).Methods("PUT", "OPTIONS") // Browsers use the OPTIONS HTTP method to check if the CORS policy allows diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-client-api-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-client-api-server/main.go index 51472b759..a64cc9a07 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-client-api-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-client-api-server/main.go @@ -53,6 +53,9 @@ func main() { roomserverProducer, err := producers.NewRoomserverProducer( cfg.Kafka.Addresses, string(cfg.Kafka.Topics.InputRoomEvent), ) + userUpdateProducer, err := producers.NewUserUpdateProducer( + cfg.Kafka.Addresses, string(cfg.Kafka.Topics.UserUpdates), + ) if err != nil { log.Panicf("Failed to setup kafka producers(%q): %s", cfg.Kafka.Addresses, err) } @@ -86,7 +89,7 @@ func main() { log.Info("Starting client API server on ", cfg.Listen.ClientAPI) routing.Setup( http.DefaultServeMux, http.DefaultClient, *cfg, roomserverProducer, - queryAPI, accountDB, deviceDB, federation, keyRing, + queryAPI, accountDB, deviceDB, federation, keyRing, userUpdateProducer, ) log.Fatal(http.ListenAndServe(string(cfg.Listen.ClientAPI), nil)) } diff --git a/src/github.com/matrix-org/dendrite/common/config/config.go b/src/github.com/matrix-org/dendrite/common/config/config.go index 9a5a3dcc9..311312a9d 100644 --- a/src/github.com/matrix-org/dendrite/common/config/config.go +++ b/src/github.com/matrix-org/dendrite/common/config/config.go @@ -99,6 +99,8 @@ type Dendrite struct { InputRoomEvent Topic `yaml:"input_room_event"` // Topic for roomserver/api.OutputRoomEvent events. OutputRoomEvent Topic `yaml:"output_room_event"` + // Topic for user updates (profile, presence) + UserUpdates Topic `yaml:"user_updates"` } } `yaml:"kafka"`