From 588e642f5f69e1feadfadcb8caa49698e44bb8fb Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Tue, 11 Jul 2017 12:06:05 +0100 Subject: [PATCH] Moved user updates producer to clientapi/producers --- .../clientapi/producers/userupdate.go | 77 +++++++++++++++++++ .../dendrite/clientapi/readers/profile.go | 69 ++--------------- .../dendrite/clientapi/routing/routing.go | 5 +- .../cmd/dendrite-client-api-server/main.go | 5 +- 4 files changed, 90 insertions(+), 66 deletions(-) create mode 100644 src/github.com/matrix-org/dendrite/clientapi/producers/userupdate.go diff --git a/src/github.com/matrix-org/dendrite/clientapi/producers/userupdate.go b/src/github.com/matrix-org/dendrite/clientapi/producers/userupdate.go new file mode 100644 index 000000000..f76be0d75 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/clientapi/producers/userupdate.go @@ -0,0 +1,77 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package producers + +import ( + "encoding/json" + + sarama "gopkg.in/Shopify/sarama.v1" +) + +// UserUpdateProducer produces events related to user updates. +type UserUpdateProducer struct { + Topic string + Producer sarama.SyncProducer +} + +// TODO: Move this struct to `common` so the components that consume the topic +// can use it when parsing incoming messages +type profileUpdate struct { + Updated string `json:"updated"` // Which attribute is updated (can be either `avatar_url` or `displayname`) + OldValue string `json:"old_value"` // The attribute's value before the update + NewValue string `json:"new_value"` // The attribute's value after the update +} + +// NewUserUpdateProducer creates a new UserUpdateProducer +func NewUserUpdateProducer(kafkaURIs []string, topic string) (*UserUpdateProducer, error) { + producer, err := sarama.NewSyncProducer(kafkaURIs, nil) + if err != nil { + return nil, err + } + return &UserUpdateProducer{ + Topic: topic, + Producer: producer, + }, nil +} + +// SendUpdate sends an update using kafka to notify the roomserver of the +// profile update. Returns an error if the update failed to send. +func (p *UserUpdateProducer) SendUpdate( + userID string, updatedAttribute string, oldValue string, newValue string, +) error { + var update profileUpdate + var m sarama.ProducerMessage + + m.Topic = string(p.Topic) + m.Key = sarama.StringEncoder(userID) + + update = profileUpdate{ + Updated: updatedAttribute, + OldValue: oldValue, + NewValue: newValue, + } + + value, err := json.Marshal(update) + if err != nil { + return err + } + m.Value = sarama.ByteEncoder(value) + + if _, _, err := p.Producer.SendMessage(&m); err != nil { + return err + } + + return nil +} diff --git a/src/github.com/matrix-org/dendrite/clientapi/readers/profile.go b/src/github.com/matrix-org/dendrite/clientapi/readers/profile.go index 091531e66..dcdb14b44 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/readers/profile.go +++ b/src/github.com/matrix-org/dendrite/clientapi/readers/profile.go @@ -15,19 +15,16 @@ package readers import ( - "encoding/json" "fmt" "net/http" "strings" - "github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" "github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/jsonerror" - "github.com/matrix-org/dendrite/common/config" - "github.com/matrix-org/util" + "github.com/matrix-org/dendrite/clientapi/producers" - sarama "gopkg.in/Shopify/sarama.v1" + "github.com/matrix-org/util" ) type profileResponse struct { @@ -43,14 +40,6 @@ type displayName struct { DisplayName string `json:"displayname"` } -// TODO: Move this struct to `common` so the components that consume the topic -// can use it when parsing incoming messages -type profileUpdate struct { - Updated string `json:"updated"` // Which attribute is updated (can be either `avatar_url` or `displayname`) - OldValue string `json:"old_value"` // The attribute's value before the update - NewValue string `json:"new_value"` // The attribute's value after the update -} - // GetProfile implements GET /profile/{userID} func GetProfile( req *http.Request, accountDB *accounts.Database, userID string, @@ -97,7 +86,7 @@ func GetAvatarURL( // SetAvatarURL implements PUT /profile/{userID}/avatar_url func SetAvatarURL( req *http.Request, accountDB *accounts.Database, userID string, - cfg config.Dendrite, + producer *producers.UserUpdateProducer, ) util.JSONResponse { var r avatarURL if resErr := httputil.UnmarshalJSONRequest(req, &r); resErr != nil { @@ -121,7 +110,7 @@ func SetAvatarURL( return httputil.LogThenError(req, err) } - if err := sendUpdate(userID, oldProfile, r.AvatarURL, "", cfg); err != nil { + if err := producer.SendUpdate(userID, "avatar_url", oldProfile.AvatarURL, r.AvatarURL); err != nil { return httputil.LogThenError(req, err) } @@ -152,7 +141,7 @@ func GetDisplayName( // SetDisplayName implements PUT /profile/{userID}/displayname func SetDisplayName( req *http.Request, accountDB *accounts.Database, userID string, - cfg config.Dendrite, + producer *producers.UserUpdateProducer, ) util.JSONResponse { var r displayName if resErr := httputil.UnmarshalJSONRequest(req, &r); resErr != nil { @@ -176,7 +165,7 @@ func SetDisplayName( return httputil.LogThenError(req, err) } - if err := sendUpdate(userID, oldProfile, "", r.DisplayName, cfg); err != nil { + if err := producer.SendUpdate(userID, "displayname", oldProfile.DisplayName, r.DisplayName); err != nil { return httputil.LogThenError(req, err) } @@ -196,49 +185,3 @@ func getLocalPart(userID string) string { // Return the part after the "@" return strings.Split(username, "@")[1] } - -// Send an update using kafka to notify the roomserver of the profile update -// Returns an error if the update failed to send -func sendUpdate(userID string, oldProfile *authtypes.Profile, newAvatarURL string, - newDisplayName string, cfg config.Dendrite, -) error { - var update profileUpdate - var m sarama.ProducerMessage - - m.Topic = string(cfg.Kafka.Topics.UserUpdates) - m.Key = sarama.StringEncoder(userID) - - // Determining if the changed value is the avatar URL or the display name - if len(newAvatarURL) > 0 { - update = profileUpdate{ - Updated: "avatar_url", - OldValue: oldProfile.AvatarURL, - NewValue: newAvatarURL, - } - } else if len(newDisplayName) > 0 { - update = profileUpdate{ - Updated: "displayname", - OldValue: oldProfile.DisplayName, - NewValue: newDisplayName, - } - } else { - panic(fmt.Errorf("No update to send")) - } - - value, err := json.Marshal(update) - if err != nil { - return err - } - m.Value = sarama.ByteEncoder(value) - - producer, err := sarama.NewSyncProducer(cfg.Kafka.Addresses, nil) - if err != nil { - return err - } - - if _, _, err := producer.SendMessage(&m); err != nil { - return err - } - - return nil -} diff --git a/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go b/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go index dad7fdfee..4d5556d9f 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go +++ b/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go @@ -46,6 +46,7 @@ func Setup( deviceDB *devices.Database, federation *gomatrixserverlib.FederationClient, keyRing gomatrixserverlib.KeyRing, + userUpdateProducer *producers.UserUpdateProducer, ) { apiMux := mux.NewRouter() @@ -178,7 +179,7 @@ func Setup( r0mux.Handle("/profile/{userID}/avatar_url", common.MakeAuthAPI("profile_avatar_url", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse { vars := mux.Vars(req) - return readers.SetAvatarURL(req, accountDB, vars["userID"], cfg) + return readers.SetAvatarURL(req, accountDB, vars["userID"], userUpdateProducer) }), ).Methods("PUT", "OPTIONS") // Browsers use the OPTIONS HTTP method to check if the CORS policy allows @@ -194,7 +195,7 @@ func Setup( r0mux.Handle("/profile/{userID}/displayname", common.MakeAuthAPI("profile_displayname", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse { vars := mux.Vars(req) - return readers.SetDisplayName(req, accountDB, vars["userID"], cfg) + return readers.SetDisplayName(req, accountDB, vars["userID"], userUpdateProducer) }), ).Methods("PUT", "OPTIONS") // Browsers use the OPTIONS HTTP method to check if the CORS policy allows diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-client-api-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-client-api-server/main.go index 51472b759..a64cc9a07 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-client-api-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-client-api-server/main.go @@ -53,6 +53,9 @@ func main() { roomserverProducer, err := producers.NewRoomserverProducer( cfg.Kafka.Addresses, string(cfg.Kafka.Topics.InputRoomEvent), ) + userUpdateProducer, err := producers.NewUserUpdateProducer( + cfg.Kafka.Addresses, string(cfg.Kafka.Topics.UserUpdates), + ) if err != nil { log.Panicf("Failed to setup kafka producers(%q): %s", cfg.Kafka.Addresses, err) } @@ -86,7 +89,7 @@ func main() { log.Info("Starting client API server on ", cfg.Listen.ClientAPI) routing.Setup( http.DefaultServeMux, http.DefaultClient, *cfg, roomserverProducer, - queryAPI, accountDB, deviceDB, federation, keyRing, + queryAPI, accountDB, deviceDB, federation, keyRing, userUpdateProducer, ) log.Fatal(http.ListenAndServe(string(cfg.Listen.ClientAPI), nil)) }