diff --git a/syncapi/storage/output_room_events_table.go b/syncapi/storage/output_room_events_table.go index 34632aedf..5bfd7dacf 100644 --- a/syncapi/storage/output_room_events_table.go +++ b/syncapi/storage/output_room_events_table.go @@ -17,6 +17,7 @@ package storage import ( "context" "database/sql" + "encoding/json" "sort" "github.com/matrix-org/dendrite/roomserver/api" @@ -43,6 +44,12 @@ CREATE TABLE IF NOT EXISTS syncapi_output_room_events ( room_id TEXT NOT NULL, -- The JSON for the event. Stored as TEXT because this should be valid UTF-8. event_json TEXT NOT NULL, + -- The event type e.g 'm.room.member' + type TEXT NOT NULL, + -- The 'sender' property of the event. + sender TEXT NOT NULL, + -- true if the event content contains a url key + contains_url BOOL NOT NULL, -- A list of event IDs which represent a delta of added/removed room state. This can be NULL -- if there is no delta. add_state_ids TEXT[], @@ -56,8 +63,8 @@ CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_id_idx ON syncapi_output_room_ev const insertEventSQL = "" + "INSERT INTO syncapi_output_room_events (" + - " room_id, event_id, event_json, add_state_ids, remove_state_ids, device_id, transaction_id" + - ") VALUES ($1, $2, $3, $4, $5, $6, $7) RETURNING id" + "room_id, event_id, event_json, type, sender, contains_url, add_state_ids, remove_state_ids, device_id, transaction_id" + + ") VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) RETURNING id" const selectEventsSQL = "" + "SELECT id, event_json FROM syncapi_output_room_events WHERE event_id = ANY($1)" @@ -75,7 +82,13 @@ const selectStateInRangeSQL = "" + "SELECT id, event_json, add_state_ids, remove_state_ids" + " FROM syncapi_output_room_events" + " WHERE (id > $1 AND id <= $2) AND (add_state_ids IS NOT NULL OR remove_state_ids IS NOT NULL)" + - " ORDER BY id ASC" + " AND ( $3::text[] IS NULL OR sender = ANY($3) )" + + " AND ( $4::text[] IS NULL OR NOT(sender = ANY($4)) )" + + " AND ( $5::text[] IS NULL OR type LIKE ANY($5) )" + + " AND ( $6::text[] IS NULL OR NOT(type LIKE ANY($6)) )" + + " AND ( $7::bool IS NULL OR contains_url = $7 )" + + " ORDER BY id ASC" + + " LIMIT $8" type outputRoomEventsStatements struct { insertEventStmt *sql.Stmt @@ -113,10 +126,19 @@ func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) { // two positions, only the most recent state is returned. func (s *outputRoomEventsStatements) selectStateInRange( ctx context.Context, txn *sql.Tx, oldPos, newPos int64, + stateFilterPart *gomatrixserverlib.FilterPart, ) (map[string]map[string]bool, map[string]streamEvent, error) { stmt := common.TxStmt(txn, s.selectStateInRangeStmt) - rows, err := stmt.QueryContext(ctx, oldPos, newPos) + rows, err := stmt.QueryContext( + ctx, oldPos, newPos, + pq.StringArray(stateFilterPart.Senders), + pq.StringArray(stateFilterPart.NotSenders), + pq.StringArray(filterConvertTypeWildcardToSQL(stateFilterPart.Types)), + pq.StringArray(filterConvertTypeWildcardToSQL(stateFilterPart.NotTypes)), + stateFilterPart.ContainsURL, + stateFilterPart.Limit, + ) if err != nil { return nil, nil, err } @@ -205,12 +227,23 @@ func (s *outputRoomEventsStatements) insertEvent( txnID = &transactionID.TransactionID } + // Parse content as JSON and search for an "url" key + containsURL := false + var content map[string]interface{} + if json.Unmarshal(event.Content(), &content) != nil { + // Set containsURL = true if url is present + _, containsURL = content["url"] + } + stmt := common.TxStmt(txn, s.insertEventStmt) err = stmt.QueryRowContext( ctx, event.RoomID(), event.EventID(), event.JSON(), + event.Type(), + event.Sender(), + containsURL, pq.StringArray(addState), pq.StringArray(removeState), deviceID, diff --git a/syncapi/storage/syncserver.go b/syncapi/storage/syncserver.go index fb2cf39a3..55775aec1 100644 --- a/syncapi/storage/syncserver.go +++ b/syncapi/storage/syncserver.go @@ -760,7 +760,7 @@ func (d *SyncServerDatasource) getStateDeltas( var deltas []stateDelta // get all the state events ever between these two positions - stateNeeded, eventMap, err := d.events.selectStateInRange(ctx, txn, fromPos, toPos) + stateNeeded, eventMap, err := d.events.selectStateInRange(ctx, txn, fromPos, toPos, stateFilterPart) if err != nil { return nil, nil, err } @@ -846,7 +846,7 @@ func (d *SyncServerDatasource) getStateDeltasForFullStateSync( } // Get all the state events ever between these two positions - stateNeeded, eventMap, err := d.events.selectStateInRange(ctx, txn, fromPos, toPos) + stateNeeded, eventMap, err := d.events.selectStateInRange(ctx, txn, fromPos, toPos, stateFilterPart) if err != nil { return nil, nil, err }