diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go index d262f0be4..06eb58e37 100644 --- a/syncapi/storage/postgres/output_room_events_table.go +++ b/syncapi/storage/postgres/output_room_events_table.go @@ -382,10 +382,17 @@ func (s *outputRoomEventsStatements) SelectRecentEvents( // from a given position, up to a maximum of 'limit'. func (s *outputRoomEventsStatements) SelectEarlyEvents( ctx context.Context, txn *sql.Tx, - roomID string, r types.Range, limit int, + roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter, ) ([]types.StreamEvent, error) { stmt := sqlutil.TxStmt(txn, s.selectEarlyEventsStmt) - rows, err := stmt.QueryContext(ctx, roomID, r.Low(), r.High(), limit) + rows, err := stmt.QueryContext( + ctx, roomID, r.Low(), r.High(), + pq.StringArray(eventFilter.Senders), + pq.StringArray(eventFilter.NotSenders), + pq.StringArray(filterConvertTypeWildcardToSQL(eventFilter.Types)), + pq.StringArray(filterConvertTypeWildcardToSQL(eventFilter.NotTypes)), + eventFilter.Limit, + ) if err != nil { return nil, err } diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index a73582873..9df076935 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -169,7 +169,7 @@ func (d *Database) GetEventsInStreamingRange( } else { // When using forward ordering, we want the least recent events first. if events, err = d.OutputEvents.SelectEarlyEvents( - ctx, nil, roomID, r, eventFilter.Limit, // TODO: filter here too + ctx, nil, roomID, r, eventFilter, ); err != nil { return } diff --git a/syncapi/storage/sqlite3/output_room_events_table.go b/syncapi/storage/sqlite3/output_room_events_table.go index 2c932cd3c..7eb5f1010 100644 --- a/syncapi/storage/sqlite3/output_room_events_table.go +++ b/syncapi/storage/sqlite3/output_room_events_table.go @@ -379,10 +379,10 @@ func (s *outputRoomEventsStatements) SelectRecentEvents( func (s *outputRoomEventsStatements) SelectEarlyEvents( ctx context.Context, txn *sql.Tx, - roomID string, r types.Range, limit int, + roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter, ) ([]types.StreamEvent, error) { stmt := sqlutil.TxStmt(txn, s.selectEarlyEventsStmt) - rows, err := stmt.QueryContext(ctx, roomID, r.Low(), r.High(), limit) + rows, err := stmt.QueryContext(ctx, roomID, r.Low(), r.High(), eventFilter.Limit) if err != nil { return nil, err } diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go index 0012b186d..739676770 100644 --- a/syncapi/storage/tables/interface.go +++ b/syncapi/storage/tables/interface.go @@ -58,7 +58,7 @@ type Events interface { // Returns up to `limit` events. Returns `limited=true` if there are more events in this range but we hit the `limit`. SelectRecentEvents(ctx context.Context, txn *sql.Tx, roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error) // SelectEarlyEvents returns the earliest events in the given room. - SelectEarlyEvents(ctx context.Context, txn *sql.Tx, roomID string, r types.Range, limit int) ([]types.StreamEvent, error) + SelectEarlyEvents(ctx context.Context, txn *sql.Tx, roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter) ([]types.StreamEvent, error) SelectEvents(ctx context.Context, txn *sql.Tx, eventIDs []string) ([]types.StreamEvent, error) UpdateEventJSON(ctx context.Context, event *gomatrixserverlib.HeaderedEvent) error // DeleteEventsForRoom removes all event information for a room. This should only be done when removing the room entirely.