mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-13 18:03:10 -06:00
Put kafka update after the database insert
This commit is contained in:
parent
0fee1035eb
commit
0443034722
|
|
@ -20,6 +20,7 @@ import (
|
||||||
"net/http"
|
"net/http"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
|
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
||||||
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
|
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
|
||||||
"github.com/matrix-org/dendrite/clientapi/httputil"
|
"github.com/matrix-org/dendrite/clientapi/httputil"
|
||||||
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||||
|
|
@ -107,14 +108,21 @@ func SetAvatarURL(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := sendUpdate(userID, r.AvatarURL, "", accountDB, cfg); err != nil {
|
localpart := getLocalPart(userID)
|
||||||
|
|
||||||
|
oldProfile, err := accountDB.GetProfileByLocalpart(localpart)
|
||||||
|
if err != nil {
|
||||||
return httputil.LogThenError(req, err)
|
return httputil.LogThenError(req, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
localpart := getLocalPart(userID)
|
|
||||||
if err := accountDB.SetAvatarURL(localpart, r.AvatarURL); err != nil {
|
if err := accountDB.SetAvatarURL(localpart, r.AvatarURL); err != nil {
|
||||||
return httputil.LogThenError(req, err)
|
return httputil.LogThenError(req, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := sendUpdate(userID, oldProfile, r.AvatarURL, "", accountDB, cfg); err != nil {
|
||||||
|
return httputil.LogThenError(req, err)
|
||||||
|
}
|
||||||
|
|
||||||
return util.JSONResponse{
|
return util.JSONResponse{
|
||||||
Code: 200,
|
Code: 200,
|
||||||
JSON: struct{}{},
|
JSON: struct{}{},
|
||||||
|
|
@ -155,14 +163,21 @@ func SetDisplayName(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := sendUpdate(userID, "", r.DisplayName, accountDB, cfg); err != nil {
|
localpart := getLocalPart(userID)
|
||||||
|
|
||||||
|
oldProfile, err := accountDB.GetProfileByLocalpart(localpart)
|
||||||
|
if err != nil {
|
||||||
return httputil.LogThenError(req, err)
|
return httputil.LogThenError(req, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
localpart := getLocalPart(userID)
|
|
||||||
if err := accountDB.SetDisplayName(localpart, r.DisplayName); err != nil {
|
if err := accountDB.SetDisplayName(localpart, r.DisplayName); err != nil {
|
||||||
return httputil.LogThenError(req, err)
|
return httputil.LogThenError(req, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := sendUpdate(userID, oldProfile, "", r.DisplayName, accountDB, cfg); err != nil {
|
||||||
|
return httputil.LogThenError(req, err)
|
||||||
|
}
|
||||||
|
|
||||||
return util.JSONResponse{
|
return util.JSONResponse{
|
||||||
Code: 200,
|
Code: 200,
|
||||||
JSON: struct{}{},
|
JSON: struct{}{},
|
||||||
|
|
@ -182,8 +197,8 @@ func getLocalPart(userID string) string {
|
||||||
|
|
||||||
// Send an update using kafka to notify the roomserver of the profile update
|
// Send an update using kafka to notify the roomserver of the profile update
|
||||||
// Returns an error if the update failed to send
|
// Returns an error if the update failed to send
|
||||||
func sendUpdate(userID string, newAvatarURL string, newDisplayName string,
|
func sendUpdate(userID string, oldProfile *authtypes.Profile, newAvatarURL string,
|
||||||
accountDB *accounts.Database, cfg config.Dendrite,
|
newDisplayName string, accountDB *accounts.Database, cfg config.Dendrite,
|
||||||
) error {
|
) error {
|
||||||
var update profileUpdate
|
var update profileUpdate
|
||||||
var m sarama.ProducerMessage
|
var m sarama.ProducerMessage
|
||||||
|
|
@ -191,23 +206,17 @@ func sendUpdate(userID string, newAvatarURL string, newDisplayName string,
|
||||||
m.Topic = string(cfg.Kafka.Topics.UserUpdates)
|
m.Topic = string(cfg.Kafka.Topics.UserUpdates)
|
||||||
m.Key = sarama.StringEncoder(userID)
|
m.Key = sarama.StringEncoder(userID)
|
||||||
|
|
||||||
localpart := getLocalPart(userID)
|
|
||||||
profile, err := accountDB.GetProfileByLocalpart(localpart)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Determining if the changed value is the avatar URL or the display name
|
// Determining if the changed value is the avatar URL or the display name
|
||||||
if len(newAvatarURL) > 0 {
|
if len(newAvatarURL) > 0 {
|
||||||
update = profileUpdate{
|
update = profileUpdate{
|
||||||
Updated: "avatar_url",
|
Updated: "avatar_url",
|
||||||
OldValue: profile.AvatarURL,
|
OldValue: oldProfile.AvatarURL,
|
||||||
NewValue: newAvatarURL,
|
NewValue: newAvatarURL,
|
||||||
}
|
}
|
||||||
} else if len(newDisplayName) > 0 {
|
} else if len(newDisplayName) > 0 {
|
||||||
update = profileUpdate{
|
update = profileUpdate{
|
||||||
Updated: "displayname",
|
Updated: "displayname",
|
||||||
OldValue: profile.DisplayName,
|
OldValue: oldProfile.DisplayName,
|
||||||
NewValue: newDisplayName,
|
NewValue: newDisplayName,
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue