Revert "Move queries to topology table"

This reverts commit 4f0d41be9c.
This commit is contained in:
Till Faelligen 2022-07-22 08:59:35 +02:00
parent bbbf1036b7
commit 7cd8f40b48
No known key found for this signature in database
GPG key ID: 3DF82D8AB9211D4E
7 changed files with 149 additions and 32 deletions

View file

@ -166,6 +166,13 @@ const selectContextAfterEventSQL = "" +
" AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )" + " AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )" +
" ORDER BY id ASC LIMIT $3" " ORDER BY id ASC LIMIT $3"
const selectTopologicalEventSQL = "" +
"SELECT se.headered_event_json, st.topological_position, st.stream_position " +
" FROM syncapi_output_room_events_topology st " +
" JOIN syncapi_output_room_events se ON se.event_id = st.event_id " +
" WHERE st.room_id = $1 AND st.topological_position < $2 AND se.type = $3 " +
" ORDER BY st.topological_position DESC LIMIT 1"
type outputRoomEventsStatements struct { type outputRoomEventsStatements struct {
insertEventStmt *sql.Stmt insertEventStmt *sql.Stmt
selectEventsStmt *sql.Stmt selectEventsStmt *sql.Stmt
@ -180,6 +187,7 @@ type outputRoomEventsStatements struct {
selectContextEventStmt *sql.Stmt selectContextEventStmt *sql.Stmt
selectContextBeforeEventStmt *sql.Stmt selectContextBeforeEventStmt *sql.Stmt
selectContextAfterEventStmt *sql.Stmt selectContextAfterEventStmt *sql.Stmt
selectTopologicalEventStmt *sql.Stmt
} }
func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) { func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) {
@ -202,6 +210,7 @@ func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) {
{&s.selectContextEventStmt, selectContextEventSQL}, {&s.selectContextEventStmt, selectContextEventSQL},
{&s.selectContextBeforeEventStmt, selectContextBeforeEventSQL}, {&s.selectContextBeforeEventStmt, selectContextBeforeEventSQL},
{&s.selectContextAfterEventStmt, selectContextAfterEventSQL}, {&s.selectContextAfterEventStmt, selectContextAfterEventSQL},
{&s.selectTopologicalEventStmt, selectTopologicalEventSQL},
}.Prepare(db) }.Prepare(db)
} }
@ -591,6 +600,32 @@ func (s *outputRoomEventsStatements) SelectContextAfterEvent(
return lastID, evts, rows.Err() return lastID, evts, rows.Err()
} }
// SelectTopologicalEvent selects an event before and including the given position by eventType and roomID. Returns the found event and the topology token.
// If not event was found, returns nil and sql.ErrNoRows.
func (s *outputRoomEventsStatements) SelectTopologicalEvent(
ctx context.Context, txn *sql.Tx, topologicalPosition int, eventType, roomID string,
) (*gomatrixserverlib.HeaderedEvent, types.TopologyToken, error) {
var (
eventBytes []byte
token types.TopologyToken
)
err := sqlutil.TxStmtContext(ctx, txn, s.selectTopologicalEventStmt).
QueryRowContext(ctx, roomID, topologicalPosition, eventType).
Scan(&eventBytes, &token.Depth, &token.PDUPosition)
if err != nil {
return nil, types.TopologyToken{}, err
}
var res *gomatrixserverlib.HeaderedEvent
if err = json.Unmarshal(eventBytes, &res); err != nil {
return nil, types.TopologyToken{}, err
}
return res, token, nil
}
func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) { func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) {
var result []types.StreamEvent var result []types.StreamEvent
for rows.Next() { for rows.Next() {

View file

@ -65,8 +65,8 @@ const selectPositionInTopologySQL = "" +
"SELECT topological_position, stream_position FROM syncapi_output_room_events_topology" + "SELECT topological_position, stream_position FROM syncapi_output_room_events_topology" +
" WHERE event_id = $1" " WHERE event_id = $1"
// Select the max topological position for the room, then sort by stream position and take the highest, // Select the max topological position for the room, then sort by stream position and take the highest,
// returning both topological and stream positions. // returning both topological and stream positions.
const selectMaxPositionInTopologySQL = "" + const selectMaxPositionInTopologySQL = "" +
"SELECT topological_position, stream_position FROM syncapi_output_room_events_topology" + "SELECT topological_position, stream_position FROM syncapi_output_room_events_topology" +
" WHERE topological_position=(" + " WHERE topological_position=(" +
@ -95,15 +95,28 @@ func NewPostgresTopologyTable(db *sql.DB) (tables.Topology, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
return s, sqlutil.StatementList{ if s.insertEventInTopologyStmt, err = db.Prepare(insertEventInTopologySQL); err != nil {
{&s.insertEventInTopologyStmt, insertEventInTopologySQL}, return nil, err
{&s.selectEventIDsInRangeASCStmt, selectEventIDsInRangeASCSQL}, }
{&s.selectEventIDsInRangeDESCStmt, selectEventIDsInRangeDESCSQL}, if s.selectEventIDsInRangeASCStmt, err = db.Prepare(selectEventIDsInRangeASCSQL); err != nil {
{&s.selectPositionInTopologyStmt, selectPositionInTopologySQL}, return nil, err
{&s.selectMaxPositionInTopologyStmt, selectMaxPositionInTopologySQL}, }
{&s.selectStreamToTopologicalPositionAscStmt, selectStreamToTopologicalPositionAscSQL}, if s.selectEventIDsInRangeDESCStmt, err = db.Prepare(selectEventIDsInRangeDESCSQL); err != nil {
{&s.selectStreamToTopologicalPositionDescStmt, selectStreamToTopologicalPositionDescSQL}, return nil, err
}.Prepare(db) }
if s.selectPositionInTopologyStmt, err = db.Prepare(selectPositionInTopologySQL); err != nil {
return nil, err
}
if s.selectMaxPositionInTopologyStmt, err = db.Prepare(selectMaxPositionInTopologySQL); err != nil {
return nil, err
}
if s.selectStreamToTopologicalPositionAscStmt, err = db.Prepare(selectStreamToTopologicalPositionAscSQL); err != nil {
return nil, err
}
if s.selectStreamToTopologicalPositionDescStmt, err = db.Prepare(selectStreamToTopologicalPositionDescSQL); err != nil {
return nil, err
}
return s, nil
} }
// InsertEventInTopology inserts the given event in the room's topology, based // InsertEventInTopology inserts the given event in the room's topology, based

View file

@ -115,6 +115,13 @@ const selectContextAfterEventSQL = "" +
// WHEN, ORDER BY and LIMIT are appended by prepareWithFilters // WHEN, ORDER BY and LIMIT are appended by prepareWithFilters
const selectTopologicalEventSQL = "" +
"SELECT headered_event_json, topological_position, stream_position " +
" FROM syncapi_output_room_events_topology " +
" JOIN syncapi_output_room_events ON syncapi_output_room_events.event_id = syncapi_output_room_events_topology.event_id " +
" WHERE syncapi_output_room_events_topology.room_id = $1 AND topological_position < $2 AND type = $3 " +
" ORDER BY topological_position DESC LIMIT 1"
type outputRoomEventsStatements struct { type outputRoomEventsStatements struct {
db *sql.DB db *sql.DB
streamIDStatements *StreamIDStatements streamIDStatements *StreamIDStatements
@ -125,6 +132,7 @@ type outputRoomEventsStatements struct {
selectContextEventStmt *sql.Stmt selectContextEventStmt *sql.Stmt
selectContextBeforeEventStmt *sql.Stmt selectContextBeforeEventStmt *sql.Stmt
selectContextAfterEventStmt *sql.Stmt selectContextAfterEventStmt *sql.Stmt
selectTopologicalEventStmt *sql.Stmt
} }
func NewSqliteEventsTable(db *sql.DB, streamID *StreamIDStatements) (tables.Events, error) { func NewSqliteEventsTable(db *sql.DB, streamID *StreamIDStatements) (tables.Events, error) {
@ -144,6 +152,7 @@ func NewSqliteEventsTable(db *sql.DB, streamID *StreamIDStatements) (tables.Even
{&s.selectContextEventStmt, selectContextEventSQL}, {&s.selectContextEventStmt, selectContextEventSQL},
{&s.selectContextBeforeEventStmt, selectContextBeforeEventSQL}, {&s.selectContextBeforeEventStmt, selectContextBeforeEventSQL},
{&s.selectContextAfterEventStmt, selectContextAfterEventSQL}, {&s.selectContextAfterEventStmt, selectContextAfterEventSQL},
{&s.selectTopologicalEventStmt, selectTopologicalEventSQL},
}.Prepare(db) }.Prepare(db)
} }
@ -613,6 +622,31 @@ func (s *outputRoomEventsStatements) SelectContextAfterEvent(
return lastID, evts, rows.Err() return lastID, evts, rows.Err()
} }
// SelectTopologicalEvent selects an event before and including the given position by eventType and roomID. Returns the found event and the topology token.
// If not event was found, returns nil and sql.ErrNoRows.
func (s *outputRoomEventsStatements) SelectTopologicalEvent(
ctx context.Context, txn *sql.Tx, topologicalPosition int, eventType, roomID string,
) (*gomatrixserverlib.HeaderedEvent, types.TopologyToken, error) {
var (
eventBytes []byte
token types.TopologyToken
)
err := sqlutil.TxStmtContext(ctx, txn, s.selectTopologicalEventStmt).
QueryRowContext(ctx, roomID, topologicalPosition, eventType).
Scan(&eventBytes, &token.Depth, &token.PDUPosition)
if err != nil {
return nil, types.TopologyToken{}, err
}
var res *gomatrixserverlib.HeaderedEvent
if err = json.Unmarshal(eventBytes, &res); err != nil {
return nil, types.TopologyToken{}, err
}
return res, token, nil
}
func unmarshalStateIDs(addIDsJSON, delIDsJSON string) (addIDs []string, delIDs []string, err error) { func unmarshalStateIDs(addIDsJSON, delIDsJSON string) (addIDs []string, delIDs []string, err error) {
if len(addIDsJSON) > 0 { if len(addIDsJSON) > 0 {
if err = json.Unmarshal([]byte(addIDsJSON), &addIDs); err != nil { if err = json.Unmarshal([]byte(addIDsJSON), &addIDs); err != nil {

View file

@ -90,15 +90,28 @@ func NewSqliteTopologyTable(db *sql.DB) (tables.Topology, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
return s, sqlutil.StatementList{ if s.insertEventInTopologyStmt, err = db.Prepare(insertEventInTopologySQL); err != nil {
{&s.insertEventInTopologyStmt, insertEventInTopologySQL}, return nil, err
{&s.selectEventIDsInRangeASCStmt, selectEventIDsInRangeASCSQL}, }
{&s.selectEventIDsInRangeDESCStmt, selectEventIDsInRangeDESCSQL}, if s.selectEventIDsInRangeASCStmt, err = db.Prepare(selectEventIDsInRangeASCSQL); err != nil {
{&s.selectPositionInTopologyStmt, selectPositionInTopologySQL}, return nil, err
{&s.selectMaxPositionInTopologyStmt, selectMaxPositionInTopologySQL}, }
{&s.selectStreamToTopologicalPositionAscStmt, selectStreamToTopologicalPositionAscSQL}, if s.selectEventIDsInRangeDESCStmt, err = db.Prepare(selectEventIDsInRangeDESCSQL); err != nil {
{&s.selectStreamToTopologicalPositionDescStmt, selectStreamToTopologicalPositionDescSQL}, return nil, err
}.Prepare(db) }
if s.selectPositionInTopologyStmt, err = db.Prepare(selectPositionInTopologySQL); err != nil {
return nil, err
}
if s.selectMaxPositionInTopologyStmt, err = db.Prepare(selectMaxPositionInTopologySQL); err != nil {
return nil, err
}
if s.selectStreamToTopologicalPositionAscStmt, err = db.Prepare(selectStreamToTopologicalPositionAscSQL); err != nil {
return nil, err
}
if s.selectStreamToTopologicalPositionDescStmt, err = db.Prepare(selectStreamToTopologicalPositionDescSQL); err != nil {
return nil, err
}
return s, nil
} }
// insertEventInTopology inserts the given event in the room's topology, based // insertEventInTopology inserts the given event in the room's topology, based
@ -177,3 +190,28 @@ func (s *outputRoomEventsTopologyStatements) SelectMaxPositionInTopology(
err = stmt.QueryRowContext(ctx, roomID).Scan(&pos, &spos) err = stmt.QueryRowContext(ctx, roomID).Scan(&pos, &spos)
return return
} }
// SelectTopologicalEvent selects an event before and including the given position by eventType and roomID. Returns the found event and the topology token.
// If not event was found, returns nil and sql.ErrNoRows.
func (s *outputRoomEventsTopologyStatements) SelectTopologicalEvent(
ctx context.Context, txn *sql.Tx, topologicalPosition int, eventType, roomID string,
) (*gomatrixserverlib.HeaderedEvent, types.TopologyToken, error) {
var (
eventBytes []byte
token types.TopologyToken
)
err := sqlutil.TxStmtContext(ctx, txn, s.selectTopologicalEventStmt).
QueryRowContext(ctx, roomID, topologicalPosition, eventType).
Scan(&eventBytes, &token.Depth, &token.PDUPosition)
if err != nil {
return nil, types.TopologyToken{}, err
}
var res *gomatrixserverlib.HeaderedEvent
if err = json.Unmarshal(eventBytes, &res); err != nil {
return nil, types.TopologyToken{}, err
}
return res, token, nil
}

View file

@ -74,6 +74,7 @@ type Events interface {
SelectContextEvent(ctx context.Context, txn *sql.Tx, roomID, eventID string) (int, gomatrixserverlib.HeaderedEvent, error) SelectContextEvent(ctx context.Context, txn *sql.Tx, roomID, eventID string) (int, gomatrixserverlib.HeaderedEvent, error)
SelectContextBeforeEvent(ctx context.Context, txn *sql.Tx, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) ([]*gomatrixserverlib.HeaderedEvent, error) SelectContextBeforeEvent(ctx context.Context, txn *sql.Tx, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) ([]*gomatrixserverlib.HeaderedEvent, error)
SelectContextAfterEvent(ctx context.Context, txn *sql.Tx, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) (int, []*gomatrixserverlib.HeaderedEvent, error) SelectContextAfterEvent(ctx context.Context, txn *sql.Tx, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) (int, []*gomatrixserverlib.HeaderedEvent, error)
SelectTopologicalEvent(ctx context.Context, txn *sql.Tx, topologicalPosition int, eventType, roomID string) (*gomatrixserverlib.HeaderedEvent, types.TopologyToken, error)
} }
// Topology keeps track of the depths and stream positions for all events. // Topology keeps track of the depths and stream positions for all events.

View file

@ -29,12 +29,20 @@ func newOutputRoomEventsTable(t *testing.T, dbType test.DBType) (tables.Events,
var tab tables.Events var tab tables.Events
switch dbType { switch dbType {
case test.DBTypePostgres: case test.DBTypePostgres:
_, err = postgres.NewPostgresTopologyTable(db) // needed, since there is a join on it
if err != nil {
t.Fatalf("unable to create table: %s", err)
}
tab, err = postgres.NewPostgresEventsTable(db) tab, err = postgres.NewPostgresEventsTable(db)
case test.DBTypeSQLite: case test.DBTypeSQLite:
var stream sqlite3.StreamIDStatements var stream sqlite3.StreamIDStatements
if err = stream.Prepare(db); err != nil { if err = stream.Prepare(db); err != nil {
t.Fatalf("failed to prepare stream stmts: %s", err) t.Fatalf("failed to prepare stream stmts: %s", err)
} }
_, err = sqlite3.NewSqliteTopologyTable(db) // needed, since there is a join on it
if err != nil {
t.Fatalf("unable to create table: %s", err)
}
tab, err = sqlite3.NewSqliteEventsTable(db, &stream) tab, err = sqlite3.NewSqliteEventsTable(db, &stream)
} }
if err != nil { if err != nil {

View file

@ -28,20 +28,8 @@ func newTopologyTable(t *testing.T, dbType test.DBType) (tables.Topology, *sql.D
var tab tables.Topology var tab tables.Topology
switch dbType { switch dbType {
case test.DBTypePostgres: case test.DBTypePostgres:
_, err = postgres.NewPostgresEventsTable(db) // needed for SQL join
if err != nil {
t.Fatalf("unable to prepare events table")
}
tab, err = postgres.NewPostgresTopologyTable(db) tab, err = postgres.NewPostgresTopologyTable(db)
case test.DBTypeSQLite: case test.DBTypeSQLite:
var stream sqlite3.StreamIDStatements
if err = stream.Prepare(db); err != nil {
t.Fatalf("failed to prepare stream stmts: %s", err)
}
_, err = sqlite3.NewSqliteEventsTable(db, &stream) // needed for SQL join
if err != nil {
t.Fatalf("unable to prepare events table")
}
tab, err = sqlite3.NewSqliteTopologyTable(db) tab, err = sqlite3.NewSqliteTopologyTable(db)
} }
if err != nil { if err != nil {