From 18d30f91c8bca9ad86a4bf7993588e9dc713fd60 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Fri, 26 Jun 2020 14:34:01 +0100 Subject: [PATCH] Implement /sync `limited` and read timeline limit from stored filters We now fully handle `room.timeline.limit` filters (in-line + stored) and return the right value for `limited` syncs. --- syncapi/storage/postgres/filter_table.go | 10 +++---- .../postgres/output_room_events_table.go | 18 ++++++++---- syncapi/storage/shared/syncserver.go | 11 +++---- syncapi/storage/sqlite3/filter_table.go | 10 +++---- .../sqlite3/output_room_events_table.go | 17 +++++++---- syncapi/storage/tables/interface.go | 4 +-- syncapi/sync/request.go | 29 ++++++++++++++----- syncapi/sync/requestpool.go | 2 +- 8 files changed, 66 insertions(+), 35 deletions(-) diff --git a/syncapi/storage/postgres/filter_table.go b/syncapi/storage/postgres/filter_table.go index 3747caf7b..beeb864ba 100644 --- a/syncapi/storage/postgres/filter_table.go +++ b/syncapi/storage/postgres/filter_table.go @@ -25,7 +25,7 @@ import ( const filterSchema = ` -- Stores data about filters -CREATE TABLE IF NOT EXISTS account_filter ( +CREATE TABLE IF NOT EXISTS syncapi_filter ( -- The filter filter TEXT NOT NULL, -- The ID @@ -36,17 +36,17 @@ CREATE TABLE IF NOT EXISTS account_filter ( PRIMARY KEY(id, localpart) ); -CREATE INDEX IF NOT EXISTS account_filter_localpart ON account_filter(localpart); +CREATE INDEX IF NOT EXISTS syncapi_filter_localpart ON syncapi_filter(localpart); ` const selectFilterSQL = "" + - "SELECT filter FROM account_filter WHERE localpart = $1 AND id = $2" + "SELECT filter FROM syncapi_filter WHERE localpart = $1 AND id = $2" const selectFilterIDByContentSQL = "" + - "SELECT id FROM account_filter WHERE localpart = $1 AND filter = $2" + "SELECT id FROM syncapi_filter WHERE localpart = $1 AND filter = $2" const insertFilterSQL = "" + - "INSERT INTO account_filter (filter, id, localpart) VALUES ($1, DEFAULT, $2) RETURNING id" + "INSERT INTO syncapi_filter (filter, id, localpart) VALUES ($1, DEFAULT, $2) RETURNING id" type filterStatements struct { selectFilterStmt *sql.Stmt diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go index f01b2eabd..1d0e6eef4 100644 --- a/syncapi/storage/postgres/output_room_events_table.go +++ b/syncapi/storage/postgres/output_room_events_table.go @@ -301,21 +301,28 @@ func (s *outputRoomEventsStatements) SelectRecentEvents( ctx context.Context, txn *sql.Tx, roomID string, r types.Range, limit int, chronologicalOrder bool, onlySyncEvents bool, -) ([]types.StreamEvent, error) { +) ([]types.StreamEvent, bool, error) { var stmt *sql.Stmt if onlySyncEvents { stmt = sqlutil.TxStmt(txn, s.selectRecentEventsForSyncStmt) } else { stmt = sqlutil.TxStmt(txn, s.selectRecentEventsStmt) } - rows, err := stmt.QueryContext(ctx, roomID, r.Low(), r.High(), limit) + rows, err := stmt.QueryContext(ctx, roomID, r.Low(), r.High(), limit+1) if err != nil { - return nil, err + return nil, false, err } defer internal.CloseAndLogIfError(ctx, rows, "selectRecentEvents: rows.close() failed") events, err := rowsToStreamEvents(rows) if err != nil { - return nil, err + return nil, false, err + } + // we queried for 1 more than the limit, so if we returned one more mark limited=true + limited := false + if len(events) > limit { + limited = true + // re-slice the extra event out + events = events[:len(events)-1] } if chronologicalOrder { // The events need to be returned from oldest to latest, which isn't @@ -325,7 +332,8 @@ func (s *outputRoomEventsStatements) SelectRecentEvents( return events[i].StreamPosition < events[j].StreamPosition }) } - return events, nil + + return events, limited, nil } // selectEarlyEvents returns the earliest events in the given room, starting diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index 7a84dd151..01362ddd6 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -79,7 +79,7 @@ func (d *Database) GetEventsInStreamingRange( } if backwardOrdering { // When using backward ordering, we want the most recent events first. - if events, err = d.OutputEvents.SelectRecentEvents( + if events, _, err = d.OutputEvents.SelectRecentEvents( ctx, nil, roomID, r, limit, false, false, ); err != nil { return @@ -655,7 +655,8 @@ func (d *Database) getResponseWithPDUsForCompleteSync( // TODO: When filters are added, we may need to call this multiple times to get enough events. // See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316 var recentStreamEvents []types.StreamEvent - recentStreamEvents, err = d.OutputEvents.SelectRecentEvents( + var limited bool + recentStreamEvents, limited, err = d.OutputEvents.SelectRecentEvents( ctx, txn, roomID, r, numRecentEventsPerRoom, true, true, ) if err != nil { @@ -683,7 +684,7 @@ func (d *Database) getResponseWithPDUsForCompleteSync( jr := types.NewJoinResponse() jr.Timeline.PrevBatch = prevBatchStr jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync) - jr.Timeline.Limited = true + jr.Timeline.Limited = limited jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(stateEvents, gomatrixserverlib.FormatSync) res.Rooms.Join[roomID] = *jr } @@ -789,7 +790,7 @@ func (d *Database) addRoomDeltaToResponse( // This is all "okay" assuming history_visibility == "shared" which it is by default. r.To = delta.membershipPos } - recentStreamEvents, err := d.OutputEvents.SelectRecentEvents( + recentStreamEvents, limited, err := d.OutputEvents.SelectRecentEvents( ctx, txn, delta.roomID, r, numRecentEventsPerRoom, true, true, ) @@ -809,7 +810,7 @@ func (d *Database) addRoomDeltaToResponse( jr.Timeline.PrevBatch = prevBatch.String() jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync) - jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true + jr.Timeline.Limited = limited jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync) res.Rooms.Join[delta.roomID] = *jr case gomatrixserverlib.Leave: diff --git a/syncapi/storage/sqlite3/filter_table.go b/syncapi/storage/sqlite3/filter_table.go index 4cd74e8f2..8b26759dc 100644 --- a/syncapi/storage/sqlite3/filter_table.go +++ b/syncapi/storage/sqlite3/filter_table.go @@ -26,7 +26,7 @@ import ( const filterSchema = ` -- Stores data about filters -CREATE TABLE IF NOT EXISTS account_filter ( +CREATE TABLE IF NOT EXISTS syncapi_filter ( -- The filter filter TEXT NOT NULL, -- The ID @@ -37,17 +37,17 @@ CREATE TABLE IF NOT EXISTS account_filter ( UNIQUE (id, localpart) ); -CREATE INDEX IF NOT EXISTS account_filter_localpart ON account_filter(localpart); +CREATE INDEX IF NOT EXISTS syncapi_filter_localpart ON syncapi_filter(localpart); ` const selectFilterSQL = "" + - "SELECT filter FROM account_filter WHERE localpart = $1 AND id = $2" + "SELECT filter FROM syncapi_filter WHERE localpart = $1 AND id = $2" const selectFilterIDByContentSQL = "" + - "SELECT id FROM account_filter WHERE localpart = $1 AND filter = $2" + "SELECT id FROM syncapi_filter WHERE localpart = $1 AND filter = $2" const insertFilterSQL = "" + - "INSERT INTO account_filter (filter, localpart) VALUES ($1, $2)" + "INSERT INTO syncapi_filter (filter, localpart) VALUES ($1, $2)" type filterStatements struct { selectFilterStmt *sql.Stmt diff --git a/syncapi/storage/sqlite3/output_room_events_table.go b/syncapi/storage/sqlite3/output_room_events_table.go index 367ab3c9a..2dbf8d6c1 100644 --- a/syncapi/storage/sqlite3/output_room_events_table.go +++ b/syncapi/storage/sqlite3/output_room_events_table.go @@ -311,7 +311,7 @@ func (s *outputRoomEventsStatements) SelectRecentEvents( ctx context.Context, txn *sql.Tx, roomID string, r types.Range, limit int, chronologicalOrder bool, onlySyncEvents bool, -) ([]types.StreamEvent, error) { +) ([]types.StreamEvent, bool, error) { var stmt *sql.Stmt if onlySyncEvents { stmt = sqlutil.TxStmt(txn, s.selectRecentEventsForSyncStmt) @@ -319,14 +319,21 @@ func (s *outputRoomEventsStatements) SelectRecentEvents( stmt = sqlutil.TxStmt(txn, s.selectRecentEventsStmt) } - rows, err := stmt.QueryContext(ctx, roomID, r.Low(), r.High(), limit) + rows, err := stmt.QueryContext(ctx, roomID, r.Low(), r.High(), limit+1) if err != nil { - return nil, err + return nil, false, err } defer internal.CloseAndLogIfError(ctx, rows, "selectRecentEvents: rows.close() failed") events, err := rowsToStreamEvents(rows) if err != nil { - return nil, err + return nil, false, err + } + // we queried for 1 more than the limit, so if we returned one more mark limited=true + limited := false + if len(events) > limit { + limited = true + // re-slice the extra event out + events = events[:len(events)-1] } if chronologicalOrder { // The events need to be returned from oldest to latest, which isn't @@ -336,7 +343,7 @@ func (s *outputRoomEventsStatements) SelectRecentEvents( return events[i].StreamPosition < events[j].StreamPosition }) } - return events, nil + return events, limited, nil } func (s *outputRoomEventsStatements) SelectEarlyEvents( diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go index 28a1786b3..4ac0be4ec 100644 --- a/syncapi/storage/tables/interface.go +++ b/syncapi/storage/tables/interface.go @@ -44,8 +44,8 @@ type Events interface { InsertEvent(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, addState, removeState []string, transactionID *api.TransactionID, excludeFromSync bool) (streamPos types.StreamPosition, err error) // SelectRecentEvents returns events between the two stream positions: exclusive of low and inclusive of high. // If onlySyncEvents has a value of true, only returns the events that aren't marked as to exclude from sync. - // Returns up to `limit` events. - SelectRecentEvents(ctx context.Context, txn *sql.Tx, roomID string, r types.Range, limit int, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, error) + // Returns up to `limit` events. Returns `limited=true` if there are more events in this range but we hit the `limit`. + SelectRecentEvents(ctx context.Context, txn *sql.Tx, roomID string, r types.Range, limit int, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error) // SelectEarlyEvents returns the earliest events in the given room. SelectEarlyEvents(ctx context.Context, txn *sql.Tx, roomID string, r types.Range, limit int) ([]types.StreamEvent, error) SelectEvents(ctx context.Context, txn *sql.Tx, eventIDs []string) ([]types.StreamEvent, error) diff --git a/syncapi/sync/request.go b/syncapi/sync/request.go index 5dd92c853..99633de66 100644 --- a/syncapi/sync/request.go +++ b/syncapi/sync/request.go @@ -21,8 +21,10 @@ import ( "strconv" "time" + "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/types" userapi "github.com/matrix-org/dendrite/userapi/api" + "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" log "github.com/sirupsen/logrus" ) @@ -49,7 +51,7 @@ type syncRequest struct { log *log.Entry } -func newSyncRequest(req *http.Request, device userapi.Device) (*syncRequest, error) { +func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Database) (*syncRequest, error) { timeout := getTimeout(req.URL.Query().Get("timeout")) fullState := req.URL.Query().Get("full_state") wantFullState := fullState != "" && fullState != "false" @@ -69,12 +71,25 @@ func newSyncRequest(req *http.Request, device userapi.Device) (*syncRequest, err timelineLimit := defaultTimelineLimit // TODO: read from stored filters too filterQuery := req.URL.Query().Get("filter") - if filterQuery != "" && filterQuery[0] == '{' { - // attempt to parse the timeline limit at least - var f filter - err := json.Unmarshal([]byte(filterQuery), &f) - if err == nil && f.Room.Timeline.Limit != nil { - timelineLimit = *f.Room.Timeline.Limit + if filterQuery != "" { + if filterQuery[0] == '{' { + // attempt to parse the timeline limit at least + var f filter + err := json.Unmarshal([]byte(filterQuery), &f) + if err == nil && f.Room.Timeline.Limit != nil { + timelineLimit = *f.Room.Timeline.Limit + } + } else { + // attempt to load the filter ID + localpart, _, err := gomatrixserverlib.SplitID('@', device.UserID) + if err != nil { + util.GetLogger(req.Context()).WithError(err).Error("gomatrixserverlib.SplitID failed") + return nil, err + } + f, err := syncDB.GetFilter(req.Context(), localpart, filterQuery) + if err == nil { + timelineLimit = f.Room.Timeline.Limit + } } } // TODO: Additional query params: set_presence, filter diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index 743c63a62..196d446a2 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -49,7 +49,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. var syncData *types.Response // Extract values from request - syncReq, err := newSyncRequest(req, *device) + syncReq, err := newSyncRequest(req, *device, rp.db) if err != nil { return util.JSONResponse{ Code: http.StatusBadRequest,