mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-17 03:43:11 -06:00
Add filter support to selectStateInRange
This commit is contained in:
parent
b2a17bd615
commit
ed521ba517
|
|
@ -17,6 +17,7 @@ package storage
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
|
"encoding/json"
|
||||||
"sort"
|
"sort"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
|
|
@ -43,6 +44,12 @@ CREATE TABLE IF NOT EXISTS syncapi_output_room_events (
|
||||||
room_id TEXT NOT NULL,
|
room_id TEXT NOT NULL,
|
||||||
-- The JSON for the event. Stored as TEXT because this should be valid UTF-8.
|
-- The JSON for the event. Stored as TEXT because this should be valid UTF-8.
|
||||||
event_json TEXT NOT NULL,
|
event_json TEXT NOT NULL,
|
||||||
|
-- The event type e.g 'm.room.member'
|
||||||
|
type TEXT NOT NULL,
|
||||||
|
-- The 'sender' property of the event.
|
||||||
|
sender TEXT NOT NULL,
|
||||||
|
-- true if the event content contains a url key
|
||||||
|
contains_url BOOL NOT NULL,
|
||||||
-- A list of event IDs which represent a delta of added/removed room state. This can be NULL
|
-- A list of event IDs which represent a delta of added/removed room state. This can be NULL
|
||||||
-- if there is no delta.
|
-- if there is no delta.
|
||||||
add_state_ids TEXT[],
|
add_state_ids TEXT[],
|
||||||
|
|
@ -56,8 +63,8 @@ CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_id_idx ON syncapi_output_room_ev
|
||||||
|
|
||||||
const insertEventSQL = "" +
|
const insertEventSQL = "" +
|
||||||
"INSERT INTO syncapi_output_room_events (" +
|
"INSERT INTO syncapi_output_room_events (" +
|
||||||
" room_id, event_id, event_json, add_state_ids, remove_state_ids, device_id, transaction_id" +
|
"room_id, event_id, event_json, type, sender, contains_url, add_state_ids, remove_state_ids, device_id, transaction_id" +
|
||||||
") VALUES ($1, $2, $3, $4, $5, $6, $7) RETURNING id"
|
") VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) RETURNING id"
|
||||||
|
|
||||||
const selectEventsSQL = "" +
|
const selectEventsSQL = "" +
|
||||||
"SELECT id, event_json FROM syncapi_output_room_events WHERE event_id = ANY($1)"
|
"SELECT id, event_json FROM syncapi_output_room_events WHERE event_id = ANY($1)"
|
||||||
|
|
@ -75,7 +82,13 @@ const selectStateInRangeSQL = "" +
|
||||||
"SELECT id, event_json, add_state_ids, remove_state_ids" +
|
"SELECT id, event_json, add_state_ids, remove_state_ids" +
|
||||||
" FROM syncapi_output_room_events" +
|
" FROM syncapi_output_room_events" +
|
||||||
" WHERE (id > $1 AND id <= $2) AND (add_state_ids IS NOT NULL OR remove_state_ids IS NOT NULL)" +
|
" WHERE (id > $1 AND id <= $2) AND (add_state_ids IS NOT NULL OR remove_state_ids IS NOT NULL)" +
|
||||||
" ORDER BY id ASC"
|
" AND ( $3::text[] IS NULL OR sender = ANY($3) )" +
|
||||||
|
" AND ( $4::text[] IS NULL OR NOT(sender = ANY($4)) )" +
|
||||||
|
" AND ( $5::text[] IS NULL OR type LIKE ANY($5) )" +
|
||||||
|
" AND ( $6::text[] IS NULL OR NOT(type LIKE ANY($6)) )" +
|
||||||
|
" AND ( $7::bool IS NULL OR contains_url = $7 )" +
|
||||||
|
" ORDER BY id ASC" +
|
||||||
|
" LIMIT $8"
|
||||||
|
|
||||||
type outputRoomEventsStatements struct {
|
type outputRoomEventsStatements struct {
|
||||||
insertEventStmt *sql.Stmt
|
insertEventStmt *sql.Stmt
|
||||||
|
|
@ -113,10 +126,19 @@ func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) {
|
||||||
// two positions, only the most recent state is returned.
|
// two positions, only the most recent state is returned.
|
||||||
func (s *outputRoomEventsStatements) selectStateInRange(
|
func (s *outputRoomEventsStatements) selectStateInRange(
|
||||||
ctx context.Context, txn *sql.Tx, oldPos, newPos int64,
|
ctx context.Context, txn *sql.Tx, oldPos, newPos int64,
|
||||||
|
stateFilterPart *gomatrixserverlib.FilterPart,
|
||||||
) (map[string]map[string]bool, map[string]streamEvent, error) {
|
) (map[string]map[string]bool, map[string]streamEvent, error) {
|
||||||
stmt := common.TxStmt(txn, s.selectStateInRangeStmt)
|
stmt := common.TxStmt(txn, s.selectStateInRangeStmt)
|
||||||
|
|
||||||
rows, err := stmt.QueryContext(ctx, oldPos, newPos)
|
rows, err := stmt.QueryContext(
|
||||||
|
ctx, oldPos, newPos,
|
||||||
|
pq.StringArray(stateFilterPart.Senders),
|
||||||
|
pq.StringArray(stateFilterPart.NotSenders),
|
||||||
|
pq.StringArray(filterConvertTypeWildcardToSQL(stateFilterPart.Types)),
|
||||||
|
pq.StringArray(filterConvertTypeWildcardToSQL(stateFilterPart.NotTypes)),
|
||||||
|
stateFilterPart.ContainsURL,
|
||||||
|
stateFilterPart.Limit,
|
||||||
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
@ -205,12 +227,23 @@ func (s *outputRoomEventsStatements) insertEvent(
|
||||||
txnID = &transactionID.TransactionID
|
txnID = &transactionID.TransactionID
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Parse content as JSON and search for an "url" key
|
||||||
|
containsURL := false
|
||||||
|
var content map[string]interface{}
|
||||||
|
if json.Unmarshal(event.Content(), &content) != nil {
|
||||||
|
// Set containsURL = true if url is present
|
||||||
|
_, containsURL = content["url"]
|
||||||
|
}
|
||||||
|
|
||||||
stmt := common.TxStmt(txn, s.insertEventStmt)
|
stmt := common.TxStmt(txn, s.insertEventStmt)
|
||||||
err = stmt.QueryRowContext(
|
err = stmt.QueryRowContext(
|
||||||
ctx,
|
ctx,
|
||||||
event.RoomID(),
|
event.RoomID(),
|
||||||
event.EventID(),
|
event.EventID(),
|
||||||
event.JSON(),
|
event.JSON(),
|
||||||
|
event.Type(),
|
||||||
|
event.Sender(),
|
||||||
|
containsURL,
|
||||||
pq.StringArray(addState),
|
pq.StringArray(addState),
|
||||||
pq.StringArray(removeState),
|
pq.StringArray(removeState),
|
||||||
deviceID,
|
deviceID,
|
||||||
|
|
|
||||||
|
|
@ -760,7 +760,7 @@ func (d *SyncServerDatasource) getStateDeltas(
|
||||||
var deltas []stateDelta
|
var deltas []stateDelta
|
||||||
|
|
||||||
// get all the state events ever between these two positions
|
// get all the state events ever between these two positions
|
||||||
stateNeeded, eventMap, err := d.events.selectStateInRange(ctx, txn, fromPos, toPos)
|
stateNeeded, eventMap, err := d.events.selectStateInRange(ctx, txn, fromPos, toPos, stateFilterPart)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
@ -846,7 +846,7 @@ func (d *SyncServerDatasource) getStateDeltasForFullStateSync(
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get all the state events ever between these two positions
|
// Get all the state events ever between these two positions
|
||||||
stateNeeded, eventMap, err := d.events.selectStateInRange(ctx, txn, fromPos, toPos)
|
stateNeeded, eventMap, err := d.events.selectStateInRange(ctx, txn, fromPos, toPos, stateFilterPart)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue