mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-28 09:13:09 -06:00
Attempt filtering on SQLite
This commit is contained in:
parent
2fdba755ef
commit
bc4be3284c
|
|
@ -19,7 +19,9 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
"sort"
|
"sort"
|
||||||
|
"strings"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/internal"
|
"github.com/matrix-org/dendrite/internal"
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
|
|
@ -61,16 +63,19 @@ const selectEventsSQL = "" +
|
||||||
const selectRecentEventsSQL = "" +
|
const selectRecentEventsSQL = "" +
|
||||||
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
|
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
|
||||||
" WHERE room_id = $1 AND id > $2 AND id <= $3" +
|
" WHERE room_id = $1 AND id > $2 AND id <= $3" +
|
||||||
|
" $FILTERS" +
|
||||||
" ORDER BY id DESC LIMIT $4"
|
" ORDER BY id DESC LIMIT $4"
|
||||||
|
|
||||||
const selectRecentEventsForSyncSQL = "" +
|
const selectRecentEventsForSyncSQL = "" +
|
||||||
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
|
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
|
||||||
" WHERE room_id = $1 AND id > $2 AND id <= $3 AND exclude_from_sync = FALSE" +
|
" WHERE room_id = $1 AND id > $2 AND id <= $3 AND exclude_from_sync = FALSE" +
|
||||||
|
" $FILTERS" +
|
||||||
" ORDER BY id DESC LIMIT $4"
|
" ORDER BY id DESC LIMIT $4"
|
||||||
|
|
||||||
const selectEarlyEventsSQL = "" +
|
const selectEarlyEventsSQL = "" +
|
||||||
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
|
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
|
||||||
" WHERE room_id = $1 AND id > $2 AND id <= $3" +
|
" WHERE room_id = $1 AND id > $2 AND id <= $3" +
|
||||||
|
" $FILTERS" +
|
||||||
" ORDER BY id ASC LIMIT $4"
|
" ORDER BY id ASC LIMIT $4"
|
||||||
|
|
||||||
const selectMaxEventIDSQL = "" +
|
const selectMaxEventIDSQL = "" +
|
||||||
|
|
@ -95,11 +100,7 @@ const selectStateInRangeSQL = "" +
|
||||||
" FROM syncapi_output_room_events" +
|
" FROM syncapi_output_room_events" +
|
||||||
" WHERE (id > $1 AND id <= $2)" + // old/new pos
|
" WHERE (id > $1 AND id <= $2)" + // old/new pos
|
||||||
" AND (add_state_ids IS NOT NULL OR remove_state_ids IS NOT NULL)" +
|
" AND (add_state_ids IS NOT NULL OR remove_state_ids IS NOT NULL)" +
|
||||||
/* " AND ( $3 IS NULL OR sender IN ($3) )" + // sender
|
" $FILTERS" +
|
||||||
" AND ( $4 IS NULL OR NOT(sender IN ($4)) )" + // not sender
|
|
||||||
" AND ( $5 IS NULL OR type IN ($5) )" + // type
|
|
||||||
" AND ( $6 IS NULL OR NOT(type IN ($6)) )" + // not type
|
|
||||||
" AND ( $7 IS NULL OR contains_url = $7)" + // contains URL? */
|
|
||||||
" ORDER BY id ASC" +
|
" ORDER BY id ASC" +
|
||||||
" LIMIT $8" // limit
|
" LIMIT $8" // limit
|
||||||
|
|
||||||
|
|
@ -112,10 +113,6 @@ type outputRoomEventsStatements struct {
|
||||||
insertEventStmt *sql.Stmt
|
insertEventStmt *sql.Stmt
|
||||||
selectEventsStmt *sql.Stmt
|
selectEventsStmt *sql.Stmt
|
||||||
selectMaxEventIDStmt *sql.Stmt
|
selectMaxEventIDStmt *sql.Stmt
|
||||||
selectRecentEventsStmt *sql.Stmt
|
|
||||||
selectRecentEventsForSyncStmt *sql.Stmt
|
|
||||||
selectEarlyEventsStmt *sql.Stmt
|
|
||||||
selectStateInRangeStmt *sql.Stmt
|
|
||||||
updateEventJSONStmt *sql.Stmt
|
updateEventJSONStmt *sql.Stmt
|
||||||
deleteEventsForRoomStmt *sql.Stmt
|
deleteEventsForRoomStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
@ -138,18 +135,6 @@ func NewSqliteEventsTable(db *sql.DB, streamID *streamIDStatements) (tables.Even
|
||||||
if s.selectMaxEventIDStmt, err = db.Prepare(selectMaxEventIDSQL); err != nil {
|
if s.selectMaxEventIDStmt, err = db.Prepare(selectMaxEventIDSQL); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if s.selectRecentEventsStmt, err = db.Prepare(selectRecentEventsSQL); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if s.selectRecentEventsForSyncStmt, err = db.Prepare(selectRecentEventsForSyncSQL); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if s.selectEarlyEventsStmt, err = db.Prepare(selectEarlyEventsSQL); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if s.selectStateInRangeStmt, err = db.Prepare(selectStateInRangeSQL); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if s.updateEventJSONStmt, err = db.Prepare(updateEventJSONSQL); err != nil {
|
if s.updateEventJSONStmt, err = db.Prepare(updateEventJSONSQL); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
@ -159,6 +144,50 @@ func NewSqliteEventsTable(db *sql.DB, streamID *streamIDStatements) (tables.Even
|
||||||
return s, nil
|
return s, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *outputRoomEventsStatements) prepareWithFilters(
|
||||||
|
query string, params []interface{},
|
||||||
|
senders, notsenders, types, nottypes []string, limit int,
|
||||||
|
) (*sql.Stmt, []interface{}, error) {
|
||||||
|
filters := ""
|
||||||
|
offset := len(params)
|
||||||
|
if count := len(senders); count > 0 {
|
||||||
|
filters += " AND sender IN " + sqlutil.QueryVariadicOffset(count, offset)
|
||||||
|
for _, v := range senders {
|
||||||
|
params = append(params, v)
|
||||||
|
offset++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if count := len(notsenders); count > 0 {
|
||||||
|
filters += " AND sender NOT IN " + sqlutil.QueryVariadicOffset(count, offset)
|
||||||
|
for _, v := range notsenders {
|
||||||
|
params = append(params, v)
|
||||||
|
offset++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if count := len(types); count > 0 {
|
||||||
|
filters += " AND type IN " + sqlutil.QueryVariadicOffset(count, offset)
|
||||||
|
for _, v := range types {
|
||||||
|
params = append(params, v)
|
||||||
|
offset++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if count := len(nottypes); count > 0 {
|
||||||
|
filters += " AND type NOT IN " + sqlutil.QueryVariadicOffset(count, offset)
|
||||||
|
for _, v := range nottypes {
|
||||||
|
params = append(params, v)
|
||||||
|
offset++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
params = append(params, limit)
|
||||||
|
query = strings.Replace(query, " $FILTERS", filters, 1)
|
||||||
|
// logrus.Infof("QUERY: %s", query)
|
||||||
|
stmt, err := s.db.Prepare(query)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, fmt.Errorf("s.db.Prepare: %w", err)
|
||||||
|
}
|
||||||
|
return stmt, params, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (s *outputRoomEventsStatements) UpdateEventJSON(ctx context.Context, event *gomatrixserverlib.HeaderedEvent) error {
|
func (s *outputRoomEventsStatements) UpdateEventJSON(ctx context.Context, event *gomatrixserverlib.HeaderedEvent) error {
|
||||||
headeredJSON, err := json.Marshal(event)
|
headeredJSON, err := json.Marshal(event)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -175,20 +204,20 @@ func (s *outputRoomEventsStatements) SelectStateInRange(
|
||||||
ctx context.Context, txn *sql.Tx, r types.Range,
|
ctx context.Context, txn *sql.Tx, r types.Range,
|
||||||
stateFilterPart *gomatrixserverlib.StateFilter,
|
stateFilterPart *gomatrixserverlib.StateFilter,
|
||||||
) (map[string]map[string]bool, map[string]types.StreamEvent, error) {
|
) (map[string]map[string]bool, map[string]types.StreamEvent, error) {
|
||||||
params := []interface{}{}
|
stmt, params, err := s.prepareWithFilters(
|
||||||
_ = params
|
selectStateInRangeSQL,
|
||||||
|
[]interface{}{
|
||||||
stmt := sqlutil.TxStmt(txn, s.selectStateInRangeStmt)
|
r.Low(), r.High(),
|
||||||
|
},
|
||||||
rows, err := stmt.QueryContext(
|
stateFilterPart.Senders, stateFilterPart.NotSenders,
|
||||||
ctx, r.Low(), r.High(),
|
stateFilterPart.Types, stateFilterPart.NotTypes,
|
||||||
/*pq.StringArray(stateFilterPart.Senders),
|
|
||||||
pq.StringArray(stateFilterPart.NotSenders),
|
|
||||||
pq.StringArray(filterConvertTypeWildcardToSQL(stateFilterPart.Types)),
|
|
||||||
pq.StringArray(filterConvertTypeWildcardToSQL(stateFilterPart.NotTypes)),
|
|
||||||
stateFilterPart.ContainsURL,*/
|
|
||||||
stateFilterPart.Limit,
|
stateFilterPart.Limit,
|
||||||
)
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, fmt.Errorf("s.prepareWithFilters: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
rows, err := stmt.QueryContext(ctx, params...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
@ -339,14 +368,27 @@ func (s *outputRoomEventsStatements) SelectRecentEvents(
|
||||||
roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter,
|
roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter,
|
||||||
chronologicalOrder bool, onlySyncEvents bool,
|
chronologicalOrder bool, onlySyncEvents bool,
|
||||||
) ([]types.StreamEvent, bool, error) {
|
) ([]types.StreamEvent, bool, error) {
|
||||||
var stmt *sql.Stmt
|
var query string
|
||||||
if onlySyncEvents {
|
if onlySyncEvents {
|
||||||
stmt = sqlutil.TxStmt(txn, s.selectRecentEventsForSyncStmt)
|
query = selectRecentEventsForSyncSQL
|
||||||
} else {
|
} else {
|
||||||
stmt = sqlutil.TxStmt(txn, s.selectRecentEventsStmt)
|
query = selectRecentEventsSQL
|
||||||
}
|
}
|
||||||
|
|
||||||
rows, err := stmt.QueryContext(ctx, roomID, r.Low(), r.High(), eventFilter.Limit+1)
|
stmt, params, err := s.prepareWithFilters(
|
||||||
|
query,
|
||||||
|
[]interface{}{
|
||||||
|
roomID, r.Low(), r.High(),
|
||||||
|
},
|
||||||
|
eventFilter.Senders, eventFilter.NotSenders,
|
||||||
|
eventFilter.Types, eventFilter.NotTypes,
|
||||||
|
eventFilter.Limit,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, false, fmt.Errorf("s.prepareWithFilters: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
rows, err := stmt.QueryContext(ctx, params...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, false, err
|
return nil, false, err
|
||||||
}
|
}
|
||||||
|
|
@ -381,8 +423,19 @@ func (s *outputRoomEventsStatements) SelectEarlyEvents(
|
||||||
ctx context.Context, txn *sql.Tx,
|
ctx context.Context, txn *sql.Tx,
|
||||||
roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter,
|
roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter,
|
||||||
) ([]types.StreamEvent, error) {
|
) ([]types.StreamEvent, error) {
|
||||||
stmt := sqlutil.TxStmt(txn, s.selectEarlyEventsStmt)
|
stmt, params, err := s.prepareWithFilters(
|
||||||
rows, err := stmt.QueryContext(ctx, roomID, r.Low(), r.High(), eventFilter.Limit)
|
selectEarlyEventsSQL,
|
||||||
|
[]interface{}{
|
||||||
|
roomID, r.Low(), r.High(),
|
||||||
|
},
|
||||||
|
eventFilter.Senders, eventFilter.NotSenders,
|
||||||
|
eventFilter.Types, eventFilter.NotTypes,
|
||||||
|
eventFilter.Limit,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("s.prepareWithFilters: %w", err)
|
||||||
|
}
|
||||||
|
rows, err := stmt.QueryContext(ctx, params...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue