mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-10 16:33:11 -06:00
Switched to samara producer and now sending messages
This commit is contained in:
parent
9684fca51f
commit
bc17740d9c
|
|
@ -15,6 +15,7 @@
|
|||
package readers
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
|
@ -22,8 +23,10 @@ import (
|
|||
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
|
||||
"github.com/matrix-org/dendrite/clientapi/httputil"
|
||||
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||
// "github.com/matrix-org/dendrite/clientapi/producers"
|
||||
"github.com/matrix-org/dendrite/common/config"
|
||||
"github.com/matrix-org/util"
|
||||
|
||||
sarama "gopkg.in/Shopify/sarama.v1"
|
||||
)
|
||||
|
||||
type profileResponse struct {
|
||||
|
|
@ -39,6 +42,12 @@ type displayName struct {
|
|||
DisplayName string `json:"displayname"`
|
||||
}
|
||||
|
||||
type profileUpdate struct {
|
||||
Updated string `json:"updated"`
|
||||
OldValue string `json:"old_value"`
|
||||
NewValue string `json:"new_value"`
|
||||
}
|
||||
|
||||
// GetProfile implements GET /profile/{userID}
|
||||
func GetProfile(
|
||||
req *http.Request, accountDB *accounts.Database, userID string,
|
||||
|
|
@ -85,6 +94,7 @@ func GetAvatarURL(
|
|||
// SetAvatarURL implements PUT /profile/{userID}/avatar_url
|
||||
func SetAvatarURL(
|
||||
req *http.Request, accountDB *accounts.Database, userID string,
|
||||
cfg config.Dendrite,
|
||||
) util.JSONResponse {
|
||||
var r avatarURL
|
||||
if resErr := httputil.UnmarshalJSONRequest(req, &r); resErr != nil {
|
||||
|
|
@ -97,6 +107,10 @@ func SetAvatarURL(
|
|||
}
|
||||
}
|
||||
|
||||
if err := sendUpdate(userID, r.AvatarURL, "", accountDB, cfg); err != nil {
|
||||
return httputil.LogThenError(req, err)
|
||||
}
|
||||
|
||||
localpart := getLocalPart(userID)
|
||||
if err := accountDB.SetAvatarURL(localpart, r.AvatarURL); err != nil {
|
||||
return httputil.LogThenError(req, err)
|
||||
|
|
@ -128,6 +142,7 @@ func GetDisplayName(
|
|||
// SetDisplayName implements PUT /profile/{userID}/displayname
|
||||
func SetDisplayName(
|
||||
req *http.Request, accountDB *accounts.Database, userID string,
|
||||
cfg config.Dendrite,
|
||||
) util.JSONResponse {
|
||||
var r displayName
|
||||
if resErr := httputil.UnmarshalJSONRequest(req, &r); resErr != nil {
|
||||
|
|
@ -140,6 +155,10 @@ func SetDisplayName(
|
|||
}
|
||||
}
|
||||
|
||||
if err := sendUpdate(userID, "", r.DisplayName, accountDB, cfg); err != nil {
|
||||
return httputil.LogThenError(req, err)
|
||||
}
|
||||
|
||||
localpart := getLocalPart(userID)
|
||||
if err := accountDB.SetDisplayName(localpart, r.DisplayName); err != nil {
|
||||
return httputil.LogThenError(req, err)
|
||||
|
|
@ -160,3 +179,53 @@ func getLocalPart(userID string) string {
|
|||
// Return the part after the "@"
|
||||
return strings.Split(username, "@")[1]
|
||||
}
|
||||
|
||||
func sendUpdate(userID string, newAvatarURL string, newDisplayName string,
|
||||
accountDB *accounts.Database, cfg config.Dendrite,
|
||||
) error {
|
||||
var update profileUpdate
|
||||
var m sarama.ProducerMessage
|
||||
|
||||
m.Topic = string(cfg.Kafka.Topics.UserUpdates)
|
||||
m.Key = sarama.StringEncoder(userID)
|
||||
|
||||
localpart := getLocalPart(userID)
|
||||
profile, err := accountDB.GetProfileByLocalpart(localpart)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Determining if the changed value is the avatar URL or the display name
|
||||
if len(newAvatarURL) > 0 {
|
||||
update = profileUpdate{
|
||||
Updated: "avatar_url",
|
||||
OldValue: profile.AvatarURL,
|
||||
NewValue: newAvatarURL,
|
||||
}
|
||||
} else if len(newDisplayName) > 0 {
|
||||
update = profileUpdate{
|
||||
Updated: "displayname",
|
||||
OldValue: profile.DisplayName,
|
||||
NewValue: newDisplayName,
|
||||
}
|
||||
} else {
|
||||
panic(fmt.Errorf("No update to send"))
|
||||
}
|
||||
|
||||
value, err := json.Marshal(update)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
m.Value = sarama.ByteEncoder(value)
|
||||
|
||||
producer, err := sarama.NewSyncProducer(cfg.Kafka.Addresses, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, _, err := producer.SendMessage(&m); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -46,7 +46,6 @@ func Setup(
|
|||
deviceDB *devices.Database,
|
||||
federation *gomatrixserverlib.FederationClient,
|
||||
keyRing gomatrixserverlib.KeyRing,
|
||||
userUpdatesProducer *producers.RoomserverProducer,
|
||||
) {
|
||||
apiMux := mux.NewRouter()
|
||||
|
||||
|
|
@ -179,7 +178,7 @@ func Setup(
|
|||
r0mux.Handle("/profile/{userID}/avatar_url",
|
||||
common.MakeAuthAPI("profile_avatar_url", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
|
||||
vars := mux.Vars(req)
|
||||
return readers.SetAvatarURL(req, accountDB, vars["userID"])
|
||||
return readers.SetAvatarURL(req, accountDB, vars["userID"], cfg)
|
||||
}),
|
||||
).Methods("PUT", "OPTIONS")
|
||||
// Browsers use the OPTIONS HTTP method to check if the CORS policy allows
|
||||
|
|
@ -195,7 +194,7 @@ func Setup(
|
|||
r0mux.Handle("/profile/{userID}/displayname",
|
||||
common.MakeAuthAPI("profile_displayname", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
|
||||
vars := mux.Vars(req)
|
||||
return readers.SetDisplayName(req, accountDB, vars["userID"])
|
||||
return readers.SetDisplayName(req, accountDB, vars["userID"], cfg)
|
||||
}),
|
||||
).Methods("PUT", "OPTIONS")
|
||||
// Browsers use the OPTIONS HTTP method to check if the CORS policy allows
|
||||
|
|
|
|||
|
|
@ -53,9 +53,6 @@ func main() {
|
|||
roomserverProducer, err := producers.NewRoomserverProducer(
|
||||
cfg.Kafka.Addresses, string(cfg.Kafka.Topics.InputRoomEvent),
|
||||
)
|
||||
userUpdatesProducer, err := producers.NewRoomserverProducer(
|
||||
cfg.Kafka.Addresses, string(cfg.Kafka.Topics.UserUpdates),
|
||||
)
|
||||
if err != nil {
|
||||
log.Panicf("Failed to setup kafka producers(%q): %s", cfg.Kafka.Addresses, err)
|
||||
}
|
||||
|
|
@ -89,7 +86,7 @@ func main() {
|
|||
log.Info("Starting client API server on ", cfg.Listen.ClientAPI)
|
||||
routing.Setup(
|
||||
http.DefaultServeMux, http.DefaultClient, *cfg, roomserverProducer,
|
||||
queryAPI, accountDB, deviceDB, federation, keyRing, userUpdatesProducer,
|
||||
queryAPI, accountDB, deviceDB, federation, keyRing,
|
||||
)
|
||||
log.Fatal(http.ListenAndServe(string(cfg.Listen.ClientAPI), nil))
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue