Try to not hit the database more than needed to get state deltas

This commit is contained in:
Neil Alexander 2020-12-18 18:14:03 +00:00
parent e50273cfe6
commit b9262eab70
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
4 changed files with 76 additions and 21 deletions

View file

@ -102,6 +102,10 @@ const selectMaxEventIDSQL = "" +
const updateEventJSONSQL = "" +
"UPDATE syncapi_output_room_events SET headered_event_json=$1 WHERE event_id=$2"
const selectCountStateChangesInRangeSQL = "" +
"SELECT COUNT(*), MIN(id), MAX(id) FROM syncapi_output_room_events" +
" WHERE (id > $1 AND id <= $2) AND (cardinality (add_state_ids) > 0 OR cardinality (remove_state_ids) > 0)"
// In order for us to apply the state updates correctly, rows need to be ordered in the order they were received (id).
const selectStateInRangeSQL = "" +
"SELECT id, headered_event_json, exclude_from_sync, add_state_ids, remove_state_ids" +
@ -128,6 +132,7 @@ type outputRoomEventsStatements struct {
selectStateInRangeStmt *sql.Stmt
updateEventJSONStmt *sql.Stmt
deleteEventsForRoomStmt *sql.Stmt
selectCountStateChangesInRangeStmt *sql.Stmt
}
func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) {
@ -163,6 +168,9 @@ func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) {
if s.deleteEventsForRoomStmt, err = db.Prepare(deleteEventsForRoomSQL); err != nil {
return nil, err
}
if s.selectCountStateChangesInRangeStmt, err = db.Prepare(selectCountStateChangesInRangeSQL); err != nil {
return nil, err
}
return s, nil
}
@ -446,3 +454,17 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) {
}
return result, rows.Err()
}
// SelectCountStateChangesInRange returns how many state changes there were
// and which positions they happened between.
func (s *outputRoomEventsStatements) SelectCountStateChangesInRange(
ctx context.Context, txn *sql.Tx, r types.Range,
) (int, types.Range, error) {
stmt := sqlutil.TxStmt(txn, s.selectCountStateChangesInRangeStmt)
var count int
var from, to types.StreamPosition
if err := stmt.QueryRowContext(ctx, r.From, r.To).Scan(&count, &from, &to); err != nil {
return 0, types.Range{}, err
}
return count, types.Range{From: from, To: to}, nil
}

View file

@ -1182,8 +1182,17 @@ func (d *Database) getStateDeltas(
// - Get all CURRENTLY joined rooms, and add them to 'joined' block.
var deltas []stateDelta
// work out if any state changed, and if so, within which ranges
count, nr, err := d.OutputEvents.SelectCountStateChangesInRange(ctx, txn, r)
if err != nil {
return nil, nil, err
}
if count == 0 {
return nil, nil, nil
}
// get all the state events ever (i.e. for all available rooms) between these two positions
stateNeeded, eventMap, err := d.OutputEvents.SelectStateInRange(ctx, txn, r, stateFilter)
stateNeeded, eventMap, err := d.OutputEvents.SelectStateInRange(ctx, txn, nr, stateFilter)
if err != nil {
return nil, nil, err
}

View file

@ -90,6 +90,11 @@ const updateEventJSONSQL = "" +
$7 = stateFilterPart.ContainsURL,
$8 = stateFilterPart.Limit,
*/
const selectCountStateChangesInRangeSQL = "" +
"SELECT COUNT(*), MIN(id), MAX(id) FROM syncapi_output_room_events" +
" WHERE (id > $1 AND id <= $2) AND ((add_state_ids IS NULL OR add_state_ids = '') OR (remove_state_ids IS NULL OR remove_state_ids = ''))"
const selectStateInRangeSQL = "" +
"SELECT id, headered_event_json, exclude_from_sync, add_state_ids, remove_state_ids" +
" FROM syncapi_output_room_events" +
@ -118,6 +123,7 @@ type outputRoomEventsStatements struct {
selectStateInRangeStmt *sql.Stmt
updateEventJSONStmt *sql.Stmt
deleteEventsForRoomStmt *sql.Stmt
selectCountStateChangesInRangeStmt *sql.Stmt
}
func NewSqliteEventsTable(db *sql.DB, streamID *streamIDStatements) (tables.Events, error) {
@ -156,6 +162,9 @@ func NewSqliteEventsTable(db *sql.DB, streamID *streamIDStatements) (tables.Even
if s.deleteEventsForRoomStmt, err = db.Prepare(deleteEventsForRoomSQL); err != nil {
return nil, err
}
if s.selectCountStateChangesInRangeStmt, err = db.Prepare(selectCountStateChangesInRangeSQL); err != nil {
return nil, err
}
return s, nil
}
@ -475,3 +484,17 @@ func unmarshalStateIDs(addIDsJSON, delIDsJSON string) (addIDs []string, delIDs [
}
return
}
// SelectCountStateChangesInRange returns how many state changes there were
// and which positions they happened between.
func (s *outputRoomEventsStatements) SelectCountStateChangesInRange(
ctx context.Context, txn *sql.Tx, r types.Range,
) (int, types.Range, error) {
stmt := sqlutil.TxStmt(txn, s.selectCountStateChangesInRangeStmt)
var count int
var from, to types.StreamPosition
if err := stmt.QueryRowContext(ctx, r.From, r.To).Scan(&count, &from, &to); err != nil {
return 0, types.Range{}, err
}
return count, types.Range{From: from, To: to}, nil
}

View file

@ -51,6 +51,7 @@ type Peeks interface {
type Events interface {
SelectStateInRange(ctx context.Context, txn *sql.Tx, r types.Range, stateFilter *gomatrixserverlib.StateFilter) (map[string]map[string]bool, map[string]types.StreamEvent, error)
SelectCountStateChangesInRange(ctx context.Context, txn *sql.Tx, r types.Range) (int, types.Range, error)
SelectMaxEventID(ctx context.Context, txn *sql.Tx) (id int64, err error)
InsertEvent(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, addState, removeState []string, transactionID *api.TransactionID, excludeFromSync bool) (streamPos types.StreamPosition, err error)
// SelectRecentEvents returns events between the two stream positions: exclusive of low and inclusive of high.