diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go index 07d0b1fbc..ae8389a9c 100644 --- a/syncapi/storage/postgres/output_room_events_table.go +++ b/syncapi/storage/postgres/output_room_events_table.go @@ -102,6 +102,10 @@ const selectMaxEventIDSQL = "" + const updateEventJSONSQL = "" + "UPDATE syncapi_output_room_events SET headered_event_json=$1 WHERE event_id=$2" +const selectCountStateChangesInRangeSQL = "" + + "SELECT COUNT(*), MIN(id), MAX(id) FROM syncapi_output_room_events" + + " WHERE (id > $1 AND id <= $2) AND (cardinality (add_state_ids) > 0 OR cardinality (remove_state_ids) > 0)" + // In order for us to apply the state updates correctly, rows need to be ordered in the order they were received (id). const selectStateInRangeSQL = "" + "SELECT id, headered_event_json, exclude_from_sync, add_state_ids, remove_state_ids" + @@ -119,15 +123,16 @@ const deleteEventsForRoomSQL = "" + "DELETE FROM syncapi_output_room_events WHERE room_id = $1" type outputRoomEventsStatements struct { - insertEventStmt *sql.Stmt - selectEventsStmt *sql.Stmt - selectMaxEventIDStmt *sql.Stmt - selectRecentEventsStmt *sql.Stmt - selectRecentEventsForSyncStmt *sql.Stmt - selectEarlyEventsStmt *sql.Stmt - selectStateInRangeStmt *sql.Stmt - updateEventJSONStmt *sql.Stmt - deleteEventsForRoomStmt *sql.Stmt + insertEventStmt *sql.Stmt + selectEventsStmt *sql.Stmt + selectMaxEventIDStmt *sql.Stmt + selectRecentEventsStmt *sql.Stmt + selectRecentEventsForSyncStmt *sql.Stmt + selectEarlyEventsStmt *sql.Stmt + selectStateInRangeStmt *sql.Stmt + updateEventJSONStmt *sql.Stmt + deleteEventsForRoomStmt *sql.Stmt + selectCountStateChangesInRangeStmt *sql.Stmt } func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) { @@ -163,6 +168,9 @@ func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) { if s.deleteEventsForRoomStmt, err = db.Prepare(deleteEventsForRoomSQL); err != nil { return nil, err } + if s.selectCountStateChangesInRangeStmt, err = db.Prepare(selectCountStateChangesInRangeSQL); err != nil { + return nil, err + } return s, nil } @@ -446,3 +454,17 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) { } return result, rows.Err() } + +// SelectCountStateChangesInRange returns how many state changes there were +// and which positions they happened between. +func (s *outputRoomEventsStatements) SelectCountStateChangesInRange( + ctx context.Context, txn *sql.Tx, r types.Range, +) (int, types.Range, error) { + stmt := sqlutil.TxStmt(txn, s.selectCountStateChangesInRangeStmt) + var count int + var from, to types.StreamPosition + if err := stmt.QueryRowContext(ctx, r.From, r.To).Scan(&count, &from, &to); err != nil { + return 0, types.Range{}, err + } + return count, types.Range{From: from, To: to}, nil +} diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index 128aaa5b7..c3468d575 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -1182,8 +1182,17 @@ func (d *Database) getStateDeltas( // - Get all CURRENTLY joined rooms, and add them to 'joined' block. var deltas []stateDelta + // work out if any state changed, and if so, within which ranges + count, nr, err := d.OutputEvents.SelectCountStateChangesInRange(ctx, txn, r) + if err != nil { + return nil, nil, err + } + if count == 0 { + return nil, nil, nil + } + // get all the state events ever (i.e. for all available rooms) between these two positions - stateNeeded, eventMap, err := d.OutputEvents.SelectStateInRange(ctx, txn, r, stateFilter) + stateNeeded, eventMap, err := d.OutputEvents.SelectStateInRange(ctx, txn, nr, stateFilter) if err != nil { return nil, nil, err } diff --git a/syncapi/storage/sqlite3/output_room_events_table.go b/syncapi/storage/sqlite3/output_room_events_table.go index edbd36fb1..303143ddc 100644 --- a/syncapi/storage/sqlite3/output_room_events_table.go +++ b/syncapi/storage/sqlite3/output_room_events_table.go @@ -90,6 +90,11 @@ const updateEventJSONSQL = "" + $7 = stateFilterPart.ContainsURL, $8 = stateFilterPart.Limit, */ + +const selectCountStateChangesInRangeSQL = "" + + "SELECT COUNT(*), MIN(id), MAX(id) FROM syncapi_output_room_events" + + " WHERE (id > $1 AND id <= $2) AND ((add_state_ids IS NULL OR add_state_ids = '') OR (remove_state_ids IS NULL OR remove_state_ids = ''))" + const selectStateInRangeSQL = "" + "SELECT id, headered_event_json, exclude_from_sync, add_state_ids, remove_state_ids" + " FROM syncapi_output_room_events" + @@ -107,17 +112,18 @@ const deleteEventsForRoomSQL = "" + "DELETE FROM syncapi_output_room_events WHERE room_id = $1" type outputRoomEventsStatements struct { - db *sql.DB - streamIDStatements *streamIDStatements - insertEventStmt *sql.Stmt - selectEventsStmt *sql.Stmt - selectMaxEventIDStmt *sql.Stmt - selectRecentEventsStmt *sql.Stmt - selectRecentEventsForSyncStmt *sql.Stmt - selectEarlyEventsStmt *sql.Stmt - selectStateInRangeStmt *sql.Stmt - updateEventJSONStmt *sql.Stmt - deleteEventsForRoomStmt *sql.Stmt + db *sql.DB + streamIDStatements *streamIDStatements + insertEventStmt *sql.Stmt + selectEventsStmt *sql.Stmt + selectMaxEventIDStmt *sql.Stmt + selectRecentEventsStmt *sql.Stmt + selectRecentEventsForSyncStmt *sql.Stmt + selectEarlyEventsStmt *sql.Stmt + selectStateInRangeStmt *sql.Stmt + updateEventJSONStmt *sql.Stmt + deleteEventsForRoomStmt *sql.Stmt + selectCountStateChangesInRangeStmt *sql.Stmt } func NewSqliteEventsTable(db *sql.DB, streamID *streamIDStatements) (tables.Events, error) { @@ -156,6 +162,9 @@ func NewSqliteEventsTable(db *sql.DB, streamID *streamIDStatements) (tables.Even if s.deleteEventsForRoomStmt, err = db.Prepare(deleteEventsForRoomSQL); err != nil { return nil, err } + if s.selectCountStateChangesInRangeStmt, err = db.Prepare(selectCountStateChangesInRangeSQL); err != nil { + return nil, err + } return s, nil } @@ -475,3 +484,17 @@ func unmarshalStateIDs(addIDsJSON, delIDsJSON string) (addIDs []string, delIDs [ } return } + +// SelectCountStateChangesInRange returns how many state changes there were +// and which positions they happened between. +func (s *outputRoomEventsStatements) SelectCountStateChangesInRange( + ctx context.Context, txn *sql.Tx, r types.Range, +) (int, types.Range, error) { + stmt := sqlutil.TxStmt(txn, s.selectCountStateChangesInRangeStmt) + var count int + var from, to types.StreamPosition + if err := stmt.QueryRowContext(ctx, r.From, r.To).Scan(&count, &from, &to); err != nil { + return 0, types.Range{}, err + } + return count, types.Range{From: from, To: to}, nil +} diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go index 1e654da56..01bbb1d46 100644 --- a/syncapi/storage/tables/interface.go +++ b/syncapi/storage/tables/interface.go @@ -51,6 +51,7 @@ type Peeks interface { type Events interface { SelectStateInRange(ctx context.Context, txn *sql.Tx, r types.Range, stateFilter *gomatrixserverlib.StateFilter) (map[string]map[string]bool, map[string]types.StreamEvent, error) + SelectCountStateChangesInRange(ctx context.Context, txn *sql.Tx, r types.Range) (int, types.Range, error) SelectMaxEventID(ctx context.Context, txn *sql.Tx) (id int64, err error) InsertEvent(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, addState, removeState []string, transactionID *api.TransactionID, excludeFromSync bool) (streamPos types.StreamPosition, err error) // SelectRecentEvents returns events between the two stream positions: exclusive of low and inclusive of high.