Update queries, close statements when using SQLite

This commit is contained in:
Till Faelligen 2022-10-19 12:28:34 +02:00
parent 615e426ae6
commit 015876ed76
No known key found for this signature in database
GPG key ID: 3DF82D8AB9211D4E
2 changed files with 88 additions and 34 deletions

View file

@ -28,8 +28,9 @@ import (
"github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/syncapi/types"
"github.com/lib/pq" "github.com/lib/pq"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/dendrite/internal/sqlutil"
) )
const outputRoomEventsSchema = ` const outputRoomEventsSchema = `
@ -133,7 +134,7 @@ const updateEventJSONSQL = "" +
"UPDATE syncapi_output_room_events SET headered_event_json=$1 WHERE event_id=$2" "UPDATE syncapi_output_room_events SET headered_event_json=$1 WHERE event_id=$2"
// In order for us to apply the state updates correctly, rows need to be ordered in the order they were received (id). // In order for us to apply the state updates correctly, rows need to be ordered in the order they were received (id).
const selectStateInRangeSQL = "" + const selectStateInRangeFilteredSQL = "" +
"SELECT event_id, id, headered_event_json, exclude_from_sync, add_state_ids, remove_state_ids, history_visibility" + "SELECT event_id, id, headered_event_json, exclude_from_sync, add_state_ids, remove_state_ids, history_visibility" +
" FROM syncapi_output_room_events" + " FROM syncapi_output_room_events" +
" WHERE (id > $1 AND id <= $2) AND (add_state_ids IS NOT NULL OR remove_state_ids IS NOT NULL)" + " WHERE (id > $1 AND id <= $2) AND (add_state_ids IS NOT NULL OR remove_state_ids IS NOT NULL)" +
@ -146,6 +147,15 @@ const selectStateInRangeSQL = "" +
" ORDER BY id ASC" + " ORDER BY id ASC" +
" LIMIT $9" " LIMIT $9"
// In order for us to apply the state updates correctly, rows need to be ordered in the order they were received (id).
const selectStateInRangeSQL = "" +
"SELECT event_id, id, headered_event_json, exclude_from_sync, add_state_ids, remove_state_ids, history_visibility" +
" FROM syncapi_output_room_events" +
" WHERE (id > $1 AND id <= $2) AND (add_state_ids IS NOT NULL OR remove_state_ids IS NOT NULL)" +
" AND room_id = ANY($3)" +
" ORDER BY id ASC" +
" LIMIT $4"
const deleteEventsForRoomSQL = "" + const deleteEventsForRoomSQL = "" +
"DELETE FROM syncapi_output_room_events WHERE room_id = $1" "DELETE FROM syncapi_output_room_events WHERE room_id = $1"
@ -171,20 +181,21 @@ const selectContextAfterEventSQL = "" +
const selectSearchSQL = "SELECT id, event_id, headered_event_json FROM syncapi_output_room_events WHERE id > $1 AND type = ANY($2) ORDER BY id ASC LIMIT $3" const selectSearchSQL = "SELECT id, event_id, headered_event_json FROM syncapi_output_room_events WHERE id > $1 AND type = ANY($2) ORDER BY id ASC LIMIT $3"
type outputRoomEventsStatements struct { type outputRoomEventsStatements struct {
insertEventStmt *sql.Stmt insertEventStmt *sql.Stmt
selectEventsStmt *sql.Stmt selectEventsStmt *sql.Stmt
selectEventsWitFilterStmt *sql.Stmt selectEventsWitFilterStmt *sql.Stmt
selectMaxEventIDStmt *sql.Stmt selectMaxEventIDStmt *sql.Stmt
selectRecentEventsStmt *sql.Stmt selectRecentEventsStmt *sql.Stmt
selectRecentEventsForSyncStmt *sql.Stmt selectRecentEventsForSyncStmt *sql.Stmt
selectEarlyEventsStmt *sql.Stmt selectEarlyEventsStmt *sql.Stmt
selectStateInRangeStmt *sql.Stmt selectStateInRangeFilteredStmt *sql.Stmt
updateEventJSONStmt *sql.Stmt selectStateInRangeStmt *sql.Stmt
deleteEventsForRoomStmt *sql.Stmt updateEventJSONStmt *sql.Stmt
selectContextEventStmt *sql.Stmt deleteEventsForRoomStmt *sql.Stmt
selectContextBeforeEventStmt *sql.Stmt selectContextEventStmt *sql.Stmt
selectContextAfterEventStmt *sql.Stmt selectContextBeforeEventStmt *sql.Stmt
selectSearchStmt *sql.Stmt selectContextAfterEventStmt *sql.Stmt
selectSearchStmt *sql.Stmt
} }
func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) { func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) {
@ -214,6 +225,7 @@ func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) {
{&s.selectRecentEventsStmt, selectRecentEventsSQL}, {&s.selectRecentEventsStmt, selectRecentEventsSQL},
{&s.selectRecentEventsForSyncStmt, selectRecentEventsForSyncSQL}, {&s.selectRecentEventsForSyncStmt, selectRecentEventsForSyncSQL},
{&s.selectEarlyEventsStmt, selectEarlyEventsSQL}, {&s.selectEarlyEventsStmt, selectEarlyEventsSQL},
{&s.selectStateInRangeFilteredStmt, selectStateInRangeFilteredSQL},
{&s.selectStateInRangeStmt, selectStateInRangeSQL}, {&s.selectStateInRangeStmt, selectStateInRangeSQL},
{&s.updateEventJSONStmt, updateEventJSONSQL}, {&s.updateEventJSONStmt, updateEventJSONSQL},
{&s.deleteEventsForRoomStmt, deleteEventsForRoomSQL}, {&s.deleteEventsForRoomStmt, deleteEventsForRoomSQL},
@ -240,17 +252,28 @@ func (s *outputRoomEventsStatements) SelectStateInRange(
ctx context.Context, txn *sql.Tx, r types.Range, ctx context.Context, txn *sql.Tx, r types.Range,
stateFilter *gomatrixserverlib.StateFilter, roomIDs []string, stateFilter *gomatrixserverlib.StateFilter, roomIDs []string,
) (map[string]map[string]bool, map[string]types.StreamEvent, error) { ) (map[string]map[string]bool, map[string]types.StreamEvent, error) {
stmt := sqlutil.TxStmt(txn, s.selectStateInRangeStmt) var rows *sql.Rows
senders, notSenders := getSendersStateFilterFilter(stateFilter) var err error
rows, err := stmt.QueryContext( if stateFilter != nil {
ctx, r.Low(), r.High(), pq.StringArray(roomIDs), stmt := sqlutil.TxStmt(txn, s.selectStateInRangeFilteredStmt)
pq.StringArray(senders), senders, notSenders := getSendersStateFilterFilter(stateFilter)
pq.StringArray(notSenders), rows, err = stmt.QueryContext(
pq.StringArray(filterConvertTypeWildcardToSQL(stateFilter.Types)), ctx, r.Low(), r.High(), pq.StringArray(roomIDs),
pq.StringArray(filterConvertTypeWildcardToSQL(stateFilter.NotTypes)), pq.StringArray(senders),
stateFilter.ContainsURL, pq.StringArray(notSenders),
stateFilter.Limit, pq.StringArray(filterConvertTypeWildcardToSQL(stateFilter.Types)),
) pq.StringArray(filterConvertTypeWildcardToSQL(stateFilter.NotTypes)),
stateFilter.ContainsURL,
stateFilter.Limit,
)
} else {
stmt := sqlutil.TxStmt(txn, s.selectStateInRangeStmt)
rows, err = stmt.QueryContext(
ctx, r.Low(), r.High(), pq.StringArray(roomIDs),
r.High()-r.Low(),
)
}
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }

View file

@ -29,8 +29,9 @@ import (
"github.com/matrix-org/dendrite/syncapi/storage/tables" "github.com/matrix-org/dendrite/syncapi/storage/tables"
"github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/dendrite/internal/sqlutil"
) )
const outputRoomEventsSchema = ` const outputRoomEventsSchema = `
@ -189,21 +190,36 @@ func (s *outputRoomEventsStatements) SelectStateInRange(
for _, roomID := range roomIDs { for _, roomID := range roomIDs {
inputParams = append(inputParams, roomID) inputParams = append(inputParams, roomID)
} }
stmt, params, err := prepareWithFilters( var (
s.db, txn, stmtSQL, inputParams, stmt *sql.Stmt
stateFilter.Senders, stateFilter.NotSenders, params []any
stateFilter.Types, stateFilter.NotTypes, err error
nil, stateFilter.ContainsURL, stateFilter.Limit, FilterOrderAsc,
) )
if stateFilter != nil {
stmt, params, err = prepareWithFilters(
s.db, txn, stmtSQL, inputParams,
stateFilter.Senders, stateFilter.NotSenders,
stateFilter.Types, stateFilter.NotTypes,
nil, stateFilter.ContainsURL, stateFilter.Limit, FilterOrderAsc,
)
} else {
stmt, params, err = prepareWithFilters(
s.db, txn, stmtSQL, inputParams,
nil, nil,
nil, nil,
nil, nil, int(r.High()-r.Low()), FilterOrderAsc,
)
}
if err != nil { if err != nil {
return nil, nil, fmt.Errorf("s.prepareWithFilters: %w", err) return nil, nil, fmt.Errorf("s.prepareWithFilters: %w", err)
} }
defer internal.CloseAndLogIfError(ctx, stmt, "selectStateInRange: stmt.close() failed")
rows, err := stmt.QueryContext(ctx, params...) rows, err := stmt.QueryContext(ctx, params...)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
defer rows.Close() // nolint: errcheck defer internal.CloseAndLogIfError(ctx, rows, "selectStateInRange: rows.close() failed")
// Fetch all the state change events for all rooms between the two positions then loop each event and: // Fetch all the state change events for all rooms between the two positions then loop each event and:
// - Keep a cache of the event by ID (99% of state change events are for the event itself) // - Keep a cache of the event by ID (99% of state change events are for the event itself)
// - For each room ID, build up an array of event IDs which represents cumulative adds/removes // - For each room ID, build up an array of event IDs which represents cumulative adds/removes
@ -269,6 +285,7 @@ func (s *outputRoomEventsStatements) SelectMaxEventID(
) (id int64, err error) { ) (id int64, err error) {
var nullableID sql.NullInt64 var nullableID sql.NullInt64
stmt := sqlutil.TxStmt(txn, s.selectMaxEventIDStmt) stmt := sqlutil.TxStmt(txn, s.selectMaxEventIDStmt)
defer internal.CloseAndLogIfError(ctx, stmt, "SelectMaxEventID: stmt.close() failed")
err = stmt.QueryRowContext(ctx).Scan(&nullableID) err = stmt.QueryRowContext(ctx).Scan(&nullableID)
if nullableID.Valid { if nullableID.Valid {
id = nullableID.Int64 id = nullableID.Int64
@ -323,6 +340,7 @@ func (s *outputRoomEventsStatements) InsertEvent(
return 0, err return 0, err
} }
insertStmt := sqlutil.TxStmt(txn, s.insertEventStmt) insertStmt := sqlutil.TxStmt(txn, s.insertEventStmt)
defer internal.CloseAndLogIfError(ctx, insertStmt, "InsertEvent: stmt.close() failed")
_, err = insertStmt.ExecContext( _, err = insertStmt.ExecContext(
ctx, ctx,
streamPos, streamPos,
@ -367,6 +385,7 @@ func (s *outputRoomEventsStatements) SelectRecentEvents(
if err != nil { if err != nil {
return nil, false, fmt.Errorf("s.prepareWithFilters: %w", err) return nil, false, fmt.Errorf("s.prepareWithFilters: %w", err)
} }
defer internal.CloseAndLogIfError(ctx, stmt, "selectRecentEvents: stmt.close() failed")
rows, err := stmt.QueryContext(ctx, params...) rows, err := stmt.QueryContext(ctx, params...)
if err != nil { if err != nil {
@ -415,6 +434,8 @@ func (s *outputRoomEventsStatements) SelectEarlyEvents(
if err != nil { if err != nil {
return nil, fmt.Errorf("s.prepareWithFilters: %w", err) return nil, fmt.Errorf("s.prepareWithFilters: %w", err)
} }
defer internal.CloseAndLogIfError(ctx, stmt, "SelectEarlyEvents: stmt.close() failed")
rows, err := stmt.QueryContext(ctx, params...) rows, err := stmt.QueryContext(ctx, params...)
if err != nil { if err != nil {
return nil, err return nil, err
@ -456,6 +477,8 @@ func (s *outputRoomEventsStatements) SelectEvents(
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer internal.CloseAndLogIfError(ctx, stmt, "SelectEvents: stmt.close() failed")
rows, err := stmt.QueryContext(ctx, params...) rows, err := stmt.QueryContext(ctx, params...)
if err != nil { if err != nil {
return nil, err return nil, err
@ -558,6 +581,10 @@ func (s *outputRoomEventsStatements) SelectContextBeforeEvent(
filter.Types, filter.NotTypes, filter.Types, filter.NotTypes,
nil, filter.ContainsURL, filter.Limit, FilterOrderDesc, nil, filter.ContainsURL, filter.Limit, FilterOrderDesc,
) )
if err != nil {
return
}
defer internal.CloseAndLogIfError(ctx, stmt, "SelectContextBeforeEvent: stmt.close() failed")
rows, err := stmt.QueryContext(ctx, params...) rows, err := stmt.QueryContext(ctx, params...)
if err != nil { if err != nil {
@ -596,6 +623,10 @@ func (s *outputRoomEventsStatements) SelectContextAfterEvent(
filter.Types, filter.NotTypes, filter.Types, filter.NotTypes,
nil, filter.ContainsURL, filter.Limit, FilterOrderAsc, nil, filter.ContainsURL, filter.Limit, FilterOrderAsc,
) )
if err != nil {
return
}
defer internal.CloseAndLogIfError(ctx, stmt, "SelectContextAfterEvent: stmt.close() failed")
rows, err := stmt.QueryContext(ctx, params...) rows, err := stmt.QueryContext(ctx, params...)
if err != nil { if err != nil {