From bc4be3284ca629bf9d1a55f6089c75c8a7f39314 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Tue, 19 Jan 2021 11:52:18 +0000 Subject: [PATCH] Attempt filtering on SQLite --- .../sqlite3/output_room_events_table.go | 145 ++++++++++++------ 1 file changed, 99 insertions(+), 46 deletions(-) diff --git a/syncapi/storage/sqlite3/output_room_events_table.go b/syncapi/storage/sqlite3/output_room_events_table.go index 7eb5f1010..02cdb4c02 100644 --- a/syncapi/storage/sqlite3/output_room_events_table.go +++ b/syncapi/storage/sqlite3/output_room_events_table.go @@ -19,7 +19,9 @@ import ( "context" "database/sql" "encoding/json" + "fmt" "sort" + "strings" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/roomserver/api" @@ -61,16 +63,19 @@ const selectEventsSQL = "" + const selectRecentEventsSQL = "" + "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" + " WHERE room_id = $1 AND id > $2 AND id <= $3" + + " $FILTERS" + " ORDER BY id DESC LIMIT $4" const selectRecentEventsForSyncSQL = "" + "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" + " WHERE room_id = $1 AND id > $2 AND id <= $3 AND exclude_from_sync = FALSE" + + " $FILTERS" + " ORDER BY id DESC LIMIT $4" const selectEarlyEventsSQL = "" + "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" + " WHERE room_id = $1 AND id > $2 AND id <= $3" + + " $FILTERS" + " ORDER BY id ASC LIMIT $4" const selectMaxEventIDSQL = "" + @@ -95,11 +100,7 @@ const selectStateInRangeSQL = "" + " FROM syncapi_output_room_events" + " WHERE (id > $1 AND id <= $2)" + // old/new pos " AND (add_state_ids IS NOT NULL OR remove_state_ids IS NOT NULL)" + - /* " AND ( $3 IS NULL OR sender IN ($3) )" + // sender - " AND ( $4 IS NULL OR NOT(sender IN ($4)) )" + // not sender - " AND ( $5 IS NULL OR type IN ($5) )" + // type - " AND ( $6 IS NULL OR NOT(type IN ($6)) )" + // not type - " AND ( $7 IS NULL OR contains_url = $7)" + // contains URL? */ + " $FILTERS" + " ORDER BY id ASC" + " LIMIT $8" // limit @@ -107,17 +108,13 @@ const deleteEventsForRoomSQL = "" + "DELETE FROM syncapi_output_room_events WHERE room_id = $1" type outputRoomEventsStatements struct { - db *sql.DB - streamIDStatements *streamIDStatements - insertEventStmt *sql.Stmt - selectEventsStmt *sql.Stmt - selectMaxEventIDStmt *sql.Stmt - selectRecentEventsStmt *sql.Stmt - selectRecentEventsForSyncStmt *sql.Stmt - selectEarlyEventsStmt *sql.Stmt - selectStateInRangeStmt *sql.Stmt - updateEventJSONStmt *sql.Stmt - deleteEventsForRoomStmt *sql.Stmt + db *sql.DB + streamIDStatements *streamIDStatements + insertEventStmt *sql.Stmt + selectEventsStmt *sql.Stmt + selectMaxEventIDStmt *sql.Stmt + updateEventJSONStmt *sql.Stmt + deleteEventsForRoomStmt *sql.Stmt } func NewSqliteEventsTable(db *sql.DB, streamID *streamIDStatements) (tables.Events, error) { @@ -138,18 +135,6 @@ func NewSqliteEventsTable(db *sql.DB, streamID *streamIDStatements) (tables.Even if s.selectMaxEventIDStmt, err = db.Prepare(selectMaxEventIDSQL); err != nil { return nil, err } - if s.selectRecentEventsStmt, err = db.Prepare(selectRecentEventsSQL); err != nil { - return nil, err - } - if s.selectRecentEventsForSyncStmt, err = db.Prepare(selectRecentEventsForSyncSQL); err != nil { - return nil, err - } - if s.selectEarlyEventsStmt, err = db.Prepare(selectEarlyEventsSQL); err != nil { - return nil, err - } - if s.selectStateInRangeStmt, err = db.Prepare(selectStateInRangeSQL); err != nil { - return nil, err - } if s.updateEventJSONStmt, err = db.Prepare(updateEventJSONSQL); err != nil { return nil, err } @@ -159,6 +144,50 @@ func NewSqliteEventsTable(db *sql.DB, streamID *streamIDStatements) (tables.Even return s, nil } +func (s *outputRoomEventsStatements) prepareWithFilters( + query string, params []interface{}, + senders, notsenders, types, nottypes []string, limit int, +) (*sql.Stmt, []interface{}, error) { + filters := "" + offset := len(params) + if count := len(senders); count > 0 { + filters += " AND sender IN " + sqlutil.QueryVariadicOffset(count, offset) + for _, v := range senders { + params = append(params, v) + offset++ + } + } + if count := len(notsenders); count > 0 { + filters += " AND sender NOT IN " + sqlutil.QueryVariadicOffset(count, offset) + for _, v := range notsenders { + params = append(params, v) + offset++ + } + } + if count := len(types); count > 0 { + filters += " AND type IN " + sqlutil.QueryVariadicOffset(count, offset) + for _, v := range types { + params = append(params, v) + offset++ + } + } + if count := len(nottypes); count > 0 { + filters += " AND type NOT IN " + sqlutil.QueryVariadicOffset(count, offset) + for _, v := range nottypes { + params = append(params, v) + offset++ + } + } + params = append(params, limit) + query = strings.Replace(query, " $FILTERS", filters, 1) + // logrus.Infof("QUERY: %s", query) + stmt, err := s.db.Prepare(query) + if err != nil { + return nil, nil, fmt.Errorf("s.db.Prepare: %w", err) + } + return stmt, params, nil +} + func (s *outputRoomEventsStatements) UpdateEventJSON(ctx context.Context, event *gomatrixserverlib.HeaderedEvent) error { headeredJSON, err := json.Marshal(event) if err != nil { @@ -175,20 +204,20 @@ func (s *outputRoomEventsStatements) SelectStateInRange( ctx context.Context, txn *sql.Tx, r types.Range, stateFilterPart *gomatrixserverlib.StateFilter, ) (map[string]map[string]bool, map[string]types.StreamEvent, error) { - params := []interface{}{} - _ = params - - stmt := sqlutil.TxStmt(txn, s.selectStateInRangeStmt) - - rows, err := stmt.QueryContext( - ctx, r.Low(), r.High(), - /*pq.StringArray(stateFilterPart.Senders), - pq.StringArray(stateFilterPart.NotSenders), - pq.StringArray(filterConvertTypeWildcardToSQL(stateFilterPart.Types)), - pq.StringArray(filterConvertTypeWildcardToSQL(stateFilterPart.NotTypes)), - stateFilterPart.ContainsURL,*/ + stmt, params, err := s.prepareWithFilters( + selectStateInRangeSQL, + []interface{}{ + r.Low(), r.High(), + }, + stateFilterPart.Senders, stateFilterPart.NotSenders, + stateFilterPart.Types, stateFilterPart.NotTypes, stateFilterPart.Limit, ) + if err != nil { + return nil, nil, fmt.Errorf("s.prepareWithFilters: %w", err) + } + + rows, err := stmt.QueryContext(ctx, params...) if err != nil { return nil, nil, err } @@ -339,14 +368,27 @@ func (s *outputRoomEventsStatements) SelectRecentEvents( roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool, ) ([]types.StreamEvent, bool, error) { - var stmt *sql.Stmt + var query string if onlySyncEvents { - stmt = sqlutil.TxStmt(txn, s.selectRecentEventsForSyncStmt) + query = selectRecentEventsForSyncSQL } else { - stmt = sqlutil.TxStmt(txn, s.selectRecentEventsStmt) + query = selectRecentEventsSQL } - rows, err := stmt.QueryContext(ctx, roomID, r.Low(), r.High(), eventFilter.Limit+1) + stmt, params, err := s.prepareWithFilters( + query, + []interface{}{ + roomID, r.Low(), r.High(), + }, + eventFilter.Senders, eventFilter.NotSenders, + eventFilter.Types, eventFilter.NotTypes, + eventFilter.Limit, + ) + if err != nil { + return nil, false, fmt.Errorf("s.prepareWithFilters: %w", err) + } + + rows, err := stmt.QueryContext(ctx, params...) if err != nil { return nil, false, err } @@ -381,8 +423,19 @@ func (s *outputRoomEventsStatements) SelectEarlyEvents( ctx context.Context, txn *sql.Tx, roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter, ) ([]types.StreamEvent, error) { - stmt := sqlutil.TxStmt(txn, s.selectEarlyEventsStmt) - rows, err := stmt.QueryContext(ctx, roomID, r.Low(), r.High(), eventFilter.Limit) + stmt, params, err := s.prepareWithFilters( + selectEarlyEventsSQL, + []interface{}{ + roomID, r.Low(), r.High(), + }, + eventFilter.Senders, eventFilter.NotSenders, + eventFilter.Types, eventFilter.NotTypes, + eventFilter.Limit, + ) + if err != nil { + return nil, fmt.Errorf("s.prepareWithFilters: %w", err) + } + rows, err := stmt.QueryContext(ctx, params...) if err != nil { return nil, err }