diff --git a/src/github.com/matrix-org/dendrite/cmd/syncserver-integration-tests/main.go b/src/github.com/matrix-org/dendrite/cmd/syncserver-integration-tests/main.go index 121f8f3cc..80aaf2427 100644 --- a/src/github.com/matrix-org/dendrite/cmd/syncserver-integration-tests/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/syncserver-integration-tests/main.go @@ -547,14 +547,15 @@ func main() { // $ curl -XPUT -d '{"membership":"join"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.member/@charlie:localhost?access_token=@charlie:localhost" // $ curl -XPUT -d '{"msgtype":"m.text","body":"not charlie..."}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/3?access_token=@alice:localhost" // $ curl -XPUT -d '{"membership":"leave"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.member/@charlie:localhost?access_token=@alice:localhost" - writeToRoomServerLog(i14StateCharlieJoin, i15AliceMsg, i16StateAliceKickCharlie) + // $ curl -XPUT -d '{"msgtype":"m.text","body":"why did you kick charlie"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/3?access_token=@bob:localhost" + writeToRoomServerLog(i14StateCharlieJoin, i15AliceMsg, i16StateAliceKickCharlie, i17BobMsg) // Check transitions to leave work testSyncServer(syncServerCmdChan, "@charlie:localhost", "15", `{ "account_data": { "events": [] }, - "next_batch": "17", + "next_batch": "18", "presence": { "events": [] }, @@ -586,7 +587,7 @@ func main() { "account_data": { "events": [] }, - "next_batch": "17", + "next_batch": "18", "presence": { "events": [] }, @@ -611,9 +612,8 @@ func main() { } }`) - // $ curl -XPUT -d '{"msgtype":"m.text","body":"why did you kick charlie"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/send/m.room.message/3?access_token=@bob:localhost" // $ curl -XPUT -d '{"name":"No Charlies"}' "http://localhost:8009/_matrix/client/r0/rooms/%21PjrbIMW2cIiaYF4t:localhost/state/m.room.name?access_token=@alice:localhost" - writeToRoomServerLog(i17BobMsg, i18StateAliceRoomName) + writeToRoomServerLog(i18StateAliceRoomName) // Check that users don't see state changes in rooms after they have left testSyncServer(syncServerCmdChan, "@charlie:localhost", "17", `{ diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go b/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go index 5a91966cf..6196fa75a 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go @@ -50,30 +50,26 @@ const insertEventSQL = "" + "INSERT INTO output_room_events (room_id, event_id, event_json, add_state_ids, remove_state_ids) VALUES ($1, $2, $3, $4, $5) RETURNING id" const selectEventsSQL = "" + - "SELECT event_json FROM output_room_events WHERE event_id = ANY($1)" - -const selectEventsInRangeSQL = "" + - "SELECT event_json FROM output_room_events WHERE id > $1 AND id <= $2" + "SELECT id, event_json FROM output_room_events WHERE event_id = ANY($1)" const selectRecentEventsSQL = "" + - "SELECT event_json FROM output_room_events WHERE room_id = $1 AND id > $2 AND id <= $3 ORDER BY id DESC LIMIT $4" + "SELECT id, event_json FROM output_room_events WHERE room_id = $1 AND id > $2 AND id <= $3 ORDER BY id DESC LIMIT $4" const selectMaxIDSQL = "" + "SELECT MAX(id) FROM output_room_events" // In order for us to apply the state updates correctly, rows need to be ordered in the order they were received (id). const selectStateInRangeSQL = "" + - "SELECT event_json, add_state_ids, remove_state_ids FROM output_room_events" + + "SELECT id, event_json, add_state_ids, remove_state_ids FROM output_room_events" + " WHERE (id > $1 AND id <= $2) AND (add_state_ids IS NOT NULL OR remove_state_ids IS NOT NULL)" + " ORDER BY id ASC" type outputRoomEventsStatements struct { - insertEventStmt *sql.Stmt - selectEventsStmt *sql.Stmt - selectMaxIDStmt *sql.Stmt - selectEventsInRangeStmt *sql.Stmt - selectRecentEventsStmt *sql.Stmt - selectStateInRangeStmt *sql.Stmt + insertEventStmt *sql.Stmt + selectEventsStmt *sql.Stmt + selectMaxIDStmt *sql.Stmt + selectRecentEventsStmt *sql.Stmt + selectStateInRangeStmt *sql.Stmt } func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) { @@ -90,9 +86,6 @@ func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) { if s.selectMaxIDStmt, err = db.Prepare(selectMaxIDSQL); err != nil { return } - if s.selectEventsInRangeStmt, err = db.Prepare(selectEventsInRangeSQL); err != nil { - return - } if s.selectRecentEventsStmt, err = db.Prepare(selectRecentEventsSQL); err != nil { return } @@ -105,7 +98,7 @@ func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) { // StateBetween returns the state events between the two given stream positions, exclusive of oldPos, inclusive of newPos. // Results are bucketed based on the room ID. If the same state is overwritten multiple times between the // two positions, only the most recent state is returned. -func (s *outputRoomEventsStatements) StateBetween(txn *sql.Tx, oldPos, newPos types.StreamPosition) (map[string][]gomatrixserverlib.Event, error) { +func (s *outputRoomEventsStatements) StateBetween(txn *sql.Tx, oldPos, newPos types.StreamPosition) (map[string][]streamEvent, error) { rows, err := txn.Stmt(s.selectStateInRangeStmt).Query(oldPos, newPos) if err != nil { return nil, err @@ -115,18 +108,19 @@ func (s *outputRoomEventsStatements) StateBetween(txn *sql.Tx, oldPos, newPos ty // - For each room ID, build up an array of event IDs which represents cumulative adds/removes // For each room, map cumulative event IDs to events and return. This may need to a batch SELECT based on event ID // if they aren't in the event ID cache. We don't handle state deletion yet. - eventIDToEvent := make(map[string]gomatrixserverlib.Event) + eventIDToEvent := make(map[string]streamEvent) // RoomID => A set (map[string]bool) of state event IDs which are between the two positions stateNeeded := make(map[string]map[string]bool) for rows.Next() { var ( + streamPos int64 eventBytes []byte addIDs pq.StringArray delIDs pq.StringArray ) - if err := rows.Scan(&eventBytes, &addIDs, &delIDs); err != nil { + if err := rows.Scan(&streamPos, &eventBytes, &addIDs, &delIDs); err != nil { return nil, err } // Sanity check for deleted state and whine if we see it. We don't need to do anything @@ -157,7 +151,7 @@ func (s *outputRoomEventsStatements) StateBetween(txn *sql.Tx, oldPos, newPos ty } stateNeeded[ev.RoomID()] = needSet - eventIDToEvent[ev.EventID()] = ev + eventIDToEvent[ev.EventID()] = streamEvent{ev, types.StreamPosition(streamPos)} } return s.fetchStateEvents(txn, stateNeeded, eventIDToEvent) @@ -165,8 +159,8 @@ func (s *outputRoomEventsStatements) StateBetween(txn *sql.Tx, oldPos, newPos ty // fetchStateEvents converts the set of event IDs into a set of events. It will fetch any which are missing from the database. // Returns a map of room ID to list of events. -func (s *outputRoomEventsStatements) fetchStateEvents(txn *sql.Tx, roomIDToEventIDSet map[string]map[string]bool, eventIDToEvent map[string]gomatrixserverlib.Event) (map[string][]gomatrixserverlib.Event, error) { - stateBetween := make(map[string][]gomatrixserverlib.Event) +func (s *outputRoomEventsStatements) fetchStateEvents(txn *sql.Tx, roomIDToEventIDSet map[string]map[string]bool, eventIDToEvent map[string]streamEvent) (map[string][]streamEvent, error) { + stateBetween := make(map[string][]streamEvent) missingEvents := make(map[string][]string) for roomID, ids := range roomIDToEventIDSet { events := stateBetween[roomID] @@ -232,7 +226,7 @@ func (s *outputRoomEventsStatements) InsertEvent(txn *sql.Tx, event *gomatrixser } // RecentEventsInRoom returns the most recent events in the given room, up to a maximum of 'limit'. -func (s *outputRoomEventsStatements) RecentEventsInRoom(txn *sql.Tx, roomID string, fromPos, toPos types.StreamPosition, limit int) ([]gomatrixserverlib.Event, error) { +func (s *outputRoomEventsStatements) RecentEventsInRoom(txn *sql.Tx, roomID string, fromPos, toPos types.StreamPosition, limit int) ([]streamEvent, error) { rows, err := s.selectRecentEventsStmt.Query(roomID, fromPos, toPos, limit) if err != nil { return nil, err @@ -249,7 +243,7 @@ func (s *outputRoomEventsStatements) RecentEventsInRoom(txn *sql.Tx, roomID stri // Events returns the events for the given event IDs. Returns an error if any one of the event IDs given are missing // from the database. -func (s *outputRoomEventsStatements) Events(txn *sql.Tx, eventIDs []string) ([]gomatrixserverlib.Event, error) { +func (s *outputRoomEventsStatements) Events(txn *sql.Tx, eventIDs []string) ([]streamEvent, error) { rows, err := txn.Stmt(s.selectEventsStmt).Query(pq.StringArray(eventIDs)) if err != nil { return nil, err @@ -266,11 +260,14 @@ func (s *outputRoomEventsStatements) Events(txn *sql.Tx, eventIDs []string) ([]g return result, nil } -func rowsToEvents(rows *sql.Rows) ([]gomatrixserverlib.Event, error) { - var result []gomatrixserverlib.Event +func rowsToEvents(rows *sql.Rows) ([]streamEvent, error) { + var result []streamEvent for rows.Next() { - var eventBytes []byte - if err := rows.Scan(&eventBytes); err != nil { + var ( + streamPos int64 + eventBytes []byte + ) + if err := rows.Scan(&streamPos, &eventBytes); err != nil { return nil, err } // TODO: Handle redacted events @@ -278,12 +275,12 @@ func rowsToEvents(rows *sql.Rows) ([]gomatrixserverlib.Event, error) { if err != nil { return nil, err } - result = append(result, ev) + result = append(result, streamEvent{ev, types.StreamPosition(streamPos)}) } return result, nil } -func reverseEvents(input []gomatrixserverlib.Event) (output []gomatrixserverlib.Event) { +func reverseEvents(input []streamEvent) (output []streamEvent) { for i := len(input) - 1; i >= 0; i-- { output = append(output, input[i]) } diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go index fb1a5c16e..326d12c40 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go @@ -29,6 +29,15 @@ type stateDelta struct { roomID string stateEvents []gomatrixserverlib.Event membership string + // The stream position of the latest membership event for this user, if applicable. + // Can be 0 if there is no membership event in this delta. + membershipPos types.StreamPosition +} + +// Same as gomatrixserverlib.Event but also has the stream position for this event. +type streamEvent struct { + gomatrixserverlib.Event + streamPosition types.StreamPosition } // SyncServerDatabase represents a sync server database @@ -99,7 +108,7 @@ func (d *SyncServerDatabase) WriteEvent(ev *gomatrixserverlib.Event, addStateEve if err != nil { return err } - return d.roomstate.UpdateRoomState(txn, added, removeStateEventIDs) + return d.roomstate.UpdateRoomState(txn, streamEventsToEvents(added), removeStateEventIDs) }) return } @@ -137,10 +146,21 @@ func (d *SyncServerDatabase) IncrementalSync(userID string, fromPos, toPos types res = types.NewResponse(toPos) for _, delta := range deltas { - recentEvents, err := d.events.RecentEventsInRoom(txn, delta.roomID, fromPos, toPos, numRecentEventsPerRoom) + endPos := toPos + if delta.membershipPos > 0 && delta.membership == "leave" { + // make sure we don't leak recent events after the leave event. + // TODO: History visibility makes this somewhat complex to handle correctly. For example: + // TODO: This doesn't work for join -> leave in a single /sync request (see events prior to join). + // TODO: This will fail on join -> leave -> sensitive msg -> join -> leave + // in a single /sync request + // This is all "okay" assuming history_visibility == "shared" which it is by default. + endPos = delta.membershipPos + } + recentStreamEvents, err := d.events.RecentEventsInRoom(txn, delta.roomID, fromPos, endPos, numRecentEventsPerRoom) if err != nil { return err } + recentEvents := streamEventsToEvents(recentStreamEvents) delta.stateEvents = removeDuplicates(delta.stateEvents, recentEvents) // roll back switch delta.membership { @@ -198,10 +218,11 @@ func (d *SyncServerDatabase) CompleteSync(userID string, numRecentEventsPerRoom } // TODO: When filters are added, we may need to call this multiple times to get enough events. // See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316 - recentEvents, err := d.events.RecentEventsInRoom(txn, roomID, types.StreamPosition(0), pos, numRecentEventsPerRoom) + recentStreamEvents, err := d.events.RecentEventsInRoom(txn, roomID, types.StreamPosition(0), pos, numRecentEventsPerRoom) if err != nil { return err } + recentEvents := streamEventsToEvents(recentStreamEvents) stateEvents = removeDuplicates(stateEvents, recentEvents) jr := types.NewJoinResponse() @@ -246,14 +267,14 @@ func (d *SyncServerDatabase) getStateDeltas(txn *sql.Tx, fromPos, toPos types.St if err != nil { return nil, err } - for roomID, stateEvents := range state { - for _, ev := range stateEvents { + for roomID, stateStreamEvents := range state { + for _, ev := range stateStreamEvents { // TODO: Currently this will incorrectly add rooms which were ALREADY joined but they sent another no-op join event. // We should be checking if the user was already joined at fromPos and not proceed if so. As a result of this, // dupe join events will result in the entire room state coming down to the client again. This is added in // the 'state' part of the response though, so is transparent modulo bandwidth concerns as it is not added to // the timeline. - if membership := getMembershipFromEvent(&ev, userID); membership != "" { + if membership := getMembershipFromEvent(&ev.Event, userID); membership != "" { if membership == "join" { // send full room state down instead of a delta var allState []gomatrixserverlib.Event @@ -261,14 +282,19 @@ func (d *SyncServerDatabase) getStateDeltas(txn *sql.Tx, fromPos, toPos types.St if err != nil { return nil, err } - state[roomID] = allState + s := make([]streamEvent, len(allState)) + for i := 0; i < len(s); i++ { + s[i] = streamEvent{allState[i], types.StreamPosition(0)} + } + state[roomID] = s continue // we'll add this room in when we do joined rooms } deltas = append(deltas, stateDelta{ - membership: membership, - stateEvents: stateEvents, - roomID: roomID, + membership: membership, + membershipPos: ev.streamPosition, + stateEvents: streamEventsToEvents(stateStreamEvents), + roomID: roomID, }) break } @@ -283,7 +309,7 @@ func (d *SyncServerDatabase) getStateDeltas(txn *sql.Tx, fromPos, toPos types.St for _, joinedRoomID := range joinedRoomIDs { deltas = append(deltas, stateDelta{ membership: "join", - stateEvents: state[joinedRoomID], + stateEvents: streamEventsToEvents(state[joinedRoomID]), roomID: joinedRoomID, }) } @@ -291,6 +317,14 @@ func (d *SyncServerDatabase) getStateDeltas(txn *sql.Tx, fromPos, toPos types.St return deltas, nil } +func streamEventsToEvents(in []streamEvent) []gomatrixserverlib.Event { + out := make([]gomatrixserverlib.Event, len(in)) + for i := 0; i < len(in); i++ { + out[i] = in[i].Event + } + return out +} + // There may be some overlap where events in stateEvents are already in recentEvents, so filter // them out so we don't include them twice in the /sync response. They should be in recentEvents // only, so clients get to the correct state once they have rolled forward.