diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go index ddef27383..0a58235ae 100644 --- a/syncapi/storage/postgres/output_room_events_table.go +++ b/syncapi/storage/postgres/output_room_events_table.go @@ -166,6 +166,13 @@ const selectContextAfterEventSQL = "" + " AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )" + " ORDER BY id ASC LIMIT $3" +const selectTopologicalEventSQL = "" + + "SELECT se.headered_event_json, st.topological_position, st.stream_position " + + " FROM syncapi_output_room_events_topology st " + + " JOIN syncapi_output_room_events se ON se.event_id = st.event_id " + + " WHERE st.room_id = $1 AND st.topological_position < $2 AND se.type = $3 " + + " ORDER BY st.topological_position DESC LIMIT 1" + type outputRoomEventsStatements struct { insertEventStmt *sql.Stmt selectEventsStmt *sql.Stmt @@ -180,6 +187,7 @@ type outputRoomEventsStatements struct { selectContextEventStmt *sql.Stmt selectContextBeforeEventStmt *sql.Stmt selectContextAfterEventStmt *sql.Stmt + selectTopologicalEventStmt *sql.Stmt } func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) { @@ -202,6 +210,7 @@ func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) { {&s.selectContextEventStmt, selectContextEventSQL}, {&s.selectContextBeforeEventStmt, selectContextBeforeEventSQL}, {&s.selectContextAfterEventStmt, selectContextAfterEventSQL}, + {&s.selectTopologicalEventStmt, selectTopologicalEventSQL}, }.Prepare(db) } @@ -591,6 +600,32 @@ func (s *outputRoomEventsStatements) SelectContextAfterEvent( return lastID, evts, rows.Err() } +// SelectTopologicalEvent selects an event before and including the given position by eventType and roomID. Returns the found event and the topology token. +// If not event was found, returns nil and sql.ErrNoRows. +func (s *outputRoomEventsStatements) SelectTopologicalEvent( + ctx context.Context, txn *sql.Tx, topologicalPosition int, eventType, roomID string, +) (*gomatrixserverlib.HeaderedEvent, types.TopologyToken, error) { + var ( + eventBytes []byte + token types.TopologyToken + ) + + err := sqlutil.TxStmtContext(ctx, txn, s.selectTopologicalEventStmt). + QueryRowContext(ctx, roomID, topologicalPosition, eventType). + Scan(&eventBytes, &token.Depth, &token.PDUPosition) + + if err != nil { + return nil, types.TopologyToken{}, err + } + + var res *gomatrixserverlib.HeaderedEvent + if err = json.Unmarshal(eventBytes, &res); err != nil { + return nil, types.TopologyToken{}, err + } + + return res, token, nil +} + func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) { var result []types.StreamEvent for rows.Next() { diff --git a/syncapi/storage/postgres/output_room_events_topology_table.go b/syncapi/storage/postgres/output_room_events_topology_table.go index 3d1b9bc1a..a1fc9b2a3 100644 --- a/syncapi/storage/postgres/output_room_events_topology_table.go +++ b/syncapi/storage/postgres/output_room_events_topology_table.go @@ -65,8 +65,8 @@ const selectPositionInTopologySQL = "" + "SELECT topological_position, stream_position FROM syncapi_output_room_events_topology" + " WHERE event_id = $1" -// Select the max topological position for the room, then sort by stream position and take the highest, -// returning both topological and stream positions. + // Select the max topological position for the room, then sort by stream position and take the highest, + // returning both topological and stream positions. const selectMaxPositionInTopologySQL = "" + "SELECT topological_position, stream_position FROM syncapi_output_room_events_topology" + " WHERE topological_position=(" + @@ -95,15 +95,28 @@ func NewPostgresTopologyTable(db *sql.DB) (tables.Topology, error) { if err != nil { return nil, err } - return s, sqlutil.StatementList{ - {&s.insertEventInTopologyStmt, insertEventInTopologySQL}, - {&s.selectEventIDsInRangeASCStmt, selectEventIDsInRangeASCSQL}, - {&s.selectEventIDsInRangeDESCStmt, selectEventIDsInRangeDESCSQL}, - {&s.selectPositionInTopologyStmt, selectPositionInTopologySQL}, - {&s.selectMaxPositionInTopologyStmt, selectMaxPositionInTopologySQL}, - {&s.selectStreamToTopologicalPositionAscStmt, selectStreamToTopologicalPositionAscSQL}, - {&s.selectStreamToTopologicalPositionDescStmt, selectStreamToTopologicalPositionDescSQL}, - }.Prepare(db) + if s.insertEventInTopologyStmt, err = db.Prepare(insertEventInTopologySQL); err != nil { + return nil, err + } + if s.selectEventIDsInRangeASCStmt, err = db.Prepare(selectEventIDsInRangeASCSQL); err != nil { + return nil, err + } + if s.selectEventIDsInRangeDESCStmt, err = db.Prepare(selectEventIDsInRangeDESCSQL); err != nil { + return nil, err + } + if s.selectPositionInTopologyStmt, err = db.Prepare(selectPositionInTopologySQL); err != nil { + return nil, err + } + if s.selectMaxPositionInTopologyStmt, err = db.Prepare(selectMaxPositionInTopologySQL); err != nil { + return nil, err + } + if s.selectStreamToTopologicalPositionAscStmt, err = db.Prepare(selectStreamToTopologicalPositionAscSQL); err != nil { + return nil, err + } + if s.selectStreamToTopologicalPositionDescStmt, err = db.Prepare(selectStreamToTopologicalPositionDescSQL); err != nil { + return nil, err + } + return s, nil } // InsertEventInTopology inserts the given event in the room's topology, based diff --git a/syncapi/storage/sqlite3/output_room_events_table.go b/syncapi/storage/sqlite3/output_room_events_table.go index b3dcb44cb..978385593 100644 --- a/syncapi/storage/sqlite3/output_room_events_table.go +++ b/syncapi/storage/sqlite3/output_room_events_table.go @@ -115,6 +115,13 @@ const selectContextAfterEventSQL = "" + // WHEN, ORDER BY and LIMIT are appended by prepareWithFilters +const selectTopologicalEventSQL = "" + + "SELECT headered_event_json, topological_position, stream_position " + + " FROM syncapi_output_room_events_topology " + + " JOIN syncapi_output_room_events ON syncapi_output_room_events.event_id = syncapi_output_room_events_topology.event_id " + + " WHERE syncapi_output_room_events_topology.room_id = $1 AND topological_position < $2 AND type = $3 " + + " ORDER BY topological_position DESC LIMIT 1" + type outputRoomEventsStatements struct { db *sql.DB streamIDStatements *StreamIDStatements @@ -125,6 +132,7 @@ type outputRoomEventsStatements struct { selectContextEventStmt *sql.Stmt selectContextBeforeEventStmt *sql.Stmt selectContextAfterEventStmt *sql.Stmt + selectTopologicalEventStmt *sql.Stmt } func NewSqliteEventsTable(db *sql.DB, streamID *StreamIDStatements) (tables.Events, error) { @@ -144,6 +152,7 @@ func NewSqliteEventsTable(db *sql.DB, streamID *StreamIDStatements) (tables.Even {&s.selectContextEventStmt, selectContextEventSQL}, {&s.selectContextBeforeEventStmt, selectContextBeforeEventSQL}, {&s.selectContextAfterEventStmt, selectContextAfterEventSQL}, + {&s.selectTopologicalEventStmt, selectTopologicalEventSQL}, }.Prepare(db) } @@ -613,6 +622,31 @@ func (s *outputRoomEventsStatements) SelectContextAfterEvent( return lastID, evts, rows.Err() } +// SelectTopologicalEvent selects an event before and including the given position by eventType and roomID. Returns the found event and the topology token. +// If not event was found, returns nil and sql.ErrNoRows. +func (s *outputRoomEventsStatements) SelectTopologicalEvent( + ctx context.Context, txn *sql.Tx, topologicalPosition int, eventType, roomID string, +) (*gomatrixserverlib.HeaderedEvent, types.TopologyToken, error) { + var ( + eventBytes []byte + token types.TopologyToken + ) + err := sqlutil.TxStmtContext(ctx, txn, s.selectTopologicalEventStmt). + QueryRowContext(ctx, roomID, topologicalPosition, eventType). + Scan(&eventBytes, &token.Depth, &token.PDUPosition) + + if err != nil { + return nil, types.TopologyToken{}, err + } + + var res *gomatrixserverlib.HeaderedEvent + if err = json.Unmarshal(eventBytes, &res); err != nil { + return nil, types.TopologyToken{}, err + } + + return res, token, nil +} + func unmarshalStateIDs(addIDsJSON, delIDsJSON string) (addIDs []string, delIDs []string, err error) { if len(addIDsJSON) > 0 { if err = json.Unmarshal([]byte(addIDsJSON), &addIDs); err != nil { diff --git a/syncapi/storage/sqlite3/output_room_events_topology_table.go b/syncapi/storage/sqlite3/output_room_events_topology_table.go index 2576bf5a4..3ea317601 100644 --- a/syncapi/storage/sqlite3/output_room_events_topology_table.go +++ b/syncapi/storage/sqlite3/output_room_events_topology_table.go @@ -90,15 +90,28 @@ func NewSqliteTopologyTable(db *sql.DB) (tables.Topology, error) { if err != nil { return nil, err } - return s, sqlutil.StatementList{ - {&s.insertEventInTopologyStmt, insertEventInTopologySQL}, - {&s.selectEventIDsInRangeASCStmt, selectEventIDsInRangeASCSQL}, - {&s.selectEventIDsInRangeDESCStmt, selectEventIDsInRangeDESCSQL}, - {&s.selectPositionInTopologyStmt, selectPositionInTopologySQL}, - {&s.selectMaxPositionInTopologyStmt, selectMaxPositionInTopologySQL}, - {&s.selectStreamToTopologicalPositionAscStmt, selectStreamToTopologicalPositionAscSQL}, - {&s.selectStreamToTopologicalPositionDescStmt, selectStreamToTopologicalPositionDescSQL}, - }.Prepare(db) + if s.insertEventInTopologyStmt, err = db.Prepare(insertEventInTopologySQL); err != nil { + return nil, err + } + if s.selectEventIDsInRangeASCStmt, err = db.Prepare(selectEventIDsInRangeASCSQL); err != nil { + return nil, err + } + if s.selectEventIDsInRangeDESCStmt, err = db.Prepare(selectEventIDsInRangeDESCSQL); err != nil { + return nil, err + } + if s.selectPositionInTopologyStmt, err = db.Prepare(selectPositionInTopologySQL); err != nil { + return nil, err + } + if s.selectMaxPositionInTopologyStmt, err = db.Prepare(selectMaxPositionInTopologySQL); err != nil { + return nil, err + } + if s.selectStreamToTopologicalPositionAscStmt, err = db.Prepare(selectStreamToTopologicalPositionAscSQL); err != nil { + return nil, err + } + if s.selectStreamToTopologicalPositionDescStmt, err = db.Prepare(selectStreamToTopologicalPositionDescSQL); err != nil { + return nil, err + } + return s, nil } // insertEventInTopology inserts the given event in the room's topology, based @@ -177,3 +190,28 @@ func (s *outputRoomEventsTopologyStatements) SelectMaxPositionInTopology( err = stmt.QueryRowContext(ctx, roomID).Scan(&pos, &spos) return } + +// SelectTopologicalEvent selects an event before and including the given position by eventType and roomID. Returns the found event and the topology token. +// If not event was found, returns nil and sql.ErrNoRows. +func (s *outputRoomEventsTopologyStatements) SelectTopologicalEvent( + ctx context.Context, txn *sql.Tx, topologicalPosition int, eventType, roomID string, +) (*gomatrixserverlib.HeaderedEvent, types.TopologyToken, error) { + var ( + eventBytes []byte + token types.TopologyToken + ) + err := sqlutil.TxStmtContext(ctx, txn, s.selectTopologicalEventStmt). + QueryRowContext(ctx, roomID, topologicalPosition, eventType). + Scan(&eventBytes, &token.Depth, &token.PDUPosition) + + if err != nil { + return nil, types.TopologyToken{}, err + } + + var res *gomatrixserverlib.HeaderedEvent + if err = json.Unmarshal(eventBytes, &res); err != nil { + return nil, types.TopologyToken{}, err + } + + return res, token, nil +} diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go index 3a99ac797..99e2e0274 100644 --- a/syncapi/storage/tables/interface.go +++ b/syncapi/storage/tables/interface.go @@ -74,6 +74,7 @@ type Events interface { SelectContextEvent(ctx context.Context, txn *sql.Tx, roomID, eventID string) (int, gomatrixserverlib.HeaderedEvent, error) SelectContextBeforeEvent(ctx context.Context, txn *sql.Tx, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) ([]*gomatrixserverlib.HeaderedEvent, error) SelectContextAfterEvent(ctx context.Context, txn *sql.Tx, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) (int, []*gomatrixserverlib.HeaderedEvent, error) + SelectTopologicalEvent(ctx context.Context, txn *sql.Tx, topologicalPosition int, eventType, roomID string) (*gomatrixserverlib.HeaderedEvent, types.TopologyToken, error) } // Topology keeps track of the depths and stream positions for all events. diff --git a/syncapi/storage/tables/output_room_events_test.go b/syncapi/storage/tables/output_room_events_test.go index bdb17ae20..1c26f8fe4 100644 --- a/syncapi/storage/tables/output_room_events_test.go +++ b/syncapi/storage/tables/output_room_events_test.go @@ -29,12 +29,20 @@ func newOutputRoomEventsTable(t *testing.T, dbType test.DBType) (tables.Events, var tab tables.Events switch dbType { case test.DBTypePostgres: + _, err = postgres.NewPostgresTopologyTable(db) // needed, since there is a join on it + if err != nil { + t.Fatalf("unable to create table: %s", err) + } tab, err = postgres.NewPostgresEventsTable(db) case test.DBTypeSQLite: var stream sqlite3.StreamIDStatements if err = stream.Prepare(db); err != nil { t.Fatalf("failed to prepare stream stmts: %s", err) } + _, err = sqlite3.NewSqliteTopologyTable(db) // needed, since there is a join on it + if err != nil { + t.Fatalf("unable to create table: %s", err) + } tab, err = sqlite3.NewSqliteEventsTable(db, &stream) } if err != nil { diff --git a/syncapi/storage/tables/topology_test.go b/syncapi/storage/tables/topology_test.go index 2aa479abe..f4f75bdf3 100644 --- a/syncapi/storage/tables/topology_test.go +++ b/syncapi/storage/tables/topology_test.go @@ -28,20 +28,8 @@ func newTopologyTable(t *testing.T, dbType test.DBType) (tables.Topology, *sql.D var tab tables.Topology switch dbType { case test.DBTypePostgres: - _, err = postgres.NewPostgresEventsTable(db) // needed for SQL join - if err != nil { - t.Fatalf("unable to prepare events table") - } tab, err = postgres.NewPostgresTopologyTable(db) case test.DBTypeSQLite: - var stream sqlite3.StreamIDStatements - if err = stream.Prepare(db); err != nil { - t.Fatalf("failed to prepare stream stmts: %s", err) - } - _, err = sqlite3.NewSqliteEventsTable(db, &stream) // needed for SQL join - if err != nil { - t.Fatalf("unable to prepare events table") - } tab, err = sqlite3.NewSqliteTopologyTable(db) } if err != nil {