diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go index 83c431671..d5fdf2532 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go @@ -25,6 +25,12 @@ import ( "github.com/matrix-org/gomatrixserverlib" ) +type stateDelta struct { + roomID string + stateEvents []gomatrixserverlib.Event + membership string +} + // SyncServerDatabase represents a sync server database type SyncServerDatabase struct { db *sql.DB @@ -115,67 +121,42 @@ func (d *SyncServerDatabase) SyncStreamPosition() (types.StreamPosition, error) // IncrementalSync returns all the data needed in order to create an incremental sync response. func (d *SyncServerDatabase) IncrementalSync(userID string, fromPos, toPos types.StreamPosition, numRecentEventsPerRoom int) (res *types.Response, returnErr error) { returnErr = runTransaction(d.db, func(txn *sql.Tx) error { - roomIDs, err := d.roomstate.SelectRoomIDsWithMembership(txn, userID, "join") - if err != nil { - return err - } - - state, err := d.events.StateBetween(txn, fromPos, toPos) + // Work out which rooms to return in the response. This is done by getting not only the currently + // joined rooms, but also which rooms have membership transitions for this user between the 2 stream positions. + // This works out what the 'state' key should be for each room as well as which membership block + // to put the room into. + deltas, err := d.getStateDeltas(txn, fromPos, toPos, userID) if err != nil { return err } res = types.NewResponse(toPos) - - // Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821 - // - Get membership list changes for this user in this sync response - // - For each room which has membership list changes: - // * Check if the room is 'newly joined' (insufficient to just check for a join event because we allow dupe joins TODO). - // If it is, then we need to send the full room state down (and 'limited' is always true). - // * Check if user is still CURRENTLY invited to the room. If so, add room to 'invited' block. - // * TODO Check if the user is CURRENTLY left/banned. If so, add room to 'archived' block. - - // work out which rooms transitioned to 'joined' between the 2 stream positions and add full state where needed. - for roomID, stateEvents := range state { - for _, ev := range stateEvents { - // TODO: Currently this will incorrectly add rooms which were ALREADY joined but they sent another no-op join event. - // We should be checking if the user was already joined at fromPos and not proceed if so. As a result of this, - // dupe join events will result in the entire room state coming down to the client again. This is added in - // the 'state' part of the response though, so is transparent modulo bandwidth concerns as it is not added to - // the timeline. - if ev.Type() == "m.room.member" && ev.StateKeyEquals(userID) { - var memberContent events.MemberContent - if err := json.Unmarshal(ev.Content(), &memberContent); err != nil { - return err - } - if memberContent.Membership != "join" { - continue - } - - allState, err := d.roomstate.CurrentState(txn, roomID) - if err != nil { - return err - } - state[roomID] = allState - } - - } - } - - for _, roomID := range roomIDs { - recentEvents, err := d.events.RecentEventsInRoom(txn, roomID, fromPos, toPos, numRecentEventsPerRoom) + for _, delta := range deltas { + recentEvents, err := d.events.RecentEventsInRoom(txn, delta.roomID, fromPos, toPos, numRecentEventsPerRoom) if err != nil { return err } - state[roomID] = removeDuplicates(state[roomID], recentEvents) + delta.stateEvents = removeDuplicates(delta.stateEvents, recentEvents) // roll back - jr := types.NewJoinResponse() - jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync) - jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true - jr.State.Events = gomatrixserverlib.ToClientEvents(state[roomID], gomatrixserverlib.FormatSync) - res.Rooms.Join[roomID] = *jr + switch delta.membership { + case "join": + jr := types.NewJoinResponse() + jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync) + jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true + jr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync) + res.Rooms.Join[delta.roomID] = *jr + case "leave": + fallthrough // transitions to leave are the same as ban + case "ban": + lr := types.NewLeaveResponse() + lr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync) + lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true + lr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync) + res.Rooms.Leave[delta.roomID] = *lr + } } + // TODO: This should be done in getStateDeltas return d.addInvitesToResponse(txn, userID, res) }) return @@ -242,6 +223,76 @@ func (d *SyncServerDatabase) addInvitesToResponse(txn *sql.Tx, userID string, re return nil } +func (d *SyncServerDatabase) getStateDeltas(txn *sql.Tx, fromPos, toPos types.StreamPosition, userID string) ([]stateDelta, error) { + // Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821 + // - Get membership list changes for this user in this sync response + // - For each room which has membership list changes: + // * Check if the room is 'newly joined' (insufficient to just check for a join event because we allow dupe joins TODO). + // If it is, then we need to send the full room state down (and 'limited' is always true). + // * Check if user is still CURRENTLY invited to the room. If so, add room to 'invited' block. + // * Check if the user is CURRENTLY (TODO) left/banned. If so, add room to 'archived' block. + // - Get all CURRENTLY joined rooms, and add them to 'joined' block. + state, err := d.events.StateBetween(txn, fromPos, toPos) + if err != nil { + return nil, err + } + + // Add in currently joined rooms which weren't already added (because no state changes for these rooms) + joinedRoomIDs, err := d.roomstate.SelectRoomIDsWithMembership(txn, userID, "join") + if err != nil { + return nil, err + } + for _, joinedRoomID := range joinedRoomIDs { + if len(state[joinedRoomID]) > 0 { + continue // we already have a state delta + } + // explicitly state there is a 0 delta using an empty array so it marshals as [] + state[joinedRoomID] = []gomatrixserverlib.Event{} + } + + var deltas []stateDelta + for roomID, stateEvents := range state { + var added bool + for _, ev := range stateEvents { + // TODO: Currently this will incorrectly add rooms which were ALREADY joined but they sent another no-op join event. + // We should be checking if the user was already joined at fromPos and not proceed if so. As a result of this, + // dupe join events will result in the entire room state coming down to the client again. This is added in + // the 'state' part of the response though, so is transparent modulo bandwidth concerns as it is not added to + // the timeline. + if membership := getMembershipFromEvent(&ev, userID); membership != "" { + if membership == "join" { + // send full room state down instead of a delta + allState, err := d.roomstate.CurrentState(txn, roomID) + if err != nil { + return nil, err + } + stateEvents = allState + } + + deltas = append(deltas, stateDelta{ + membership: membership, + stateEvents: stateEvents, + roomID: roomID, + }) + added = true + break + } + } + if !added { + // We can hit this case if: + // - There is a 0 delta + // - There is a delta but it contains no membership changes + deltas = append(deltas, stateDelta{ + membership: "join", + roomID: roomID, + stateEvents: stateEvents, + }) + } + } + + return deltas, nil +} + // There may be some overlap where events in stateEvents are already in recentEvents, so filter // them out so we don't include them twice in the /sync response. They should be in recentEvents // only, so clients get to the correct state once they have rolled forward. @@ -267,6 +318,19 @@ func removeDuplicates(stateEvents, recentEvents []gomatrixserverlib.Event) []gom return stateEvents } +// getMembershipFromEvent returns the value of content.membership iff the event is a state event +// with type 'm.room.member' and state_key of userID. Otherwise, an empty string is returned. +func getMembershipFromEvent(ev *gomatrixserverlib.Event, userID string) string { + if ev.Type() == "m.room.member" && ev.StateKeyEquals(userID) { + var memberContent events.MemberContent + if err := json.Unmarshal(ev.Content(), &memberContent); err != nil { + return "" + } + return memberContent.Membership + } + return "" +} + func runTransaction(db *sql.DB, fn func(txn *sql.Tx) error) (err error) { txn, err := db.Begin() if err != nil {