Propagate profile update through rooms (#163)

* Use gomatrixserverlib function to split user ID

* Propagate profile update via m.room.member events

* Send profile data on room join

* Send profile data on room creation

* Rename variable

* Move membership update to roomserver consumer

* Improve iteration

* Move event update from client API server to sync API server

* Change the way buildMembershipEvents is called

* Forbid update of someone else's profile

* Use gomatrixserverlib method

* Fix depth and previous events not being set

* Fix wrong removal in latest commit

* Update all events instead of only memberships

* Handle case where there is no state key

* Fix test
This commit is contained in:
Brendan Abolivier 2017-07-25 16:10:59 +01:00 committed by Mark Haines
parent a904380e1b
commit 6d073dcf9f
13 changed files with 373 additions and 35 deletions

View file

@ -0,0 +1,23 @@
// Copyright 2017 Vector Creations Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package authtypes
// Membership represents the relationship between a user and a room they're a
// member of
type Membership struct {
Localpart string
RoomID string
EventID string
}

View file

@ -18,6 +18,7 @@ import (
"database/sql"
"github.com/lib/pq"
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
)
const membershipSchema = `
@ -38,23 +39,29 @@ CREATE TABLE IF NOT EXISTS memberships (
CREATE UNIQUE INDEX IF NOT EXISTS membership_event_id ON memberships(event_id);
`
const insertMembershipSQL = "" +
"INSERT INTO memberships(localpart, room_id, event_id) VALUES ($1, $2, $3)"
const insertMembershipSQL = `
INSERT INTO memberships(localpart, room_id, event_id) VALUES ($1, $2, $3)
ON CONFLICT (localpart, room_id) DO UPDATE SET event_id = EXCLUDED.event_id
`
const selectMembershipSQL = "" +
"SELECT * from memberships WHERE localpart = $1 AND room_id = $2"
const selectMembershipsByLocalpartSQL = "" +
"SELECT room_id FROM memberships WHERE localpart = $1"
"SELECT room_id, event_id FROM memberships WHERE localpart = $1"
const deleteMembershipsByEventIDsSQL = "" +
"DELETE FROM memberships WHERE event_id = ANY($1)"
const updateMembershipByEventIDSQL = "" +
"UPDATE memberships SET event_id = $2 WHERE event_id = $1"
type membershipStatements struct {
deleteMembershipsByEventIDsStmt *sql.Stmt
insertMembershipStmt *sql.Stmt
selectMembershipByEventIDStmt *sql.Stmt
selectMembershipsByLocalpartStmt *sql.Stmt
updateMembershipByEventIDStmt *sql.Stmt
}
func (s *membershipStatements) prepare(db *sql.DB) (err error) {
@ -71,6 +78,9 @@ func (s *membershipStatements) prepare(db *sql.DB) (err error) {
if s.selectMembershipsByLocalpartStmt, err = db.Prepare(selectMembershipsByLocalpartSQL); err != nil {
return
}
if s.updateMembershipByEventIDStmt, err = db.Prepare(updateMembershipByEventIDSQL); err != nil {
return
}
return
}
@ -83,3 +93,29 @@ func (s *membershipStatements) deleteMembershipsByEventIDs(eventIDs []string, tx
_, err = txn.Stmt(s.deleteMembershipsByEventIDsStmt).Exec(pq.StringArray(eventIDs))
return
}
func (s *membershipStatements) selectMembershipsByLocalpart(localpart string) (memberships []authtypes.Membership, err error) {
rows, err := s.selectMembershipsByLocalpartStmt.Query(localpart)
if err != nil {
return
}
memberships = []authtypes.Membership{}
defer rows.Close()
for rows.Next() {
var m authtypes.Membership
m.Localpart = localpart
if err := rows.Scan(&m.RoomID, &m.EventID); err != nil {
return nil, err
}
memberships = append(memberships, m)
}
return
}
func (s *membershipStatements) updateMembershipByEventID(oldEventID string, newEventID string) (err error) {
_, err = s.updateMembershipByEventIDStmt.Exec(oldEventID, newEventID)
return
}

View file

@ -151,6 +151,21 @@ func (d *Database) UpdateMemberships(eventsToAdd []gomatrixserverlib.Event, idsT
})
}
// GetMembershipsByLocalpart returns an array containing the IDs of all the rooms
// a user matching a given localpart is a member of
// If no membership match the given localpart, returns an empty array
// If there was an issue during the retrieval, returns the SQL error
func (d *Database) GetMembershipsByLocalpart(localpart string) (memberships []authtypes.Membership, err error) {
return d.memberships.selectMembershipsByLocalpart(localpart)
}
// UpdateMembership update the "join" membership event ID of a membership.
// This is useful in case of membership upgrade (e.g. profile update)
// If there was an issue during the update, returns the SQL error
func (d *Database) UpdateMembership(oldEventID string, newEventID string) error {
return d.memberships.updateMembershipByEventID(oldEventID, newEventID)
}
// newMembership will save a new membership in the database if the given state
// event is a "join" membership event
// If the event isn't a "join" membership event, does nothing

View file

@ -107,6 +107,11 @@ func (s *OutputRoomEvent) lookupStateEvents(
) ([]gomatrixserverlib.Event, error) {
// Fast path if there aren't any new state events.
if len(addsStateEventIDs) == 0 {
// If the event is a membership update (e.g. for a profile update), it won't
// show up in AddsStateEventIDs, so we need to add it manually
if event.Type() == "m.room.member" {
return []gomatrixserverlib.Event{event}, nil
}
return nil, nil
}

View file

@ -21,6 +21,7 @@ import (
"github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
)
@ -35,7 +36,11 @@ func Logout(
}
}
localpart := getLocalPart(device.UserID)
localpart, _, err := gomatrixserverlib.SplitID('@', device.UserID)
if err != nil {
return httputil.LogThenError(req, err)
}
if err := deviceDB.RemoveDevice(device.ID, localpart); err != nil {
return httputil.LogThenError(req, err)
}

View file

@ -17,12 +17,17 @@ package readers
import (
"fmt"
"net/http"
"strings"
"time"
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
"github.com/matrix-org/dendrite/clientapi/events"
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
)
@ -50,7 +55,11 @@ func GetProfile(
JSON: jsonerror.NotFound("Bad method"),
}
}
localpart := getLocalPart(userID)
localpart, _, err := gomatrixserverlib.SplitID('@', userID)
if err != nil {
return httputil.LogThenError(req, err)
}
profile, err := accountDB.GetProfileByLocalpart(localpart)
if err != nil {
return httputil.LogThenError(req, err)
@ -69,7 +78,11 @@ func GetProfile(
func GetAvatarURL(
req *http.Request, accountDB *accounts.Database, userID string,
) util.JSONResponse {
localpart := getLocalPart(userID)
localpart, _, err := gomatrixserverlib.SplitID('@', userID)
if err != nil {
return httputil.LogThenError(req, err)
}
profile, err := accountDB.GetProfileByLocalpart(localpart)
if err != nil {
return httputil.LogThenError(req, err)
@ -85,9 +98,19 @@ func GetAvatarURL(
// SetAvatarURL implements PUT /profile/{userID}/avatar_url
func SetAvatarURL(
req *http.Request, accountDB *accounts.Database, userID string,
producer *producers.UserUpdateProducer,
req *http.Request, accountDB *accounts.Database, device *authtypes.Device,
userID string, producer *producers.UserUpdateProducer, cfg *config.Dendrite,
rsProducer *producers.RoomserverProducer, queryAPI api.RoomserverQueryAPI,
) util.JSONResponse {
if userID != device.UserID {
return util.JSONResponse{
Code: 403,
JSON: jsonerror.Forbidden("userID does not match the current user"),
}
}
changedKey := "avatar_url"
var r avatarURL
if resErr := httputil.UnmarshalJSONRequest(req, &r); resErr != nil {
return *resErr
@ -99,18 +122,41 @@ func SetAvatarURL(
}
}
localpart := getLocalPart(userID)
localpart, _, err := gomatrixserverlib.SplitID('@', userID)
if err != nil {
return httputil.LogThenError(req, err)
}
oldProfile, err := accountDB.GetProfileByLocalpart(localpart)
if err != nil {
return httputil.LogThenError(req, err)
}
if err := accountDB.SetAvatarURL(localpart, r.AvatarURL); err != nil {
if err = accountDB.SetAvatarURL(localpart, r.AvatarURL); err != nil {
return httputil.LogThenError(req, err)
}
if err := producer.SendUpdate(userID, "avatar_url", oldProfile.AvatarURL, r.AvatarURL); err != nil {
memberships, err := accountDB.GetMembershipsByLocalpart(localpart)
if err != nil {
return httputil.LogThenError(req, err)
}
newProfile := authtypes.Profile{
Localpart: localpart,
DisplayName: oldProfile.DisplayName,
AvatarURL: r.AvatarURL,
}
events, err := buildMembershipEvents(memberships, accountDB, newProfile, userID, cfg, queryAPI)
if err != nil {
return httputil.LogThenError(req, err)
}
if err := rsProducer.SendEvents(events, cfg.Matrix.ServerName); err != nil {
return httputil.LogThenError(req, err)
}
if err := producer.SendUpdate(userID, changedKey, oldProfile.AvatarURL, r.AvatarURL); err != nil {
return httputil.LogThenError(req, err)
}
@ -124,7 +170,11 @@ func SetAvatarURL(
func GetDisplayName(
req *http.Request, accountDB *accounts.Database, userID string,
) util.JSONResponse {
localpart := getLocalPart(userID)
localpart, _, err := gomatrixserverlib.SplitID('@', userID)
if err != nil {
return httputil.LogThenError(req, err)
}
profile, err := accountDB.GetProfileByLocalpart(localpart)
if err != nil {
return httputil.LogThenError(req, err)
@ -140,9 +190,19 @@ func GetDisplayName(
// SetDisplayName implements PUT /profile/{userID}/displayname
func SetDisplayName(
req *http.Request, accountDB *accounts.Database, userID string,
producer *producers.UserUpdateProducer,
req *http.Request, accountDB *accounts.Database, device *authtypes.Device,
userID string, producer *producers.UserUpdateProducer, cfg *config.Dendrite,
rsProducer *producers.RoomserverProducer, queryAPI api.RoomserverQueryAPI,
) util.JSONResponse {
if userID != device.UserID {
return util.JSONResponse{
Code: 403,
JSON: jsonerror.Forbidden("userID does not match the current user"),
}
}
changedKey := "displayname"
var r displayName
if resErr := httputil.UnmarshalJSONRequest(req, &r); resErr != nil {
return *resErr
@ -154,18 +214,41 @@ func SetDisplayName(
}
}
localpart := getLocalPart(userID)
localpart, _, err := gomatrixserverlib.SplitID('@', userID)
if err != nil {
return httputil.LogThenError(req, err)
}
oldProfile, err := accountDB.GetProfileByLocalpart(localpart)
if err != nil {
return httputil.LogThenError(req, err)
}
if err := accountDB.SetDisplayName(localpart, r.DisplayName); err != nil {
if err = accountDB.SetDisplayName(localpart, r.DisplayName); err != nil {
return httputil.LogThenError(req, err)
}
if err := producer.SendUpdate(userID, "displayname", oldProfile.DisplayName, r.DisplayName); err != nil {
memberships, err := accountDB.GetMembershipsByLocalpart(localpart)
if err != nil {
return httputil.LogThenError(req, err)
}
newProfile := authtypes.Profile{
Localpart: localpart,
DisplayName: r.DisplayName,
AvatarURL: oldProfile.AvatarURL,
}
events, err := buildMembershipEvents(memberships, accountDB, newProfile, userID, cfg, queryAPI)
if err != nil {
return httputil.LogThenError(req, err)
}
if err := rsProducer.SendEvents(events, cfg.Matrix.ServerName); err != nil {
return httputil.LogThenError(req, err)
}
if err := producer.SendUpdate(userID, changedKey, oldProfile.DisplayName, r.DisplayName); err != nil {
return httputil.LogThenError(req, err)
}
@ -175,13 +258,71 @@ func SetDisplayName(
}
}
func getLocalPart(userID string) string {
if !strings.HasPrefix(userID, "@") {
panic(fmt.Errorf("Invalid user ID"))
func buildMembershipEvents(
memberships []authtypes.Membership, db *accounts.Database,
newProfile authtypes.Profile, userID string, cfg *config.Dendrite,
queryAPI api.RoomserverQueryAPI,
) ([]gomatrixserverlib.Event, error) {
evs := []gomatrixserverlib.Event{}
for _, membership := range memberships {
builder := gomatrixserverlib.EventBuilder{
Sender: userID,
RoomID: membership.RoomID,
Type: "m.room.member",
StateKey: &userID,
}
// Get the part before ":"
username := strings.Split(userID, ":")[0]
// Return the part after the "@"
return strings.Split(username, "@")[1]
content := events.MemberContent{
Membership: "join",
}
content.DisplayName = newProfile.DisplayName
content.AvatarURL = newProfile.AvatarURL
if err := builder.SetContent(content); err != nil {
return nil, err
}
eventsNeeded, err := gomatrixserverlib.StateNeededForEventBuilder(&builder)
if err != nil {
return nil, err
}
// Ask the roomserver for information about this room
queryReq := api.QueryLatestEventsAndStateRequest{
RoomID: membership.RoomID,
StateToFetch: eventsNeeded.Tuples(),
}
var queryRes api.QueryLatestEventsAndStateResponse
if queryErr := queryAPI.QueryLatestEventsAndState(&queryReq, &queryRes); queryErr != nil {
return nil, err
}
builder.Depth = queryRes.Depth
builder.PrevEvents = queryRes.LatestEvents
authEvents := gomatrixserverlib.NewAuthEvents(nil)
for i := range queryRes.StateEvents {
authEvents.AddEvent(&queryRes.StateEvents[i])
}
refs, err := eventsNeeded.AuthEventReferences(&authEvents)
if err != nil {
return nil, err
}
builder.AuthEvents = refs
eventID := fmt.Sprintf("$%s:%s", util.RandomString(16), cfg.Matrix.ServerName)
now := time.Now()
event, err := builder.Build(eventID, now, cfg.Matrix.ServerName, cfg.Matrix.KeyID, cfg.Matrix.PrivateKey)
if err != nil {
return nil, err
}
evs = append(evs, event)
}
return evs, nil
}

View file

@ -70,14 +70,14 @@ func Setup(
r0mux.Handle("/createRoom",
common.MakeAuthAPI("createRoom", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
return writers.CreateRoom(req, device, cfg, producer)
return writers.CreateRoom(req, device, cfg, producer, accountDB)
}),
)
r0mux.Handle("/join/{roomIDOrAlias}",
common.MakeAuthAPI("join", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
vars := mux.Vars(req)
return writers.JoinRoomByIDOrAlias(
req, device, vars["roomIDOrAlias"], cfg, federation, producer, queryAPI, keyRing,
req, device, vars["roomIDOrAlias"], cfg, federation, producer, queryAPI, keyRing, accountDB,
)
}),
)
@ -185,7 +185,7 @@ func Setup(
r0mux.Handle("/profile/{userID}/avatar_url",
common.MakeAuthAPI("profile_avatar_url", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
vars := mux.Vars(req)
return readers.SetAvatarURL(req, accountDB, vars["userID"], userUpdateProducer)
return readers.SetAvatarURL(req, accountDB, device, vars["userID"], userUpdateProducer, &cfg, producer, queryAPI)
}),
).Methods("PUT", "OPTIONS")
// Browsers use the OPTIONS HTTP method to check if the CORS policy allows
@ -201,7 +201,7 @@ func Setup(
r0mux.Handle("/profile/{userID}/displayname",
common.MakeAuthAPI("profile_displayname", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
vars := mux.Vars(req)
return readers.SetDisplayName(req, accountDB, vars["userID"], userUpdateProducer)
return readers.SetDisplayName(req, accountDB, device, vars["userID"], userUpdateProducer, &cfg, producer, queryAPI)
}),
).Methods("PUT", "OPTIONS")
// Browsers use the OPTIONS HTTP method to check if the CORS policy allows

View file

@ -23,6 +23,7 @@ import (
log "github.com/Sirupsen/logrus"
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
"github.com/matrix-org/dendrite/clientapi/events"
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
@ -84,15 +85,21 @@ type fledglingEvent struct {
}
// CreateRoom implements /createRoom
func CreateRoom(req *http.Request, device *authtypes.Device, cfg config.Dendrite, producer *producers.RoomserverProducer) util.JSONResponse {
func CreateRoom(req *http.Request, device *authtypes.Device,
cfg config.Dendrite, producer *producers.RoomserverProducer,
accountDB *accounts.Database,
) util.JSONResponse {
// TODO: Check room ID doesn't clash with an existing one, and we
// probably shouldn't be using pseudo-random strings, maybe GUIDs?
roomID := fmt.Sprintf("!%s:%s", util.RandomString(16), cfg.Matrix.ServerName)
return createRoom(req, device, cfg, roomID, producer)
return createRoom(req, device, cfg, roomID, producer, accountDB)
}
// createRoom implements /createRoom
func createRoom(req *http.Request, device *authtypes.Device, cfg config.Dendrite, roomID string, producer *producers.RoomserverProducer) util.JSONResponse {
func createRoom(req *http.Request, device *authtypes.Device,
cfg config.Dendrite, roomID string, producer *producers.RoomserverProducer,
accountDB *accounts.Database,
) util.JSONResponse {
logger := util.GetLogger(req.Context())
userID := device.UserID
var r createRoomRequest
@ -115,6 +122,22 @@ func createRoom(req *http.Request, device *authtypes.Device, cfg config.Dendrite
"roomID": roomID,
}).Info("Creating new room")
localpart, _, err := gomatrixserverlib.SplitID('@', userID)
if err != nil {
return httputil.LogThenError(req, err)
}
profile, err := accountDB.GetProfileByLocalpart(localpart)
if err != nil {
return httputil.LogThenError(req, err)
}
membershipContent := events.MemberContent{
Membership: "join",
DisplayName: profile.DisplayName,
AvatarURL: profile.AvatarURL,
}
var builtEvents []gomatrixserverlib.Event
// send events into the room in order of:
@ -137,7 +160,7 @@ func createRoom(req *http.Request, device *authtypes.Device, cfg config.Dendrite
// TODO: Synapse has txn/token ID on each event. Do we need to do this here?
eventsToMake := []fledglingEvent{
{"m.room.create", "", events.CreateContent{Creator: userID}},
{"m.room.member", userID, events.MemberContent{Membership: "join"}}, // TODO: Set avatar_url / displayname
{"m.room.member", userID, membershipContent},
{"m.room.power_levels", "", events.InitialPowerLevelsContent(userID)},
// TODO: m.room.canonical_alias
{"m.room.join_rules", "", events.JoinRulesContent{"public"}}, // FIXME: Allow this to be changed

View file

@ -21,6 +21,7 @@ import (
"time"
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/clientapi/producers"
@ -42,13 +43,26 @@ func JoinRoomByIDOrAlias(
producer *producers.RoomserverProducer,
queryAPI api.RoomserverQueryAPI,
keyRing gomatrixserverlib.KeyRing,
accountDB *accounts.Database,
) util.JSONResponse {
var content map[string]interface{} // must be a JSON object
if resErr := httputil.UnmarshalJSONRequest(req, &content); resErr != nil {
return *resErr
}
localpart, _, err := gomatrixserverlib.SplitID('@', device.UserID)
if err != nil {
return httputil.LogThenError(req, err)
}
profile, err := accountDB.GetProfileByLocalpart(localpart)
if err != nil {
return httputil.LogThenError(req, err)
}
content["membership"] = "join"
content["displayname"] = profile.DisplayName
content["avatar_url"] = profile.AvatarURL
r := joinRoomReq{req, content, device.UserID, cfg, federation, producer, queryAPI, keyRing}

View file

@ -71,11 +71,11 @@ var outputRoomEventTestData = []string{
// $ curl -XPUT -d '{"membership":"invite"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.member/@charlie:localhost?access_token=@bob:localhost"
`{"type":"new_room_event","new_room_event":{"event":{"auth_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}],["$RWsxGlfPHAcijTgu:localhost",{"sha256":"1zc+86U9vLK1BvTJbeLuYpw9dZqvX2fr8rc3pOF69f8"}],["$wPepDhIla765Odre:localhost",{"sha256":"GqUhRiAkRvPrNBDyUxj+emRfK2P8j6iWtvsXDOUltiI"}]],"content":{"membership":"invite"},"depth":0,"event_id":"$zzLHVlHIWPrnE7DI:localhost","hashes":{"sha256":"LKk7tnYJAHsyffbi9CzfdP+TU4KQ5g6YTgYGKjJ7NxU"},"origin":"localhost","origin_server_ts":1494411709192,"prev_events":[["$4NBTdIwDxq5fDGpv:localhost",{"sha256":"EpqmxEoJP93Zb2Nt2fS95SJWTqqIutHm/Ne8OHqp6Ps"}]],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@bob:localhost","signatures":{"localhost":{"ed25519:something":"GdUzkC+7YKl1XDi7kYuD39yi2L/+nv+YrecIQHS+0BLDQqnEj+iRXfNBuZfTk6lUBCJCHXZlk7MnEIjvWDlZCg"}},"state_key":"@charlie:localhost","type":"m.room.member"},"latest_event_ids":["$zzLHVlHIWPrnE7DI:localhost"],"adds_state_event_ids":["$zzLHVlHIWPrnE7DI:localhost"],"last_sent_event_id":"$4NBTdIwDxq5fDGpv:localhost"}}`,
// $ curl -XPUT -d '{"membership":"join"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.member/@charlie:localhost?access_token=@charlie:localhost"
`{"type":"new_room_event","new_room_event":{"event":{"auth_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}],["$RWsxGlfPHAcijTgu:localhost",{"sha256":"1zc+86U9vLK1BvTJbeLuYpw9dZqvX2fr8rc3pOF69f8"}],["$2O2DpHB37CuwwJOe:localhost",{"sha256":"ulaRD63dbCyolLTwvInIQpcrtU2c7ex/BHmhpLXAUoE"}],["$zzLHVlHIWPrnE7DI:localhost",{"sha256":"Jw28x9W+GoZYw7sEynsi1fcRzqRQiLddolOa/p26PV0"}]],"content":{"membership":"join"},"depth":0,"event_id":"$uJVKyzZi8ZX0kOd9:localhost","hashes":{"sha256":"9ZZs/Cg0ewpBiCB6iFXXYlmW8koFiesCNGFrOLDTolE"},"origin":"localhost","origin_server_ts":1494411745015,"prev_events":[["$zzLHVlHIWPrnE7DI:localhost",{"sha256":"Jw28x9W+GoZYw7sEynsi1fcRzqRQiLddolOa/p26PV0"}]],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@charlie:localhost","signatures":{"localhost":{"ed25519:something":"+TM0gFPM/M3Ji2BjYuTUTgDyCOWlOq8aTMCxLg7EBvS62yPxJ558f13OWWTczUO5aRAt+PvXsMVM/bp8u6c8DQ"}},"state_key":"@charlie:localhost","type":"m.room.member"},"latest_event_ids":["$uJVKyzZi8ZX0kOd9:localhost"],"adds_state_event_ids":["$uJVKyzZi8ZX0kOd9:localhost"],"removes_state_event_ids":["$zzLHVlHIWPrnE7DI:localhost"],"last_sent_event_id":"$zzLHVlHIWPrnE7DI:localhost"}}`,
`{"type":"new_room_event","new_room_event":{"event":{"auth_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}],["$RWsxGlfPHAcijTgu:localhost",{"sha256":"1zc+86U9vLK1BvTJbeLuYpw9dZqvX2fr8rc3pOF69f8"}],["$2O2DpHB37CuwwJOe:localhost",{"sha256":"ulaRD63dbCyolLTwvInIQpcrtU2c7ex/BHmhpLXAUoE"}],["$zzLHVlHIWPrnE7DI:localhost",{"sha256":"Jw28x9W+GoZYw7sEynsi1fcRzqRQiLddolOa/p26PV0"}]],"content":{"membership":"join"},"unsigned":{"prev_content":{"membership":"invite"},"prev_sender":"@bob:localhost","replaces_state":"$zzLHVlHIWPrnE7DI:localhost"},"depth":0,"event_id":"$uJVKyzZi8ZX0kOd9:localhost","hashes":{"sha256":"9ZZs/Cg0ewpBiCB6iFXXYlmW8koFiesCNGFrOLDTolE"},"origin":"localhost","origin_server_ts":1494411745015,"prev_events":[["$zzLHVlHIWPrnE7DI:localhost",{"sha256":"Jw28x9W+GoZYw7sEynsi1fcRzqRQiLddolOa/p26PV0"}]],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@charlie:localhost","signatures":{"localhost":{"ed25519:something":"+TM0gFPM/M3Ji2BjYuTUTgDyCOWlOq8aTMCxLg7EBvS62yPxJ558f13OWWTczUO5aRAt+PvXsMVM/bp8u6c8DQ"}},"state_key":"@charlie:localhost","type":"m.room.member"},"latest_event_ids":["$uJVKyzZi8ZX0kOd9:localhost"],"adds_state_event_ids":["$uJVKyzZi8ZX0kOd9:localhost"],"removes_state_event_ids":["$zzLHVlHIWPrnE7DI:localhost"],"last_sent_event_id":"$zzLHVlHIWPrnE7DI:localhost"}}`,
// $ curl -XPUT -d '{"msgtype":"m.text","body":"not charlie..."}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/3?access_token=@alice:localhost"
`{"type":"new_room_event","new_room_event":{"event":{"auth_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}],["$QTen1vksfcRTpUCk:localhost",{"sha256":"znwhbYzdueh0grYkUX4jgXmP9AjKphzyesMZWMiF4IY"}],["$RWsxGlfPHAcijTgu:localhost",{"sha256":"1zc+86U9vLK1BvTJbeLuYpw9dZqvX2fr8rc3pOF69f8"}]],"content":{"body":"not charlie...","msgtype":"m.text"},"depth":0,"event_id":"$Ixfn5WT9ocWTYxfy:localhost","hashes":{"sha256":"hRChdyMQ3AY4jvrPpI8PEX6Taux83Qo5hdSeHlhPxGo"},"origin":"localhost","origin_server_ts":1494411792737,"prev_events":[["$uJVKyzZi8ZX0kOd9:localhost",{"sha256":"BtesLFnHZOREQCeilFM+xvDU/Wdj+nyHMw7IGTh/9gU"}]],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@alice:localhost","signatures":{"localhost":{"ed25519:something":"LC/Zqwu/XdqjmLdTOp/NQaFaE0niSAGgEpa39gCxsnsqEX80P7P5WDn/Kzx6rjWTnhIszrLsnoycqkXQT0Z4DQ"}},"type":"m.room.message"},"latest_event_ids":["$Ixfn5WT9ocWTYxfy:localhost"],"last_sent_event_id":"$uJVKyzZi8ZX0kOd9:localhost"}}`,
// $ curl -XPUT -d '{"membership":"leave"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.member/@charlie:localhost?access_token=@alice:localhost"
`{"type":"new_room_event","new_room_event":{"event":{"auth_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}],["$QTen1vksfcRTpUCk:localhost",{"sha256":"znwhbYzdueh0grYkUX4jgXmP9AjKphzyesMZWMiF4IY"}],["$RWsxGlfPHAcijTgu:localhost",{"sha256":"1zc+86U9vLK1BvTJbeLuYpw9dZqvX2fr8rc3pOF69f8"}],["$uJVKyzZi8ZX0kOd9:localhost",{"sha256":"BtesLFnHZOREQCeilFM+xvDU/Wdj+nyHMw7IGTh/9gU"}]],"content":{"membership":"leave"},"depth":0,"event_id":"$om1F4AI8tCYlHUSp:localhost","hashes":{"sha256":"7JVI0uCxSUyEqDJ+o36/zUIlIZkXVK/R6wkrZGvQXDE"},"origin":"localhost","origin_server_ts":1494411855278,"prev_events":[["$Ixfn5WT9ocWTYxfy:localhost",{"sha256":"hOoPIDQFvvNqQJzA5ggjoQi4v1BOELnhnmwU4UArDOY"}]],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@alice:localhost","signatures":{"localhost":{"ed25519:something":"3sxoDLUPnKuDJgFgS3C647BbiXrozxhhxrZOlFP3KgJKzBYv/ht+Jd2V2iSZOvsv94wgRBf0A/lEcJRIqeLgDA"}},"state_key":"@charlie:localhost","type":"m.room.member"},"latest_event_ids":["$om1F4AI8tCYlHUSp:localhost"],"adds_state_event_ids":["$om1F4AI8tCYlHUSp:localhost"],"removes_state_event_ids":["$uJVKyzZi8ZX0kOd9:localhost"],"last_sent_event_id":"$Ixfn5WT9ocWTYxfy:localhost"}}`,
`{"type":"new_room_event","new_room_event":{"event":{"auth_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}],["$QTen1vksfcRTpUCk:localhost",{"sha256":"znwhbYzdueh0grYkUX4jgXmP9AjKphzyesMZWMiF4IY"}],["$RWsxGlfPHAcijTgu:localhost",{"sha256":"1zc+86U9vLK1BvTJbeLuYpw9dZqvX2fr8rc3pOF69f8"}],["$uJVKyzZi8ZX0kOd9:localhost",{"sha256":"BtesLFnHZOREQCeilFM+xvDU/Wdj+nyHMw7IGTh/9gU"}]],"content":{"membership":"leave"},"unsigned":{"prev_content":{"membership":"join"},"prev_sender":"@charlie:localhost","replaces_state":"$uJVKyzZi8ZX0kOd9:localhost"},"depth":0,"event_id":"$om1F4AI8tCYlHUSp:localhost","hashes":{"sha256":"7JVI0uCxSUyEqDJ+o36/zUIlIZkXVK/R6wkrZGvQXDE"},"origin":"localhost","origin_server_ts":1494411855278,"prev_events":[["$Ixfn5WT9ocWTYxfy:localhost",{"sha256":"hOoPIDQFvvNqQJzA5ggjoQi4v1BOELnhnmwU4UArDOY"}]],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@alice:localhost","signatures":{"localhost":{"ed25519:something":"3sxoDLUPnKuDJgFgS3C647BbiXrozxhhxrZOlFP3KgJKzBYv/ht+Jd2V2iSZOvsv94wgRBf0A/lEcJRIqeLgDA"}},"state_key":"@charlie:localhost","type":"m.room.member"},"latest_event_ids":["$om1F4AI8tCYlHUSp:localhost"],"adds_state_event_ids":["$om1F4AI8tCYlHUSp:localhost"],"removes_state_event_ids":["$uJVKyzZi8ZX0kOd9:localhost"],"last_sent_event_id":"$Ixfn5WT9ocWTYxfy:localhost"}}`,
// $ curl -XPUT -d '{"msgtype":"m.text","body":"why did you kick charlie"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/3?access_token=@bob:localhost"
`{"type":"new_room_event","new_room_event":{"event":{"auth_events":[["$xz0fUB8zNMTGFh1W:localhost",{"sha256":"F4tTLtltC6f2XKeXq4ZKpMZ5EpditaW+RYQSnYzq3lI"}],["$RWsxGlfPHAcijTgu:localhost",{"sha256":"1zc+86U9vLK1BvTJbeLuYpw9dZqvX2fr8rc3pOF69f8"}],["$wPepDhIla765Odre:localhost",{"sha256":"GqUhRiAkRvPrNBDyUxj+emRfK2P8j6iWtvsXDOUltiI"}]],"content":{"body":"why did you kick charlie","msgtype":"m.text"},"depth":0,"event_id":"$hgao5gTmr3r9TtK2:localhost","hashes":{"sha256":"Aa2ZCrvwjX5xhvkVqIOFUeEGqrnrQZjjNFiZRybjsPY"},"origin":"localhost","origin_server_ts":1494411912809,"prev_events":[["$om1F4AI8tCYlHUSp:localhost",{"sha256":"yVs+CW7AiJrJOYouL8xPIBrtIHAhnbxaegna8MxeCto"}]],"room_id":"!PjrbIMW2cIiaYF4t:localhost","sender":"@bob:localhost","signatures":{"localhost":{"ed25519:something":"sGkpbEXGsvAuCvE3wb5E9H5fjCVKpRdWNt6csj1bCB9Fmg4Rg4mvj3TAJ+91DjO8IPsgSxDKdqqRYF0OtcynBA"}},"type":"m.room.message"},"latest_event_ids":["$hgao5gTmr3r9TtK2:localhost"],"last_sent_event_id":"$om1F4AI8tCYlHUSp:localhost"}}`,
// $ curl -XPUT -d '{"name":"No Charlies"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.name?access_token=@alice:localhost"

View file

@ -35,6 +35,15 @@ type OutputRoomEvent struct {
db *storage.SyncServerDatabase
notifier *sync.Notifier
query api.RoomserverQueryAPI
serverName gomatrixserverlib.ServerName
keyID gomatrixserverlib.KeyID
privateKey []byte
}
type prevMembership struct {
PrevContent json.RawMessage `json:"prev_content"`
PrevID string `json:"replaces_state"`
UserID string `json:"prev_sender"`
}
// NewOutputRoomEvent creates a new OutputRoomEvent consumer. Call Start() to begin consuming from room servers.
@ -55,6 +64,9 @@ func NewOutputRoomEvent(cfg *config.Dendrite, n *sync.Notifier, store *storage.S
db: store,
notifier: n,
query: api.NewRoomserverQueryAPIHTTP(roomServerURL, nil),
serverName: cfg.Matrix.ServerName,
keyID: cfg.Matrix.KeyID,
privateKey: cfg.Matrix.PrivateKey,
}
consumer.ProcessMessage = s.onMessage
@ -101,6 +113,18 @@ func (s *OutputRoomEvent) onMessage(msg *sarama.ConsumerMessage) error {
}).Panicf("roomserver output log: state event lookup failure")
}
ev, err = s.updateStateEvent(ev, s.keyID, s.privateKey)
if err != nil {
return err
}
for i := range addsStateEvents {
addsStateEvents[i], err = s.updateStateEvent(addsStateEvents[i], s.keyID, s.privateKey)
if err != nil {
return err
}
}
syncStreamPos, err := s.db.WriteEvent(
&ev, addsStateEvents, output.NewRoomEvent.AddsStateEventIDs, output.NewRoomEvent.RemovesStateEventIDs,
)
@ -177,6 +201,35 @@ func (s *OutputRoomEvent) lookupStateEvents(
return result, nil
}
func (s *OutputRoomEvent) updateStateEvent(
event gomatrixserverlib.Event, keyID gomatrixserverlib.KeyID,
privateKey []byte,
) (gomatrixserverlib.Event, error) {
var stateKey string
if event.StateKey() == nil {
stateKey = ""
} else {
stateKey = *event.StateKey()
}
prevEvent, err := s.db.GetStateEvent(event.Type(), event.RoomID(), stateKey)
if err != nil {
return event, err
}
if prevEvent == nil {
return event, nil
}
prev := prevMembership{
PrevContent: prevEvent.Content(),
PrevID: prevEvent.EventID(),
UserID: prevEvent.Sender(),
}
return event.SetUnsigned(prev)
}
func missingEventsFrom(events []gomatrixserverlib.Event, required []string) []string {
have := map[string]bool{}
for _, event := range events {

View file

@ -66,6 +66,9 @@ const selectCurrentStateSQL = "" +
const selectJoinedUsersSQL = "" +
"SELECT room_id, state_key FROM current_room_state WHERE type = 'm.room.member' AND membership = 'join'"
const selectStateEventSQL = "" +
"SELECT event_json FROM current_room_state WHERE type = $1 AND room_id = $2 AND state_key = $3"
const selectEventsWithEventIDsSQL = "" +
"SELECT added_at, event_json FROM current_room_state WHERE event_id = ANY($1)"
@ -76,6 +79,7 @@ type currentRoomStateStatements struct {
selectCurrentStateStmt *sql.Stmt
selectJoinedUsersStmt *sql.Stmt
selectEventsWithEventIDsStmt *sql.Stmt
selectStateEventStmt *sql.Stmt
}
func (s *currentRoomStateStatements) prepare(db *sql.DB) (err error) {
@ -101,6 +105,9 @@ func (s *currentRoomStateStatements) prepare(db *sql.DB) (err error) {
if s.selectEventsWithEventIDsStmt, err = db.Prepare(selectEventsWithEventIDsSQL); err != nil {
return
}
if s.selectStateEventStmt, err = db.Prepare(selectStateEventSQL); err != nil {
return
}
return
}
@ -195,3 +202,12 @@ func rowsToEvents(rows *sql.Rows) ([]gomatrixserverlib.Event, error) {
}
return result, nil
}
func (s *currentRoomStateStatements) selectStateEvent(evType string, roomID string, stateKey string) (*gomatrixserverlib.Event, error) {
var res []byte
if err := s.selectStateEventStmt.QueryRow(evType, roomID, stateKey).Scan(&res); err == sql.ErrNoRows {
return nil, nil
}
ev, err := gomatrixserverlib.NewEventFromTrustedJSON(res, false)
return &ev, err
}

View file

@ -141,6 +141,13 @@ func (d *SyncServerDatabase) updateRoomState(
return nil
}
// GetStateEvent returns the Matrix state event of a given type for a given room with a given state key
// If no event could be found, returns nil
// If there was an issue during the retrieval, returns an error
func (d *SyncServerDatabase) GetStateEvent(evType string, roomID string, stateKey string) (*gomatrixserverlib.Event, error) {
return d.roomstate.selectStateEvent(evType, roomID, stateKey)
}
// PartitionOffsets implements common.PartitionStorer
func (d *SyncServerDatabase) PartitionOffsets(topic string) ([]common.PartitionOffset, error) {
return d.partitions.SelectPartitionOffsets(topic)