Moved user updates producer to clientapi/producers

This commit is contained in:
Brendan Abolivier 2017-07-11 12:06:05 +01:00
parent 29ccf85dcb
commit 588e642f5f
No known key found for this signature in database
GPG key ID: 8EF1500759F70623
4 changed files with 90 additions and 66 deletions

View file

@ -0,0 +1,77 @@
// Copyright 2017 Vector Creations Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package producers
import (
"encoding/json"
sarama "gopkg.in/Shopify/sarama.v1"
)
// UserUpdateProducer produces events related to user updates.
type UserUpdateProducer struct {
Topic string
Producer sarama.SyncProducer
}
// TODO: Move this struct to `common` so the components that consume the topic
// can use it when parsing incoming messages
type profileUpdate struct {
Updated string `json:"updated"` // Which attribute is updated (can be either `avatar_url` or `displayname`)
OldValue string `json:"old_value"` // The attribute's value before the update
NewValue string `json:"new_value"` // The attribute's value after the update
}
// NewUserUpdateProducer creates a new UserUpdateProducer
func NewUserUpdateProducer(kafkaURIs []string, topic string) (*UserUpdateProducer, error) {
producer, err := sarama.NewSyncProducer(kafkaURIs, nil)
if err != nil {
return nil, err
}
return &UserUpdateProducer{
Topic: topic,
Producer: producer,
}, nil
}
// SendUpdate sends an update using kafka to notify the roomserver of the
// profile update. Returns an error if the update failed to send.
func (p *UserUpdateProducer) SendUpdate(
userID string, updatedAttribute string, oldValue string, newValue string,
) error {
var update profileUpdate
var m sarama.ProducerMessage
m.Topic = string(p.Topic)
m.Key = sarama.StringEncoder(userID)
update = profileUpdate{
Updated: updatedAttribute,
OldValue: oldValue,
NewValue: newValue,
}
value, err := json.Marshal(update)
if err != nil {
return err
}
m.Value = sarama.ByteEncoder(value)
if _, _, err := p.Producer.SendMessage(&m); err != nil {
return err
}
return nil
}

View file

@ -15,19 +15,16 @@
package readers
import (
"encoding/json"
"fmt"
"net/http"
"strings"
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/util"
"github.com/matrix-org/dendrite/clientapi/producers"
sarama "gopkg.in/Shopify/sarama.v1"
"github.com/matrix-org/util"
)
type profileResponse struct {
@ -43,14 +40,6 @@ type displayName struct {
DisplayName string `json:"displayname"`
}
// TODO: Move this struct to `common` so the components that consume the topic
// can use it when parsing incoming messages
type profileUpdate struct {
Updated string `json:"updated"` // Which attribute is updated (can be either `avatar_url` or `displayname`)
OldValue string `json:"old_value"` // The attribute's value before the update
NewValue string `json:"new_value"` // The attribute's value after the update
}
// GetProfile implements GET /profile/{userID}
func GetProfile(
req *http.Request, accountDB *accounts.Database, userID string,
@ -97,7 +86,7 @@ func GetAvatarURL(
// SetAvatarURL implements PUT /profile/{userID}/avatar_url
func SetAvatarURL(
req *http.Request, accountDB *accounts.Database, userID string,
cfg config.Dendrite,
producer *producers.UserUpdateProducer,
) util.JSONResponse {
var r avatarURL
if resErr := httputil.UnmarshalJSONRequest(req, &r); resErr != nil {
@ -121,7 +110,7 @@ func SetAvatarURL(
return httputil.LogThenError(req, err)
}
if err := sendUpdate(userID, oldProfile, r.AvatarURL, "", cfg); err != nil {
if err := producer.SendUpdate(userID, "avatar_url", oldProfile.AvatarURL, r.AvatarURL); err != nil {
return httputil.LogThenError(req, err)
}
@ -152,7 +141,7 @@ func GetDisplayName(
// SetDisplayName implements PUT /profile/{userID}/displayname
func SetDisplayName(
req *http.Request, accountDB *accounts.Database, userID string,
cfg config.Dendrite,
producer *producers.UserUpdateProducer,
) util.JSONResponse {
var r displayName
if resErr := httputil.UnmarshalJSONRequest(req, &r); resErr != nil {
@ -176,7 +165,7 @@ func SetDisplayName(
return httputil.LogThenError(req, err)
}
if err := sendUpdate(userID, oldProfile, "", r.DisplayName, cfg); err != nil {
if err := producer.SendUpdate(userID, "displayname", oldProfile.DisplayName, r.DisplayName); err != nil {
return httputil.LogThenError(req, err)
}
@ -196,49 +185,3 @@ func getLocalPart(userID string) string {
// Return the part after the "@"
return strings.Split(username, "@")[1]
}
// Send an update using kafka to notify the roomserver of the profile update
// Returns an error if the update failed to send
func sendUpdate(userID string, oldProfile *authtypes.Profile, newAvatarURL string,
newDisplayName string, cfg config.Dendrite,
) error {
var update profileUpdate
var m sarama.ProducerMessage
m.Topic = string(cfg.Kafka.Topics.UserUpdates)
m.Key = sarama.StringEncoder(userID)
// Determining if the changed value is the avatar URL or the display name
if len(newAvatarURL) > 0 {
update = profileUpdate{
Updated: "avatar_url",
OldValue: oldProfile.AvatarURL,
NewValue: newAvatarURL,
}
} else if len(newDisplayName) > 0 {
update = profileUpdate{
Updated: "displayname",
OldValue: oldProfile.DisplayName,
NewValue: newDisplayName,
}
} else {
panic(fmt.Errorf("No update to send"))
}
value, err := json.Marshal(update)
if err != nil {
return err
}
m.Value = sarama.ByteEncoder(value)
producer, err := sarama.NewSyncProducer(cfg.Kafka.Addresses, nil)
if err != nil {
return err
}
if _, _, err := producer.SendMessage(&m); err != nil {
return err
}
return nil
}

View file

@ -46,6 +46,7 @@ func Setup(
deviceDB *devices.Database,
federation *gomatrixserverlib.FederationClient,
keyRing gomatrixserverlib.KeyRing,
userUpdateProducer *producers.UserUpdateProducer,
) {
apiMux := mux.NewRouter()
@ -178,7 +179,7 @@ func Setup(
r0mux.Handle("/profile/{userID}/avatar_url",
common.MakeAuthAPI("profile_avatar_url", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
vars := mux.Vars(req)
return readers.SetAvatarURL(req, accountDB, vars["userID"], cfg)
return readers.SetAvatarURL(req, accountDB, vars["userID"], userUpdateProducer)
}),
).Methods("PUT", "OPTIONS")
// Browsers use the OPTIONS HTTP method to check if the CORS policy allows
@ -194,7 +195,7 @@ func Setup(
r0mux.Handle("/profile/{userID}/displayname",
common.MakeAuthAPI("profile_displayname", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
vars := mux.Vars(req)
return readers.SetDisplayName(req, accountDB, vars["userID"], cfg)
return readers.SetDisplayName(req, accountDB, vars["userID"], userUpdateProducer)
}),
).Methods("PUT", "OPTIONS")
// Browsers use the OPTIONS HTTP method to check if the CORS policy allows

View file

@ -53,6 +53,9 @@ func main() {
roomserverProducer, err := producers.NewRoomserverProducer(
cfg.Kafka.Addresses, string(cfg.Kafka.Topics.InputRoomEvent),
)
userUpdateProducer, err := producers.NewUserUpdateProducer(
cfg.Kafka.Addresses, string(cfg.Kafka.Topics.UserUpdates),
)
if err != nil {
log.Panicf("Failed to setup kafka producers(%q): %s", cfg.Kafka.Addresses, err)
}
@ -86,7 +89,7 @@ func main() {
log.Info("Starting client API server on ", cfg.Listen.ClientAPI)
routing.Setup(
http.DefaultServeMux, http.DefaultClient, *cfg, roomserverProducer,
queryAPI, accountDB, deviceDB, federation, keyRing,
queryAPI, accountDB, deviceDB, federation, keyRing, userUpdateProducer,
)
log.Fatal(http.ListenAndServe(string(cfg.Listen.ClientAPI), nil))
}