diff --git a/src/github.com/matrix-org/dendrite/cmd/syncserver-integration-tests/main.go b/src/github.com/matrix-org/dendrite/cmd/syncserver-integration-tests/main.go index badf2279d..5a1c94e19 100644 --- a/src/github.com/matrix-org/dendrite/cmd/syncserver-integration-tests/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/syncserver-integration-tests/main.go @@ -392,7 +392,6 @@ func main() { } // Make sure alice sees it TODO: prev_batch - // TODO: Make sure bob sees it AND all the current room state testSyncServer(syncServerCmdChan, "@alice:localhost", "9", `{ "account_data": { "events": [] @@ -425,6 +424,46 @@ func main() { } }`) + // Make sure bob sees the room AND all the current room state TODO: history visibility + testSyncServer(syncServerCmdChan, "@bob:localhost", "9", `{ + "account_data": { + "events": [] + }, + "next_batch": "10", + "presence": { + "events": [] + }, + "rooms": { + "invite": {}, + "join": { + "!PjrbIMW2cIiaYF4t:localhost": { + "account_data": { + "events": [] + }, + "ephemeral": { + "events": [] + }, + "state": { + "events": [`+ + clientEventTestData[0]+","+ + clientEventTestData[1]+","+ + clientEventTestData[2]+","+ + clientEventTestData[3]+","+ + clientEventTestData[4]+","+ + clientEventTestData[8]+`] + }, + "timeline": { + "limited": false, + "prev_batch": "", + "events": [`+ + clientEventTestData[9]+`] + } + } + }, + "leave": {} + } + }`) + // $ curl -XPUT -d '{"msgtype":"m.text","body":"hello alice"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/1?access_token=@bob:localhost" if err := exe.WriteToTopic(inputTopic, canonicalJSONInput([]string{outputRoomEventTestData[10]})); err != nil { panic(err) @@ -469,6 +508,35 @@ func main() { // $ curl -XPUT -d '{"name":"A Different Custom Room Name"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.name?access_token=@alice:localhost" // $ curl -XPUT -d '{"msgtype":"m.text","body":"hello bob"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/2?access_token=@alice:localhost" // $ curl -XPUT -d '{"membership":"invite"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.member/@charlie:localhost?access_token=@bob:localhost" + if err := exe.WriteToTopic(inputTopic, canonicalJSONInput(outputRoomEventTestData[11:14])); err != nil { + panic(err) + } + + // Make sure charlie sees the invite both with and without a ?since= token + // TODO: Invite state should include the invite event and the room name. + charlieInviteData := `{ + "account_data": { + "events": [] + }, + "next_batch": "14", + "presence": { + "events": [] + }, + "rooms": { + "invite": { + "!PjrbIMW2cIiaYF4t:localhost": { + "invite_state": { + "events": [] + } + } + }, + "join": {}, + "leave": {} + } + }` + testSyncServer(syncServerCmdChan, "@charlie:localhost", "7", charlieInviteData) + testSyncServer(syncServerCmdChan, "@charlie:localhost", "", charlieInviteData) + // $ curl -XPUT -d '{"membership":"join"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.member/@charlie:localhost?access_token=@charlie:localhost" // $ curl -XPUT -d '{"msgtype":"m.text","body":"not charlie..."}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/3?access_token=@alice:localhost" // $ curl -XPUT -d '{"membership":"leave"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.member/@charlie:localhost?access_token=@alice:localhost" diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go b/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go index 222b105c6..5a91966cf 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go @@ -64,7 +64,7 @@ const selectMaxIDSQL = "" + // In order for us to apply the state updates correctly, rows need to be ordered in the order they were received (id). const selectStateInRangeSQL = "" + "SELECT event_json, add_state_ids, remove_state_ids FROM output_room_events" + - " WHERE (id > $1 AND id < $2) AND (add_state_ids IS NOT NULL OR remove_state_ids IS NOT NULL)" + + " WHERE (id > $1 AND id <= $2) AND (add_state_ids IS NOT NULL OR remove_state_ids IS NOT NULL)" + " ORDER BY id ASC" type outputRoomEventsStatements struct { @@ -102,7 +102,7 @@ func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) { return } -// StateBetween returns the state events between the two given stream positions, exclusive of both. +// StateBetween returns the state events between the two given stream positions, exclusive of oldPos, inclusive of newPos. // Results are bucketed based on the room ID. If the same state is overwritten multiple times between the // two positions, only the most recent state is returned. func (s *outputRoomEventsStatements) StateBetween(txn *sql.Tx, oldPos, newPos types.StreamPosition) (map[string][]gomatrixserverlib.Event, error) { diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go index d0411c00c..92027c6af 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go @@ -16,8 +16,10 @@ package storage import ( "database/sql" + "encoding/json" // Import the postgres database driver. _ "github.com/lib/pq" + "github.com/matrix-org/dendrite/clientapi/events" "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" @@ -116,8 +118,7 @@ func (d *SyncServerDatabase) SyncStreamPosition() (types.StreamPosition, error) } // IncrementalSync returns all the data needed in order to create an incremental sync response. -func (d *SyncServerDatabase) IncrementalSync(userID string, fromPos, toPos types.StreamPosition, numRecentEventsPerRoom int) (data map[string]types.RoomData, returnErr error) { - data = make(map[string]types.RoomData) +func (d *SyncServerDatabase) IncrementalSync(userID string, fromPos, toPos types.StreamPosition, numRecentEventsPerRoom int) (res *types.Response, returnErr error) { returnErr = runTransaction(d.db, func(txn *sql.Tx) error { roomIDs, err := d.roomstate.SelectRoomIDsWithMembership(txn, userID, "join") if err != nil { @@ -129,41 +130,84 @@ func (d *SyncServerDatabase) IncrementalSync(userID string, fromPos, toPos types return err } + res = types.NewResponse(toPos) + + // Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821 + // - Get membership list changes for this user in this sync response + // - For each room which has membership list changes: + // * Check if the room is 'newly joined' (insufficient to just check for a join event because we allow dupe joins TODO). + // If it is, then we need to send the full room state down (and 'limited' is always true). + // * Check if user is still CURRENTLY invited to the room. If so, add room to 'invited' block. + // * TODO Check if the user is CURRENTLY left/banned. If so, add room to 'archived' block. + + // work out which rooms transitioned to 'joined' between the 2 stream positions and add full state where needed. + for roomID, stateEvents := range state { + for _, ev := range stateEvents { + // TODO: Currently this will incorrectly add rooms which were ALREADY joined but they sent another no-op join event. + // We should be checking if the user was already joined at fromPos and not proceed if so. As a result of this, + // dupe join events will result in the entire room state coming down to the client again. This is added in + // the 'state' part of the response though, so is transparent modulo bandwidth concerns as it is not added to + // the timeline. + if ev.Type() == "m.room.member" && ev.StateKeyEquals(userID) { + var memberContent events.MemberContent + if err := json.Unmarshal(ev.Content(), &memberContent); err != nil { + return err + } + if memberContent.Membership != "join" { + continue + } + + allState, err := d.roomstate.CurrentState(txn, roomID) + if err != nil { + return err + } + state[roomID] = allState + } + + } + } + for _, roomID := range roomIDs { recentEvents, err := d.events.RecentEventsInRoom(txn, roomID, fromPos, toPos, numRecentEventsPerRoom) if err != nil { return err } state[roomID] = removeDuplicates(state[roomID], recentEvents) - roomData := types.RoomData{ - State: state[roomID], - RecentEvents: recentEvents, - } - data[roomID] = roomData + + jr := types.NewJoinResponse() + jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync) + jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true + jr.State.Events = gomatrixserverlib.ToClientEvents(state[roomID], gomatrixserverlib.FormatSync) + res.Rooms.Join[roomID] = *jr } - return nil + + return d.addInvitesToResponse(txn, userID, res) }) return } -// CompleteSync returns all the data needed in order to create a complete sync response. -func (d *SyncServerDatabase) CompleteSync(userID string, numRecentEventsPerRoom int) (pos types.StreamPosition, data map[string]types.RoomData, returnErr error) { - data = make(map[string]types.RoomData) +// CompleteSync a complete /sync API response for the given user. +func (d *SyncServerDatabase) CompleteSync(userID string, numRecentEventsPerRoom int) (res *types.Response, returnErr error) { // This needs to be all done in a transaction as we need to do multiple SELECTs, and we need to have // a consistent view of the database throughout. This includes extracting the sync stream position. + // This does have the unfortunate side-effect that all the matrixy logic resides in this function, + // but it's better to not hide the fact that this is being done in a transaction. returnErr = runTransaction(d.db, func(txn *sql.Tx) error { // Get the current stream position which we will base the sync response on. id, err := d.events.MaxID(txn) if err != nil { return err } - pos = types.StreamPosition(id) + pos := types.StreamPosition(id) // Extract room state and recent events for all rooms the user is joined to. roomIDs, err := d.roomstate.SelectRoomIDsWithMembership(txn, userID, "join") if err != nil { return err } + + // Build up a /sync response. Add joined rooms. + res = types.NewResponse(pos) for _, roomID := range roomIDs { stateEvents, err := d.roomstate.CurrentState(txn, roomID) if err != nil { @@ -177,17 +221,32 @@ func (d *SyncServerDatabase) CompleteSync(userID string, numRecentEventsPerRoom } stateEvents = removeDuplicates(stateEvents, recentEvents) - - data[roomID] = types.RoomData{ - State: stateEvents, - RecentEvents: recentEvents, - } + jr := types.NewJoinResponse() + jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync) + jr.Timeline.Limited = true + jr.State.Events = gomatrixserverlib.ToClientEvents(stateEvents, gomatrixserverlib.FormatSync) + res.Rooms.Join[roomID] = *jr } - return nil + + return d.addInvitesToResponse(txn, userID, res) }) return } +func (d *SyncServerDatabase) addInvitesToResponse(txn *sql.Tx, userID string, res *types.Response) error { + // Add invites - TODO: This will break over federation as they won't be in the current state table according to Mark. + roomIDs, err := d.roomstate.SelectRoomIDsWithMembership(txn, userID, "invite") + if err != nil { + return err + } + for _, roomID := range roomIDs { + ir := types.NewInviteResponse() + // TODO: invite_state. The state won't be in the current state table in cases where you get invited over federation + res.Rooms.Invite[roomID] = *ir + } + return nil +} + // There may be some overlap where events in stateEvents are already in recentEvents, so filter // them out so we don't include them twice in the /sync response. They should be in recentEvents // only, so clients get to the correct state once they have rolled forward. diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go b/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go index eee117e76..8e9affb68 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go @@ -24,7 +24,6 @@ import ( "github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/types" - "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" ) @@ -64,7 +63,6 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request) util.JSONRespons // Fork off 2 goroutines: one to do the work, and one to serve as a timeout. // Whichever returns first is the one we will serve back to the client. - // TODO: Currently this means that cpu work is timed, which may not be what we want long term. timeoutChan := make(chan struct{}) timer := time.AfterFunc(syncReq.timeout, func() { close(timeoutChan) // signal that the timeout has expired @@ -72,8 +70,12 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request) util.JSONRespons done := make(chan util.JSONResponse) go func() { - syncData, err := rp.currentSyncForUser(*syncReq) + currentPos := rp.notifier.WaitForEvents(*syncReq) + // We stop the timer BEFORE calculating the response so the cpu work + // done to calculate the response is not timed. This stops us from + // doing lots of work then timing out and sending back an empty response. timer.Stop() + syncData, err := rp.currentSyncForUser(*syncReq, currentPos) var res util.JSONResponse if err != nil { res = httputil.LogThenError(req, err) @@ -98,39 +100,10 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request) util.JSONRespons } } -func (rp *RequestPool) currentSyncForUser(req syncRequest) (*types.Response, error) { - currentPos := rp.notifier.WaitForEvents(req) - - if req.since == types.StreamPosition(0) { - pos, data, err := rp.db.CompleteSync(req.userID, req.limit) - if err != nil { - return nil, err - } - res := types.NewResponse(pos) - for roomID, d := range data { - jr := types.NewJoinResponse() - jr.Timeline.Events = gomatrixserverlib.ToClientEvents(d.RecentEvents, gomatrixserverlib.FormatSync) - jr.Timeline.Limited = true - jr.State.Events = gomatrixserverlib.ToClientEvents(d.State, gomatrixserverlib.FormatSync) - res.Rooms.Join[roomID] = *jr - } - return res, nil - } - +func (rp *RequestPool) currentSyncForUser(req syncRequest, currentPos types.StreamPosition) (*types.Response, error) { // TODO: handle ignored users - - data, err := rp.db.IncrementalSync(req.userID, req.since, currentPos, req.limit) - if err != nil { - return nil, err + if req.since == types.StreamPosition(0) { + return rp.db.CompleteSync(req.userID, req.limit) } - - res := types.NewResponse(currentPos) - for roomID, d := range data { - jr := types.NewJoinResponse() - jr.Timeline.Events = gomatrixserverlib.ToClientEvents(d.RecentEvents, gomatrixserverlib.FormatSync) - jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true - jr.State.Events = gomatrixserverlib.ToClientEvents(d.State, gomatrixserverlib.FormatSync) - res.Rooms.Join[roomID] = *jr - } - return res, nil + return rp.db.IncrementalSync(req.userID, req.since, currentPos, req.limit) } diff --git a/src/github.com/matrix-org/dendrite/syncapi/types/types.go b/src/github.com/matrix-org/dendrite/syncapi/types/types.go index 8c112936c..a1c3e3c76 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/types/types.go +++ b/src/github.com/matrix-org/dendrite/syncapi/types/types.go @@ -28,12 +28,6 @@ func (sp StreamPosition) String() string { return strconv.FormatInt(int64(sp), 10) } -// RoomData represents the data for a room suitable for building a sync response from. -type RoomData struct { - State []gomatrixserverlib.Event - RecentEvents []gomatrixserverlib.Event -} - // Response represents a /sync API response. See https://matrix.org/docs/spec/client_server/r0.2.0.html#get-matrix-client-r0-sync type Response struct { NextBatch string `json:"next_batch"` @@ -103,7 +97,7 @@ func NewJoinResponse() *JoinResponse { // InviteResponse represents a /sync response for a room which is under the 'invite' key. type InviteResponse struct { InviteState struct { - Events []gomatrixserverlib.ClientEvent + Events []gomatrixserverlib.ClientEvent `json:"events"` } `json:"invite_state"` }