Include the stream position that a state event was added at in the current state tables

This commit is contained in:
Mark Haines 2017-06-07 16:13:56 +01:00
parent 59f4745ca5
commit 2c6e061688
2 changed files with 19 additions and 11 deletions

View file

@ -36,6 +36,9 @@ CREATE TABLE IF NOT EXISTS current_room_state (
-- The 'content.membership' value if this event is an m.room.member event. For other
-- events, this will be NULL.
membership TEXT,
-- The serial ID of the output_room_events table when this event became
-- part of the current state of the room.
added_at BIGINT,
-- Clobber based on 3-uple of room_id, type and state_key
CONSTRAINT room_state_unique UNIQUE (room_id, type, state_key)
);
@ -46,9 +49,10 @@ CREATE INDEX IF NOT EXISTS membership_idx ON current_room_state(type, state_key,
`
const upsertRoomStateSQL = "" +
"INSERT INTO current_room_state (room_id, event_id, type, state_key, event_json, membership) VALUES ($1, $2, $3, $4, $5, $6)" +
"INSERT INTO current_room_state (room_id, event_id, type, state_key, event_json, membership, added_at_id)" +
" VALUES ($1, $2, $3, $4, $5, $6, $7)" +
" ON CONFLICT ON CONSTRAINT room_state_unique" +
" DO UPDATE SET event_id = $2, event_json = $5, membership = $6"
" DO UPDATE SET event_id = $2, event_json = $5, membership = $6, added_at_id = $7"
const deleteRoomStateByEventIDSQL = "" +
"DELETE FROM current_room_state WHERE event_id = $1"
@ -63,7 +67,7 @@ const selectJoinedUsersSQL = "" +
"SELECT room_id, state_key FROM current_room_state WHERE type = 'm.room.member' AND membership = 'join'"
const selectEventsWithEventIDsSQL = "" +
"SELECT event_json FROM current_room_state WHERE event_id = ANY($1)"
"SELECT added_at, event_json FROM current_room_state WHERE event_id = ANY($1)"
type currentRoomStateStatements struct {
upsertRoomStateStmt *sql.Stmt
@ -157,20 +161,22 @@ func (s *currentRoomStateStatements) deleteRoomStateByEventID(txn *sql.Tx, event
return err
}
func (s *currentRoomStateStatements) upsertRoomState(txn *sql.Tx, event gomatrixserverlib.Event, membership *string) error {
func (s *currentRoomStateStatements) upsertRoomState(
txn *sql.Tx, event gomatrixserverlib.Event, membership *string, addedAt int64,
) error {
_, err := txn.Stmt(s.upsertRoomStateStmt).Exec(
event.RoomID(), event.EventID(), event.Type(), *event.StateKey(), event.JSON(), membership,
event.RoomID(), event.EventID(), event.Type(), *event.StateKey(), event.JSON(), membership, addedAt,
)
return err
}
func (s *currentRoomStateStatements) selectEventsWithEventIDs(txn *sql.Tx, eventIDs []string) ([]gomatrixserverlib.Event, error) {
func (s *currentRoomStateStatements) selectEventsWithEventIDs(txn *sql.Tx, eventIDs []string) ([]streamEvent, error) {
rows, err := txn.Stmt(s.selectEventsWithEventIDsStmt).Query(pq.StringArray(eventIDs))
if err != nil {
return nil, err
}
defer rows.Close()
return rowsToEvents(rows)
return rowsToStreamEvents(rows)
}
func rowsToEvents(rows *sql.Rows) ([]gomatrixserverlib.Event, error) {

View file

@ -107,12 +107,14 @@ func (d *SyncServerDatabase) WriteEvent(
return nil
}
return d.updateRoomState(txn, removeStateEventIDs, addStateEvents)
return d.updateRoomState(txn, removeStateEventIDs, addStateEvents, streamPos)
})
return
}
func (d *SyncServerDatabase) updateRoomState(txn *sql.Tx, removedEventIDs []string, addedEvents []gomatrixserverlib.Event) error {
func (d *SyncServerDatabase) updateRoomState(
txn *sql.Tx, removedEventIDs []string, addedEvents []gomatrixserverlib.Event, streamPos types.StreamPosition,
) error {
// remove first, then add, as we do not ever delete state, but do replace state which is a remove followed by an add.
for _, eventID := range removedEventIDs {
if err := d.roomstate.deleteRoomStateByEventID(txn, eventID); err != nil {
@ -133,7 +135,7 @@ func (d *SyncServerDatabase) updateRoomState(txn *sql.Tx, removedEventIDs []stri
}
membership = &memberContent.Membership
}
if err := d.roomstate.upsertRoomState(txn, event, membership); err != nil {
if err := d.roomstate.upsertRoomState(txn, event, membership, int64(streamPos)); err != nil {
return err
}
}
@ -362,7 +364,7 @@ func (d *SyncServerDatabase) fetchMissingStateEvents(txn *sql.Tx, eventIDs []str
// stream so probably happened before it.
// TOOD: What happens if we receive a state event from outside the
// timeline associated with an event in the middle of the timeline?
events = append(events, streamEvent{e, 0})
events = append(events, e)
}
return events, nil
}