From cc7117392f177cd01fcb1218130426b56dca81b4 Mon Sep 17 00:00:00 2001 From: Kegsay Date: Fri, 12 May 2017 16:56:17 +0100 Subject: [PATCH 1/3] Send the entire room state down when transitioning to 'join' on a /sync response (#100) This is only 'mostly' correct currently, because what should be no-op dupe joins will actually trigger the entire room state to be re-sent. Bizarrely, it's significantly easier to just do that than work out if we should, and there are no client-visible effects to doing so, so we just do it for now. --- .../cmd/syncserver-integration-tests/main.go | 41 ++++++++++++++++++- .../storage/output_room_events_table.go | 4 +- .../dendrite/syncapi/storage/syncserver.go | 37 +++++++++++++++++ .../dendrite/syncapi/sync/requestpool.go | 11 ++--- 4 files changed, 85 insertions(+), 8 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/cmd/syncserver-integration-tests/main.go b/src/github.com/matrix-org/dendrite/cmd/syncserver-integration-tests/main.go index badf2279d..8d04c7345 100644 --- a/src/github.com/matrix-org/dendrite/cmd/syncserver-integration-tests/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/syncserver-integration-tests/main.go @@ -392,7 +392,6 @@ func main() { } // Make sure alice sees it TODO: prev_batch - // TODO: Make sure bob sees it AND all the current room state testSyncServer(syncServerCmdChan, "@alice:localhost", "9", `{ "account_data": { "events": [] @@ -425,6 +424,46 @@ func main() { } }`) + // Make sure bob sees the room AND all the current room state TODO: history visibility + testSyncServer(syncServerCmdChan, "@bob:localhost", "9", `{ + "account_data": { + "events": [] + }, + "next_batch": "10", + "presence": { + "events": [] + }, + "rooms": { + "invite": {}, + "join": { + "!PjrbIMW2cIiaYF4t:localhost": { + "account_data": { + "events": [] + }, + "ephemeral": { + "events": [] + }, + "state": { + "events": [`+ + clientEventTestData[0]+","+ + clientEventTestData[1]+","+ + clientEventTestData[2]+","+ + clientEventTestData[3]+","+ + clientEventTestData[4]+","+ + clientEventTestData[8]+`] + }, + "timeline": { + "limited": false, + "prev_batch": "", + "events": [`+ + clientEventTestData[9]+`] + } + } + }, + "leave": {} + } + }`) + // $ curl -XPUT -d '{"msgtype":"m.text","body":"hello alice"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/1?access_token=@bob:localhost" if err := exe.WriteToTopic(inputTopic, canonicalJSONInput([]string{outputRoomEventTestData[10]})); err != nil { panic(err) diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go b/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go index 222b105c6..5a91966cf 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go @@ -64,7 +64,7 @@ const selectMaxIDSQL = "" + // In order for us to apply the state updates correctly, rows need to be ordered in the order they were received (id). const selectStateInRangeSQL = "" + "SELECT event_json, add_state_ids, remove_state_ids FROM output_room_events" + - " WHERE (id > $1 AND id < $2) AND (add_state_ids IS NOT NULL OR remove_state_ids IS NOT NULL)" + + " WHERE (id > $1 AND id <= $2) AND (add_state_ids IS NOT NULL OR remove_state_ids IS NOT NULL)" + " ORDER BY id ASC" type outputRoomEventsStatements struct { @@ -102,7 +102,7 @@ func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) { return } -// StateBetween returns the state events between the two given stream positions, exclusive of both. +// StateBetween returns the state events between the two given stream positions, exclusive of oldPos, inclusive of newPos. // Results are bucketed based on the room ID. If the same state is overwritten multiple times between the // two positions, only the most recent state is returned. func (s *outputRoomEventsStatements) StateBetween(txn *sql.Tx, oldPos, newPos types.StreamPosition) (map[string][]gomatrixserverlib.Event, error) { diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go index cc9fbdec0..47520ff2f 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go @@ -16,8 +16,10 @@ package storage import ( "database/sql" + "encoding/json" // Import the postgres database driver. _ "github.com/lib/pq" + "github.com/matrix-org/dendrite/clientapi/events" "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" @@ -124,6 +126,41 @@ func (d *SyncServerDatabase) IncrementalSync(userID string, fromPos, toPos types return err } + // Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821 + // - Get membership list changes for this user in this sync response + // - For each room which has membership list changes: + // * Check if the room is 'newly joined' (insufficient to just check for a join event because we allow dupe joins TODO). + // If it is, then we need to send the full room state down (and 'limited' is always true). + // * TODO Check if user is still CURRENTLY invited to the room. If so, add room to 'invited' block. + // * TODO Check if the user is CURRENTLY left/banned. If so, add room to 'archived' block. + + // work out which rooms transitioned to 'joined' between the 2 stream positions and add full state where needed. + for roomID, stateEvents := range state { + for _, ev := range stateEvents { + // TODO: Currently this will incorrectly add rooms which were ALREADY joined but they sent another no-op join event. + // We should be checking if the user was already joined at fromPos and not proceed if so. As a result of this, + // dupe join events will result in the entire room state coming down to the client again. This is added in + // the 'state' part of the response though, so is transparent modulo bandwidth concerns as it is not added to + // the timeline. + if ev.Type() == "m.room.member" && ev.StateKeyEquals(userID) { + var memberContent events.MemberContent + if err := json.Unmarshal(ev.Content(), &memberContent); err != nil { + return err + } + if memberContent.Membership != "join" { + continue + } + + allState, err := d.roomstate.CurrentState(txn, roomID) + if err != nil { + return err + } + state[roomID] = allState + } + + } + } + for _, roomID := range roomIDs { recentEvents, err := d.events.RecentEventsInRoom(txn, roomID, fromPos, toPos, numRecentEventsPerRoom) if err != nil { diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go b/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go index eee117e76..c9c415661 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go @@ -64,7 +64,6 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request) util.JSONRespons // Fork off 2 goroutines: one to do the work, and one to serve as a timeout. // Whichever returns first is the one we will serve back to the client. - // TODO: Currently this means that cpu work is timed, which may not be what we want long term. timeoutChan := make(chan struct{}) timer := time.AfterFunc(syncReq.timeout, func() { close(timeoutChan) // signal that the timeout has expired @@ -72,8 +71,12 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request) util.JSONRespons done := make(chan util.JSONResponse) go func() { - syncData, err := rp.currentSyncForUser(*syncReq) + currentPos := rp.notifier.WaitForEvents(*syncReq) + // We stop the timer BEFORE calculating the response so the cpu work + // done to calculate the response is not timed. This stops us from + // doing lots of work then timing out and sending back an empty response. timer.Stop() + syncData, err := rp.currentSyncForUser(*syncReq, currentPos) var res util.JSONResponse if err != nil { res = httputil.LogThenError(req, err) @@ -98,9 +101,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request) util.JSONRespons } } -func (rp *RequestPool) currentSyncForUser(req syncRequest) (*types.Response, error) { - currentPos := rp.notifier.WaitForEvents(req) - +func (rp *RequestPool) currentSyncForUser(req syncRequest, currentPos types.StreamPosition) (*types.Response, error) { if req.since == types.StreamPosition(0) { pos, data, err := rp.db.CompleteSync(req.userID, req.limit) if err != nil { From 94e1c6274508e5876f7f3b397064d32925c005cc Mon Sep 17 00:00:00 2001 From: Kegsay Date: Mon, 15 May 2017 15:18:08 +0100 Subject: [PATCH 2/3] Remove intermediary /sync response struct (#102) The logic required to populate the right bits of `RoomData` tends towards the complete `/sync` response struct, so just use the actual response struct and save the hassle of mapping between the two. It may not make much difference in its current form, but the next PR will make use of this. This PR has no functional changes. --- .../dendrite/syncapi/storage/syncserver.go | 39 +++++++++++-------- .../dendrite/syncapi/sync/requestpool.go | 34 ++-------------- .../dendrite/syncapi/types/types.go | 6 --- 3 files changed, 26 insertions(+), 53 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go index 47520ff2f..5d0bdaccf 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go @@ -113,8 +113,7 @@ func (d *SyncServerDatabase) SyncStreamPosition() (types.StreamPosition, error) } // IncrementalSync returns all the data needed in order to create an incremental sync response. -func (d *SyncServerDatabase) IncrementalSync(userID string, fromPos, toPos types.StreamPosition, numRecentEventsPerRoom int) (data map[string]types.RoomData, returnErr error) { - data = make(map[string]types.RoomData) +func (d *SyncServerDatabase) IncrementalSync(userID string, fromPos, toPos types.StreamPosition, numRecentEventsPerRoom int) (res *types.Response, returnErr error) { returnErr = runTransaction(d.db, func(txn *sql.Tx) error { roomIDs, err := d.roomstate.SelectRoomIDsWithMembership(txn, userID, "join") if err != nil { @@ -126,6 +125,8 @@ func (d *SyncServerDatabase) IncrementalSync(userID string, fromPos, toPos types return err } + res = types.NewResponse(toPos) + // Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821 // - Get membership list changes for this user in this sync response // - For each room which has membership list changes: @@ -167,35 +168,40 @@ func (d *SyncServerDatabase) IncrementalSync(userID string, fromPos, toPos types return err } state[roomID] = removeDuplicates(state[roomID], recentEvents) - roomData := types.RoomData{ - State: state[roomID], - RecentEvents: recentEvents, - } - data[roomID] = roomData + + jr := types.NewJoinResponse() + jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync) + jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true + jr.State.Events = gomatrixserverlib.ToClientEvents(state[roomID], gomatrixserverlib.FormatSync) + res.Rooms.Join[roomID] = *jr } return nil }) return } -// CompleteSync returns all the data needed in order to create a complete sync response. -func (d *SyncServerDatabase) CompleteSync(userID string, numRecentEventsPerRoom int) (pos types.StreamPosition, data map[string]types.RoomData, returnErr error) { - data = make(map[string]types.RoomData) +// CompleteSync a complete /sync API response for the given user. +func (d *SyncServerDatabase) CompleteSync(userID string, numRecentEventsPerRoom int) (res *types.Response, returnErr error) { // This needs to be all done in a transaction as we need to do multiple SELECTs, and we need to have // a consistent view of the database throughout. This includes extracting the sync stream position. + // This does have the unfortunate side-effect that all the matrixy logic resides in this function, + // but it's better to not hide the fact that this is being done in a transaction. returnErr = runTransaction(d.db, func(txn *sql.Tx) error { // Get the current stream position which we will base the sync response on. id, err := d.events.MaxID(txn) if err != nil { return err } - pos = types.StreamPosition(id) + pos := types.StreamPosition(id) // Extract room state and recent events for all rooms the user is joined to. roomIDs, err := d.roomstate.SelectRoomIDsWithMembership(txn, userID, "join") if err != nil { return err } + + // Build up a /sync response + res = types.NewResponse(pos) for _, roomID := range roomIDs { stateEvents, err := d.roomstate.CurrentState(txn, roomID) if err != nil { @@ -209,12 +215,13 @@ func (d *SyncServerDatabase) CompleteSync(userID string, numRecentEventsPerRoom } stateEvents = removeDuplicates(stateEvents, recentEvents) - - data[roomID] = types.RoomData{ - State: stateEvents, - RecentEvents: recentEvents, - } + jr := types.NewJoinResponse() + jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync) + jr.Timeline.Limited = true + jr.State.Events = gomatrixserverlib.ToClientEvents(stateEvents, gomatrixserverlib.FormatSync) + res.Rooms.Join[roomID] = *jr } + // TODO: Add invites! return nil }) return diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go b/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go index c9c415661..8e9affb68 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go @@ -24,7 +24,6 @@ import ( "github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/types" - "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" ) @@ -102,36 +101,9 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request) util.JSONRespons } func (rp *RequestPool) currentSyncForUser(req syncRequest, currentPos types.StreamPosition) (*types.Response, error) { - if req.since == types.StreamPosition(0) { - pos, data, err := rp.db.CompleteSync(req.userID, req.limit) - if err != nil { - return nil, err - } - res := types.NewResponse(pos) - for roomID, d := range data { - jr := types.NewJoinResponse() - jr.Timeline.Events = gomatrixserverlib.ToClientEvents(d.RecentEvents, gomatrixserverlib.FormatSync) - jr.Timeline.Limited = true - jr.State.Events = gomatrixserverlib.ToClientEvents(d.State, gomatrixserverlib.FormatSync) - res.Rooms.Join[roomID] = *jr - } - return res, nil - } - // TODO: handle ignored users - - data, err := rp.db.IncrementalSync(req.userID, req.since, currentPos, req.limit) - if err != nil { - return nil, err + if req.since == types.StreamPosition(0) { + return rp.db.CompleteSync(req.userID, req.limit) } - - res := types.NewResponse(currentPos) - for roomID, d := range data { - jr := types.NewJoinResponse() - jr.Timeline.Events = gomatrixserverlib.ToClientEvents(d.RecentEvents, gomatrixserverlib.FormatSync) - jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true - jr.State.Events = gomatrixserverlib.ToClientEvents(d.State, gomatrixserverlib.FormatSync) - res.Rooms.Join[roomID] = *jr - } - return res, nil + return rp.db.IncrementalSync(req.userID, req.since, currentPos, req.limit) } diff --git a/src/github.com/matrix-org/dendrite/syncapi/types/types.go b/src/github.com/matrix-org/dendrite/syncapi/types/types.go index 8c112936c..588737de2 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/types/types.go +++ b/src/github.com/matrix-org/dendrite/syncapi/types/types.go @@ -28,12 +28,6 @@ func (sp StreamPosition) String() string { return strconv.FormatInt(int64(sp), 10) } -// RoomData represents the data for a room suitable for building a sync response from. -type RoomData struct { - State []gomatrixserverlib.Event - RecentEvents []gomatrixserverlib.Event -} - // Response represents a /sync API response. See https://matrix.org/docs/spec/client_server/r0.2.0.html#get-matrix-client-r0-sync type Response struct { NextBatch string `json:"next_batch"` From 675759c192b90a76c7784cd45a0e8b8a8a5c046f Mon Sep 17 00:00:00 2001 From: Kegsay Date: Mon, 15 May 2017 17:41:54 +0100 Subject: [PATCH 3/3] Add invites to /sync responses (#103) --- .../cmd/syncserver-integration-tests/main.go | 29 +++++++++++++++++++ .../dendrite/syncapi/storage/syncserver.go | 25 ++++++++++++---- .../dendrite/syncapi/types/types.go | 2 +- 3 files changed, 50 insertions(+), 6 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/cmd/syncserver-integration-tests/main.go b/src/github.com/matrix-org/dendrite/cmd/syncserver-integration-tests/main.go index 8d04c7345..5a1c94e19 100644 --- a/src/github.com/matrix-org/dendrite/cmd/syncserver-integration-tests/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/syncserver-integration-tests/main.go @@ -508,6 +508,35 @@ func main() { // $ curl -XPUT -d '{"name":"A Different Custom Room Name"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.name?access_token=@alice:localhost" // $ curl -XPUT -d '{"msgtype":"m.text","body":"hello bob"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/2?access_token=@alice:localhost" // $ curl -XPUT -d '{"membership":"invite"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.member/@charlie:localhost?access_token=@bob:localhost" + if err := exe.WriteToTopic(inputTopic, canonicalJSONInput(outputRoomEventTestData[11:14])); err != nil { + panic(err) + } + + // Make sure charlie sees the invite both with and without a ?since= token + // TODO: Invite state should include the invite event and the room name. + charlieInviteData := `{ + "account_data": { + "events": [] + }, + "next_batch": "14", + "presence": { + "events": [] + }, + "rooms": { + "invite": { + "!PjrbIMW2cIiaYF4t:localhost": { + "invite_state": { + "events": [] + } + } + }, + "join": {}, + "leave": {} + } + }` + testSyncServer(syncServerCmdChan, "@charlie:localhost", "7", charlieInviteData) + testSyncServer(syncServerCmdChan, "@charlie:localhost", "", charlieInviteData) + // $ curl -XPUT -d '{"membership":"join"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.member/@charlie:localhost?access_token=@charlie:localhost" // $ curl -XPUT -d '{"msgtype":"m.text","body":"not charlie..."}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/3?access_token=@alice:localhost" // $ curl -XPUT -d '{"membership":"leave"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.member/@charlie:localhost?access_token=@alice:localhost" diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go index 5d0bdaccf..83c431671 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go @@ -132,7 +132,7 @@ func (d *SyncServerDatabase) IncrementalSync(userID string, fromPos, toPos types // - For each room which has membership list changes: // * Check if the room is 'newly joined' (insufficient to just check for a join event because we allow dupe joins TODO). // If it is, then we need to send the full room state down (and 'limited' is always true). - // * TODO Check if user is still CURRENTLY invited to the room. If so, add room to 'invited' block. + // * Check if user is still CURRENTLY invited to the room. If so, add room to 'invited' block. // * TODO Check if the user is CURRENTLY left/banned. If so, add room to 'archived' block. // work out which rooms transitioned to 'joined' between the 2 stream positions and add full state where needed. @@ -175,7 +175,8 @@ func (d *SyncServerDatabase) IncrementalSync(userID string, fromPos, toPos types jr.State.Events = gomatrixserverlib.ToClientEvents(state[roomID], gomatrixserverlib.FormatSync) res.Rooms.Join[roomID] = *jr } - return nil + + return d.addInvitesToResponse(txn, userID, res) }) return } @@ -200,7 +201,7 @@ func (d *SyncServerDatabase) CompleteSync(userID string, numRecentEventsPerRoom return err } - // Build up a /sync response + // Build up a /sync response. Add joined rooms. res = types.NewResponse(pos) for _, roomID := range roomIDs { stateEvents, err := d.roomstate.CurrentState(txn, roomID) @@ -221,12 +222,26 @@ func (d *SyncServerDatabase) CompleteSync(userID string, numRecentEventsPerRoom jr.State.Events = gomatrixserverlib.ToClientEvents(stateEvents, gomatrixserverlib.FormatSync) res.Rooms.Join[roomID] = *jr } - // TODO: Add invites! - return nil + + return d.addInvitesToResponse(txn, userID, res) }) return } +func (d *SyncServerDatabase) addInvitesToResponse(txn *sql.Tx, userID string, res *types.Response) error { + // Add invites - TODO: This will break over federation as they won't be in the current state table according to Mark. + roomIDs, err := d.roomstate.SelectRoomIDsWithMembership(txn, userID, "invite") + if err != nil { + return err + } + for _, roomID := range roomIDs { + ir := types.NewInviteResponse() + // TODO: invite_state. The state won't be in the current state table in cases where you get invited over federation + res.Rooms.Invite[roomID] = *ir + } + return nil +} + // There may be some overlap where events in stateEvents are already in recentEvents, so filter // them out so we don't include them twice in the /sync response. They should be in recentEvents // only, so clients get to the correct state once they have rolled forward. diff --git a/src/github.com/matrix-org/dendrite/syncapi/types/types.go b/src/github.com/matrix-org/dendrite/syncapi/types/types.go index 588737de2..a1c3e3c76 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/types/types.go +++ b/src/github.com/matrix-org/dendrite/syncapi/types/types.go @@ -97,7 +97,7 @@ func NewJoinResponse() *JoinResponse { // InviteResponse represents a /sync response for a room which is under the 'invite' key. type InviteResponse struct { InviteState struct { - Events []gomatrixserverlib.ClientEvent + Events []gomatrixserverlib.ClientEvent `json:"events"` } `json:"invite_state"` }