diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go b/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go index 531f2941b..3533b7b62 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go @@ -36,6 +36,9 @@ CREATE TABLE IF NOT EXISTS current_room_state ( -- The 'content.membership' value if this event is an m.room.member event. For other -- events, this will be NULL. membership TEXT, + -- The serial ID of the output_room_events table when this event became + -- part of the current state of the room. + added_at BIGINT, -- Clobber based on 3-uple of room_id, type and state_key CONSTRAINT room_state_unique UNIQUE (room_id, type, state_key) ); @@ -46,9 +49,10 @@ CREATE INDEX IF NOT EXISTS membership_idx ON current_room_state(type, state_key, ` const upsertRoomStateSQL = "" + - "INSERT INTO current_room_state (room_id, event_id, type, state_key, event_json, membership) VALUES ($1, $2, $3, $4, $5, $6)" + + "INSERT INTO current_room_state (room_id, event_id, type, state_key, event_json, membership, added_at_id)" + + " VALUES ($1, $2, $3, $4, $5, $6, $7)" + " ON CONFLICT ON CONSTRAINT room_state_unique" + - " DO UPDATE SET event_id = $2, event_json = $5, membership = $6" + " DO UPDATE SET event_id = $2, event_json = $5, membership = $6, added_at_id = $7" const deleteRoomStateByEventIDSQL = "" + "DELETE FROM current_room_state WHERE event_id = $1" @@ -63,7 +67,7 @@ const selectJoinedUsersSQL = "" + "SELECT room_id, state_key FROM current_room_state WHERE type = 'm.room.member' AND membership = 'join'" const selectEventsWithEventIDsSQL = "" + - "SELECT event_json FROM current_room_state WHERE event_id = ANY($1)" + "SELECT added_at, event_json FROM current_room_state WHERE event_id = ANY($1)" type currentRoomStateStatements struct { upsertRoomStateStmt *sql.Stmt @@ -157,20 +161,22 @@ func (s *currentRoomStateStatements) deleteRoomStateByEventID(txn *sql.Tx, event return err } -func (s *currentRoomStateStatements) upsertRoomState(txn *sql.Tx, event gomatrixserverlib.Event, membership *string) error { +func (s *currentRoomStateStatements) upsertRoomState( + txn *sql.Tx, event gomatrixserverlib.Event, membership *string, addedAt int64, +) error { _, err := txn.Stmt(s.upsertRoomStateStmt).Exec( - event.RoomID(), event.EventID(), event.Type(), *event.StateKey(), event.JSON(), membership, + event.RoomID(), event.EventID(), event.Type(), *event.StateKey(), event.JSON(), membership, addedAt, ) return err } -func (s *currentRoomStateStatements) selectEventsWithEventIDs(txn *sql.Tx, eventIDs []string) ([]gomatrixserverlib.Event, error) { +func (s *currentRoomStateStatements) selectEventsWithEventIDs(txn *sql.Tx, eventIDs []string) ([]streamEvent, error) { rows, err := txn.Stmt(s.selectEventsWithEventIDsStmt).Query(pq.StringArray(eventIDs)) if err != nil { return nil, err } defer rows.Close() - return rowsToEvents(rows) + return rowsToStreamEvents(rows) } func rowsToEvents(rows *sql.Rows) ([]gomatrixserverlib.Event, error) { diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go index b97cd548e..6eb0408d1 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go @@ -107,12 +107,14 @@ func (d *SyncServerDatabase) WriteEvent( return nil } - return d.updateRoomState(txn, removeStateEventIDs, addStateEvents) + return d.updateRoomState(txn, removeStateEventIDs, addStateEvents, streamPos) }) return } -func (d *SyncServerDatabase) updateRoomState(txn *sql.Tx, removedEventIDs []string, addedEvents []gomatrixserverlib.Event) error { +func (d *SyncServerDatabase) updateRoomState( + txn *sql.Tx, removedEventIDs []string, addedEvents []gomatrixserverlib.Event, streamPos types.StreamPosition, +) error { // remove first, then add, as we do not ever delete state, but do replace state which is a remove followed by an add. for _, eventID := range removedEventIDs { if err := d.roomstate.deleteRoomStateByEventID(txn, eventID); err != nil { @@ -133,7 +135,7 @@ func (d *SyncServerDatabase) updateRoomState(txn *sql.Tx, removedEventIDs []stri } membership = &memberContent.Membership } - if err := d.roomstate.upsertRoomState(txn, event, membership); err != nil { + if err := d.roomstate.upsertRoomState(txn, event, membership, int64(streamPos)); err != nil { return err } } @@ -362,7 +364,7 @@ func (d *SyncServerDatabase) fetchMissingStateEvents(txn *sql.Tx, eventIDs []str // stream so probably happened before it. // TOOD: What happens if we receive a state event from outside the // timeline associated with an event in the middle of the timeline? - events = append(events, streamEvent{e, 0}) + events = append(events, e) } return events, nil }