Implement /sync limited and read timeline limit from stored filters

We now fully handle `room.timeline.limit` filters (in-line + stored) and
return the right value for `limited` syncs.
This commit is contained in:
Kegan Dougal 2020-06-26 14:34:01 +01:00
parent 79cf546d92
commit 18d30f91c8
8 changed files with 66 additions and 35 deletions

View file

@ -25,7 +25,7 @@ import (
const filterSchema = ` const filterSchema = `
-- Stores data about filters -- Stores data about filters
CREATE TABLE IF NOT EXISTS account_filter ( CREATE TABLE IF NOT EXISTS syncapi_filter (
-- The filter -- The filter
filter TEXT NOT NULL, filter TEXT NOT NULL,
-- The ID -- The ID
@ -36,17 +36,17 @@ CREATE TABLE IF NOT EXISTS account_filter (
PRIMARY KEY(id, localpart) PRIMARY KEY(id, localpart)
); );
CREATE INDEX IF NOT EXISTS account_filter_localpart ON account_filter(localpart); CREATE INDEX IF NOT EXISTS syncapi_filter_localpart ON syncapi_filter(localpart);
` `
const selectFilterSQL = "" + const selectFilterSQL = "" +
"SELECT filter FROM account_filter WHERE localpart = $1 AND id = $2" "SELECT filter FROM syncapi_filter WHERE localpart = $1 AND id = $2"
const selectFilterIDByContentSQL = "" + const selectFilterIDByContentSQL = "" +
"SELECT id FROM account_filter WHERE localpart = $1 AND filter = $2" "SELECT id FROM syncapi_filter WHERE localpart = $1 AND filter = $2"
const insertFilterSQL = "" + const insertFilterSQL = "" +
"INSERT INTO account_filter (filter, id, localpart) VALUES ($1, DEFAULT, $2) RETURNING id" "INSERT INTO syncapi_filter (filter, id, localpart) VALUES ($1, DEFAULT, $2) RETURNING id"
type filterStatements struct { type filterStatements struct {
selectFilterStmt *sql.Stmt selectFilterStmt *sql.Stmt

View file

@ -301,21 +301,28 @@ func (s *outputRoomEventsStatements) SelectRecentEvents(
ctx context.Context, txn *sql.Tx, ctx context.Context, txn *sql.Tx,
roomID string, r types.Range, limit int, roomID string, r types.Range, limit int,
chronologicalOrder bool, onlySyncEvents bool, chronologicalOrder bool, onlySyncEvents bool,
) ([]types.StreamEvent, error) { ) ([]types.StreamEvent, bool, error) {
var stmt *sql.Stmt var stmt *sql.Stmt
if onlySyncEvents { if onlySyncEvents {
stmt = sqlutil.TxStmt(txn, s.selectRecentEventsForSyncStmt) stmt = sqlutil.TxStmt(txn, s.selectRecentEventsForSyncStmt)
} else { } else {
stmt = sqlutil.TxStmt(txn, s.selectRecentEventsStmt) stmt = sqlutil.TxStmt(txn, s.selectRecentEventsStmt)
} }
rows, err := stmt.QueryContext(ctx, roomID, r.Low(), r.High(), limit) rows, err := stmt.QueryContext(ctx, roomID, r.Low(), r.High(), limit+1)
if err != nil { if err != nil {
return nil, err return nil, false, err
} }
defer internal.CloseAndLogIfError(ctx, rows, "selectRecentEvents: rows.close() failed") defer internal.CloseAndLogIfError(ctx, rows, "selectRecentEvents: rows.close() failed")
events, err := rowsToStreamEvents(rows) events, err := rowsToStreamEvents(rows)
if err != nil { if err != nil {
return nil, err return nil, false, err
}
// we queried for 1 more than the limit, so if we returned one more mark limited=true
limited := false
if len(events) > limit {
limited = true
// re-slice the extra event out
events = events[:len(events)-1]
} }
if chronologicalOrder { if chronologicalOrder {
// The events need to be returned from oldest to latest, which isn't // The events need to be returned from oldest to latest, which isn't
@ -325,7 +332,8 @@ func (s *outputRoomEventsStatements) SelectRecentEvents(
return events[i].StreamPosition < events[j].StreamPosition return events[i].StreamPosition < events[j].StreamPosition
}) })
} }
return events, nil
return events, limited, nil
} }
// selectEarlyEvents returns the earliest events in the given room, starting // selectEarlyEvents returns the earliest events in the given room, starting

View file

@ -79,7 +79,7 @@ func (d *Database) GetEventsInStreamingRange(
} }
if backwardOrdering { if backwardOrdering {
// When using backward ordering, we want the most recent events first. // When using backward ordering, we want the most recent events first.
if events, err = d.OutputEvents.SelectRecentEvents( if events, _, err = d.OutputEvents.SelectRecentEvents(
ctx, nil, roomID, r, limit, false, false, ctx, nil, roomID, r, limit, false, false,
); err != nil { ); err != nil {
return return
@ -655,7 +655,8 @@ func (d *Database) getResponseWithPDUsForCompleteSync(
// TODO: When filters are added, we may need to call this multiple times to get enough events. // TODO: When filters are added, we may need to call this multiple times to get enough events.
// See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316 // See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316
var recentStreamEvents []types.StreamEvent var recentStreamEvents []types.StreamEvent
recentStreamEvents, err = d.OutputEvents.SelectRecentEvents( var limited bool
recentStreamEvents, limited, err = d.OutputEvents.SelectRecentEvents(
ctx, txn, roomID, r, numRecentEventsPerRoom, true, true, ctx, txn, roomID, r, numRecentEventsPerRoom, true, true,
) )
if err != nil { if err != nil {
@ -683,7 +684,7 @@ func (d *Database) getResponseWithPDUsForCompleteSync(
jr := types.NewJoinResponse() jr := types.NewJoinResponse()
jr.Timeline.PrevBatch = prevBatchStr jr.Timeline.PrevBatch = prevBatchStr
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync) jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
jr.Timeline.Limited = true jr.Timeline.Limited = limited
jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(stateEvents, gomatrixserverlib.FormatSync) jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(stateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Join[roomID] = *jr res.Rooms.Join[roomID] = *jr
} }
@ -789,7 +790,7 @@ func (d *Database) addRoomDeltaToResponse(
// This is all "okay" assuming history_visibility == "shared" which it is by default. // This is all "okay" assuming history_visibility == "shared" which it is by default.
r.To = delta.membershipPos r.To = delta.membershipPos
} }
recentStreamEvents, err := d.OutputEvents.SelectRecentEvents( recentStreamEvents, limited, err := d.OutputEvents.SelectRecentEvents(
ctx, txn, delta.roomID, r, ctx, txn, delta.roomID, r,
numRecentEventsPerRoom, true, true, numRecentEventsPerRoom, true, true,
) )
@ -809,7 +810,7 @@ func (d *Database) addRoomDeltaToResponse(
jr.Timeline.PrevBatch = prevBatch.String() jr.Timeline.PrevBatch = prevBatch.String()
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync) jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true jr.Timeline.Limited = limited
jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync) jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Join[delta.roomID] = *jr res.Rooms.Join[delta.roomID] = *jr
case gomatrixserverlib.Leave: case gomatrixserverlib.Leave:

View file

@ -26,7 +26,7 @@ import (
const filterSchema = ` const filterSchema = `
-- Stores data about filters -- Stores data about filters
CREATE TABLE IF NOT EXISTS account_filter ( CREATE TABLE IF NOT EXISTS syncapi_filter (
-- The filter -- The filter
filter TEXT NOT NULL, filter TEXT NOT NULL,
-- The ID -- The ID
@ -37,17 +37,17 @@ CREATE TABLE IF NOT EXISTS account_filter (
UNIQUE (id, localpart) UNIQUE (id, localpart)
); );
CREATE INDEX IF NOT EXISTS account_filter_localpart ON account_filter(localpart); CREATE INDEX IF NOT EXISTS syncapi_filter_localpart ON syncapi_filter(localpart);
` `
const selectFilterSQL = "" + const selectFilterSQL = "" +
"SELECT filter FROM account_filter WHERE localpart = $1 AND id = $2" "SELECT filter FROM syncapi_filter WHERE localpart = $1 AND id = $2"
const selectFilterIDByContentSQL = "" + const selectFilterIDByContentSQL = "" +
"SELECT id FROM account_filter WHERE localpart = $1 AND filter = $2" "SELECT id FROM syncapi_filter WHERE localpart = $1 AND filter = $2"
const insertFilterSQL = "" + const insertFilterSQL = "" +
"INSERT INTO account_filter (filter, localpart) VALUES ($1, $2)" "INSERT INTO syncapi_filter (filter, localpart) VALUES ($1, $2)"
type filterStatements struct { type filterStatements struct {
selectFilterStmt *sql.Stmt selectFilterStmt *sql.Stmt

View file

@ -311,7 +311,7 @@ func (s *outputRoomEventsStatements) SelectRecentEvents(
ctx context.Context, txn *sql.Tx, ctx context.Context, txn *sql.Tx,
roomID string, r types.Range, limit int, roomID string, r types.Range, limit int,
chronologicalOrder bool, onlySyncEvents bool, chronologicalOrder bool, onlySyncEvents bool,
) ([]types.StreamEvent, error) { ) ([]types.StreamEvent, bool, error) {
var stmt *sql.Stmt var stmt *sql.Stmt
if onlySyncEvents { if onlySyncEvents {
stmt = sqlutil.TxStmt(txn, s.selectRecentEventsForSyncStmt) stmt = sqlutil.TxStmt(txn, s.selectRecentEventsForSyncStmt)
@ -319,14 +319,21 @@ func (s *outputRoomEventsStatements) SelectRecentEvents(
stmt = sqlutil.TxStmt(txn, s.selectRecentEventsStmt) stmt = sqlutil.TxStmt(txn, s.selectRecentEventsStmt)
} }
rows, err := stmt.QueryContext(ctx, roomID, r.Low(), r.High(), limit) rows, err := stmt.QueryContext(ctx, roomID, r.Low(), r.High(), limit+1)
if err != nil { if err != nil {
return nil, err return nil, false, err
} }
defer internal.CloseAndLogIfError(ctx, rows, "selectRecentEvents: rows.close() failed") defer internal.CloseAndLogIfError(ctx, rows, "selectRecentEvents: rows.close() failed")
events, err := rowsToStreamEvents(rows) events, err := rowsToStreamEvents(rows)
if err != nil { if err != nil {
return nil, err return nil, false, err
}
// we queried for 1 more than the limit, so if we returned one more mark limited=true
limited := false
if len(events) > limit {
limited = true
// re-slice the extra event out
events = events[:len(events)-1]
} }
if chronologicalOrder { if chronologicalOrder {
// The events need to be returned from oldest to latest, which isn't // The events need to be returned from oldest to latest, which isn't
@ -336,7 +343,7 @@ func (s *outputRoomEventsStatements) SelectRecentEvents(
return events[i].StreamPosition < events[j].StreamPosition return events[i].StreamPosition < events[j].StreamPosition
}) })
} }
return events, nil return events, limited, nil
} }
func (s *outputRoomEventsStatements) SelectEarlyEvents( func (s *outputRoomEventsStatements) SelectEarlyEvents(

View file

@ -44,8 +44,8 @@ type Events interface {
InsertEvent(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, addState, removeState []string, transactionID *api.TransactionID, excludeFromSync bool) (streamPos types.StreamPosition, err error) InsertEvent(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, addState, removeState []string, transactionID *api.TransactionID, excludeFromSync bool) (streamPos types.StreamPosition, err error)
// SelectRecentEvents returns events between the two stream positions: exclusive of low and inclusive of high. // SelectRecentEvents returns events between the two stream positions: exclusive of low and inclusive of high.
// If onlySyncEvents has a value of true, only returns the events that aren't marked as to exclude from sync. // If onlySyncEvents has a value of true, only returns the events that aren't marked as to exclude from sync.
// Returns up to `limit` events. // Returns up to `limit` events. Returns `limited=true` if there are more events in this range but we hit the `limit`.
SelectRecentEvents(ctx context.Context, txn *sql.Tx, roomID string, r types.Range, limit int, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, error) SelectRecentEvents(ctx context.Context, txn *sql.Tx, roomID string, r types.Range, limit int, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error)
// SelectEarlyEvents returns the earliest events in the given room. // SelectEarlyEvents returns the earliest events in the given room.
SelectEarlyEvents(ctx context.Context, txn *sql.Tx, roomID string, r types.Range, limit int) ([]types.StreamEvent, error) SelectEarlyEvents(ctx context.Context, txn *sql.Tx, roomID string, r types.Range, limit int) ([]types.StreamEvent, error)
SelectEvents(ctx context.Context, txn *sql.Tx, eventIDs []string) ([]types.StreamEvent, error) SelectEvents(ctx context.Context, txn *sql.Tx, eventIDs []string) ([]types.StreamEvent, error)

View file

@ -21,8 +21,10 @@ import (
"strconv" "strconv"
"time" "time"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api" userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util" "github.com/matrix-org/util"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
@ -49,7 +51,7 @@ type syncRequest struct {
log *log.Entry log *log.Entry
} }
func newSyncRequest(req *http.Request, device userapi.Device) (*syncRequest, error) { func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Database) (*syncRequest, error) {
timeout := getTimeout(req.URL.Query().Get("timeout")) timeout := getTimeout(req.URL.Query().Get("timeout"))
fullState := req.URL.Query().Get("full_state") fullState := req.URL.Query().Get("full_state")
wantFullState := fullState != "" && fullState != "false" wantFullState := fullState != "" && fullState != "false"
@ -69,12 +71,25 @@ func newSyncRequest(req *http.Request, device userapi.Device) (*syncRequest, err
timelineLimit := defaultTimelineLimit timelineLimit := defaultTimelineLimit
// TODO: read from stored filters too // TODO: read from stored filters too
filterQuery := req.URL.Query().Get("filter") filterQuery := req.URL.Query().Get("filter")
if filterQuery != "" && filterQuery[0] == '{' { if filterQuery != "" {
// attempt to parse the timeline limit at least if filterQuery[0] == '{' {
var f filter // attempt to parse the timeline limit at least
err := json.Unmarshal([]byte(filterQuery), &f) var f filter
if err == nil && f.Room.Timeline.Limit != nil { err := json.Unmarshal([]byte(filterQuery), &f)
timelineLimit = *f.Room.Timeline.Limit if err == nil && f.Room.Timeline.Limit != nil {
timelineLimit = *f.Room.Timeline.Limit
}
} else {
// attempt to load the filter ID
localpart, _, err := gomatrixserverlib.SplitID('@', device.UserID)
if err != nil {
util.GetLogger(req.Context()).WithError(err).Error("gomatrixserverlib.SplitID failed")
return nil, err
}
f, err := syncDB.GetFilter(req.Context(), localpart, filterQuery)
if err == nil {
timelineLimit = f.Room.Timeline.Limit
}
} }
} }
// TODO: Additional query params: set_presence, filter // TODO: Additional query params: set_presence, filter

View file

@ -49,7 +49,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
var syncData *types.Response var syncData *types.Response
// Extract values from request // Extract values from request
syncReq, err := newSyncRequest(req, *device) syncReq, err := newSyncRequest(req, *device, rp.db)
if err != nil { if err != nil {
return util.JSONResponse{ return util.JSONResponse{
Code: http.StatusBadRequest, Code: http.StatusBadRequest,