diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go index c3df0d7ea..d84d0cfa2 100644 --- a/syncapi/storage/postgres/output_room_events_table.go +++ b/syncapi/storage/postgres/output_room_events_table.go @@ -164,13 +164,6 @@ const selectContextAfterEventSQL = "" + " AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )" + " ORDER BY id ASC LIMIT $3" -const selectTopologicalEventSQL = "" + - "SELECT se.headered_event_json, st.topological_position, st.stream_position " + - " FROM syncapi_output_room_events_topology st " + - " JOIN syncapi_output_room_events se ON se.event_id = st.event_id " + - " WHERE st.room_id = $1 AND st.topological_position < $2 AND se.type = $3 " + - " ORDER BY st.topological_position DESC LIMIT 1" - type outputRoomEventsStatements struct { insertEventStmt *sql.Stmt selectEventsStmt *sql.Stmt @@ -185,7 +178,6 @@ type outputRoomEventsStatements struct { selectContextEventStmt *sql.Stmt selectContextBeforeEventStmt *sql.Stmt selectContextAfterEventStmt *sql.Stmt - selectTopologicalEventStmt *sql.Stmt } func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) { @@ -208,7 +200,6 @@ func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) { {&s.selectContextEventStmt, selectContextEventSQL}, {&s.selectContextBeforeEventStmt, selectContextBeforeEventSQL}, {&s.selectContextAfterEventStmt, selectContextAfterEventSQL}, - {&s.selectTopologicalEventStmt, selectTopologicalEventSQL}, }.Prepare(db) } @@ -589,32 +580,6 @@ func (s *outputRoomEventsStatements) SelectContextAfterEvent( return lastID, evts, rows.Err() } -// SelectTopologicalEvent selects an event before and including the given position by eventType and roomID. Returns the found event and the topology token. -// If not event was found, returns nil and sql.ErrNoRows. -func (s *outputRoomEventsStatements) SelectTopologicalEvent( - ctx context.Context, txn *sql.Tx, topologicalPosition int, eventType, roomID string, -) (*gomatrixserverlib.HeaderedEvent, types.TopologyToken, error) { - var ( - eventBytes []byte - token types.TopologyToken - ) - - err := sqlutil.TxStmtContext(ctx, txn, s.selectTopologicalEventStmt). - QueryRowContext(ctx, roomID, topologicalPosition, eventType). - Scan(&eventBytes, &token.Depth, &token.PDUPosition) - - if err != nil { - return nil, types.TopologyToken{}, err - } - - var res *gomatrixserverlib.HeaderedEvent - if err = json.Unmarshal(eventBytes, &res); err != nil { - return nil, types.TopologyToken{}, err - } - - return res, token, nil -} - func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) { var result []types.StreamEvent for rows.Next() { diff --git a/syncapi/storage/postgres/output_room_events_topology_table.go b/syncapi/storage/postgres/output_room_events_topology_table.go index a1fc9b2a3..6f8e55177 100644 --- a/syncapi/storage/postgres/output_room_events_topology_table.go +++ b/syncapi/storage/postgres/output_room_events_topology_table.go @@ -17,6 +17,7 @@ package postgres import ( "context" "database/sql" + "encoding/json" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/sqlutil" @@ -65,8 +66,8 @@ const selectPositionInTopologySQL = "" + "SELECT topological_position, stream_position FROM syncapi_output_room_events_topology" + " WHERE event_id = $1" - // Select the max topological position for the room, then sort by stream position and take the highest, - // returning both topological and stream positions. +// Select the max topological position for the room, then sort by stream position and take the highest, +// returning both topological and stream positions. const selectMaxPositionInTopologySQL = "" + "SELECT topological_position, stream_position FROM syncapi_output_room_events_topology" + " WHERE topological_position=(" + @@ -79,6 +80,13 @@ const selectStreamToTopologicalPositionAscSQL = "" + const selectStreamToTopologicalPositionDescSQL = "" + "SELECT topological_position FROM syncapi_output_room_events_topology WHERE room_id = $1 AND stream_position <= $2 ORDER BY topological_position DESC LIMIT 1;" +const selectTopologicalEventSQL = "" + + "SELECT se.headered_event_json, st.topological_position, st.stream_position " + + " FROM syncapi_output_room_events_topology st " + + " JOIN syncapi_output_room_events se ON se.event_id = st.event_id " + + " WHERE st.room_id = $1 AND st.topological_position <= $2 AND se.type = $3 " + + " ORDER BY st.topological_position DESC LIMIT 1" + type outputRoomEventsTopologyStatements struct { insertEventInTopologyStmt *sql.Stmt selectEventIDsInRangeASCStmt *sql.Stmt @@ -87,6 +95,7 @@ type outputRoomEventsTopologyStatements struct { selectMaxPositionInTopologyStmt *sql.Stmt selectStreamToTopologicalPositionAscStmt *sql.Stmt selectStreamToTopologicalPositionDescStmt *sql.Stmt + selectTopologicalEventStmt *sql.Stmt } func NewPostgresTopologyTable(db *sql.DB) (tables.Topology, error) { @@ -95,28 +104,16 @@ func NewPostgresTopologyTable(db *sql.DB) (tables.Topology, error) { if err != nil { return nil, err } - if s.insertEventInTopologyStmt, err = db.Prepare(insertEventInTopologySQL); err != nil { - return nil, err - } - if s.selectEventIDsInRangeASCStmt, err = db.Prepare(selectEventIDsInRangeASCSQL); err != nil { - return nil, err - } - if s.selectEventIDsInRangeDESCStmt, err = db.Prepare(selectEventIDsInRangeDESCSQL); err != nil { - return nil, err - } - if s.selectPositionInTopologyStmt, err = db.Prepare(selectPositionInTopologySQL); err != nil { - return nil, err - } - if s.selectMaxPositionInTopologyStmt, err = db.Prepare(selectMaxPositionInTopologySQL); err != nil { - return nil, err - } - if s.selectStreamToTopologicalPositionAscStmt, err = db.Prepare(selectStreamToTopologicalPositionAscSQL); err != nil { - return nil, err - } - if s.selectStreamToTopologicalPositionDescStmt, err = db.Prepare(selectStreamToTopologicalPositionDescSQL); err != nil { - return nil, err - } - return s, nil + return s, sqlutil.StatementList{ + {&s.insertEventInTopologyStmt, insertEventInTopologySQL}, + {&s.selectEventIDsInRangeASCStmt, selectEventIDsInRangeASCSQL}, + {&s.selectEventIDsInRangeDESCStmt, selectEventIDsInRangeDESCSQL}, + {&s.selectPositionInTopologyStmt, selectPositionInTopologySQL}, + {&s.selectMaxPositionInTopologyStmt, selectMaxPositionInTopologySQL}, + {&s.selectStreamToTopologicalPositionAscStmt, selectStreamToTopologicalPositionAscSQL}, + {&s.selectStreamToTopologicalPositionDescStmt, selectStreamToTopologicalPositionDescSQL}, + {&s.selectTopologicalEventStmt, selectTopologicalEventSQL}, + }.Prepare(db) } // InsertEventInTopology inserts the given event in the room's topology, based @@ -196,3 +193,29 @@ func (s *outputRoomEventsTopologyStatements) SelectMaxPositionInTopology( err = s.selectMaxPositionInTopologyStmt.QueryRowContext(ctx, roomID).Scan(&pos, &spos) return } + +// SelectTopologicalEvent selects an event before and including the given position by eventType and roomID. Returns the found event and the topology token. +// If not event was found, returns nil and sql.ErrNoRows. +func (s *outputRoomEventsTopologyStatements) SelectTopologicalEvent( + ctx context.Context, txn *sql.Tx, topologicalPosition int, eventType, roomID string, +) (*gomatrixserverlib.HeaderedEvent, types.TopologyToken, error) { + var ( + eventBytes []byte + token types.TopologyToken + ) + + err := sqlutil.TxStmtContext(ctx, txn, s.selectTopologicalEventStmt). + QueryRowContext(ctx, roomID, topologicalPosition, eventType). + Scan(&eventBytes, &token.Depth, &token.PDUPosition) + + if err != nil { + return nil, types.TopologyToken{}, err + } + + var res *gomatrixserverlib.HeaderedEvent + if err = json.Unmarshal(eventBytes, &res); err != nil { + return nil, types.TopologyToken{}, err + } + + return res, token, nil +} diff --git a/syncapi/storage/postgres/syncserver.go b/syncapi/storage/postgres/syncserver.go index d2693adc4..9cfe7c070 100644 --- a/syncapi/storage/postgres/syncserver.go +++ b/syncapi/storage/postgres/syncserver.go @@ -46,10 +46,6 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions) if err != nil { return nil, err } - topology, err := NewPostgresTopologyTable(d.db) - if err != nil { - return nil, err - } events, err := NewPostgresEventsTable(d.db) if err != nil { return nil, err @@ -66,6 +62,10 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions) if err != nil { return nil, err } + topology, err := NewPostgresTopologyTable(d.db) + if err != nil { + return nil, err + } backwardExtremities, err := NewPostgresBackwardsExtremitiesTable(d.db) if err != nil { return nil, err diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index 9e48ea271..0b9d58698 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -1069,7 +1069,7 @@ func (d *Database) MaxStreamPositionForPresence(ctx context.Context) (types.Stre } func (d *Database) SelectTopologicalEvent(ctx context.Context, topologicalPosition int, eventType, roomID string) (*gomatrixserverlib.HeaderedEvent, types.TopologyToken, error) { - return d.OutputEvents.SelectTopologicalEvent(ctx, nil, topologicalPosition, eventType, roomID) + return d.Topology.SelectTopologicalEvent(ctx, nil, topologicalPosition, eventType, roomID) } func (d *Database) SelectMembershipForUser(ctx context.Context, roomID, userID string, pos int64) (membership string, topologicalPos int, err error) { diff --git a/syncapi/storage/sqlite3/output_room_events_table.go b/syncapi/storage/sqlite3/output_room_events_table.go index 1c619d0fe..f9961a9d1 100644 --- a/syncapi/storage/sqlite3/output_room_events_table.go +++ b/syncapi/storage/sqlite3/output_room_events_table.go @@ -114,13 +114,6 @@ const selectContextAfterEventSQL = "" + // WHEN, ORDER BY and LIMIT are appended by prepareWithFilters -const selectTopologicalEventSQL = "" + - "SELECT headered_event_json, topological_position, stream_position " + - " FROM syncapi_output_room_events_topology " + - " JOIN syncapi_output_room_events ON syncapi_output_room_events.event_id = syncapi_output_room_events_topology.event_id " + - " WHERE syncapi_output_room_events_topology.room_id = $1 AND topological_position < $2 AND type = $3 " + - " ORDER BY topological_position DESC LIMIT 1" - type outputRoomEventsStatements struct { db *sql.DB streamIDStatements *StreamIDStatements @@ -131,7 +124,6 @@ type outputRoomEventsStatements struct { selectContextEventStmt *sql.Stmt selectContextBeforeEventStmt *sql.Stmt selectContextAfterEventStmt *sql.Stmt - selectTopologicalEventStmt *sql.Stmt } func NewSqliteEventsTable(db *sql.DB, streamID *StreamIDStatements) (tables.Events, error) { @@ -151,7 +143,6 @@ func NewSqliteEventsTable(db *sql.DB, streamID *StreamIDStatements) (tables.Even {&s.selectContextEventStmt, selectContextEventSQL}, {&s.selectContextBeforeEventStmt, selectContextBeforeEventSQL}, {&s.selectContextAfterEventStmt, selectContextAfterEventSQL}, - {&s.selectTopologicalEventStmt, selectTopologicalEventSQL}, }.Prepare(db) } @@ -609,31 +600,6 @@ func (s *outputRoomEventsStatements) SelectContextAfterEvent( return lastID, evts, rows.Err() } -// SelectTopologicalEvent selects an event before and including the given position by eventType and roomID. Returns the found event and the topology token. -// If not event was found, returns nil and sql.ErrNoRows. -func (s *outputRoomEventsStatements) SelectTopologicalEvent( - ctx context.Context, txn *sql.Tx, topologicalPosition int, eventType, roomID string, -) (*gomatrixserverlib.HeaderedEvent, types.TopologyToken, error) { - var ( - eventBytes []byte - token types.TopologyToken - ) - err := sqlutil.TxStmtContext(ctx, txn, s.selectTopologicalEventStmt). - QueryRowContext(ctx, roomID, topologicalPosition, eventType). - Scan(&eventBytes, &token.Depth, &token.PDUPosition) - - if err != nil { - return nil, types.TopologyToken{}, err - } - - var res *gomatrixserverlib.HeaderedEvent - if err = json.Unmarshal(eventBytes, &res); err != nil { - return nil, types.TopologyToken{}, err - } - - return res, token, nil -} - func unmarshalStateIDs(addIDsJSON, delIDsJSON string) (addIDs []string, delIDs []string, err error) { if len(addIDsJSON) > 0 { if err = json.Unmarshal([]byte(addIDsJSON), &addIDs); err != nil { diff --git a/syncapi/storage/sqlite3/output_room_events_topology_table.go b/syncapi/storage/sqlite3/output_room_events_topology_table.go index b2fb77417..28c216979 100644 --- a/syncapi/storage/sqlite3/output_room_events_topology_table.go +++ b/syncapi/storage/sqlite3/output_room_events_topology_table.go @@ -17,6 +17,7 @@ package sqlite3 import ( "context" "database/sql" + "encoding/json" "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/syncapi/storage/tables" @@ -71,6 +72,13 @@ const selectStreamToTopologicalPositionAscSQL = "" + const selectStreamToTopologicalPositionDescSQL = "" + "SELECT topological_position FROM syncapi_output_room_events_topology WHERE room_id = $1 AND stream_position <= $2 ORDER BY topological_position DESC LIMIT 1;" +const selectTopologicalEventSQL = "" + + "SELECT headered_event_json, topological_position, stream_position " + + " FROM syncapi_output_room_events_topology " + + " JOIN syncapi_output_room_events ON syncapi_output_room_events.event_id = syncapi_output_room_events_topology.event_id " + + " WHERE syncapi_output_room_events_topology.room_id = $1 AND topological_position <= $2 AND type = $3 " + + " ORDER BY topological_position DESC LIMIT 1" + type outputRoomEventsTopologyStatements struct { db *sql.DB insertEventInTopologyStmt *sql.Stmt @@ -80,6 +88,7 @@ type outputRoomEventsTopologyStatements struct { selectMaxPositionInTopologyStmt *sql.Stmt selectStreamToTopologicalPositionAscStmt *sql.Stmt selectStreamToTopologicalPositionDescStmt *sql.Stmt + selectTopologicalEventStmt *sql.Stmt } func NewSqliteTopologyTable(db *sql.DB) (tables.Topology, error) { @@ -90,28 +99,16 @@ func NewSqliteTopologyTable(db *sql.DB) (tables.Topology, error) { if err != nil { return nil, err } - if s.insertEventInTopologyStmt, err = db.Prepare(insertEventInTopologySQL); err != nil { - return nil, err - } - if s.selectEventIDsInRangeASCStmt, err = db.Prepare(selectEventIDsInRangeASCSQL); err != nil { - return nil, err - } - if s.selectEventIDsInRangeDESCStmt, err = db.Prepare(selectEventIDsInRangeDESCSQL); err != nil { - return nil, err - } - if s.selectPositionInTopologyStmt, err = db.Prepare(selectPositionInTopologySQL); err != nil { - return nil, err - } - if s.selectMaxPositionInTopologyStmt, err = db.Prepare(selectMaxPositionInTopologySQL); err != nil { - return nil, err - } - if s.selectStreamToTopologicalPositionAscStmt, err = db.Prepare(selectStreamToTopologicalPositionAscSQL); err != nil { - return nil, err - } - if s.selectStreamToTopologicalPositionDescStmt, err = db.Prepare(selectStreamToTopologicalPositionDescSQL); err != nil { - return nil, err - } - return s, nil + return s, sqlutil.StatementList{ + {&s.insertEventInTopologyStmt, insertEventInTopologySQL}, + {&s.selectEventIDsInRangeASCStmt, selectEventIDsInRangeASCSQL}, + {&s.selectEventIDsInRangeDESCStmt, selectEventIDsInRangeDESCSQL}, + {&s.selectPositionInTopologyStmt, selectPositionInTopologySQL}, + {&s.selectMaxPositionInTopologyStmt, selectMaxPositionInTopologySQL}, + {&s.selectStreamToTopologicalPositionAscStmt, selectStreamToTopologicalPositionAscSQL}, + {&s.selectStreamToTopologicalPositionDescStmt, selectStreamToTopologicalPositionDescSQL}, + {&s.selectTopologicalEventStmt, selectTopologicalEventSQL}, + }.Prepare(db) } // insertEventInTopology inserts the given event in the room's topology, based @@ -190,3 +187,28 @@ func (s *outputRoomEventsTopologyStatements) SelectMaxPositionInTopology( err = stmt.QueryRowContext(ctx, roomID).Scan(&pos, &spos) return } + +// SelectTopologicalEvent selects an event before and including the given position by eventType and roomID. Returns the found event and the topology token. +// If not event was found, returns nil and sql.ErrNoRows. +func (s *outputRoomEventsTopologyStatements) SelectTopologicalEvent( + ctx context.Context, txn *sql.Tx, topologicalPosition int, eventType, roomID string, +) (*gomatrixserverlib.HeaderedEvent, types.TopologyToken, error) { + var ( + eventBytes []byte + token types.TopologyToken + ) + err := sqlutil.TxStmtContext(ctx, txn, s.selectTopologicalEventStmt). + QueryRowContext(ctx, roomID, topologicalPosition, eventType). + Scan(&eventBytes, &token.Depth, &token.PDUPosition) + + if err != nil { + return nil, types.TopologyToken{}, err + } + + var res *gomatrixserverlib.HeaderedEvent + if err = json.Unmarshal(eventBytes, &res); err != nil { + return nil, types.TopologyToken{}, err + } + + return res, token, nil +} diff --git a/syncapi/storage/sqlite3/syncserver.go b/syncapi/storage/sqlite3/syncserver.go index 9f56e5e42..e08a0ba82 100644 --- a/syncapi/storage/sqlite3/syncserver.go +++ b/syncapi/storage/sqlite3/syncserver.go @@ -56,10 +56,6 @@ func (d *SyncServerDatasource) prepare(dbProperties *config.DatabaseOptions) (er if err != nil { return err } - topology, err := NewSqliteTopologyTable(d.db) - if err != nil { - return err - } events, err := NewSqliteEventsTable(d.db, &d.streamID) if err != nil { return err @@ -76,6 +72,10 @@ func (d *SyncServerDatasource) prepare(dbProperties *config.DatabaseOptions) (er if err != nil { return err } + topology, err := NewSqliteTopologyTable(d.db) + if err != nil { + return err + } bwExtrem, err := NewSqliteBackwardsExtremitiesTable(d.db) if err != nil { return err diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go index eb5e4b355..9f8062fc1 100644 --- a/syncapi/storage/tables/interface.go +++ b/syncapi/storage/tables/interface.go @@ -67,7 +67,6 @@ type Events interface { SelectContextEvent(ctx context.Context, txn *sql.Tx, roomID, eventID string) (int, gomatrixserverlib.HeaderedEvent, error) SelectContextBeforeEvent(ctx context.Context, txn *sql.Tx, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) ([]*gomatrixserverlib.HeaderedEvent, error) SelectContextAfterEvent(ctx context.Context, txn *sql.Tx, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) (int, []*gomatrixserverlib.HeaderedEvent, error) - SelectTopologicalEvent(ctx context.Context, txn *sql.Tx, topologicalPosition int, eventType, roomID string) (*gomatrixserverlib.HeaderedEvent, types.TopologyToken, error) } // Topology keeps track of the depths and stream positions for all events. @@ -87,6 +86,7 @@ type Topology interface { SelectMaxPositionInTopology(ctx context.Context, txn *sql.Tx, roomID string) (depth types.StreamPosition, spos types.StreamPosition, err error) // SelectStreamToTopologicalPosition converts a stream position to a topological position by finding the nearest topological position in the room. SelectStreamToTopologicalPosition(ctx context.Context, txn *sql.Tx, roomID string, streamPos types.StreamPosition, forward bool) (topoPos types.StreamPosition, err error) + SelectTopologicalEvent(ctx context.Context, txn *sql.Tx, topologicalPosition int, eventType, roomID string) (*gomatrixserverlib.HeaderedEvent, types.TopologyToken, error) } type CurrentRoomState interface { diff --git a/syncapi/storage/tables/output_room_events_test.go b/syncapi/storage/tables/output_room_events_test.go index 8604cdebd..69bbd04c9 100644 --- a/syncapi/storage/tables/output_room_events_test.go +++ b/syncapi/storage/tables/output_room_events_test.go @@ -29,20 +29,12 @@ func newOutputRoomEventsTable(t *testing.T, dbType test.DBType) (tables.Events, var tab tables.Events switch dbType { case test.DBTypePostgres: - _, err = postgres.NewPostgresTopologyTable(db) // needed, since there is a join on it - if err != nil { - t.Fatalf("unable to create table: %s", err) - } tab, err = postgres.NewPostgresEventsTable(db) case test.DBTypeSQLite: var stream sqlite3.StreamIDStatements if err = stream.Prepare(db); err != nil { t.Fatalf("failed to prepare stream stmts: %s", err) } - _, err = sqlite3.NewSqliteTopologyTable(db) // needed, since there is a join on it - if err != nil { - t.Fatalf("unable to create table: %s", err) - } tab, err = sqlite3.NewSqliteEventsTable(db, &stream) } if err != nil { diff --git a/syncapi/storage/tables/topology_test.go b/syncapi/storage/tables/topology_test.go index f4f75bdf3..2aa479abe 100644 --- a/syncapi/storage/tables/topology_test.go +++ b/syncapi/storage/tables/topology_test.go @@ -28,8 +28,20 @@ func newTopologyTable(t *testing.T, dbType test.DBType) (tables.Topology, *sql.D var tab tables.Topology switch dbType { case test.DBTypePostgres: + _, err = postgres.NewPostgresEventsTable(db) // needed for SQL join + if err != nil { + t.Fatalf("unable to prepare events table") + } tab, err = postgres.NewPostgresTopologyTable(db) case test.DBTypeSQLite: + var stream sqlite3.StreamIDStatements + if err = stream.Prepare(db); err != nil { + t.Fatalf("failed to prepare stream stmts: %s", err) + } + _, err = sqlite3.NewSqliteEventsTable(db, &stream) // needed for SQL join + if err != nil { + t.Fatalf("unable to prepare events table") + } tab, err = sqlite3.NewSqliteTopologyTable(db) } if err != nil {