From da0658dc675a6136d2845faf24e22c268f60b612 Mon Sep 17 00:00:00 2001 From: "Crom (Thibaut CHARLES)" Date: Fri, 5 Jan 2018 14:25:36 +0100 Subject: [PATCH] Timeline filtering Signed-off-by: Thibaut CHARLES cromfr@gmail.com --- .../storage/output_room_events_table.go | 61 +++++++++----- .../dendrite/syncapi/storage/syncserver.go | 82 +++++++++++-------- .../dendrite/syncapi/sync/notifier_test.go | 3 +- .../dendrite/syncapi/sync/request.go | 31 ++++++- .../dendrite/syncapi/sync/requestpool.go | 4 +- .../github.com/matrix-org/gomatrix/filter.go | 51 +++++++++--- 6 files changed, 161 insertions(+), 71 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go b/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go index ceb2601f1..c1a53ca70 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go @@ -19,6 +19,7 @@ import ( "database/sql" "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/gomatrix" "github.com/lib/pq" "github.com/matrix-org/dendrite/common" @@ -39,10 +40,10 @@ CREATE TABLE IF NOT EXISTS syncapi_output_room_events ( id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_stream_id'), -- The event ID for the event event_id TEXT NOT NULL, - -- The 'room_id' key for the event. + -- The 'room_id' key for the event. TODO Duplicate of (event_json->>'room_id') room_id TEXT NOT NULL, - -- The JSON for the event. Stored as TEXT because this should be valid UTF-8. - event_json TEXT NOT NULL, + -- The JSON for the event. Stored as JSON + event_json JSON NOT NULL, -- A list of event IDs which represent a delta of added/removed room state. This can be NULL -- if there is no delta. add_state_ids TEXT[], @@ -51,7 +52,11 @@ CREATE TABLE IF NOT EXISTS syncapi_output_room_events ( transaction_id TEXT -- The transaction id used to send the event, if any ); -- for event selection -CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_id_idx ON syncapi_output_room_events(event_id); +CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_id_idx ON syncapi_output_room_events( + event_id, + room_id, + (event_json->>'type'), + (event_json->>'sender')); ` const insertEventSQL = "" + @@ -62,10 +67,15 @@ const insertEventSQL = "" + const selectEventsSQL = "" + "SELECT id, event_json FROM syncapi_output_room_events WHERE event_id = ANY($1)" -const selectRecentEventsSQL = "" + +const selectRoomRecentEventsSQL = "" + "SELECT id, event_json, device_id, transaction_id FROM syncapi_output_room_events" + - " WHERE room_id = $1 AND id > $2 AND id <= $3" + - " ORDER BY id ASC LIMIT $4" + " WHERE room_id=$1 " + + " AND id > $2 AND id <= $3" + + " AND ( $4::text[] IS NULL OR (event_json->>'sender') = ANY($4) )" + + " AND ( $5::text[] IS NULL OR NOT((event_json->>'sender') = ANY($5)) )" + + " AND ( $6::text[] IS NULL OR (event_json->>'type') = ANY($6) )" + + " AND ( $7::text[] IS NULL OR NOT((event_json->>'type') = ANY($7)) )" + + " ORDER BY id DESC LIMIT $8" const selectMaxEventIDSQL = "" + "SELECT MAX(id) FROM syncapi_output_room_events" @@ -78,11 +88,11 @@ const selectStateInRangeSQL = "" + " ORDER BY id ASC" type outputRoomEventsStatements struct { - insertEventStmt *sql.Stmt - selectEventsStmt *sql.Stmt - selectMaxEventIDStmt *sql.Stmt - selectRecentEventsStmt *sql.Stmt - selectStateInRangeStmt *sql.Stmt + insertEventStmt *sql.Stmt + selectEventsStmt *sql.Stmt + selectMaxEventIDStmt *sql.Stmt + selectRoomRecentEventsStmt *sql.Stmt + selectStateInRangeStmt *sql.Stmt } func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) { @@ -99,7 +109,7 @@ func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) { if s.selectMaxEventIDStmt, err = db.Prepare(selectMaxEventIDSQL); err != nil { return } - if s.selectRecentEventsStmt, err = db.Prepare(selectRecentEventsSQL); err != nil { + if s.selectRoomRecentEventsStmt, err = db.Prepare(selectRoomRecentEventsSQL); err != nil { return } if s.selectStateInRangeStmt, err = db.Prepare(selectStateInRangeSQL); err != nil { @@ -219,22 +229,29 @@ func (s *outputRoomEventsStatements) insertEvent( return } -// RecentEventsInRoom returns the most recent events in the given room, up to a maximum of 'limit'. -func (s *outputRoomEventsStatements) selectRecentEvents( +func (s *outputRoomEventsStatements) selectRoomRecentEvents( ctx context.Context, txn *sql.Tx, - roomID string, fromPos, toPos types.StreamPosition, limit int, -) ([]streamEvent, error) { - stmt := common.TxStmt(txn, s.selectRecentEventsStmt) - rows, err := stmt.QueryContext(ctx, roomID, fromPos, toPos, limit) + roomID string, fromPos, toPos types.StreamPosition, timelineFilter *gomatrix.FilterPart, +) ([]streamEvent, bool, error) { + + stmt := common.TxStmt(txn, s.selectRoomRecentEventsStmt) + rows, err := stmt.QueryContext(ctx, roomID, fromPos, toPos, + pq.StringArray(timelineFilter.Senders), + pq.StringArray(timelineFilter.NotSenders), + pq.StringArray(timelineFilter.Types), + pq.StringArray(timelineFilter.NotTypes), + timelineFilter.Limit, // TODO: limit abusive values? + ) if err != nil { - return nil, err + return nil, false, err } defer rows.Close() // nolint: errcheck events, err := rowsToStreamEvents(rows) if err != nil { - return nil, err + return nil, false, err } - return events, nil + + return events, len(events) == timelineFilter.Limit, nil // TODO: len(events) == timelineFilter.Limit not accurate } // Events returns the events for the given event IDs. Returns an error if any one of the event IDs given are missing diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go index 84417a348..d132cefd6 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go @@ -23,6 +23,7 @@ import ( "github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/gomatrix" // Import the postgres database driver. _ "github.com/lib/pq" "github.com/matrix-org/dendrite/common" @@ -224,7 +225,7 @@ func (d *SyncServerDatabase) IncrementalSync( ctx context.Context, device authtypes.Device, fromPos, toPos types.StreamPosition, - numRecentEventsPerRoom int, + filter *gomatrix.Filter, ) (*types.Response, error) { txn, err := d.db.BeginTx(ctx, &txReadOnlySnapshot) if err != nil { @@ -244,7 +245,7 @@ func (d *SyncServerDatabase) IncrementalSync( res := types.NewResponse(toPos) for _, delta := range deltas { - err = d.addRoomDeltaToResponse(ctx, &device, txn, fromPos, toPos, delta, numRecentEventsPerRoom, res) + err = d.addRoomDeltaToResponse(ctx, &device, txn, fromPos, toPos, delta, filter, res) if err != nil { return nil, err } @@ -261,7 +262,7 @@ func (d *SyncServerDatabase) IncrementalSync( // CompleteSync a complete /sync API response for the given user. func (d *SyncServerDatabase) CompleteSync( - ctx context.Context, userID string, numRecentEventsPerRoom int, + ctx context.Context, userID string, filter *gomatrix.Filter, ) (*types.Response, error) { // This needs to be all done in a transaction as we need to do multiple SELECTs, and we need to have // a consistent view of the database throughout. This includes extracting the sync stream position. @@ -275,7 +276,8 @@ func (d *SyncServerDatabase) CompleteSync( defer common.EndTransaction(txn, &succeeded) // Get the current stream position which we will base the sync response on. - pos, err := d.syncStreamPositionTx(ctx, txn) + posFrom := types.StreamPosition(0) + posTo, err := d.syncStreamPositionTx(ctx, txn) if err != nil { return nil, err } @@ -287,39 +289,53 @@ func (d *SyncServerDatabase) CompleteSync( } // Build up a /sync response. Add joined rooms. - res := types.NewResponse(pos) + res := types.NewResponse(posTo) for _, roomID := range roomIDs { - var stateEvents []gomatrixserverlib.Event - stateEvents, err = d.roomstate.selectCurrentState(ctx, txn, roomID) - if err != nil { - return nil, err - } - // TODO: When filters are added, we may need to call this multiple times to get enough events. - // See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316 - var recentStreamEvents []streamEvent - recentStreamEvents, err = d.events.selectRecentEvents( - ctx, txn, roomID, types.StreamPosition(0), pos, numRecentEventsPerRoom, - ) - if err != nil { - return nil, err - } - // We don't include a device here as we don't need to send down - // transaction IDs for complete syncs - recentEvents := streamEventsToEvents(nil, recentStreamEvents) - - stateEvents = removeDuplicates(stateEvents, recentEvents) jr := types.NewJoinResponse() - jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync) - jr.Timeline.Limited = true - jr.State.Events = gomatrixserverlib.ToClientEvents(stateEvents, gomatrixserverlib.FormatSync) + + //Join response should contain events only if room isn't filtered + if !isRoomFiltered(filter, roomID) { + // Timeline events + var recentStreamEvents []streamEvent + var limited bool + recentStreamEvents, limited, err = d.events.selectRoomRecentEvents( + ctx, txn, roomID, posFrom, posTo, &filter.Room.Timeline) + if err != nil { + return nil, err + } + jr.Timeline.Limited = limited + + recentEvents := streamEventsToEvents(nil, recentStreamEvents) + jr.Timeline.Events = gomatrixserverlib.ToClientEvents( + recentEvents, + gomatrixserverlib.FormatSync) + + // State events + var stateEvents []gomatrixserverlib.Event + stateEvents, err = d.roomstate.selectCurrentState(ctx, txn, roomID) + if err != nil { + return nil, err + } + stateEvents = removeDuplicates(stateEvents, recentEvents) + jr.State.Events = gomatrixserverlib.ToClientEvents(stateEvents, gomatrixserverlib.FormatSync) + + //TODO AccountData events + //TODO Ephemeral events + } + + //TODO Handle jr.Timeline.prev_batch + res.Rooms.Join[roomID] = *jr } - if err = d.addInvitesToResponse(ctx, txn, userID, 0, pos, res); err != nil { + //TODO: filter Invite events + if err = d.addInvitesToResponse(ctx, txn, userID, 0, posTo, res); err != nil { return nil, err } + //TODO handle res.Room[roomID].Leave + succeeded = true return res, err } @@ -409,7 +425,7 @@ func (d *SyncServerDatabase) addRoomDeltaToResponse( txn *sql.Tx, fromPos, toPos types.StreamPosition, delta stateDelta, - numRecentEventsPerRoom int, + filter *gomatrix.Filter, res *types.Response, ) error { endPos := toPos @@ -422,8 +438,8 @@ func (d *SyncServerDatabase) addRoomDeltaToResponse( // This is all "okay" assuming history_visibility == "shared" which it is by default. endPos = delta.membershipPos } - recentStreamEvents, err := d.events.selectRecentEvents( - ctx, txn, delta.roomID, fromPos, endPos, numRecentEventsPerRoom, + recentStreamEvents, limited, err := d.events.selectRoomRecentEvents( + ctx, txn, delta.roomID, fromPos, endPos, &filter.Room.Timeline, ) if err != nil { return err @@ -440,7 +456,7 @@ func (d *SyncServerDatabase) addRoomDeltaToResponse( case "join": jr := types.NewJoinResponse() jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync) - jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true + jr.Timeline.Limited = limited jr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync) res.Rooms.Join[delta.roomID] = *jr case "leave": @@ -450,7 +466,7 @@ func (d *SyncServerDatabase) addRoomDeltaToResponse( // no longer in the room. lr := types.NewLeaveResponse() lr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync) - lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true + lr.Timeline.Limited = limited lr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync) res.Rooms.Leave[delta.roomID] = *lr } diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go index 4fa543936..fbb46d82e 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go @@ -22,6 +22,7 @@ import ( "time" "github.com/matrix-org/dendrite/clientapi/auth/authtypes" + "github.com/matrix-org/gomatrix" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" @@ -286,8 +287,8 @@ func newTestSyncRequest(userID string, since types.StreamPosition) syncRequest { timeout: 1 * time.Minute, since: &since, wantFullState: false, - limit: defaultTimelineLimit, log: util.GetLogger(context.TODO()), ctx: context.TODO(), + filter: gomatrix.DefaultFilter(), } } diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/request.go b/src/github.com/matrix-org/dendrite/syncapi/sync/request.go index 3c1befddf..92bc79e3e 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/request.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/request.go @@ -16,11 +16,14 @@ package sync import ( "context" + "encoding/json" + "errors" "net/http" "strconv" "time" "github.com/matrix-org/dendrite/clientapi/auth/authtypes" + "github.com/matrix-org/gomatrix" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/util" @@ -34,11 +37,11 @@ const defaultTimelineLimit = 20 type syncRequest struct { ctx context.Context device authtypes.Device - limit int timeout time.Duration since *types.StreamPosition // nil means that no since token was supplied wantFullState bool log *log.Entry + filter gomatrix.Filter } func newSyncRequest(req *http.Request, device authtypes.Device) (*syncRequest, error) { @@ -49,15 +52,37 @@ func newSyncRequest(req *http.Request, device authtypes.Device) (*syncRequest, e if err != nil { return nil, err } - // TODO: Additional query params: set_presence, filter + + filterStr := req.URL.Query().Get("filter") + var filter gomatrix.Filter + if filterStr != "" { + if filterStr[0] == '{' { + // Inline filter + filter = gomatrix.DefaultFilter() + err = json.Unmarshal([]byte(filterStr), &filter) + if err != nil { + return nil, err + } + err = filter.Validate() + if err != nil { + return nil, err + } + } else { + // Filter ID + // TODO retrieve filter from DB + return nil, errors.New("Filter ID retrieval not implemented") + } + } + + // TODO: Additional query params: set_presence return &syncRequest{ ctx: req.Context(), device: device, timeout: timeout, since: since, wantFullState: wantFullState, - limit: defaultTimelineLimit, // TODO: read from filter log: util.GetLogger(req.Context()), + filter: filter, }, nil } diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go b/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go index 703ddd3f1..3b21fe9ab 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go @@ -122,9 +122,9 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype func (rp *RequestPool) currentSyncForUser(req syncRequest, currentPos types.StreamPosition) (res *types.Response, err error) { // TODO: handle ignored users if req.since == nil { - res, err = rp.db.CompleteSync(req.ctx, req.device.UserID, req.limit) + res, err = rp.db.CompleteSync(req.ctx, req.device.UserID, &req.filter) } else { - res, err = rp.db.IncrementalSync(req.ctx, req.device, *req.since, currentPos, req.limit) + res, err = rp.db.IncrementalSync(req.ctx, req.device, *req.since, currentPos, &req.filter) } if err != nil { diff --git a/vendor/src/github.com/matrix-org/gomatrix/filter.go b/vendor/src/github.com/matrix-org/gomatrix/filter.go index 3aa65fa24..43ff1c773 100644 --- a/vendor/src/github.com/matrix-org/gomatrix/filter.go +++ b/vendor/src/github.com/matrix-org/gomatrix/filter.go @@ -23,21 +23,23 @@ type Filter struct { EventFields []string `json:"event_fields,omitempty"` EventFormat string `json:"event_format,omitempty"` Presence FilterPart `json:"presence,omitempty"` - Room struct { - AccountData FilterPart `json:"account_data,omitempty"` - Ephemeral FilterPart `json:"ephemeral,omitempty"` - IncludeLeave bool `json:"include_leave,omitempty"` - NotRooms []string `json:"not_rooms,omitempty"` - Rooms []string `json:"rooms,omitempty"` - State FilterPart `json:"state,omitempty"` - Timeline FilterPart `json:"timeline,omitempty"` - } `json:"room,omitempty"` + Room FilterRoom `json:"room,omitempty"` +} + +type FilterRoom struct { + AccountData FilterPart `json:"account_data,omitempty"` + Ephemeral FilterPart `json:"ephemeral,omitempty"` + IncludeLeave bool `json:"include_leave,omitempty"` + NotRooms []string `json:"not_rooms,omitempty"` + Rooms []string `json:"rooms,omitempty"` + State FilterPart `json:"state,omitempty"` + Timeline FilterPart `json:"timeline,omitempty"` } type FilterPart struct { NotRooms []string `json:"not_rooms,omitempty"` Rooms []string `json:"rooms,omitempty"` - Limit *int `json:"limit,omitempty"` + Limit int `json:"limit,omitempty"` NotSenders []string `json:"not_senders,omitempty"` NotTypes []string `json:"not_types,omitempty"` Senders []string `json:"senders,omitempty"` @@ -50,3 +52,32 @@ func (filter *Filter) Validate() error { } return nil } + +func DefaultFilter() Filter { + return Filter{ + AccountData: defaultFilterPart(), + EventFields: nil, + EventFormat: "client", + Presence: defaultFilterPart(), + Room: FilterRoom{ + AccountData: defaultFilterPart(), + Ephemeral: defaultFilterPart(), + IncludeLeave: false, //TODO check default value on synapse + NotRooms: nil, + Rooms: nil, + State: defaultFilterPart(), + Timeline: defaultFilterPart(), + }, + } +} +func defaultFilterPart() FilterPart { + return FilterPart{ + NotRooms: nil, + Rooms: nil, + Limit: 100, //TODO check this on synapse + NotSenders: nil, + NotTypes: nil, + Senders: nil, + Types: nil, + } +}