mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-07 23:13:11 -06:00
Handle room transitions to 'leave' in incremental /sync requests
This commit is contained in:
parent
675759c192
commit
aa3737a2d2
|
|
@ -25,6 +25,12 @@ import (
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type stateDelta struct {
|
||||||
|
roomID string
|
||||||
|
stateEvents []gomatrixserverlib.Event
|
||||||
|
membership string
|
||||||
|
}
|
||||||
|
|
||||||
// SyncServerDatabase represents a sync server database
|
// SyncServerDatabase represents a sync server database
|
||||||
type SyncServerDatabase struct {
|
type SyncServerDatabase struct {
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
|
|
@ -115,67 +121,42 @@ func (d *SyncServerDatabase) SyncStreamPosition() (types.StreamPosition, error)
|
||||||
// IncrementalSync returns all the data needed in order to create an incremental sync response.
|
// IncrementalSync returns all the data needed in order to create an incremental sync response.
|
||||||
func (d *SyncServerDatabase) IncrementalSync(userID string, fromPos, toPos types.StreamPosition, numRecentEventsPerRoom int) (res *types.Response, returnErr error) {
|
func (d *SyncServerDatabase) IncrementalSync(userID string, fromPos, toPos types.StreamPosition, numRecentEventsPerRoom int) (res *types.Response, returnErr error) {
|
||||||
returnErr = runTransaction(d.db, func(txn *sql.Tx) error {
|
returnErr = runTransaction(d.db, func(txn *sql.Tx) error {
|
||||||
roomIDs, err := d.roomstate.SelectRoomIDsWithMembership(txn, userID, "join")
|
// Work out which rooms to return in the response. This is done by getting not only the currently
|
||||||
if err != nil {
|
// joined rooms, but also which rooms have membership transitions for this user between the 2 stream positions.
|
||||||
return err
|
// This works out what the 'state' key should be for each room as well as which membership block
|
||||||
}
|
// to put the room into.
|
||||||
|
deltas, err := d.getStateDeltas(txn, fromPos, toPos, userID)
|
||||||
state, err := d.events.StateBetween(txn, fromPos, toPos)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
res = types.NewResponse(toPos)
|
res = types.NewResponse(toPos)
|
||||||
|
for _, delta := range deltas {
|
||||||
// Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821
|
recentEvents, err := d.events.RecentEventsInRoom(txn, delta.roomID, fromPos, toPos, numRecentEventsPerRoom)
|
||||||
// - Get membership list changes for this user in this sync response
|
|
||||||
// - For each room which has membership list changes:
|
|
||||||
// * Check if the room is 'newly joined' (insufficient to just check for a join event because we allow dupe joins TODO).
|
|
||||||
// If it is, then we need to send the full room state down (and 'limited' is always true).
|
|
||||||
// * Check if user is still CURRENTLY invited to the room. If so, add room to 'invited' block.
|
|
||||||
// * TODO Check if the user is CURRENTLY left/banned. If so, add room to 'archived' block.
|
|
||||||
|
|
||||||
// work out which rooms transitioned to 'joined' between the 2 stream positions and add full state where needed.
|
|
||||||
for roomID, stateEvents := range state {
|
|
||||||
for _, ev := range stateEvents {
|
|
||||||
// TODO: Currently this will incorrectly add rooms which were ALREADY joined but they sent another no-op join event.
|
|
||||||
// We should be checking if the user was already joined at fromPos and not proceed if so. As a result of this,
|
|
||||||
// dupe join events will result in the entire room state coming down to the client again. This is added in
|
|
||||||
// the 'state' part of the response though, so is transparent modulo bandwidth concerns as it is not added to
|
|
||||||
// the timeline.
|
|
||||||
if ev.Type() == "m.room.member" && ev.StateKeyEquals(userID) {
|
|
||||||
var memberContent events.MemberContent
|
|
||||||
if err := json.Unmarshal(ev.Content(), &memberContent); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if memberContent.Membership != "join" {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
allState, err := d.roomstate.CurrentState(txn, roomID)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
state[roomID] = allState
|
delta.stateEvents = removeDuplicates(delta.stateEvents, recentEvents) // roll back
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, roomID := range roomIDs {
|
|
||||||
recentEvents, err := d.events.RecentEventsInRoom(txn, roomID, fromPos, toPos, numRecentEventsPerRoom)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
state[roomID] = removeDuplicates(state[roomID], recentEvents)
|
|
||||||
|
|
||||||
|
switch delta.membership {
|
||||||
|
case "join":
|
||||||
jr := types.NewJoinResponse()
|
jr := types.NewJoinResponse()
|
||||||
jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
|
jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
|
||||||
jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
|
jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
|
||||||
jr.State.Events = gomatrixserverlib.ToClientEvents(state[roomID], gomatrixserverlib.FormatSync)
|
jr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
|
||||||
res.Rooms.Join[roomID] = *jr
|
res.Rooms.Join[delta.roomID] = *jr
|
||||||
|
case "leave":
|
||||||
|
fallthrough // transitions to leave are the same as ban
|
||||||
|
case "ban":
|
||||||
|
lr := types.NewLeaveResponse()
|
||||||
|
lr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
|
||||||
|
lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
|
||||||
|
lr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
|
||||||
|
res.Rooms.Leave[delta.roomID] = *lr
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: This should be done in getStateDeltas
|
||||||
return d.addInvitesToResponse(txn, userID, res)
|
return d.addInvitesToResponse(txn, userID, res)
|
||||||
})
|
})
|
||||||
return
|
return
|
||||||
|
|
@ -242,6 +223,76 @@ func (d *SyncServerDatabase) addInvitesToResponse(txn *sql.Tx, userID string, re
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (d *SyncServerDatabase) getStateDeltas(txn *sql.Tx, fromPos, toPos types.StreamPosition, userID string) ([]stateDelta, error) {
|
||||||
|
// Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821
|
||||||
|
// - Get membership list changes for this user in this sync response
|
||||||
|
// - For each room which has membership list changes:
|
||||||
|
// * Check if the room is 'newly joined' (insufficient to just check for a join event because we allow dupe joins TODO).
|
||||||
|
// If it is, then we need to send the full room state down (and 'limited' is always true).
|
||||||
|
// * Check if user is still CURRENTLY invited to the room. If so, add room to 'invited' block.
|
||||||
|
// * Check if the user is CURRENTLY (TODO) left/banned. If so, add room to 'archived' block.
|
||||||
|
// - Get all CURRENTLY joined rooms, and add them to 'joined' block.
|
||||||
|
state, err := d.events.StateBetween(txn, fromPos, toPos)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add in currently joined rooms which weren't already added (because no state changes for these rooms)
|
||||||
|
joinedRoomIDs, err := d.roomstate.SelectRoomIDsWithMembership(txn, userID, "join")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
for _, joinedRoomID := range joinedRoomIDs {
|
||||||
|
if len(state[joinedRoomID]) > 0 {
|
||||||
|
continue // we already have a state delta
|
||||||
|
}
|
||||||
|
// explicitly state there is a 0 delta using an empty array so it marshals as []
|
||||||
|
state[joinedRoomID] = []gomatrixserverlib.Event{}
|
||||||
|
}
|
||||||
|
|
||||||
|
var deltas []stateDelta
|
||||||
|
for roomID, stateEvents := range state {
|
||||||
|
var added bool
|
||||||
|
for _, ev := range stateEvents {
|
||||||
|
// TODO: Currently this will incorrectly add rooms which were ALREADY joined but they sent another no-op join event.
|
||||||
|
// We should be checking if the user was already joined at fromPos and not proceed if so. As a result of this,
|
||||||
|
// dupe join events will result in the entire room state coming down to the client again. This is added in
|
||||||
|
// the 'state' part of the response though, so is transparent modulo bandwidth concerns as it is not added to
|
||||||
|
// the timeline.
|
||||||
|
if membership := getMembershipFromEvent(&ev, userID); membership != "" {
|
||||||
|
if membership == "join" {
|
||||||
|
// send full room state down instead of a delta
|
||||||
|
allState, err := d.roomstate.CurrentState(txn, roomID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
stateEvents = allState
|
||||||
|
}
|
||||||
|
|
||||||
|
deltas = append(deltas, stateDelta{
|
||||||
|
membership: membership,
|
||||||
|
stateEvents: stateEvents,
|
||||||
|
roomID: roomID,
|
||||||
|
})
|
||||||
|
added = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !added {
|
||||||
|
// We can hit this case if:
|
||||||
|
// - There is a 0 delta
|
||||||
|
// - There is a delta but it contains no membership changes
|
||||||
|
deltas = append(deltas, stateDelta{
|
||||||
|
membership: "join",
|
||||||
|
roomID: roomID,
|
||||||
|
stateEvents: stateEvents,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return deltas, nil
|
||||||
|
}
|
||||||
|
|
||||||
// There may be some overlap where events in stateEvents are already in recentEvents, so filter
|
// There may be some overlap where events in stateEvents are already in recentEvents, so filter
|
||||||
// them out so we don't include them twice in the /sync response. They should be in recentEvents
|
// them out so we don't include them twice in the /sync response. They should be in recentEvents
|
||||||
// only, so clients get to the correct state once they have rolled forward.
|
// only, so clients get to the correct state once they have rolled forward.
|
||||||
|
|
@ -267,6 +318,19 @@ func removeDuplicates(stateEvents, recentEvents []gomatrixserverlib.Event) []gom
|
||||||
return stateEvents
|
return stateEvents
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// getMembershipFromEvent returns the value of content.membership iff the event is a state event
|
||||||
|
// with type 'm.room.member' and state_key of userID. Otherwise, an empty string is returned.
|
||||||
|
func getMembershipFromEvent(ev *gomatrixserverlib.Event, userID string) string {
|
||||||
|
if ev.Type() == "m.room.member" && ev.StateKeyEquals(userID) {
|
||||||
|
var memberContent events.MemberContent
|
||||||
|
if err := json.Unmarshal(ev.Content(), &memberContent); err != nil {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
return memberContent.Membership
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
func runTransaction(db *sql.DB, fn func(txn *sql.Tx) error) (err error) {
|
func runTransaction(db *sql.DB, fn func(txn *sql.Tx) error) (err error) {
|
||||||
txn, err := db.Begin()
|
txn, err := db.Begin()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue