Add postgres impl

This commit is contained in:
Kegan Dougal 2020-04-30 19:26:21 +01:00
parent 3b02d0f05a
commit 5cbcfe6f4f
5 changed files with 51 additions and 35 deletions

View file

@ -94,6 +94,9 @@ const selectEarlyEventsSQL = "" +
" WHERE room_id = $1 AND id > $2 AND id <= $3" + " WHERE room_id = $1 AND id > $2 AND id <= $3" +
" ORDER BY id ASC LIMIT $4" " ORDER BY id ASC LIMIT $4"
const selectStreamPositionForEventIDSQL = "" +
"SELECT id FROM syncapi_output_room_events WHERE event_id = $1"
const selectMaxEventIDSQL = "" + const selectMaxEventIDSQL = "" +
"SELECT MAX(id) FROM syncapi_output_room_events" "SELECT MAX(id) FROM syncapi_output_room_events"
@ -111,13 +114,14 @@ const selectStateInRangeSQL = "" +
" LIMIT $8" " LIMIT $8"
type outputRoomEventsStatements struct { type outputRoomEventsStatements struct {
insertEventStmt *sql.Stmt insertEventStmt *sql.Stmt
selectEventsStmt *sql.Stmt selectEventsStmt *sql.Stmt
selectMaxEventIDStmt *sql.Stmt selectMaxEventIDStmt *sql.Stmt
selectRecentEventsStmt *sql.Stmt selectRecentEventsStmt *sql.Stmt
selectRecentEventsForSyncStmt *sql.Stmt selectRecentEventsForSyncStmt *sql.Stmt
selectEarlyEventsStmt *sql.Stmt selectEarlyEventsStmt *sql.Stmt
selectStateInRangeStmt *sql.Stmt selectStateInRangeStmt *sql.Stmt
selectStreamPositionForEventIDStmt *sql.Stmt
} }
func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) { func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) {
@ -146,9 +150,18 @@ func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) {
if s.selectStateInRangeStmt, err = db.Prepare(selectStateInRangeSQL); err != nil { if s.selectStateInRangeStmt, err = db.Prepare(selectStateInRangeSQL); err != nil {
return return
} }
if s.selectStreamPositionForEventIDStmt, err = db.Prepare(selectStreamPositionForEventIDSQL); err != nil {
return
}
return return
} }
func (s *outputRoomEventsStatements) selectStreamPositionForEventID(ctx context.Context, eventID string) (types.StreamPosition, error) {
var id int64
err := s.selectStreamPositionForEventIDStmt.QueryRowContext(ctx, eventID).Scan(&id)
return types.StreamPosition(id), err
}
// selectStateInRange returns the state events between the two given PDU stream positions, exclusive of oldPos, inclusive of newPos. // selectStateInRange returns the state events between the two given PDU stream positions, exclusive of oldPos, inclusive of newPos.
// Results are bucketed based on the room ID. If the same state is overwritten multiple times between the // Results are bucketed based on the room ID. If the same state is overwritten multiple times between the
// two positions, only the most recent state is returned. // two positions, only the most recent state is returned.

View file

@ -32,35 +32,40 @@ CREATE TABLE IF NOT EXISTS syncapi_output_room_events_topology (
-- The place of the event in the room's topology. This can usually be determined -- The place of the event in the room's topology. This can usually be determined
-- from the event's depth. -- from the event's depth.
topological_position BIGINT NOT NULL, topological_position BIGINT NOT NULL,
stream_position BIGINT NOT NULL,
-- The 'room_id' key for the event. -- The 'room_id' key for the event.
room_id TEXT NOT NULL room_id TEXT NOT NULL
); );
-- The topological order will be used in events selection and ordering -- The topological order will be used in events selection and ordering
CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_topological_position_idx ON syncapi_output_room_events_topology(topological_position, room_id); CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_topological_position_idx ON syncapi_output_room_events_topology(topological_position, stream_position, room_id);
` `
const insertEventInTopologySQL = "" + const insertEventInTopologySQL = "" +
"INSERT INTO syncapi_output_room_events_topology (event_id, topological_position, room_id)" + "INSERT INTO syncapi_output_room_events_topology (event_id, topological_position, room_id, stream_position)" +
" VALUES ($1, $2, $3)" + " VALUES ($1, $2, $3, $4)" +
" ON CONFLICT (topological_position, room_id) DO UPDATE SET event_id = $1" " ON CONFLICT (topological_position, stream_position, room_id) DO UPDATE SET event_id = $1"
const selectEventIDsInRangeASCSQL = "" + const selectEventIDsInRangeASCSQL = "" +
"SELECT event_id FROM syncapi_output_room_events_topology" + "SELECT event_id FROM syncapi_output_room_events_topology" +
" WHERE room_id = $1 AND topological_position > $2 AND topological_position <= $3" + " WHERE room_id = $1 AND" +
" ORDER BY topological_position ASC LIMIT $4" "(topological_position > $2 AND topological_position < $3) OR" +
"(topological_position = $4 AND stream_position <= $5)" +
" ORDER BY topological_position ASC, stream_position ASC LIMIT $6"
const selectEventIDsInRangeDESCSQL = "" + const selectEventIDsInRangeDESCSQL = "" +
"SELECT event_id FROM syncapi_output_room_events_topology" + "SELECT event_id FROM syncapi_output_room_events_topology" +
" WHERE room_id = $1 AND topological_position > $2 AND topological_position <= $3" + " WHERE room_id = $1 AND" +
" ORDER BY topological_position DESC LIMIT $4" "(topological_position > $2 AND topological_position < $3) OR" +
"(topological_position = $4 AND stream_position <= $5)" +
" ORDER BY topological_position DESC, stream_position DESC LIMIT $6"
const selectPositionInTopologySQL = "" + const selectPositionInTopologySQL = "" +
"SELECT topological_position FROM syncapi_output_room_events_topology" + "SELECT topological_position FROM syncapi_output_room_events_topology" +
" WHERE event_id = $1" " WHERE event_id = $1"
const selectMaxPositionInTopologySQL = "" + const selectMaxPositionInTopologySQL = "" +
"SELECT MAX(topological_position) FROM syncapi_output_room_events_topology" + "SELECT MAX(topological_position), stream_position FROM syncapi_output_room_events_topology" +
" WHERE room_id = $1" " WHERE room_id = $1 ORDER BY stream_position DESC"
const selectEventIDsFromPositionSQL = "" + const selectEventIDsFromPositionSQL = "" +
"SELECT event_id FROM syncapi_output_room_events_topology" + "SELECT event_id FROM syncapi_output_room_events_topology" +
@ -104,10 +109,10 @@ func (s *outputRoomEventsTopologyStatements) prepare(db *sql.DB) (err error) {
// insertEventInTopology inserts the given event in the room's topology, based // insertEventInTopology inserts the given event in the room's topology, based
// on the event's depth. // on the event's depth.
func (s *outputRoomEventsTopologyStatements) insertEventInTopology( func (s *outputRoomEventsTopologyStatements) insertEventInTopology(
ctx context.Context, event *gomatrixserverlib.HeaderedEvent, ctx context.Context, event *gomatrixserverlib.HeaderedEvent, pos types.StreamPosition,
) (err error) { ) (err error) {
_, err = s.insertEventInTopologyStmt.ExecContext( _, err = s.insertEventInTopologyStmt.ExecContext(
ctx, event.EventID(), event.Depth(), event.RoomID(), ctx, event.EventID(), event.Depth(), event.RoomID(), pos,
) )
return return
} }
@ -116,7 +121,7 @@ func (s *outputRoomEventsTopologyStatements) insertEventInTopology(
// given range in a given room's topological order. // given range in a given room's topological order.
// Returns an empty slice if no events match the given range. // Returns an empty slice if no events match the given range.
func (s *outputRoomEventsTopologyStatements) selectEventIDsInRange( func (s *outputRoomEventsTopologyStatements) selectEventIDsInRange(
ctx context.Context, roomID string, fromPos, toPos types.StreamPosition, ctx context.Context, roomID string, fromPos, toPos, toMicroPos types.StreamPosition,
limit int, chronologicalOrder bool, limit int, chronologicalOrder bool,
) (eventIDs []string, err error) { ) (eventIDs []string, err error) {
// Decide on the selection's order according to whether chronological order // Decide on the selection's order according to whether chronological order
@ -129,7 +134,7 @@ func (s *outputRoomEventsTopologyStatements) selectEventIDsInRange(
} }
// Query the event IDs. // Query the event IDs.
rows, err := stmt.QueryContext(ctx, roomID, fromPos, toPos, limit) rows, err := stmt.QueryContext(ctx, roomID, fromPos, toPos, toPos, toMicroPos, limit)
if err == sql.ErrNoRows { if err == sql.ErrNoRows {
// If no event matched the request, return an empty slice. // If no event matched the request, return an empty slice.
return []string{}, nil return []string{}, nil
@ -161,8 +166,8 @@ func (s *outputRoomEventsTopologyStatements) selectPositionInTopology(
func (s *outputRoomEventsTopologyStatements) selectMaxPositionInTopology( func (s *outputRoomEventsTopologyStatements) selectMaxPositionInTopology(
ctx context.Context, roomID string, ctx context.Context, roomID string,
) (pos types.StreamPosition, err error) { ) (pos types.StreamPosition, spos types.StreamPosition, err error) {
err = s.selectMaxPositionInTopologyStmt.QueryRowContext(ctx, roomID).Scan(&pos) err = s.selectMaxPositionInTopologyStmt.QueryRowContext(ctx, roomID).Scan(&pos, &spos)
return return
} }

View file

@ -159,7 +159,7 @@ func (d *SyncServerDatasource) WriteEvent(
} }
pduPosition = pos pduPosition = pos
if err = d.topology.insertEventInTopology(ctx, ev); err != nil { if err = d.topology.insertEventInTopology(ctx, ev, pos); err != nil {
return err return err
} }
@ -240,12 +240,13 @@ func (d *SyncServerDatasource) GetEventsInRange(
if from.Type == types.PaginationTokenTypeTopology { if from.Type == types.PaginationTokenTypeTopology {
// Determine the backward and forward limit, i.e. the upper and lower // Determine the backward and forward limit, i.e. the upper and lower
// limits to the selection in the room's topology, from the direction. // limits to the selection in the room's topology, from the direction.
var backwardLimit, forwardLimit types.StreamPosition var backwardLimit, forwardLimit, forwardMicroLimit types.StreamPosition
if backwardOrdering { if backwardOrdering {
// Backward ordering is antichronological (latest event to oldest // Backward ordering is antichronological (latest event to oldest
// one). // one).
backwardLimit = to.PDUPosition backwardLimit = to.PDUPosition
forwardLimit = from.PDUPosition forwardLimit = from.PDUPosition
forwardMicroLimit = from.EDUTypingPosition
} else { } else {
// Forward ordering is chronological (oldest event to latest one). // Forward ordering is chronological (oldest event to latest one).
backwardLimit = from.PDUPosition backwardLimit = from.PDUPosition
@ -255,7 +256,7 @@ func (d *SyncServerDatasource) GetEventsInRange(
// Select the event IDs from the defined range. // Select the event IDs from the defined range.
var eIDs []string var eIDs []string
eIDs, err = d.topology.selectEventIDsInRange( eIDs, err = d.topology.selectEventIDsInRange(
ctx, roomID, backwardLimit, forwardLimit, limit, !backwardOrdering, ctx, roomID, backwardLimit, forwardLimit, forwardMicroLimit, limit, !backwardOrdering,
) )
if err != nil { if err != nil {
return return
@ -302,8 +303,7 @@ func (d *SyncServerDatasource) BackwardExtremitiesForRoom(
func (d *SyncServerDatasource) MaxTopologicalPosition( func (d *SyncServerDatasource) MaxTopologicalPosition(
ctx context.Context, roomID string, ctx context.Context, roomID string,
) (depth types.StreamPosition, stream types.StreamPosition, err error) { ) (depth types.StreamPosition, stream types.StreamPosition, err error) {
depth, err = d.topology.selectMaxPositionInTopology(ctx, roomID) return d.topology.selectMaxPositionInTopology(ctx, roomID)
return
} }
func (d *SyncServerDatasource) EventsAtTopologicalPosition( func (d *SyncServerDatasource) EventsAtTopologicalPosition(
@ -324,7 +324,7 @@ func (d *SyncServerDatasource) EventPositionInTopology(
if err != nil { if err != nil {
return return
} }
//stream, err = d.events.selectStreamPositionForEventID(ctx, eventID) stream, err = d.events.selectStreamPositionForEventID(ctx, eventID)
return return
} }

View file

@ -43,14 +43,14 @@ const insertEventInTopologySQL = "" +
" ON CONFLICT DO NOTHING" " ON CONFLICT DO NOTHING"
const selectEventIDsInRangeASCSQL = "" + const selectEventIDsInRangeASCSQL = "" +
"SELECT event_id, topological_position, stream_position FROM syncapi_output_room_events_topology" + "SELECT event_id FROM syncapi_output_room_events_topology" +
" WHERE room_id = $1 AND" + " WHERE room_id = $1 AND" +
"(topological_position > $2 AND topological_position < $3) OR" + "(topological_position > $2 AND topological_position < $3) OR" +
"(topological_position = $4 AND stream_position <= $5)" + "(topological_position = $4 AND stream_position <= $5)" +
" ORDER BY topological_position ASC, stream_position ASC LIMIT $6" " ORDER BY topological_position ASC, stream_position ASC LIMIT $6"
const selectEventIDsInRangeDESCSQL = "" + const selectEventIDsInRangeDESCSQL = "" +
"SELECT event_id, topological_position, stream_position FROM syncapi_output_room_events_topology" + "SELECT event_id FROM syncapi_output_room_events_topology" +
" WHERE room_id = $1 AND" + " WHERE room_id = $1 AND" +
"(topological_position > $2 AND topological_position < $3) OR" + "(topological_position > $2 AND topological_position < $3) OR" +
"(topological_position = $4 AND stream_position <= $5)" + "(topological_position = $4 AND stream_position <= $5)" +
@ -143,10 +143,8 @@ func (s *outputRoomEventsTopologyStatements) selectEventIDsInRange(
// Return the IDs. // Return the IDs.
var eventID string var eventID string
var streamPos int64
var topoPos int64
for rows.Next() { for rows.Next() {
if err = rows.Scan(&eventID, &topoPos, &streamPos); err != nil { if err = rows.Scan(&eventID); err != nil {
return return
} }
eventIDs = append(eventIDs, eventID) eventIDs = append(eventIDs, eventID)

View file

@ -182,7 +182,7 @@ func TestSyncResponse(t *testing.T) {
// limit set to 5 // limit set to 5
return db.CompleteSync(ctx, testUserIDA, 5) return db.CompleteSync(ctx, testUserIDA, 5)
}, },
// want the last 5 events, NOT the last 10. // want the last 5 events
WantTimeline: events[len(events)-5:], WantTimeline: events[len(events)-5:],
// want all state for the room // want all state for the room
WantState: state, WantState: state,