From 8e59b392ea1f0e391d83b5735bc4384ba2f8a481 Mon Sep 17 00:00:00 2001 From: Till Faelligen Date: Mon, 21 Feb 2022 11:10:49 +0100 Subject: [PATCH] Apply RoomEventFilter when getting events --- syncapi/routing/context.go | 18 +++++---- syncapi/storage/interface.go | 4 +- .../postgres/output_room_events_table.go | 34 ++++++++++++++--- syncapi/storage/shared/syncserver.go | 8 ++-- .../sqlite3/current_room_state_table.go | 3 +- .../sqlite3/output_room_events_table.go | 38 +++++++++++++++---- syncapi/storage/tables/interface.go | 4 +- 7 files changed, 79 insertions(+), 30 deletions(-) diff --git a/syncapi/routing/context.go b/syncapi/routing/context.go index 0b97a774c..6deb25690 100644 --- a/syncapi/routing/context.go +++ b/syncapi/routing/context.go @@ -30,11 +30,11 @@ import ( ) type ContextRespsonse struct { - End string `json:"end,omitempty"` + End string `json:"end"` Event gomatrixserverlib.ClientEvent `json:"event"` EventsAfter []gomatrixserverlib.ClientEvent `json:"events_after,omitempty"` EventsBefore []gomatrixserverlib.ClientEvent `json:"events_before,omitempty"` - Start string `json:"start,omitempty"` + Start string `json:"start"` State []gomatrixserverlib.ClientEvent `json:"state"` } @@ -70,7 +70,7 @@ func Context( } stateFilter := gomatrixserverlib.StateFilter{ - Limit: filter.Limit, + Limit: 100, NotSenders: filter.NotSenders, NotTypes: filter.NotTypes, Senders: filter.Senders, @@ -104,13 +104,13 @@ func Context( return jsonerror.InternalServerError() } - eventsBefore, err := syncDB.SelectContextBeforeEvent(ctx, id, roomID, filter.Limit/2) + eventsBefore, err := syncDB.SelectContextBeforeEvent(ctx, id, roomID, filter) if err != nil && err != sql.ErrNoRows { logrus.WithError(err).Error("unable to fetch before events") return jsonerror.InternalServerError() } - _, eventsAfter, err := syncDB.SelectContextAfterEvent(ctx, id, roomID, filter.Limit/2) + _, eventsAfter, err := syncDB.SelectContextAfterEvent(ctx, id, roomID, filter) if err != nil && err != sql.ErrNoRows { logrus.WithError(err).Error("unable to fetch after events") return jsonerror.InternalServerError() @@ -121,14 +121,16 @@ func Context( newState := applyLazyLoadMembers(filter, eventsAfterClient, eventsBeforeClient, state) response := ContextRespsonse{ - End: "end", Event: gomatrixserverlib.HeaderedToClientEvent(&requestedEvent, gomatrixserverlib.FormatAll), EventsAfter: eventsAfterClient, EventsBefore: eventsBeforeClient, - Start: "start", State: gomatrixserverlib.HeaderedToClientEvents(newState, gomatrixserverlib.FormatAll), } + if len(response.State) > filter.Limit { + response.State = response.State[len(response.State)-filter.Limit:] + } + return util.JSONResponse{ Code: http.StatusOK, JSON: response, @@ -139,7 +141,7 @@ func applyLazyLoadMembers(filter *gomatrixserverlib.RoomEventFilter, eventsAfter if filter == nil || !filter.LazyLoadMembers { return state } - allEvents := append(eventsAfter, eventsBefore...) + allEvents := append(eventsBefore, eventsAfter...) x := make(map[string]bool) // get members who actually send an event for _, e := range allEvents { diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index aa99dbbf4..c6fa05b17 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -139,7 +139,7 @@ type Database interface { GetRoomReceipts(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) ([]eduAPI.OutputReceiptEvent, error) SelectContextEvent(ctx context.Context, roomID, eventID string) (int, gomatrixserverlib.HeaderedEvent, error) - SelectContextBeforeEvent(ctx context.Context, id int, roomID string, limit int) ([]*gomatrixserverlib.HeaderedEvent, error) - SelectContextAfterEvent(ctx context.Context, id int, roomID string, limit int) (int, []*gomatrixserverlib.HeaderedEvent, error) + SelectContextBeforeEvent(ctx context.Context, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) ([]*gomatrixserverlib.HeaderedEvent, error) + SelectContextAfterEvent(ctx context.Context, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) (int, []*gomatrixserverlib.HeaderedEvent, error) SelectEventIDsAfter(ctx context.Context, roomID string, id int) ([]string, error) } diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go index 57d889b3f..5d101412a 100644 --- a/syncapi/storage/postgres/output_room_events_table.go +++ b/syncapi/storage/postgres/output_room_events_table.go @@ -134,10 +134,20 @@ const selectContextEventSQL = "" + "SELECT id, headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND event_id = $2" const selectContextBeforeEventSQL = "" + - "SELECT headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND id < $2 ORDER BY id DESC LIMIT $3" + "SELECT headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND id < $2" + + " AND ( $4::text[] IS NULL OR sender = ANY($4) )" + + " AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" + + " AND ( $6::text[] IS NULL OR type LIKE ANY($6) )" + + " AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )" + + " ORDER BY id DESC LIMIT $3" const selectContextAfterEventSQL = "" + - "SELECT id, headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND id > $2 ORDER BY id ASC LIMIT $3" + "SELECT id, headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND id > $2" + + " AND ( $4::text[] IS NULL OR sender = ANY($4) )" + + " AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" + + " AND ( $6::text[] IS NULL OR type LIKE ANY($6) )" + + " AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )" + + " ORDER BY id ASC LIMIT $3" const selectEventIDsAfterSQL = "" + "SELECT event_id FROM syncapi_output_room_events WHERE room_id = $1 AND id > $2" @@ -454,9 +464,15 @@ func (s *outputRoomEventsStatements) SelectContextEvent(ctx context.Context, txn } func (s *outputRoomEventsStatements) SelectContextBeforeEvent( - ctx context.Context, txn *sql.Tx, id int, roomID string, limit int, + ctx context.Context, txn *sql.Tx, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter, ) (evts []*gomatrixserverlib.HeaderedEvent, err error) { - rows, err := sqlutil.TxStmt(txn, s.selectContextBeforeEventStmt).QueryContext(ctx, roomID, id, limit) + rows, err := sqlutil.TxStmt(txn, s.selectContextBeforeEventStmt).QueryContext( + ctx, roomID, id, filter.Limit, + pq.StringArray(filter.Senders), + pq.StringArray(filter.NotSenders), + pq.StringArray(filterConvertTypeWildcardToSQL(filter.Types)), + pq.StringArray(filterConvertTypeWildcardToSQL(filter.NotTypes)), + ) if err != nil { return } @@ -480,9 +496,15 @@ func (s *outputRoomEventsStatements) SelectContextBeforeEvent( } func (s *outputRoomEventsStatements) SelectContextAfterEvent( - ctx context.Context, txn *sql.Tx, id int, roomID string, limit int, + ctx context.Context, txn *sql.Tx, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter, ) (lastID int, evts []*gomatrixserverlib.HeaderedEvent, err error) { - rows, err := sqlutil.TxStmt(txn, s.selectContextAfterEventStmt).QueryContext(ctx, roomID, id, limit) + rows, err := sqlutil.TxStmt(txn, s.selectContextAfterEventStmt).QueryContext( + ctx, roomID, id, filter.Limit, + pq.StringArray(filter.Senders), + pq.StringArray(filter.NotSenders), + pq.StringArray(filterConvertTypeWildcardToSQL(filter.Types)), + pq.StringArray(filterConvertTypeWildcardToSQL(filter.NotTypes)), + ) if err != nil { return } diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index b24f2a94b..2edba77ab 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -960,11 +960,11 @@ func (s *Database) SelectContextEvent(ctx context.Context, roomID, eventID strin return s.OutputEvents.SelectContextEvent(ctx, nil, roomID, eventID) } -func (s *Database) SelectContextBeforeEvent(ctx context.Context, id int, roomID string, limit int) ([]*gomatrixserverlib.HeaderedEvent, error) { - return s.OutputEvents.SelectContextBeforeEvent(ctx, nil, id, roomID, limit) +func (s *Database) SelectContextBeforeEvent(ctx context.Context, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) ([]*gomatrixserverlib.HeaderedEvent, error) { + return s.OutputEvents.SelectContextBeforeEvent(ctx, nil, id, roomID, filter) } -func (s *Database) SelectContextAfterEvent(ctx context.Context, id int, roomID string, limit int) (int, []*gomatrixserverlib.HeaderedEvent, error) { - return s.OutputEvents.SelectContextAfterEvent(ctx, nil, id, roomID, limit) +func (s *Database) SelectContextAfterEvent(ctx context.Context, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) (int, []*gomatrixserverlib.HeaderedEvent, error) { + return s.OutputEvents.SelectContextAfterEvent(ctx, nil, id, roomID, filter) } func (s *Database) SelectEventIDsAfter(ctx context.Context, roomID string, id int) ([]string, error) { diff --git a/syncapi/storage/sqlite3/current_room_state_table.go b/syncapi/storage/sqlite3/current_room_state_table.go index 4fbbf45cf..c91ca6923 100644 --- a/syncapi/storage/sqlite3/current_room_state_table.go +++ b/syncapi/storage/sqlite3/current_room_state_table.go @@ -68,7 +68,8 @@ const selectRoomIDsWithMembershipSQL = "" + const selectCurrentStateSQL = "" + "SELECT event_id, headered_event_json FROM syncapi_current_room_state WHERE room_id = $1" - // WHEN, ORDER BY and LIMIT will be added by prepareWithFilter + +// WHEN, ORDER BY and LIMIT will be added by prepareWithFilter const selectJoinedUsersSQL = "" + "SELECT room_id, state_key FROM syncapi_current_room_state WHERE type = 'm.room.member' AND membership = 'join'" diff --git a/syncapi/storage/sqlite3/output_room_events_table.go b/syncapi/storage/sqlite3/output_room_events_table.go index 0a635dfb5..d2d57bac9 100644 --- a/syncapi/storage/sqlite3/output_room_events_table.go +++ b/syncapi/storage/sqlite3/output_room_events_table.go @@ -29,6 +29,7 @@ import ( "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/gomatrixserverlib" + "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus" ) @@ -98,10 +99,10 @@ const selectContextEventSQL = "" + "SELECT id, headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND event_id = $2" const selectContextBeforeEventSQL = "" + - "SELECT headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND id < $2 ORDER BY id DESC LIMIT $3" + "SELECT headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND id < $2" const selectContextAfterEventSQL = "" + - "SELECT id, headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND id > $2 ORDER BY id ASC LIMIT $3" + "SELECT id, headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND id > $2" const selectEventIDsAfterSQL = "" + "SELECT event_id FROM syncapi_output_room_events WHERE room_id = $1 AND id > $2" @@ -493,9 +494,19 @@ func (s *outputRoomEventsStatements) SelectContextEvent( } func (s *outputRoomEventsStatements) SelectContextBeforeEvent( - ctx context.Context, txn *sql.Tx, id int, roomID string, limit int, + ctx context.Context, txn *sql.Tx, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter, ) (evts []*gomatrixserverlib.HeaderedEvent, err error) { - rows, err := sqlutil.TxStmt(txn, s.selectContextBeforeEventStmt).QueryContext(ctx, roomID, id, limit) + stmt, params, err := prepareWithFilters( + s.db, txn, selectContextBeforeEventSQL, + []interface{}{ + roomID, id, + }, + filter.Senders, filter.NotSenders, + filter.Types, filter.NotTypes, + nil, filter.Limit, FilterOrderDesc, + ) + + rows, err := stmt.QueryContext(ctx, params...) if err != nil { return } @@ -519,9 +530,22 @@ func (s *outputRoomEventsStatements) SelectContextBeforeEvent( } func (s *outputRoomEventsStatements) SelectContextAfterEvent( - ctx context.Context, txn *sql.Tx, id int, roomID string, limit int, + ctx context.Context, txn *sql.Tx, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter, ) (lastID int, evts []*gomatrixserverlib.HeaderedEvent, err error) { - rows, err := sqlutil.TxStmt(txn, s.selectContextAfterEventStmt).QueryContext(ctx, roomID, id, limit) + logrus.Debugf("getting events after: %+v", filter) + stmt, params, err := prepareWithFilters( + s.db, txn, selectContextAfterEventSQL, + []interface{}{ + roomID, id, + }, + filter.Senders, filter.NotSenders, + filter.Types, filter.NotTypes, + nil, filter.Limit, FilterOrderAsc, + ) + + logrus.Debugf("params, %+v", params) + + rows, err := stmt.QueryContext(ctx, params...) if err != nil { return } @@ -540,7 +564,7 @@ func (s *outputRoomEventsStatements) SelectContextAfterEvent( } evts = append(evts, evt) } - + logrus.Debugf("returning events: %+v", evts) return lastID, evts, rows.Err() } diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go index 5c5ffcd1a..48626ec13 100644 --- a/syncapi/storage/tables/interface.go +++ b/syncapi/storage/tables/interface.go @@ -65,8 +65,8 @@ type Events interface { DeleteEventsForRoom(ctx context.Context, txn *sql.Tx, roomID string) (err error) SelectContextEvent(ctx context.Context, txn *sql.Tx, roomID, eventID string) (int, gomatrixserverlib.HeaderedEvent, error) - SelectContextAfterEvent(ctx context.Context, txn *sql.Tx, id int, roomID string, limit int) (int, []*gomatrixserverlib.HeaderedEvent, error) - SelectContextBeforeEvent(ctx context.Context, txn *sql.Tx, id int, roomID string, limit int) ([]*gomatrixserverlib.HeaderedEvent, error) + SelectContextBeforeEvent(ctx context.Context, txn *sql.Tx, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) ([]*gomatrixserverlib.HeaderedEvent, error) + SelectContextAfterEvent(ctx context.Context, txn *sql.Tx, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) (int, []*gomatrixserverlib.HeaderedEvent, error) SelectEventIDsAfter(ctx context.Context, roomID string, id int) ([]string, error) }