From a86c9f7a0ce2da7f33e8006c92babe512877fe4d Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Tue, 19 Jan 2021 10:12:19 +0000 Subject: [PATCH] Try to use request filter --- syncapi/notifier/notifier_test.go | 1 - syncapi/routing/messages.go | 2 +- syncapi/storage/interface.go | 4 ++-- .../postgres/output_room_events_table.go | 2 +- syncapi/storage/shared/syncserver.go | 4 ++-- .../sqlite3/output_room_events_table.go | 2 +- syncapi/storage/tables/interface.go | 2 +- syncapi/streams/stream_pdu.go | 15 ++++++--------- syncapi/sync/request.go | 18 ++++++------------ syncapi/types/provider.go | 3 +-- 10 files changed, 21 insertions(+), 32 deletions(-) diff --git a/syncapi/notifier/notifier_test.go b/syncapi/notifier/notifier_test.go index 8b9425e37..1401fc676 100644 --- a/syncapi/notifier/notifier_test.go +++ b/syncapi/notifier/notifier_test.go @@ -367,7 +367,6 @@ func newTestSyncRequest(userID, deviceID string, since types.StreamingToken) typ Timeout: 1 * time.Minute, Since: since, WantFullState: false, - Limit: 20, Log: util.GetLogger(context.TODO()), Context: context.TODO(), } diff --git a/syncapi/routing/messages.go b/syncapi/routing/messages.go index c59910d8f..ba739148d 100644 --- a/syncapi/routing/messages.go +++ b/syncapi/routing/messages.go @@ -235,7 +235,7 @@ func (r *messagesReq) retrieveEvents() ( clientEvents []gomatrixserverlib.ClientEvent, start, end types.TopologyToken, err error, ) { - eventFilter := gomatrixserverlib.DefaultEventFilter() + eventFilter := gomatrixserverlib.DefaultRoomEventFilter() eventFilter.Limit = r.limit // Retrieve the events from the local database. diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index 9d9c9fad7..22d801617 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -40,7 +40,7 @@ type Database interface { GetStateDeltas(ctx context.Context, device *userapi.Device, r types.Range, userID string, stateFilter *gomatrixserverlib.StateFilter) ([]types.StateDelta, []string, error) RoomIDsWithMembership(ctx context.Context, userID string, membership string) ([]string, error) - RecentEvents(ctx context.Context, roomID string, r types.Range, eventFilter *gomatrixserverlib.EventFilter, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error) + RecentEvents(ctx context.Context, roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error) GetBackwardTopologyPos(ctx context.Context, events []types.StreamEvent) (types.TopologyToken, error) PositionInTopology(ctx context.Context, eventID string) (pos types.StreamPosition, spos types.StreamPosition, err error) @@ -105,7 +105,7 @@ type Database interface { // Returns an error if there was a problem communicating with the database. DeletePeeks(ctx context.Context, RoomID, UserID string) (types.StreamPosition, error) // GetEventsInStreamingRange retrieves all of the events on a given ordering using the given extremities and limit. - GetEventsInStreamingRange(ctx context.Context, from, to *types.StreamingToken, roomID string, eventFilter *gomatrixserverlib.EventFilter, backwardOrdering bool) (events []types.StreamEvent, err error) + GetEventsInStreamingRange(ctx context.Context, from, to *types.StreamingToken, roomID string, eventFilter *gomatrixserverlib.RoomEventFilter, backwardOrdering bool) (events []types.StreamEvent, err error) // GetEventsInTopologicalRange retrieves all of the events on a given ordering using the given extremities and limit. GetEventsInTopologicalRange(ctx context.Context, from, to *types.TopologyToken, roomID string, limit int, backwardOrdering bool) (events []types.StreamEvent, err error) // EventPositionInTopology returns the depth and stream position of the given event. diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go index c48cc6fbb..d262f0be4 100644 --- a/syncapi/storage/postgres/output_room_events_table.go +++ b/syncapi/storage/postgres/output_room_events_table.go @@ -330,7 +330,7 @@ func (s *outputRoomEventsStatements) InsertEvent( // from sync. func (s *outputRoomEventsStatements) SelectRecentEvents( ctx context.Context, txn *sql.Tx, - roomID string, r types.Range, eventFilter *gomatrixserverlib.EventFilter, + roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool, ) ([]types.StreamEvent, bool, error) { var stmt *sql.Stmt diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index 555b17043..a73582873 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -110,7 +110,7 @@ func (d *Database) RoomIDsWithMembership(ctx context.Context, userID string, mem return d.CurrentRoomState.SelectRoomIDsWithMembership(ctx, nil, userID, membership) } -func (d *Database) RecentEvents(ctx context.Context, roomID string, r types.Range, eventFilter *gomatrixserverlib.EventFilter, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error) { +func (d *Database) RecentEvents(ctx context.Context, roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error) { return d.OutputEvents.SelectRecentEvents(ctx, nil, roomID, r, eventFilter, chronologicalOrder, onlySyncEvents) } @@ -151,7 +151,7 @@ func (d *Database) Events(ctx context.Context, eventIDs []string) ([]*gomatrixse func (d *Database) GetEventsInStreamingRange( ctx context.Context, from, to *types.StreamingToken, - roomID string, eventFilter *gomatrixserverlib.EventFilter, + roomID string, eventFilter *gomatrixserverlib.RoomEventFilter, backwardOrdering bool, ) (events []types.StreamEvent, err error) { r := types.Range{ diff --git a/syncapi/storage/sqlite3/output_room_events_table.go b/syncapi/storage/sqlite3/output_room_events_table.go index 7218cf420..2c932cd3c 100644 --- a/syncapi/storage/sqlite3/output_room_events_table.go +++ b/syncapi/storage/sqlite3/output_room_events_table.go @@ -336,7 +336,7 @@ func (s *outputRoomEventsStatements) InsertEvent( func (s *outputRoomEventsStatements) SelectRecentEvents( ctx context.Context, txn *sql.Tx, - roomID string, r types.Range, eventFilter *gomatrixserverlib.EventFilter, + roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool, ) ([]types.StreamEvent, bool, error) { var stmt *sql.Stmt diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go index 2ee4fb469..0012b186d 100644 --- a/syncapi/storage/tables/interface.go +++ b/syncapi/storage/tables/interface.go @@ -56,7 +56,7 @@ type Events interface { // SelectRecentEvents returns events between the two stream positions: exclusive of low and inclusive of high. // If onlySyncEvents has a value of true, only returns the events that aren't marked as to exclude from sync. // Returns up to `limit` events. Returns `limited=true` if there are more events in this range but we hit the `limit`. - SelectRecentEvents(ctx context.Context, txn *sql.Tx, roomID string, r types.Range, eventFilter *gomatrixserverlib.EventFilter, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error) + SelectRecentEvents(ctx context.Context, txn *sql.Tx, roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error) // SelectEarlyEvents returns the earliest events in the given room. SelectEarlyEvents(ctx context.Context, txn *sql.Tx, roomID string, r types.Range, limit int) ([]types.StreamEvent, error) SelectEvents(ctx context.Context, txn *sql.Tx, eventIDs []string) ([]types.StreamEvent, error) diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go index c81937f35..d6d7ff444 100644 --- a/syncapi/streams/stream_pdu.go +++ b/syncapi/streams/stream_pdu.go @@ -48,9 +48,8 @@ func (p *PDUStreamProvider) CompleteSync( return from } - stateFilter := gomatrixserverlib.DefaultStateFilter() // TODO: use filter provided in request - eventFilter := gomatrixserverlib.DefaultEventFilter() - eventFilter.Limit = req.Limit + stateFilter := req.Filter.Room.State + eventFilter := req.Filter.Room.Timeline // Build up a /sync response. Add joined rooms. for _, roomID := range joinedRoomIDs { @@ -106,10 +105,8 @@ func (p *PDUStreamProvider) IncrementalSync( var stateDeltas []types.StateDelta var joinedRooms []string - // TODO: use filter provided in request - stateFilter := gomatrixserverlib.DefaultStateFilter() - eventFilter := gomatrixserverlib.DefaultEventFilter() - eventFilter.Limit = req.Limit + stateFilter := req.Filter.Room.State + eventFilter := req.Filter.Room.Timeline if req.WantFullState { if stateDeltas, joinedRooms, err = p.DB.GetStateDeltasForFullStateSync(ctx, req.Device, r, req.Device.UserID, &stateFilter); err != nil { @@ -142,7 +139,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( device *userapi.Device, r types.Range, delta types.StateDelta, - eventFilter *gomatrixserverlib.EventFilter, + eventFilter *gomatrixserverlib.RoomEventFilter, res *types.Response, ) error { if delta.MembershipPos > 0 && delta.Membership == gomatrixserverlib.Leave { @@ -213,7 +210,7 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync( roomID string, r types.Range, stateFilter *gomatrixserverlib.StateFilter, - eventFilter *gomatrixserverlib.EventFilter, + eventFilter *gomatrixserverlib.RoomEventFilter, device *userapi.Device, ) (jr *types.JoinResponse, err error) { var stateEvents []*gomatrixserverlib.HeaderedEvent diff --git a/syncapi/sync/request.go b/syncapi/sync/request.go index 5f89ffc33..06224610c 100644 --- a/syncapi/sync/request.go +++ b/syncapi/sync/request.go @@ -16,6 +16,7 @@ package sync import ( "encoding/json" + "fmt" "net/http" "strconv" "time" @@ -51,16 +52,14 @@ func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Dat return nil, err } } - timelineLimit := DefaultTimelineLimit // TODO: read from stored filters too + filter := gomatrixserverlib.DefaultFilter() filterQuery := req.URL.Query().Get("filter") if filterQuery != "" { if filterQuery[0] == '{' { // attempt to parse the timeline limit at least - var f filter - err := json.Unmarshal([]byte(filterQuery), &f) - if err == nil && f.Room.Timeline.Limit != nil { - timelineLimit = *f.Room.Timeline.Limit + if err := json.Unmarshal([]byte(filterQuery), &filter); err != nil { + return nil, fmt.Errorf("json.Unmarshal: %w", err) } } else { // attempt to load the filter ID @@ -71,21 +70,17 @@ func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Dat } f, err := syncDB.GetFilter(req.Context(), localpart, filterQuery) if err == nil { - timelineLimit = f.Room.Timeline.Limit + filter = *f } } } - filter := gomatrixserverlib.DefaultEventFilter() - filter.Limit = timelineLimit - // TODO: Additional query params: set_presence, filter - logger := util.GetLogger(req.Context()).WithFields(logrus.Fields{ "user_id": device.UserID, "device_id": device.ID, "since": since, "timeout": timeout, - "limit": timelineLimit, + "limit": filter.Room.Timeline.Limit, }) return &types.SyncRequest{ @@ -96,7 +91,6 @@ func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Dat Filter: filter, // Since: since, // Timeout: timeout, // - Limit: timelineLimit, // Rooms: make(map[string]string), // Populated by the PDU stream WantFullState: wantFullState, // }, nil diff --git a/syncapi/types/provider.go b/syncapi/types/provider.go index 24b453a80..93ed12661 100644 --- a/syncapi/types/provider.go +++ b/syncapi/types/provider.go @@ -14,9 +14,8 @@ type SyncRequest struct { Log *logrus.Entry Device *userapi.Device Response *Response - Filter gomatrixserverlib.EventFilter + Filter gomatrixserverlib.Filter Since StreamingToken - Limit int Timeout time.Duration WantFullState bool