From 9af5e73fdffcae776ad328e39e140e1b649566b3 Mon Sep 17 00:00:00 2001 From: "Crom (Thibaut CHARLES)" Date: Sat, 23 Dec 2017 15:09:29 +0100 Subject: [PATCH 01/17] Filter unmarshal & validation before storing Signed-off-by: Thibaut CHARLES cromfr@gmail.com --- .../auth/storage/accounts/filter_table.go | 22 ++++++++++++++----- .../auth/storage/accounts/storage.go | 5 +++-- .../dendrite/clientapi/routing/filter.go | 17 +++++++------- .../github.com/matrix-org/gomatrix/filter.go | 9 ++++++++ 4 files changed, 36 insertions(+), 17 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/clientapi/auth/storage/accounts/filter_table.go b/src/github.com/matrix-org/dendrite/clientapi/auth/storage/accounts/filter_table.go index 81bae4545..8b7c6af22 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/auth/storage/accounts/filter_table.go +++ b/src/github.com/matrix-org/dendrite/clientapi/auth/storage/accounts/filter_table.go @@ -17,8 +17,9 @@ package accounts import ( "context" "database/sql" + "encoding/json" - "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/gomatrix" ) const filterSchema = ` @@ -71,20 +72,29 @@ func (s *filterStatements) prepare(db *sql.DB) (err error) { func (s *filterStatements) selectFilter( ctx context.Context, localpart string, filterID string, -) (filter []byte, err error) { - err = s.selectFilterStmt.QueryRowContext(ctx, localpart, filterID).Scan(&filter) - return +) (*gomatrix.Filter, error) { + var filterData []byte + err := s.selectFilterStmt.QueryRowContext(ctx, localpart, filterID).Scan(&filterData) + if err != nil { + return nil, err + } + + var filter gomatrix.Filter + if err = json.Unmarshal(filterData, &filter); err != nil { + return nil, err + } + return &filter, err } func (s *filterStatements) insertFilter( - ctx context.Context, filter []byte, localpart string, + ctx context.Context, filter *gomatrix.Filter, localpart string, ) (filterID string, err error) { var existingFilterID string // This can result in a race condition when two clients try to insert the // same filter and localpart at the same time, however this is not a // problem as both calls will result in the same filterID - filterJSON, err := gomatrixserverlib.CanonicalJSON(filter) + filterJSON, err := json.Marshal(filter) if err != nil { return "", err } diff --git a/src/github.com/matrix-org/dendrite/clientapi/auth/storage/accounts/storage.go b/src/github.com/matrix-org/dendrite/clientapi/auth/storage/accounts/storage.go index e88942e34..652983529 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/auth/storage/accounts/storage.go +++ b/src/github.com/matrix-org/dendrite/clientapi/auth/storage/accounts/storage.go @@ -21,6 +21,7 @@ import ( "github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/gomatrix" "github.com/matrix-org/gomatrixserverlib" "golang.org/x/crypto/bcrypt" // Import the postgres database driver. @@ -329,7 +330,7 @@ func (d *Database) GetThreePIDsForLocalpart( // no such filter exists or if there was an error talking to the database. func (d *Database) GetFilter( ctx context.Context, localpart string, filterID string, -) ([]byte, error) { +) (*gomatrix.Filter, error) { return d.filter.selectFilter(ctx, localpart, filterID) } @@ -337,7 +338,7 @@ func (d *Database) GetFilter( // Returns the filterID as a string. Otherwise returns an error if something // goes wrong. func (d *Database) PutFilter( - ctx context.Context, localpart string, filter []byte, + ctx context.Context, localpart string, filter *gomatrix.Filter, ) (string, error) { return d.filter.insertFilter(ctx, filter, localpart) } diff --git a/src/github.com/matrix-org/dendrite/clientapi/routing/filter.go b/src/github.com/matrix-org/dendrite/clientapi/routing/filter.go index 4b84e293d..d14aaeb96 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/routing/filter.go +++ b/src/github.com/matrix-org/dendrite/clientapi/routing/filter.go @@ -49,7 +49,7 @@ func GetFilter( return httputil.LogThenError(req, err) } - res, err := accountDB.GetFilter(req.Context(), localpart, filterID) + filter, err := accountDB.GetFilter(req.Context(), localpart, filterID) if err != nil { //TODO better error handling. This error message is *probably* right, // but if there are obscure db errors, this will also be returned, @@ -59,15 +59,15 @@ func GetFilter( JSON: jsonerror.NotFound("No such filter"), } } - filter := gomatrix.Filter{} - err = json.Unmarshal(res, &filter) + + filterJSON, err := json.Marshal(filter) if err != nil { - httputil.LogThenError(req, err) + return httputil.LogThenError(req, err) } return util.JSONResponse{ Code: 200, - JSON: filter, + JSON: filterJSON, } } @@ -103,15 +103,14 @@ func PutFilter( return *reqErr } - filterArray, err := json.Marshal(filter) - if err != nil { + if err = filter.Validate(); err != nil { return util.JSONResponse{ Code: 400, - JSON: jsonerror.BadJSON("Filter is malformed"), + JSON: jsonerror.BadJSON("Invalid filter: " + err.Error()), } } - filterID, err := accountDB.PutFilter(req.Context(), localpart, filterArray) + filterID, err := accountDB.PutFilter(req.Context(), localpart, &filter) if err != nil { return httputil.LogThenError(req, err) } diff --git a/vendor/src/github.com/matrix-org/gomatrix/filter.go b/vendor/src/github.com/matrix-org/gomatrix/filter.go index e4e762873..3aa65fa24 100644 --- a/vendor/src/github.com/matrix-org/gomatrix/filter.go +++ b/vendor/src/github.com/matrix-org/gomatrix/filter.go @@ -14,6 +14,8 @@ package gomatrix +import "errors" + //Filter is used by clients to specify how the server should filter responses to e.g. sync requests //Specified by: https://matrix.org/docs/spec/client_server/r0.2.0.html#filtering type Filter struct { @@ -41,3 +43,10 @@ type FilterPart struct { Senders []string `json:"senders,omitempty"` Types []string `json:"types,omitempty"` } + +func (filter *Filter) Validate() error { + if filter.EventFormat != "client" && filter.EventFormat != "federation" { + return errors.New("Bad event_format value. Must be any of [\"client\", \"federation\"]") + } + return nil +} From da0658dc675a6136d2845faf24e22c268f60b612 Mon Sep 17 00:00:00 2001 From: "Crom (Thibaut CHARLES)" Date: Fri, 5 Jan 2018 14:25:36 +0100 Subject: [PATCH 02/17] Timeline filtering Signed-off-by: Thibaut CHARLES cromfr@gmail.com --- .../storage/output_room_events_table.go | 61 +++++++++----- .../dendrite/syncapi/storage/syncserver.go | 82 +++++++++++-------- .../dendrite/syncapi/sync/notifier_test.go | 3 +- .../dendrite/syncapi/sync/request.go | 31 ++++++- .../dendrite/syncapi/sync/requestpool.go | 4 +- .../github.com/matrix-org/gomatrix/filter.go | 51 +++++++++--- 6 files changed, 161 insertions(+), 71 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go b/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go index ceb2601f1..c1a53ca70 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go @@ -19,6 +19,7 @@ import ( "database/sql" "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/gomatrix" "github.com/lib/pq" "github.com/matrix-org/dendrite/common" @@ -39,10 +40,10 @@ CREATE TABLE IF NOT EXISTS syncapi_output_room_events ( id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_stream_id'), -- The event ID for the event event_id TEXT NOT NULL, - -- The 'room_id' key for the event. + -- The 'room_id' key for the event. TODO Duplicate of (event_json->>'room_id') room_id TEXT NOT NULL, - -- The JSON for the event. Stored as TEXT because this should be valid UTF-8. - event_json TEXT NOT NULL, + -- The JSON for the event. Stored as JSON + event_json JSON NOT NULL, -- A list of event IDs which represent a delta of added/removed room state. This can be NULL -- if there is no delta. add_state_ids TEXT[], @@ -51,7 +52,11 @@ CREATE TABLE IF NOT EXISTS syncapi_output_room_events ( transaction_id TEXT -- The transaction id used to send the event, if any ); -- for event selection -CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_id_idx ON syncapi_output_room_events(event_id); +CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_id_idx ON syncapi_output_room_events( + event_id, + room_id, + (event_json->>'type'), + (event_json->>'sender')); ` const insertEventSQL = "" + @@ -62,10 +67,15 @@ const insertEventSQL = "" + const selectEventsSQL = "" + "SELECT id, event_json FROM syncapi_output_room_events WHERE event_id = ANY($1)" -const selectRecentEventsSQL = "" + +const selectRoomRecentEventsSQL = "" + "SELECT id, event_json, device_id, transaction_id FROM syncapi_output_room_events" + - " WHERE room_id = $1 AND id > $2 AND id <= $3" + - " ORDER BY id ASC LIMIT $4" + " WHERE room_id=$1 " + + " AND id > $2 AND id <= $3" + + " AND ( $4::text[] IS NULL OR (event_json->>'sender') = ANY($4) )" + + " AND ( $5::text[] IS NULL OR NOT((event_json->>'sender') = ANY($5)) )" + + " AND ( $6::text[] IS NULL OR (event_json->>'type') = ANY($6) )" + + " AND ( $7::text[] IS NULL OR NOT((event_json->>'type') = ANY($7)) )" + + " ORDER BY id DESC LIMIT $8" const selectMaxEventIDSQL = "" + "SELECT MAX(id) FROM syncapi_output_room_events" @@ -78,11 +88,11 @@ const selectStateInRangeSQL = "" + " ORDER BY id ASC" type outputRoomEventsStatements struct { - insertEventStmt *sql.Stmt - selectEventsStmt *sql.Stmt - selectMaxEventIDStmt *sql.Stmt - selectRecentEventsStmt *sql.Stmt - selectStateInRangeStmt *sql.Stmt + insertEventStmt *sql.Stmt + selectEventsStmt *sql.Stmt + selectMaxEventIDStmt *sql.Stmt + selectRoomRecentEventsStmt *sql.Stmt + selectStateInRangeStmt *sql.Stmt } func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) { @@ -99,7 +109,7 @@ func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) { if s.selectMaxEventIDStmt, err = db.Prepare(selectMaxEventIDSQL); err != nil { return } - if s.selectRecentEventsStmt, err = db.Prepare(selectRecentEventsSQL); err != nil { + if s.selectRoomRecentEventsStmt, err = db.Prepare(selectRoomRecentEventsSQL); err != nil { return } if s.selectStateInRangeStmt, err = db.Prepare(selectStateInRangeSQL); err != nil { @@ -219,22 +229,29 @@ func (s *outputRoomEventsStatements) insertEvent( return } -// RecentEventsInRoom returns the most recent events in the given room, up to a maximum of 'limit'. -func (s *outputRoomEventsStatements) selectRecentEvents( +func (s *outputRoomEventsStatements) selectRoomRecentEvents( ctx context.Context, txn *sql.Tx, - roomID string, fromPos, toPos types.StreamPosition, limit int, -) ([]streamEvent, error) { - stmt := common.TxStmt(txn, s.selectRecentEventsStmt) - rows, err := stmt.QueryContext(ctx, roomID, fromPos, toPos, limit) + roomID string, fromPos, toPos types.StreamPosition, timelineFilter *gomatrix.FilterPart, +) ([]streamEvent, bool, error) { + + stmt := common.TxStmt(txn, s.selectRoomRecentEventsStmt) + rows, err := stmt.QueryContext(ctx, roomID, fromPos, toPos, + pq.StringArray(timelineFilter.Senders), + pq.StringArray(timelineFilter.NotSenders), + pq.StringArray(timelineFilter.Types), + pq.StringArray(timelineFilter.NotTypes), + timelineFilter.Limit, // TODO: limit abusive values? + ) if err != nil { - return nil, err + return nil, false, err } defer rows.Close() // nolint: errcheck events, err := rowsToStreamEvents(rows) if err != nil { - return nil, err + return nil, false, err } - return events, nil + + return events, len(events) == timelineFilter.Limit, nil // TODO: len(events) == timelineFilter.Limit not accurate } // Events returns the events for the given event IDs. Returns an error if any one of the event IDs given are missing diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go index 84417a348..d132cefd6 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go @@ -23,6 +23,7 @@ import ( "github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/gomatrix" // Import the postgres database driver. _ "github.com/lib/pq" "github.com/matrix-org/dendrite/common" @@ -224,7 +225,7 @@ func (d *SyncServerDatabase) IncrementalSync( ctx context.Context, device authtypes.Device, fromPos, toPos types.StreamPosition, - numRecentEventsPerRoom int, + filter *gomatrix.Filter, ) (*types.Response, error) { txn, err := d.db.BeginTx(ctx, &txReadOnlySnapshot) if err != nil { @@ -244,7 +245,7 @@ func (d *SyncServerDatabase) IncrementalSync( res := types.NewResponse(toPos) for _, delta := range deltas { - err = d.addRoomDeltaToResponse(ctx, &device, txn, fromPos, toPos, delta, numRecentEventsPerRoom, res) + err = d.addRoomDeltaToResponse(ctx, &device, txn, fromPos, toPos, delta, filter, res) if err != nil { return nil, err } @@ -261,7 +262,7 @@ func (d *SyncServerDatabase) IncrementalSync( // CompleteSync a complete /sync API response for the given user. func (d *SyncServerDatabase) CompleteSync( - ctx context.Context, userID string, numRecentEventsPerRoom int, + ctx context.Context, userID string, filter *gomatrix.Filter, ) (*types.Response, error) { // This needs to be all done in a transaction as we need to do multiple SELECTs, and we need to have // a consistent view of the database throughout. This includes extracting the sync stream position. @@ -275,7 +276,8 @@ func (d *SyncServerDatabase) CompleteSync( defer common.EndTransaction(txn, &succeeded) // Get the current stream position which we will base the sync response on. - pos, err := d.syncStreamPositionTx(ctx, txn) + posFrom := types.StreamPosition(0) + posTo, err := d.syncStreamPositionTx(ctx, txn) if err != nil { return nil, err } @@ -287,39 +289,53 @@ func (d *SyncServerDatabase) CompleteSync( } // Build up a /sync response. Add joined rooms. - res := types.NewResponse(pos) + res := types.NewResponse(posTo) for _, roomID := range roomIDs { - var stateEvents []gomatrixserverlib.Event - stateEvents, err = d.roomstate.selectCurrentState(ctx, txn, roomID) - if err != nil { - return nil, err - } - // TODO: When filters are added, we may need to call this multiple times to get enough events. - // See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316 - var recentStreamEvents []streamEvent - recentStreamEvents, err = d.events.selectRecentEvents( - ctx, txn, roomID, types.StreamPosition(0), pos, numRecentEventsPerRoom, - ) - if err != nil { - return nil, err - } - // We don't include a device here as we don't need to send down - // transaction IDs for complete syncs - recentEvents := streamEventsToEvents(nil, recentStreamEvents) - - stateEvents = removeDuplicates(stateEvents, recentEvents) jr := types.NewJoinResponse() - jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync) - jr.Timeline.Limited = true - jr.State.Events = gomatrixserverlib.ToClientEvents(stateEvents, gomatrixserverlib.FormatSync) + + //Join response should contain events only if room isn't filtered + if !isRoomFiltered(filter, roomID) { + // Timeline events + var recentStreamEvents []streamEvent + var limited bool + recentStreamEvents, limited, err = d.events.selectRoomRecentEvents( + ctx, txn, roomID, posFrom, posTo, &filter.Room.Timeline) + if err != nil { + return nil, err + } + jr.Timeline.Limited = limited + + recentEvents := streamEventsToEvents(nil, recentStreamEvents) + jr.Timeline.Events = gomatrixserverlib.ToClientEvents( + recentEvents, + gomatrixserverlib.FormatSync) + + // State events + var stateEvents []gomatrixserverlib.Event + stateEvents, err = d.roomstate.selectCurrentState(ctx, txn, roomID) + if err != nil { + return nil, err + } + stateEvents = removeDuplicates(stateEvents, recentEvents) + jr.State.Events = gomatrixserverlib.ToClientEvents(stateEvents, gomatrixserverlib.FormatSync) + + //TODO AccountData events + //TODO Ephemeral events + } + + //TODO Handle jr.Timeline.prev_batch + res.Rooms.Join[roomID] = *jr } - if err = d.addInvitesToResponse(ctx, txn, userID, 0, pos, res); err != nil { + //TODO: filter Invite events + if err = d.addInvitesToResponse(ctx, txn, userID, 0, posTo, res); err != nil { return nil, err } + //TODO handle res.Room[roomID].Leave + succeeded = true return res, err } @@ -409,7 +425,7 @@ func (d *SyncServerDatabase) addRoomDeltaToResponse( txn *sql.Tx, fromPos, toPos types.StreamPosition, delta stateDelta, - numRecentEventsPerRoom int, + filter *gomatrix.Filter, res *types.Response, ) error { endPos := toPos @@ -422,8 +438,8 @@ func (d *SyncServerDatabase) addRoomDeltaToResponse( // This is all "okay" assuming history_visibility == "shared" which it is by default. endPos = delta.membershipPos } - recentStreamEvents, err := d.events.selectRecentEvents( - ctx, txn, delta.roomID, fromPos, endPos, numRecentEventsPerRoom, + recentStreamEvents, limited, err := d.events.selectRoomRecentEvents( + ctx, txn, delta.roomID, fromPos, endPos, &filter.Room.Timeline, ) if err != nil { return err @@ -440,7 +456,7 @@ func (d *SyncServerDatabase) addRoomDeltaToResponse( case "join": jr := types.NewJoinResponse() jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync) - jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true + jr.Timeline.Limited = limited jr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync) res.Rooms.Join[delta.roomID] = *jr case "leave": @@ -450,7 +466,7 @@ func (d *SyncServerDatabase) addRoomDeltaToResponse( // no longer in the room. lr := types.NewLeaveResponse() lr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync) - lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true + lr.Timeline.Limited = limited lr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync) res.Rooms.Leave[delta.roomID] = *lr } diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go index 4fa543936..fbb46d82e 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/notifier_test.go @@ -22,6 +22,7 @@ import ( "time" "github.com/matrix-org/dendrite/clientapi/auth/authtypes" + "github.com/matrix-org/gomatrix" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" @@ -286,8 +287,8 @@ func newTestSyncRequest(userID string, since types.StreamPosition) syncRequest { timeout: 1 * time.Minute, since: &since, wantFullState: false, - limit: defaultTimelineLimit, log: util.GetLogger(context.TODO()), ctx: context.TODO(), + filter: gomatrix.DefaultFilter(), } } diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/request.go b/src/github.com/matrix-org/dendrite/syncapi/sync/request.go index 3c1befddf..92bc79e3e 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/request.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/request.go @@ -16,11 +16,14 @@ package sync import ( "context" + "encoding/json" + "errors" "net/http" "strconv" "time" "github.com/matrix-org/dendrite/clientapi/auth/authtypes" + "github.com/matrix-org/gomatrix" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/util" @@ -34,11 +37,11 @@ const defaultTimelineLimit = 20 type syncRequest struct { ctx context.Context device authtypes.Device - limit int timeout time.Duration since *types.StreamPosition // nil means that no since token was supplied wantFullState bool log *log.Entry + filter gomatrix.Filter } func newSyncRequest(req *http.Request, device authtypes.Device) (*syncRequest, error) { @@ -49,15 +52,37 @@ func newSyncRequest(req *http.Request, device authtypes.Device) (*syncRequest, e if err != nil { return nil, err } - // TODO: Additional query params: set_presence, filter + + filterStr := req.URL.Query().Get("filter") + var filter gomatrix.Filter + if filterStr != "" { + if filterStr[0] == '{' { + // Inline filter + filter = gomatrix.DefaultFilter() + err = json.Unmarshal([]byte(filterStr), &filter) + if err != nil { + return nil, err + } + err = filter.Validate() + if err != nil { + return nil, err + } + } else { + // Filter ID + // TODO retrieve filter from DB + return nil, errors.New("Filter ID retrieval not implemented") + } + } + + // TODO: Additional query params: set_presence return &syncRequest{ ctx: req.Context(), device: device, timeout: timeout, since: since, wantFullState: wantFullState, - limit: defaultTimelineLimit, // TODO: read from filter log: util.GetLogger(req.Context()), + filter: filter, }, nil } diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go b/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go index 703ddd3f1..3b21fe9ab 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go @@ -122,9 +122,9 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype func (rp *RequestPool) currentSyncForUser(req syncRequest, currentPos types.StreamPosition) (res *types.Response, err error) { // TODO: handle ignored users if req.since == nil { - res, err = rp.db.CompleteSync(req.ctx, req.device.UserID, req.limit) + res, err = rp.db.CompleteSync(req.ctx, req.device.UserID, &req.filter) } else { - res, err = rp.db.IncrementalSync(req.ctx, req.device, *req.since, currentPos, req.limit) + res, err = rp.db.IncrementalSync(req.ctx, req.device, *req.since, currentPos, &req.filter) } if err != nil { diff --git a/vendor/src/github.com/matrix-org/gomatrix/filter.go b/vendor/src/github.com/matrix-org/gomatrix/filter.go index 3aa65fa24..43ff1c773 100644 --- a/vendor/src/github.com/matrix-org/gomatrix/filter.go +++ b/vendor/src/github.com/matrix-org/gomatrix/filter.go @@ -23,21 +23,23 @@ type Filter struct { EventFields []string `json:"event_fields,omitempty"` EventFormat string `json:"event_format,omitempty"` Presence FilterPart `json:"presence,omitempty"` - Room struct { - AccountData FilterPart `json:"account_data,omitempty"` - Ephemeral FilterPart `json:"ephemeral,omitempty"` - IncludeLeave bool `json:"include_leave,omitempty"` - NotRooms []string `json:"not_rooms,omitempty"` - Rooms []string `json:"rooms,omitempty"` - State FilterPart `json:"state,omitempty"` - Timeline FilterPart `json:"timeline,omitempty"` - } `json:"room,omitempty"` + Room FilterRoom `json:"room,omitempty"` +} + +type FilterRoom struct { + AccountData FilterPart `json:"account_data,omitempty"` + Ephemeral FilterPart `json:"ephemeral,omitempty"` + IncludeLeave bool `json:"include_leave,omitempty"` + NotRooms []string `json:"not_rooms,omitempty"` + Rooms []string `json:"rooms,omitempty"` + State FilterPart `json:"state,omitempty"` + Timeline FilterPart `json:"timeline,omitempty"` } type FilterPart struct { NotRooms []string `json:"not_rooms,omitempty"` Rooms []string `json:"rooms,omitempty"` - Limit *int `json:"limit,omitempty"` + Limit int `json:"limit,omitempty"` NotSenders []string `json:"not_senders,omitempty"` NotTypes []string `json:"not_types,omitempty"` Senders []string `json:"senders,omitempty"` @@ -50,3 +52,32 @@ func (filter *Filter) Validate() error { } return nil } + +func DefaultFilter() Filter { + return Filter{ + AccountData: defaultFilterPart(), + EventFields: nil, + EventFormat: "client", + Presence: defaultFilterPart(), + Room: FilterRoom{ + AccountData: defaultFilterPart(), + Ephemeral: defaultFilterPart(), + IncludeLeave: false, //TODO check default value on synapse + NotRooms: nil, + Rooms: nil, + State: defaultFilterPart(), + Timeline: defaultFilterPart(), + }, + } +} +func defaultFilterPart() FilterPart { + return FilterPart{ + NotRooms: nil, + Rooms: nil, + Limit: 100, //TODO check this on synapse + NotSenders: nil, + NotTypes: nil, + Senders: nil, + Types: nil, + } +} From a762aecc62ff38cb94dbf9ceaa1bce201c12db7a Mon Sep 17 00:00:00 2001 From: "Crom (Thibaut CHARLES)" Date: Fri, 5 Jan 2018 15:06:58 +0100 Subject: [PATCH 03/17] Fix room timeline-specific filtering & linter warns Signed-off-by: Thibaut CHARLES cromfr@gmail.com --- .../dendrite/syncapi/storage/filtering.go | 48 +++++++++++++++++++ .../dendrite/syncapi/storage/syncserver.go | 10 ++-- .../dendrite/syncapi/sync/request.go | 1 - .../github.com/matrix-org/gomatrix/filter.go | 2 +- 4 files changed, 55 insertions(+), 6 deletions(-) create mode 100644 src/github.com/matrix-org/dendrite/syncapi/storage/filtering.go diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/filtering.go b/src/github.com/matrix-org/dendrite/syncapi/storage/filtering.go new file mode 100644 index 000000000..21ab25877 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/filtering.go @@ -0,0 +1,48 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "github.com/matrix-org/gomatrix" +) + +func isRoomFiltered(roomID string, filter *gomatrix.Filter, filterPart *gomatrix.FilterPart) bool { + if filter != nil { + if filter.Room.Rooms != nil && !hasValue(roomID, filter.Room.Rooms) { + return true + } + if filter.Room.NotRooms != nil && hasValue(roomID, filter.Room.NotRooms) { + return true + } + } + if filterPart != nil { + if filterPart.Rooms != nil && !hasValue(roomID, filterPart.Rooms) { + return true + } + if filterPart.NotRooms != nil && hasValue(roomID, filterPart.NotRooms) { + return true + } + } + return false +} + +func hasValue(value string, list []string) bool { + for i := range list { + if list[i] == value { + return true + } + } + return false +} diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go index d132cefd6..36b10e156 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go @@ -245,9 +245,11 @@ func (d *SyncServerDatabase) IncrementalSync( res := types.NewResponse(toPos) for _, delta := range deltas { - err = d.addRoomDeltaToResponse(ctx, &device, txn, fromPos, toPos, delta, filter, res) - if err != nil { - return nil, err + if !isRoomFiltered(delta.roomID, filter, &filter.Room.Timeline) { + err = d.addRoomDeltaToResponse(ctx, &device, txn, fromPos, toPos, delta, filter, res) + if err != nil { + return nil, err + } } } @@ -295,7 +297,7 @@ func (d *SyncServerDatabase) CompleteSync( jr := types.NewJoinResponse() //Join response should contain events only if room isn't filtered - if !isRoomFiltered(filter, roomID) { + if !isRoomFiltered(roomID, filter, &filter.Room.Timeline) { // Timeline events var recentStreamEvents []streamEvent var limited bool diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/request.go b/src/github.com/matrix-org/dendrite/syncapi/sync/request.go index 92bc79e3e..42494b9b5 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/request.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/request.go @@ -31,7 +31,6 @@ import ( ) const defaultSyncTimeout = time.Duration(30) * time.Second -const defaultTimelineLimit = 20 // syncRequest represents a /sync request, with sensible defaults/sanity checks applied. type syncRequest struct { diff --git a/vendor/src/github.com/matrix-org/gomatrix/filter.go b/vendor/src/github.com/matrix-org/gomatrix/filter.go index 43ff1c773..820f44f0c 100644 --- a/vendor/src/github.com/matrix-org/gomatrix/filter.go +++ b/vendor/src/github.com/matrix-org/gomatrix/filter.go @@ -74,7 +74,7 @@ func defaultFilterPart() FilterPart { return FilterPart{ NotRooms: nil, Rooms: nil, - Limit: 100, //TODO check this on synapse + Limit: 20, NotSenders: nil, NotTypes: nil, Senders: nil, From 2c3e55c7d390949e3d3adaf95d241bdae1eec883 Mon Sep 17 00:00:00 2001 From: "Crom (Thibaut CHARLES)" Date: Wed, 10 Jan 2018 14:28:27 +0100 Subject: [PATCH 04/17] moved event_json->>property to separate columns Signed-off-by: Thibaut CHARLES cromfr@gmail.com --- .../storage/output_room_events_table.go | 28 +++++++++++-------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go b/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go index c1a53ca70..bc1add678 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go @@ -40,10 +40,14 @@ CREATE TABLE IF NOT EXISTS syncapi_output_room_events ( id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_stream_id'), -- The event ID for the event event_id TEXT NOT NULL, - -- The 'room_id' key for the event. TODO Duplicate of (event_json->>'room_id') + -- The 'room_id' key for the event. room_id TEXT NOT NULL, - -- The JSON for the event. Stored as JSON - event_json JSON NOT NULL, + -- The 'type' property for the event. + type TEXT NOT NULL, + -- The 'sender' property for the event. + sender TEXT NOT NULL, + -- The JSON for the event. Stored as TEXT because this should be valid UTF-8. + event_json TEXT NOT NULL, -- A list of event IDs which represent a delta of added/removed room state. This can be NULL -- if there is no delta. add_state_ids TEXT[], @@ -55,14 +59,14 @@ CREATE TABLE IF NOT EXISTS syncapi_output_room_events ( CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_id_idx ON syncapi_output_room_events( event_id, room_id, - (event_json->>'type'), - (event_json->>'sender')); + type, + sender); ` const insertEventSQL = "" + "INSERT INTO syncapi_output_room_events (" + - " room_id, event_id, event_json, add_state_ids, remove_state_ids, device_id, transaction_id" + - ") VALUES ($1, $2, $3, $4, $5, $6, $7) RETURNING id" + " room_id, event_id, type, sender, event_json, add_state_ids, remove_state_ids, device_id, transaction_id" + + ") VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) RETURNING id" const selectEventsSQL = "" + "SELECT id, event_json FROM syncapi_output_room_events WHERE event_id = ANY($1)" @@ -71,10 +75,10 @@ const selectRoomRecentEventsSQL = "" + "SELECT id, event_json, device_id, transaction_id FROM syncapi_output_room_events" + " WHERE room_id=$1 " + " AND id > $2 AND id <= $3" + - " AND ( $4::text[] IS NULL OR (event_json->>'sender') = ANY($4) )" + - " AND ( $5::text[] IS NULL OR NOT((event_json->>'sender') = ANY($5)) )" + - " AND ( $6::text[] IS NULL OR (event_json->>'type') = ANY($6) )" + - " AND ( $7::text[] IS NULL OR NOT((event_json->>'type') = ANY($7)) )" + + " AND ( $4::text[] IS NULL OR sender = ANY($4) )" + + " AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" + + " AND ( $6::text[] IS NULL OR type = ANY($6) )" + + " AND ( $7::text[] IS NULL OR NOT(type = ANY($7)) )" + " ORDER BY id DESC LIMIT $8" const selectMaxEventIDSQL = "" + @@ -220,6 +224,8 @@ func (s *outputRoomEventsStatements) insertEvent( ctx, event.RoomID(), event.EventID(), + event.Type(), + event.Sender(), event.JSON(), pq.StringArray(addState), pq.StringArray(removeState), From dd65a8fe688003122099082eb974e3b3e0f38592 Mon Sep 17 00:00:00 2001 From: "Crom (Thibaut CHARLES)" Date: Wed, 10 Jan 2018 15:15:19 +0100 Subject: [PATCH 05/17] Room state filtering Signed-off-by: Thibaut CHARLES cromfr@gmail.com --- .../dendrite/syncapi/routing/state.go | 2 +- .../storage/current_room_state_table.go | 37 +++++++++--- .../dendrite/syncapi/storage/syncserver.go | 57 +++++++++++-------- .../github.com/matrix-org/gomatrix/filter.go | 14 ++--- 4 files changed, 70 insertions(+), 40 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/syncapi/routing/state.go b/src/github.com/matrix-org/dendrite/syncapi/routing/state.go index 6c825fce8..fc219b555 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/routing/state.go +++ b/src/github.com/matrix-org/dendrite/syncapi/routing/state.go @@ -44,7 +44,7 @@ func OnIncomingStateRequest(req *http.Request, db *storage.SyncServerDatabase, r // TODO(#287): Auth request and handle the case where the user has left (where // we should return the state at the poin they left) - stateEvents, err := db.GetStateEventsForRoom(req.Context(), roomID) + stateEvents, err := db.GetStateEventsForRoom(req.Context(), roomID, nil) if err != nil { return httputil.LogThenError(req, err) } diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go b/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go index 852bfd760..dfe4314d9 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go @@ -20,6 +20,7 @@ import ( "github.com/lib/pq" "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/gomatrix" "github.com/matrix-org/gomatrixserverlib" ) @@ -32,6 +33,8 @@ CREATE TABLE IF NOT EXISTS syncapi_current_room_state ( event_id TEXT NOT NULL, -- The state event type e.g 'm.room.member' type TEXT NOT NULL, + -- The 'sender' property for the event. + sender TEXT NOT NULL, -- The state_key value for this state event e.g '' state_key TEXT NOT NULL, -- The JSON for the event. Stored as TEXT because this should be valid UTF-8. @@ -46,16 +49,17 @@ CREATE TABLE IF NOT EXISTS syncapi_current_room_state ( CONSTRAINT syncapi_room_state_unique UNIQUE (room_id, type, state_key) ); -- for event deletion -CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_id_idx ON syncapi_current_room_state(event_id); +CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_id_idx ON syncapi_current_room_state(event_id, room_id, type, sender); -- for querying membership states of users CREATE INDEX IF NOT EXISTS syncapi_membership_idx ON syncapi_current_room_state(type, state_key, membership) WHERE membership IS NOT NULL AND membership != 'leave'; + ` const upsertRoomStateSQL = "" + - "INSERT INTO syncapi_current_room_state (room_id, event_id, type, state_key, event_json, membership, added_at)" + - " VALUES ($1, $2, $3, $4, $5, $6, $7)" + + "INSERT INTO syncapi_current_room_state (room_id, event_id, type, sender, state_key, event_json, membership, added_at)" + + " VALUES ($1, $2, $3, $4, $5, $6, $7, 8)" + " ON CONFLICT ON CONSTRAINT syncapi_room_state_unique" + - " DO UPDATE SET event_id = $2, event_json = $5, membership = $6, added_at = $7" + " DO UPDATE SET event_id = $2, sender=$4, event_json = $6, membership = $7, added_at = $8" const deleteRoomStateByEventIDSQL = "" + "DELETE FROM syncapi_current_room_state WHERE event_id = $1" @@ -64,7 +68,12 @@ const selectRoomIDsWithMembershipSQL = "" + "SELECT room_id FROM syncapi_current_room_state WHERE type = 'm.room.member' AND state_key = $1 AND membership = $2" const selectCurrentStateSQL = "" + - "SELECT event_json FROM syncapi_current_room_state WHERE room_id = $1" + "SELECT event_json FROM syncapi_current_room_state WHERE room_id = $1" + + " AND ( $2::text[] IS NULL OR sender = ANY($2) )" + + " AND ( $3::text[] IS NULL OR NOT(sender = ANY($3)) )" + + " AND ( $4::text[] IS NULL OR type = ANY($4) )" + + " AND ( $5::text[] IS NULL OR NOT(type = ANY($5)) )" + + " LIMIT $6" const selectJoinedUsersSQL = "" + "SELECT room_id, state_key FROM syncapi_current_room_state WHERE type = 'm.room.member' AND membership = 'join'" @@ -165,10 +174,23 @@ func (s *currentRoomStateStatements) selectRoomIDsWithMembership( // CurrentState returns all the current state events for the given room. func (s *currentRoomStateStatements) selectCurrentState( - ctx context.Context, txn *sql.Tx, roomID string, + ctx context.Context, txn *sql.Tx, roomID string, stateFilter *gomatrix.FilterPart, ) ([]gomatrixserverlib.Event, error) { + + var filter gomatrix.FilterPart + if stateFilter == nil { + filter = gomatrix.DefaultFilterPart() + } else { + filter = *stateFilter + } + stmt := common.TxStmt(txn, s.selectCurrentStateStmt) - rows, err := stmt.QueryContext(ctx, roomID) + rows, err := stmt.QueryContext(ctx, roomID, + pq.StringArray(filter.Senders), + pq.StringArray(filter.NotSenders), + pq.StringArray(filter.Types), + pq.StringArray(filter.NotTypes), + filter.Limit) if err != nil { return nil, err } @@ -195,6 +217,7 @@ func (s *currentRoomStateStatements) upsertRoomState( event.RoomID(), event.EventID(), event.Type(), + event.Sender(), *event.StateKey(), event.JSON(), membership, diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go index 36b10e156..5622ec9aa 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go @@ -178,10 +178,11 @@ func (d *SyncServerDatabase) GetStateEvent( // Returns an empty slice if no state events could be found for this room. // Returns an error if there was an issue with the retrieval. func (d *SyncServerDatabase) GetStateEventsForRoom( - ctx context.Context, roomID string, + ctx context.Context, roomID string, stateFilter *gomatrix.FilterPart, ) (stateEvents []gomatrixserverlib.Event, err error) { + err = common.WithTransaction(d.db, func(txn *sql.Tx) error { - stateEvents, err = d.roomstate.selectCurrentState(ctx, txn, roomID) + stateEvents, err = d.roomstate.selectCurrentState(ctx, txn, roomID, stateFilter) return err }) return @@ -238,7 +239,7 @@ func (d *SyncServerDatabase) IncrementalSync( // joined rooms, but also which rooms have membership transitions for this user between the 2 stream positions. // This works out what the 'state' key should be for each room as well as which membership block // to put the room into. - deltas, err := d.getStateDeltas(ctx, &device, txn, fromPos, toPos, device.UserID) + deltas, err := d.getStateDeltas(ctx, &device, txn, fromPos, toPos, device.UserID, &filter.Room.State) if err != nil { return nil, err } @@ -297,31 +298,37 @@ func (d *SyncServerDatabase) CompleteSync( jr := types.NewJoinResponse() //Join response should contain events only if room isn't filtered - if !isRoomFiltered(roomID, filter, &filter.Room.Timeline) { + if !isRoomFiltered(roomID, filter, nil) { // Timeline events - var recentStreamEvents []streamEvent - var limited bool - recentStreamEvents, limited, err = d.events.selectRoomRecentEvents( - ctx, txn, roomID, posFrom, posTo, &filter.Room.Timeline) - if err != nil { - return nil, err - } - jr.Timeline.Limited = limited + var recentEvents []gomatrixserverlib.Event + if !isRoomFiltered(roomID, nil, &filter.Room.Timeline) { + var recentStreamEvents []streamEvent + var limited bool + recentStreamEvents, limited, err = d.events.selectRoomRecentEvents( + ctx, txn, roomID, posFrom, posTo, &filter.Room.Timeline) + if err != nil { + return nil, err + } + recentEvents = streamEventsToEvents(nil, recentStreamEvents) - recentEvents := streamEventsToEvents(nil, recentStreamEvents) - jr.Timeline.Events = gomatrixserverlib.ToClientEvents( - recentEvents, - gomatrixserverlib.FormatSync) + jr.Timeline.Limited = limited + jr.Timeline.Events = gomatrixserverlib.ToClientEvents( + recentEvents, + gomatrixserverlib.FormatSync) + } // State events - var stateEvents []gomatrixserverlib.Event - stateEvents, err = d.roomstate.selectCurrentState(ctx, txn, roomID) - if err != nil { - return nil, err + if !isRoomFiltered(roomID, nil, &filter.Room.State) { + var stateEvents []gomatrixserverlib.Event + stateEvents, err = d.roomstate.selectCurrentState(ctx, txn, roomID, &filter.Room.State) + if err != nil { + return nil, err + } + if recentEvents != nil { + stateEvents = removeDuplicates(stateEvents, recentEvents) + } + jr.State.Events = gomatrixserverlib.ToClientEvents(stateEvents, gomatrixserverlib.FormatSync) } - stateEvents = removeDuplicates(stateEvents, recentEvents) - jr.State.Events = gomatrixserverlib.ToClientEvents(stateEvents, gomatrixserverlib.FormatSync) - //TODO AccountData events //TODO Ephemeral events } @@ -564,7 +571,7 @@ func (d *SyncServerDatabase) fetchMissingStateEvents( func (d *SyncServerDatabase) getStateDeltas( ctx context.Context, device *authtypes.Device, txn *sql.Tx, - fromPos, toPos types.StreamPosition, userID string, + fromPos, toPos types.StreamPosition, userID string, stateFilter *gomatrix.FilterPart, ) ([]stateDelta, error) { // Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821 // - Get membership list changes for this user in this sync response @@ -597,7 +604,7 @@ func (d *SyncServerDatabase) getStateDeltas( if membership == "join" { // send full room state down instead of a delta var allState []gomatrixserverlib.Event - allState, err = d.roomstate.selectCurrentState(ctx, txn, roomID) + allState, err = d.roomstate.selectCurrentState(ctx, txn, roomID, stateFilter) if err != nil { return nil, err } diff --git a/vendor/src/github.com/matrix-org/gomatrix/filter.go b/vendor/src/github.com/matrix-org/gomatrix/filter.go index 820f44f0c..52294bf1c 100644 --- a/vendor/src/github.com/matrix-org/gomatrix/filter.go +++ b/vendor/src/github.com/matrix-org/gomatrix/filter.go @@ -55,22 +55,22 @@ func (filter *Filter) Validate() error { func DefaultFilter() Filter { return Filter{ - AccountData: defaultFilterPart(), + AccountData: DefaultFilterPart(), EventFields: nil, EventFormat: "client", - Presence: defaultFilterPart(), + Presence: DefaultFilterPart(), Room: FilterRoom{ - AccountData: defaultFilterPart(), - Ephemeral: defaultFilterPart(), + AccountData: DefaultFilterPart(), + Ephemeral: DefaultFilterPart(), IncludeLeave: false, //TODO check default value on synapse NotRooms: nil, Rooms: nil, - State: defaultFilterPart(), - Timeline: defaultFilterPart(), + State: DefaultFilterPart(), + Timeline: DefaultFilterPart(), }, } } -func defaultFilterPart() FilterPart { +func DefaultFilterPart() FilterPart { return FilterPart{ NotRooms: nil, Rooms: nil, From 2ec5a1cec9a83f86b06435672e37df422bede7c7 Mon Sep 17 00:00:00 2001 From: "Crom (Thibaut CHARLES)" Date: Wed, 10 Jan 2018 15:28:44 +0100 Subject: [PATCH 06/17] Removed state filter limit usage Signed-off-by: Thibaut CHARLES cromfr@gmail.com --- .../dendrite/syncapi/storage/current_room_state_table.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go b/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go index dfe4314d9..2bc3fdde8 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go @@ -72,8 +72,7 @@ const selectCurrentStateSQL = "" + " AND ( $2::text[] IS NULL OR sender = ANY($2) )" + " AND ( $3::text[] IS NULL OR NOT(sender = ANY($3)) )" + " AND ( $4::text[] IS NULL OR type = ANY($4) )" + - " AND ( $5::text[] IS NULL OR NOT(type = ANY($5)) )" + - " LIMIT $6" + " AND ( $5::text[] IS NULL OR NOT(type = ANY($5)) )" const selectJoinedUsersSQL = "" + "SELECT room_id, state_key FROM syncapi_current_room_state WHERE type = 'm.room.member' AND membership = 'join'" @@ -189,8 +188,7 @@ func (s *currentRoomStateStatements) selectCurrentState( pq.StringArray(filter.Senders), pq.StringArray(filter.NotSenders), pq.StringArray(filter.Types), - pq.StringArray(filter.NotTypes), - filter.Limit) + pq.StringArray(filter.NotTypes)) if err != nil { return nil, err } From 7b80e6d51ba1b71db0986df101233ed6154ed482 Mon Sep 17 00:00:00 2001 From: "Crom (Thibaut CHARLES)" Date: Wed, 10 Jan 2018 15:34:28 +0100 Subject: [PATCH 07/17] Fix timeline.limited detection Signed-off-by: Thibaut CHARLES cromfr@gmail.com --- .../syncapi/storage/output_room_events_table.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go b/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go index bc1add678..1bc5bf3ac 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go @@ -246,7 +246,7 @@ func (s *outputRoomEventsStatements) selectRoomRecentEvents( pq.StringArray(timelineFilter.NotSenders), pq.StringArray(timelineFilter.Types), pq.StringArray(timelineFilter.NotTypes), - timelineFilter.Limit, // TODO: limit abusive values? + timelineFilter.Limit+1, // TODO: limit abusive values? This can also be done in gomatrix.Filter.Validate ) if err != nil { return nil, false, err @@ -257,7 +257,13 @@ func (s *outputRoomEventsStatements) selectRoomRecentEvents( return nil, false, err } - return events, len(events) == timelineFilter.Limit, nil // TODO: len(events) == timelineFilter.Limit not accurate + limited := false + if len(events) > timelineFilter.Limit { + limited = true + events = events[:len(events)-1] + } + + return events, limited, nil } // Events returns the events for the given event IDs. Returns an error if any one of the event IDs given are missing From 350e6cbfed0f6bac32fd4e741ec581119d04445c Mon Sep 17 00:00:00 2001 From: "Crom (Thibaut CHARLES)" Date: Fri, 12 Jan 2018 18:27:24 +0100 Subject: [PATCH 08/17] Filter retrieval from db Signed-off-by: Thibaut CHARLES cromfr@gmail.com --- .../dendrite/syncapi/sync/request.go | 20 +++++++++++++++---- .../dendrite/syncapi/sync/requestpool.go | 2 +- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/request.go b/src/github.com/matrix-org/dendrite/syncapi/sync/request.go index 42494b9b5..e175c4569 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/request.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/request.go @@ -17,13 +17,14 @@ package sync import ( "context" "encoding/json" - "errors" "net/http" "strconv" "time" "github.com/matrix-org/dendrite/clientapi/auth/authtypes" + "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" "github.com/matrix-org/gomatrix" + "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/util" @@ -43,7 +44,7 @@ type syncRequest struct { filter gomatrix.Filter } -func newSyncRequest(req *http.Request, device authtypes.Device) (*syncRequest, error) { +func newSyncRequest(req *http.Request, device authtypes.Device, accountDB *accounts.Database) (*syncRequest, error) { timeout := getTimeout(req.URL.Query().Get("timeout")) fullState := req.URL.Query().Get("full_state") wantFullState := fullState != "" && fullState != "false" @@ -68,8 +69,19 @@ func newSyncRequest(req *http.Request, device authtypes.Device) (*syncRequest, e } } else { // Filter ID - // TODO retrieve filter from DB - return nil, errors.New("Filter ID retrieval not implemented") + filterID, err := strconv.Atoi(filterStr) + if err != nil { + return nil, err + } + localpart, _, err := gomatrixserverlib.SplitID('@', device.UserID) + if err != nil { + return nil, err + } + recvFilter, err := accountDB.GetFilter(req.Context(), localpart, strconv.Itoa(filterID)) //TODO GetFilter should receive filterID as an int + if err != nil { + return nil, err + } + filter = *recvFilter } } diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go b/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go index 3b21fe9ab..2ca1e9cf7 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go @@ -48,7 +48,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype // Extract values from request logger := util.GetLogger(req.Context()) userID := device.UserID - syncReq, err := newSyncRequest(req, *device) + syncReq, err := newSyncRequest(req, *device, rp.accountDB) if err != nil { return util.JSONResponse{ Code: 400, From 65e36484d6140682cbeb2df77d27ca3c9890e9f9 Mon Sep 17 00:00:00 2001 From: "Crom (Thibaut CHARLES)" Date: Fri, 12 Jan 2018 18:42:14 +0100 Subject: [PATCH 09/17] Fix GET filter sending a base64 encoded json Signed-off-by: Thibaut CHARLES cromfr@gmail.com --- .../matrix-org/dendrite/clientapi/routing/filter.go | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/clientapi/routing/filter.go b/src/github.com/matrix-org/dendrite/clientapi/routing/filter.go index d14aaeb96..5ca6bf972 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/routing/filter.go +++ b/src/github.com/matrix-org/dendrite/clientapi/routing/filter.go @@ -17,8 +17,6 @@ package routing import ( "net/http" - "encoding/json" - "github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" "github.com/matrix-org/dendrite/clientapi/httputil" @@ -60,14 +58,9 @@ func GetFilter( } } - filterJSON, err := json.Marshal(filter) - if err != nil { - return httputil.LogThenError(req, err) - } - return util.JSONResponse{ Code: 200, - JSON: filterJSON, + JSON: filter, } } From 4ef80a4e48e7ad2b8bc383271711e68cf30ab340 Mon Sep 17 00:00:00 2001 From: "Crom (Thibaut CHARLES)" Date: Fri, 12 Jan 2018 19:10:43 +0100 Subject: [PATCH 10/17] Handling "m.room.*" type filters Signed-off-by: Thibaut CHARLES cromfr@gmail.com --- .../syncapi/storage/current_room_state_table.go | 12 ++++++------ .../matrix-org/dendrite/syncapi/storage/filtering.go | 10 ++++++++++ .../syncapi/storage/output_room_events_table.go | 12 ++++++------ 3 files changed, 22 insertions(+), 12 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go b/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go index 2bc3fdde8..7278573ca 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go @@ -69,10 +69,10 @@ const selectRoomIDsWithMembershipSQL = "" + const selectCurrentStateSQL = "" + "SELECT event_json FROM syncapi_current_room_state WHERE room_id = $1" + - " AND ( $2::text[] IS NULL OR sender = ANY($2) )" + - " AND ( $3::text[] IS NULL OR NOT(sender = ANY($3)) )" + - " AND ( $4::text[] IS NULL OR type = ANY($4) )" + - " AND ( $5::text[] IS NULL OR NOT(type = ANY($5)) )" + " AND ( $2::text[] IS NULL OR sender = ANY($2) )" + + " AND ( $3::text[] IS NULL OR NOT(sender = ANY($3)) )" + + " AND ( $4::text[] IS NULL OR type LIKE ANY($4) )" + + " AND ( $5::text[] IS NULL OR NOT(type LIKE ANY($5)) )" const selectJoinedUsersSQL = "" + "SELECT room_id, state_key FROM syncapi_current_room_state WHERE type = 'm.room.member' AND membership = 'join'" @@ -187,8 +187,8 @@ func (s *currentRoomStateStatements) selectCurrentState( rows, err := stmt.QueryContext(ctx, roomID, pq.StringArray(filter.Senders), pq.StringArray(filter.NotSenders), - pq.StringArray(filter.Types), - pq.StringArray(filter.NotTypes)) + pq.StringArray(filterConvertWildcardToSQL(filter.Types)), + pq.StringArray(filterConvertWildcardToSQL(filter.NotTypes))) if err != nil { return nil, err } diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/filtering.go b/src/github.com/matrix-org/dendrite/syncapi/storage/filtering.go index 21ab25877..886e48c3d 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/filtering.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/filtering.go @@ -15,6 +15,8 @@ package storage import ( + "strings" + "github.com/matrix-org/gomatrix" ) @@ -46,3 +48,11 @@ func hasValue(value string, list []string) bool { } return false } + +func filterConvertWildcardToSQL(values []string) []string { + ret := make([]string, len(values)) + for i := range values { + ret[i] = strings.Replace(values[i], "*", "%", -1) + } + return ret +} diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go b/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go index 1bc5bf3ac..a45c90331 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go @@ -75,10 +75,10 @@ const selectRoomRecentEventsSQL = "" + "SELECT id, event_json, device_id, transaction_id FROM syncapi_output_room_events" + " WHERE room_id=$1 " + " AND id > $2 AND id <= $3" + - " AND ( $4::text[] IS NULL OR sender = ANY($4) )" + - " AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" + - " AND ( $6::text[] IS NULL OR type = ANY($6) )" + - " AND ( $7::text[] IS NULL OR NOT(type = ANY($7)) )" + + " AND ( $4::text[] IS NULL OR sender = ANY($4) )" + + " AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" + + " AND ( $6::text[] IS NULL OR type LIKE ANY($6) )" + + " AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )" + " ORDER BY id DESC LIMIT $8" const selectMaxEventIDSQL = "" + @@ -244,8 +244,8 @@ func (s *outputRoomEventsStatements) selectRoomRecentEvents( rows, err := stmt.QueryContext(ctx, roomID, fromPos, toPos, pq.StringArray(timelineFilter.Senders), pq.StringArray(timelineFilter.NotSenders), - pq.StringArray(timelineFilter.Types), - pq.StringArray(timelineFilter.NotTypes), + pq.StringArray(filterConvertWildcardToSQL(timelineFilter.Types)), + pq.StringArray(filterConvertWildcardToSQL(timelineFilter.NotTypes)), timelineFilter.Limit+1, // TODO: limit abusive values? This can also be done in gomatrix.Filter.Validate ) if err != nil { From a8daf97a138f8194ce9eb41eb616dc82522eb254 Mon Sep 17 00:00:00 2001 From: "Crom (Thibaut CHARLES)" Date: Sun, 14 Jan 2018 14:16:31 +0100 Subject: [PATCH 11/17] Invite events filtering Signed-off-by: Thibaut CHARLES cromfr@gmail.com --- .../dendrite/syncapi/storage/invites_table.go | 30 +++++++++++++++---- .../dendrite/syncapi/storage/syncserver.go | 8 ++--- 2 files changed, 29 insertions(+), 9 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/invites_table.go b/src/github.com/matrix-org/dendrite/syncapi/storage/invites_table.go index 88c98f7e3..4f80adf25 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/invites_table.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/invites_table.go @@ -4,7 +4,9 @@ import ( "context" "database/sql" + "github.com/lib/pq" "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/gomatrix" "github.com/matrix-org/gomatrixserverlib" ) @@ -13,13 +15,15 @@ CREATE TABLE IF NOT EXISTS syncapi_invite_events ( id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_stream_id'), event_id TEXT NOT NULL, room_id TEXT NOT NULL, + type TEXT NOT NULL, + sender TEXT NOT NULL, target_user_id TEXT NOT NULL, event_json TEXT NOT NULL ); -- For looking up the invites for a given user. CREATE INDEX IF NOT EXISTS syncapi_invites_target_user_id_idx - ON syncapi_invite_events (target_user_id, id); + ON syncapi_invite_events (target_user_id, id, room_id, type, sender); -- For deleting old invites CREATE INDEX IF NOT EXISTS syncapi_invites_event_id_idx @@ -28,8 +32,8 @@ CREATE INDEX IF NOT EXISTS syncapi_invites_event_id_idx const insertInviteEventSQL = "" + "INSERT INTO syncapi_invite_events (" + - " room_id, event_id, target_user_id, event_json" + - ") VALUES ($1, $2, $3, $4) RETURNING id" + " room_id, event_id, type, sender, target_user_id, event_json" + + ") VALUES ($1, $2, $3, $4, $5, $6) RETURNING id" const deleteInviteEventSQL = "" + "DELETE FROM syncapi_invite_events WHERE event_id = $1" @@ -37,6 +41,12 @@ const deleteInviteEventSQL = "" + const selectInviteEventsInRangeSQL = "" + "SELECT room_id, event_json FROM syncapi_invite_events" + " WHERE target_user_id = $1 AND id > $2 AND id <= $3" + + " AND ( $4::text[] IS NULL OR sender = ANY($4) )" + + " AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" + + " AND ( $6::text[] IS NULL OR type LIKE ANY($6) )" + + " AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )" + + " AND ( $8::text[] IS NULL OR room_id = ANY($8) )" + + " AND ( $9::text[] IS NULL OR NOT(room_id = ANY($9)) )" + " ORDER BY id DESC" const selectMaxInviteIDSQL = "" + @@ -76,6 +86,8 @@ func (s *inviteEventsStatements) insertInviteEvent( ctx, inviteEvent.RoomID(), inviteEvent.EventID(), + inviteEvent.Type(), + inviteEvent.Sender(), *inviteEvent.StateKey(), inviteEvent.JSON(), ).Scan(&streamPos) @@ -92,10 +104,18 @@ func (s *inviteEventsStatements) deleteInviteEvent( // selectInviteEventsInRange returns a map of room ID to invite event for the // active invites for the target user ID in the supplied range. func (s *inviteEventsStatements) selectInviteEventsInRange( - ctx context.Context, txn *sql.Tx, targetUserID string, startPos, endPos int64, + ctx context.Context, txn *sql.Tx, targetUserID string, startPos, endPos int64, filter *gomatrix.Filter, ) (map[string]gomatrixserverlib.Event, error) { stmt := common.TxStmt(txn, s.selectInviteEventsInRangeStmt) - rows, err := stmt.QueryContext(ctx, targetUserID, startPos, endPos) + + rows, err := stmt.QueryContext(ctx, targetUserID, startPos, endPos, + pq.StringArray(filterConvertWildcardToSQL(filter.Room.State.Types)), + pq.StringArray(filterConvertWildcardToSQL(filter.Room.State.NotTypes)), + pq.StringArray(filter.Room.State.Senders), + pq.StringArray(filter.Room.State.NotSenders), + pq.StringArray(append(filter.Room.Rooms, filter.Room.State.Rooms...)), + pq.StringArray(append(filter.Room.NotRooms, filter.Room.State.NotRooms...)), + ) if err != nil { return nil, err } diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go index 5622ec9aa..bfec0be30 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go @@ -255,7 +255,7 @@ func (d *SyncServerDatabase) IncrementalSync( } // TODO: This should be done in getStateDeltas - if err = d.addInvitesToResponse(ctx, txn, device.UserID, fromPos, toPos, res); err != nil { + if err = d.addInvitesToResponse(ctx, txn, device.UserID, fromPos, toPos, filter, res); err != nil { return nil, err } @@ -338,8 +338,7 @@ func (d *SyncServerDatabase) CompleteSync( res.Rooms.Join[roomID] = *jr } - //TODO: filter Invite events - if err = d.addInvitesToResponse(ctx, txn, userID, 0, posTo, res); err != nil { + if err = d.addInvitesToResponse(ctx, txn, userID, 0, posTo, filter, res); err != nil { return nil, err } @@ -408,10 +407,11 @@ func (d *SyncServerDatabase) addInvitesToResponse( ctx context.Context, txn *sql.Tx, userID string, fromPos, toPos types.StreamPosition, + filter *gomatrix.Filter, res *types.Response, ) error { invites, err := d.invites.selectInviteEventsInRange( - ctx, txn, userID, int64(fromPos), int64(toPos), + ctx, txn, userID, int64(fromPos), int64(toPos), filter, ) if err != nil { return err From cc56e0b88e7b85114b2318856f4a5609ad75b1de Mon Sep 17 00:00:00 2001 From: "Crom (Thibaut CHARLES)" Date: Sun, 14 Jan 2018 23:28:41 +0100 Subject: [PATCH 12/17] Account data filtering Signed-off-by: Thibaut CHARLES cromfr@gmail.com --- .../matrix-org/dendrite/common/types.go | 1 + .../dendrite/syncapi/consumers/clientapi.go | 2 +- .../syncapi/storage/account_data_table.go | 27 ++++++++++++++----- .../dendrite/syncapi/storage/syncserver.go | 8 +++--- .../dendrite/syncapi/sync/requestpool.go | 2 +- 5 files changed, 27 insertions(+), 13 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/common/types.go b/src/github.com/matrix-org/dendrite/common/types.go index d8c5c5a7e..a17e87b4d 100644 --- a/src/github.com/matrix-org/dendrite/common/types.go +++ b/src/github.com/matrix-org/dendrite/common/types.go @@ -19,6 +19,7 @@ package common type AccountData struct { RoomID string `json:"room_id"` Type string `json:"type"` + Sender string `json:"sender"` } // ProfileResponse is a struct containing all known user profile data diff --git a/src/github.com/matrix-org/dendrite/syncapi/consumers/clientapi.go b/src/github.com/matrix-org/dendrite/syncapi/consumers/clientapi.go index d05a76920..b1c4d700b 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/consumers/clientapi.go +++ b/src/github.com/matrix-org/dendrite/syncapi/consumers/clientapi.go @@ -79,7 +79,7 @@ func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error }).Info("received data from client API server") syncStreamPos, err := s.db.UpsertAccountData( - context.TODO(), string(msg.Key), output.RoomID, output.Type, + context.TODO(), string(msg.Key), output.RoomID, output.Type, output.Sender, ) if err != nil { log.WithFields(log.Fields{ diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/account_data_table.go b/src/github.com/matrix-org/dendrite/syncapi/storage/account_data_table.go index d4d74d158..30ccc26b8 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/account_data_table.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/account_data_table.go @@ -18,7 +18,9 @@ import ( "context" "database/sql" + "github.com/lib/pq" "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/gomatrix" "github.com/matrix-org/dendrite/syncapi/types" ) @@ -38,16 +40,17 @@ CREATE TABLE IF NOT EXISTS syncapi_account_data_type ( room_id TEXT NOT NULL, -- Type of the data type TEXT NOT NULL, + sender TEXT NOT NULL, -- We don't want two entries of the same type for the same user CONSTRAINT syncapi_account_data_unique UNIQUE (user_id, room_id, type) ); -CREATE UNIQUE INDEX IF NOT EXISTS syncapi_account_data_id_idx ON syncapi_account_data_type(id); +CREATE UNIQUE INDEX IF NOT EXISTS syncapi_account_data_id_idx ON syncapi_account_data_type(id, type, sender); ` const insertAccountDataSQL = "" + - "INSERT INTO syncapi_account_data_type (user_id, room_id, type) VALUES ($1, $2, $3)" + + "INSERT INTO syncapi_account_data_type (user_id, room_id, type, sender) VALUES ($1, $2, $3, $4)" + " ON CONFLICT ON CONSTRAINT syncapi_account_data_unique" + " DO UPDATE SET id = EXCLUDED.id" + " RETURNING id" @@ -55,7 +58,11 @@ const insertAccountDataSQL = "" + const selectAccountDataInRangeSQL = "" + "SELECT room_id, type FROM syncapi_account_data_type" + " WHERE user_id = $1 AND id > $2 AND id <= $3" + - " ORDER BY id ASC" + " AND ( $4::text[] IS NULL OR sender = ANY($4) )" + + " AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" + + " AND ( $6::text[] IS NULL OR type LIKE ANY($6) )" + + " AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )" + + " ORDER BY id ASC LIMIT $8" const selectMaxAccountDataIDSQL = "" + "SELECT MAX(id) FROM syncapi_account_data_type" @@ -85,16 +92,16 @@ func (s *accountDataStatements) prepare(db *sql.DB) (err error) { func (s *accountDataStatements) insertAccountData( ctx context.Context, - userID, roomID, dataType string, + userID, roomID, dataType, sender string, ) (pos int64, err error) { - err = s.insertAccountDataStmt.QueryRowContext(ctx, userID, roomID, dataType).Scan(&pos) + err = s.insertAccountDataStmt.QueryRowContext(ctx, userID, roomID, dataType, sender).Scan(&pos) return } func (s *accountDataStatements) selectAccountDataInRange( ctx context.Context, userID string, - oldPos, newPos types.StreamPosition, + oldPos, newPos types.StreamPosition, accountDataFilterPart *gomatrix.FilterPart, ) (data map[string][]string, err error) { data = make(map[string][]string) @@ -105,7 +112,13 @@ func (s *accountDataStatements) selectAccountDataInRange( oldPos-- } - rows, err := s.selectAccountDataInRangeStmt.QueryContext(ctx, userID, oldPos, newPos) + rows, err := s.selectAccountDataInRangeStmt.QueryContext(ctx, userID, oldPos, newPos, + pq.StringArray(filterConvertWildcardToSQL(accountDataFilterPart.Types)), + pq.StringArray(filterConvertWildcardToSQL(accountDataFilterPart.NotTypes)), + pq.StringArray(accountDataFilterPart.Senders), + pq.StringArray(accountDataFilterPart.NotSenders), + accountDataFilterPart.Limit, + ) if err != nil { return } diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go index bfec0be30..9742b0388 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go @@ -364,9 +364,9 @@ var txReadOnlySnapshot = sql.TxOptions{ // If no data is retrieved, returns an empty map // If there was an issue with the retrieval, returns an error func (d *SyncServerDatabase) GetAccountDataInRange( - ctx context.Context, userID string, oldPos, newPos types.StreamPosition, + ctx context.Context, userID string, oldPos, newPos types.StreamPosition, accountDataFilterPart *gomatrix.FilterPart, ) (map[string][]string, error) { - return d.accountData.selectAccountDataInRange(ctx, userID, oldPos, newPos) + return d.accountData.selectAccountDataInRange(ctx, userID, oldPos, newPos, accountDataFilterPart) } // UpsertAccountData keeps track of new or updated account data, by saving the type @@ -376,9 +376,9 @@ func (d *SyncServerDatabase) GetAccountDataInRange( // creates a new row, else update the existing one // Returns an error if there was an issue with the upsert func (d *SyncServerDatabase) UpsertAccountData( - ctx context.Context, userID, roomID, dataType string, + ctx context.Context, userID, roomID, dataType, sender string, ) (types.StreamPosition, error) { - pos, err := d.accountData.insertAccountData(ctx, userID, roomID, dataType) + pos, err := d.accountData.insertAccountData(ctx, userID, roomID, dataType, sender) return types.StreamPosition(pos), err } diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go b/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go index 2ca1e9cf7..4751e38dd 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/requestpool.go @@ -170,7 +170,7 @@ func (rp *RequestPool) appendAccountData( } // Sync is not initial, get all account data since the latest sync - dataTypes, err := rp.db.GetAccountDataInRange(req.ctx, userID, *req.since, currentPos) + dataTypes, err := rp.db.GetAccountDataInRange(req.ctx, userID, *req.since, currentPos, &req.filter.AccountData) if err != nil { return nil, err } From 97a5882d81d3c664665d050de8bd94c806d5a034 Mon Sep 17 00:00:00 2001 From: "Crom (Thibaut CHARLES)" Date: Mon, 15 Jan 2018 00:08:21 +0100 Subject: [PATCH 13/17] Limit room state events Signed-off-by: Thibaut CHARLES cromfr@gmail.com --- .../dendrite/syncapi/storage/current_room_state_table.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go b/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go index 7278573ca..c5554f8cf 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go @@ -72,7 +72,8 @@ const selectCurrentStateSQL = "" + " AND ( $2::text[] IS NULL OR sender = ANY($2) )" + " AND ( $3::text[] IS NULL OR NOT(sender = ANY($3)) )" + " AND ( $4::text[] IS NULL OR type LIKE ANY($4) )" + - " AND ( $5::text[] IS NULL OR NOT(type LIKE ANY($5)) )" + " AND ( $5::text[] IS NULL OR NOT(type LIKE ANY($5)) )" + + " LIMIT $6" const selectJoinedUsersSQL = "" + "SELECT room_id, state_key FROM syncapi_current_room_state WHERE type = 'm.room.member' AND membership = 'join'" @@ -188,7 +189,9 @@ func (s *currentRoomStateStatements) selectCurrentState( pq.StringArray(filter.Senders), pq.StringArray(filter.NotSenders), pq.StringArray(filterConvertWildcardToSQL(filter.Types)), - pq.StringArray(filterConvertWildcardToSQL(filter.NotTypes))) + pq.StringArray(filterConvertWildcardToSQL(filter.NotTypes)), + stateFilter.Limit, + ) if err != nil { return nil, err } From be96370e3034573acc0b49c1bf7056017627abf4 Mon Sep 17 00:00:00 2001 From: "Crom (Thibaut CHARLES)" Date: Thu, 18 Jan 2018 02:03:43 +0100 Subject: [PATCH 14/17] Filter contains_url implementation Signed-off-by: Thibaut CHARLES cromfr@gmail.com --- .../storage/current_room_state_table.go | 20 ++++++++++++++----- .../dendrite/syncapi/storage/invites_table.go | 15 +++++++++++--- .../storage/output_room_events_table.go | 19 ++++++++++++++---- .../github.com/matrix-org/gomatrix/filter.go | 15 +++++++------- 4 files changed, 50 insertions(+), 19 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go b/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go index c5554f8cf..0990f429b 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go @@ -17,6 +17,7 @@ package storage import ( "context" "database/sql" + "encoding/json" "github.com/lib/pq" "github.com/matrix-org/dendrite/common" @@ -35,6 +36,8 @@ CREATE TABLE IF NOT EXISTS syncapi_current_room_state ( type TEXT NOT NULL, -- The 'sender' property for the event. sender TEXT NOT NULL, + -- true if the event content contains a url key + contains_url BOOL NOT NULL, -- The state_key value for this state event e.g '' state_key TEXT NOT NULL, -- The JSON for the event. Stored as TEXT because this should be valid UTF-8. @@ -49,17 +52,17 @@ CREATE TABLE IF NOT EXISTS syncapi_current_room_state ( CONSTRAINT syncapi_room_state_unique UNIQUE (room_id, type, state_key) ); -- for event deletion -CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_id_idx ON syncapi_current_room_state(event_id, room_id, type, sender); +CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_id_idx ON syncapi_current_room_state(event_id, room_id, type, sender, contains_url); -- for querying membership states of users CREATE INDEX IF NOT EXISTS syncapi_membership_idx ON syncapi_current_room_state(type, state_key, membership) WHERE membership IS NOT NULL AND membership != 'leave'; ` const upsertRoomStateSQL = "" + - "INSERT INTO syncapi_current_room_state (room_id, event_id, type, sender, state_key, event_json, membership, added_at)" + - " VALUES ($1, $2, $3, $4, $5, $6, $7, 8)" + + "INSERT INTO syncapi_current_room_state (room_id, event_id, type, sender, contains_url, state_key, event_json, membership, added_at)" + + " VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)" + " ON CONFLICT ON CONSTRAINT syncapi_room_state_unique" + - " DO UPDATE SET event_id = $2, sender=$4, event_json = $6, membership = $7, added_at = $8" + " DO UPDATE SET event_id = $2, sender=$4, contains_url=$5, event_json = $7, membership = $8, added_at = $9" const deleteRoomStateByEventIDSQL = "" + "DELETE FROM syncapi_current_room_state WHERE event_id = $1" @@ -73,7 +76,8 @@ const selectCurrentStateSQL = "" + " AND ( $3::text[] IS NULL OR NOT(sender = ANY($3)) )" + " AND ( $4::text[] IS NULL OR type LIKE ANY($4) )" + " AND ( $5::text[] IS NULL OR NOT(type LIKE ANY($5)) )" + - " LIMIT $6" + " AND ( $6::bool IS NULL OR contains_url = $6 )" + + " LIMIT $7" const selectJoinedUsersSQL = "" + "SELECT room_id, state_key FROM syncapi_current_room_state WHERE type = 'm.room.member' AND membership = 'join'" @@ -190,6 +194,7 @@ func (s *currentRoomStateStatements) selectCurrentState( pq.StringArray(filter.NotSenders), pq.StringArray(filterConvertWildcardToSQL(filter.Types)), pq.StringArray(filterConvertWildcardToSQL(filter.NotTypes)), + filter.ContainsURL, stateFilter.Limit, ) if err != nil { @@ -212,6 +217,10 @@ func (s *currentRoomStateStatements) upsertRoomState( ctx context.Context, txn *sql.Tx, event gomatrixserverlib.Event, membership *string, addedAt int64, ) error { + var content map[string]interface{} + json.Unmarshal(event.Content(), content) + _, containsURL := content["url"] + stmt := common.TxStmt(txn, s.upsertRoomStateStmt) _, err := stmt.ExecContext( ctx, @@ -219,6 +228,7 @@ func (s *currentRoomStateStatements) upsertRoomState( event.EventID(), event.Type(), event.Sender(), + containsURL, *event.StateKey(), event.JSON(), membership, diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/invites_table.go b/src/github.com/matrix-org/dendrite/syncapi/storage/invites_table.go index 4f80adf25..37a361837 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/invites_table.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/invites_table.go @@ -3,6 +3,7 @@ package storage import ( "context" "database/sql" + "encoding/json" "github.com/lib/pq" "github.com/matrix-org/dendrite/common" @@ -17,13 +18,14 @@ CREATE TABLE IF NOT EXISTS syncapi_invite_events ( room_id TEXT NOT NULL, type TEXT NOT NULL, sender TEXT NOT NULL, + contains_url BOOL NOT NULL, target_user_id TEXT NOT NULL, event_json TEXT NOT NULL ); -- For looking up the invites for a given user. CREATE INDEX IF NOT EXISTS syncapi_invites_target_user_id_idx - ON syncapi_invite_events (target_user_id, id, room_id, type, sender); + ON syncapi_invite_events (target_user_id, id, room_id, type, sender, contains_url); -- For deleting old invites CREATE INDEX IF NOT EXISTS syncapi_invites_event_id_idx @@ -32,8 +34,8 @@ CREATE INDEX IF NOT EXISTS syncapi_invites_event_id_idx const insertInviteEventSQL = "" + "INSERT INTO syncapi_invite_events (" + - " room_id, event_id, type, sender, target_user_id, event_json" + - ") VALUES ($1, $2, $3, $4, $5, $6) RETURNING id" + " room_id, event_id, type, sender, contains_url, target_user_id, event_json" + + ") VALUES ($1, $2, $3, $4, $5, $6, $7) RETURNING id" const deleteInviteEventSQL = "" + "DELETE FROM syncapi_invite_events WHERE event_id = $1" @@ -47,6 +49,7 @@ const selectInviteEventsInRangeSQL = "" + " AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )" + " AND ( $8::text[] IS NULL OR room_id = ANY($8) )" + " AND ( $9::text[] IS NULL OR NOT(room_id = ANY($9)) )" + + " AND ( $10::bool IS NULL OR contains_url = $10 )" + " ORDER BY id DESC" const selectMaxInviteIDSQL = "" + @@ -82,12 +85,17 @@ func (s *inviteEventsStatements) prepare(db *sql.DB) (err error) { func (s *inviteEventsStatements) insertInviteEvent( ctx context.Context, inviteEvent gomatrixserverlib.Event, ) (streamPos int64, err error) { + var content map[string]interface{} + json.Unmarshal(inviteEvent.Content(), content) + _, containsURL := content["url"] + err = s.insertInviteEventStmt.QueryRowContext( ctx, inviteEvent.RoomID(), inviteEvent.EventID(), inviteEvent.Type(), inviteEvent.Sender(), + containsURL, *inviteEvent.StateKey(), inviteEvent.JSON(), ).Scan(&streamPos) @@ -115,6 +123,7 @@ func (s *inviteEventsStatements) selectInviteEventsInRange( pq.StringArray(filter.Room.State.NotSenders), pq.StringArray(append(filter.Room.Rooms, filter.Room.State.Rooms...)), pq.StringArray(append(filter.Room.NotRooms, filter.Room.State.NotRooms...)), + filter.Room.State.ContainsURL, ) if err != nil { return nil, err diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go b/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go index a45c90331..0d4525b20 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go @@ -17,6 +17,7 @@ package storage import ( "context" "database/sql" + "encoding/json" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrix" @@ -46,6 +47,8 @@ CREATE TABLE IF NOT EXISTS syncapi_output_room_events ( type TEXT NOT NULL, -- The 'sender' property for the event. sender TEXT NOT NULL, + -- true if the event content contains a url key + contains_url BOOL NOT NULL, -- The JSON for the event. Stored as TEXT because this should be valid UTF-8. event_json TEXT NOT NULL, -- A list of event IDs which represent a delta of added/removed room state. This can be NULL @@ -60,13 +63,14 @@ CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_id_idx ON syncapi_output_room_ev event_id, room_id, type, - sender); + sender, + contains_url); ` const insertEventSQL = "" + "INSERT INTO syncapi_output_room_events (" + - " room_id, event_id, type, sender, event_json, add_state_ids, remove_state_ids, device_id, transaction_id" + - ") VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) RETURNING id" + " room_id, event_id, type, sender, contains_url, event_json, add_state_ids, remove_state_ids, device_id, transaction_id" + + ") VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) RETURNING id" const selectEventsSQL = "" + "SELECT id, event_json FROM syncapi_output_room_events WHERE event_id = ANY($1)" @@ -79,7 +83,8 @@ const selectRoomRecentEventsSQL = "" + " AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" + " AND ( $6::text[] IS NULL OR type LIKE ANY($6) )" + " AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )" + - " ORDER BY id DESC LIMIT $8" + " AND ( $8::bool IS NULL OR contains_url = $8 )" + + " ORDER BY id DESC LIMIT $9" const selectMaxEventIDSQL = "" + "SELECT MAX(id) FROM syncapi_output_room_events" @@ -219,6 +224,10 @@ func (s *outputRoomEventsStatements) insertEvent( txnID = &transactionID.TransactionID } + var content map[string]interface{} + json.Unmarshal(event.Content(), content) + _, containsURL := content["url"] + stmt := common.TxStmt(txn, s.insertEventStmt) err = stmt.QueryRowContext( ctx, @@ -226,6 +235,7 @@ func (s *outputRoomEventsStatements) insertEvent( event.EventID(), event.Type(), event.Sender(), + containsURL, event.JSON(), pq.StringArray(addState), pq.StringArray(removeState), @@ -246,6 +256,7 @@ func (s *outputRoomEventsStatements) selectRoomRecentEvents( pq.StringArray(timelineFilter.NotSenders), pq.StringArray(filterConvertWildcardToSQL(timelineFilter.Types)), pq.StringArray(filterConvertWildcardToSQL(timelineFilter.NotTypes)), + timelineFilter.ContainsURL, timelineFilter.Limit+1, // TODO: limit abusive values? This can also be done in gomatrix.Filter.Validate ) if err != nil { diff --git a/vendor/src/github.com/matrix-org/gomatrix/filter.go b/vendor/src/github.com/matrix-org/gomatrix/filter.go index 52294bf1c..e9c0c18a8 100644 --- a/vendor/src/github.com/matrix-org/gomatrix/filter.go +++ b/vendor/src/github.com/matrix-org/gomatrix/filter.go @@ -37,13 +37,14 @@ type FilterRoom struct { } type FilterPart struct { - NotRooms []string `json:"not_rooms,omitempty"` - Rooms []string `json:"rooms,omitempty"` - Limit int `json:"limit,omitempty"` - NotSenders []string `json:"not_senders,omitempty"` - NotTypes []string `json:"not_types,omitempty"` - Senders []string `json:"senders,omitempty"` - Types []string `json:"types,omitempty"` + NotRooms []string `json:"not_rooms,omitempty"` + Rooms []string `json:"rooms,omitempty"` + Limit int `json:"limit,omitempty"` + NotSenders []string `json:"not_senders,omitempty"` + NotTypes []string `json:"not_types,omitempty"` + Senders []string `json:"senders,omitempty"` + Types []string `json:"types,omitempty"` + ContainsURL *bool `json:"contains_url,omitempty"` } func (filter *Filter) Validate() error { From 38767efff0d0ec5446302246eb2c85c1fbe1da4f Mon Sep 17 00:00:00 2001 From: "Crom (Thibaut CHARLES)" Date: Thu, 18 Jan 2018 02:14:53 +0100 Subject: [PATCH 15/17] Fix lint legitimate warnings :) Signed-off-by: Thibaut CHARLES cromfr@gmail.com --- .../dendrite/syncapi/storage/current_room_state_table.go | 6 ++++-- .../matrix-org/dendrite/syncapi/storage/invites_table.go | 6 ++++-- .../dendrite/syncapi/storage/output_room_events_table.go | 6 ++++-- 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go b/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go index 0990f429b..4dd440a66 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go @@ -217,9 +217,11 @@ func (s *currentRoomStateStatements) upsertRoomState( ctx context.Context, txn *sql.Tx, event gomatrixserverlib.Event, membership *string, addedAt int64, ) error { + var containsURL bool var content map[string]interface{} - json.Unmarshal(event.Content(), content) - _, containsURL := content["url"] + if json.Unmarshal(event.Content(), &content) != nil { + _, containsURL = content["url"] + } stmt := common.TxStmt(txn, s.upsertRoomStateStmt) _, err := stmt.ExecContext( diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/invites_table.go b/src/github.com/matrix-org/dendrite/syncapi/storage/invites_table.go index 37a361837..1a214b791 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/invites_table.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/invites_table.go @@ -85,9 +85,11 @@ func (s *inviteEventsStatements) prepare(db *sql.DB) (err error) { func (s *inviteEventsStatements) insertInviteEvent( ctx context.Context, inviteEvent gomatrixserverlib.Event, ) (streamPos int64, err error) { + var containsURL bool var content map[string]interface{} - json.Unmarshal(inviteEvent.Content(), content) - _, containsURL := content["url"] + if json.Unmarshal(inviteEvent.Content(), &content) != nil { + _, containsURL = content["url"] + } err = s.insertInviteEventStmt.QueryRowContext( ctx, diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go b/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go index 0d4525b20..a103d1d39 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go @@ -224,9 +224,11 @@ func (s *outputRoomEventsStatements) insertEvent( txnID = &transactionID.TransactionID } + var containsURL bool var content map[string]interface{} - json.Unmarshal(event.Content(), content) - _, containsURL := content["url"] + if json.Unmarshal(event.Content(), &content) != nil { + _, containsURL = content["url"] + } stmt := common.TxStmt(txn, s.insertEventStmt) err = stmt.QueryRowContext( From c3e87f19becf39ad73deee85735a56d8c55449d6 Mon Sep 17 00:00:00 2001 From: "Crom (Thibaut CHARLES)" Date: Thu, 18 Jan 2018 02:18:11 +0100 Subject: [PATCH 16/17] Reverted rename selectRecentEvents => selectRoomRecentEvents Signed-off-by: Thibaut CHARLES cromfr@gmail.com --- .../storage/output_room_events_table.go | 18 +++++++++--------- .../dendrite/syncapi/storage/syncserver.go | 4 ++-- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go b/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go index a103d1d39..3158c24ce 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go @@ -75,7 +75,7 @@ const insertEventSQL = "" + const selectEventsSQL = "" + "SELECT id, event_json FROM syncapi_output_room_events WHERE event_id = ANY($1)" -const selectRoomRecentEventsSQL = "" + +const selectRecentEventsSQL = "" + "SELECT id, event_json, device_id, transaction_id FROM syncapi_output_room_events" + " WHERE room_id=$1 " + " AND id > $2 AND id <= $3" + @@ -97,11 +97,11 @@ const selectStateInRangeSQL = "" + " ORDER BY id ASC" type outputRoomEventsStatements struct { - insertEventStmt *sql.Stmt - selectEventsStmt *sql.Stmt - selectMaxEventIDStmt *sql.Stmt - selectRoomRecentEventsStmt *sql.Stmt - selectStateInRangeStmt *sql.Stmt + insertEventStmt *sql.Stmt + selectEventsStmt *sql.Stmt + selectMaxEventIDStmt *sql.Stmt + selectRecentEventsStmt *sql.Stmt + selectStateInRangeStmt *sql.Stmt } func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) { @@ -118,7 +118,7 @@ func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) { if s.selectMaxEventIDStmt, err = db.Prepare(selectMaxEventIDSQL); err != nil { return } - if s.selectRoomRecentEventsStmt, err = db.Prepare(selectRoomRecentEventsSQL); err != nil { + if s.selectRecentEventsStmt, err = db.Prepare(selectRecentEventsSQL); err != nil { return } if s.selectStateInRangeStmt, err = db.Prepare(selectStateInRangeSQL); err != nil { @@ -247,12 +247,12 @@ func (s *outputRoomEventsStatements) insertEvent( return } -func (s *outputRoomEventsStatements) selectRoomRecentEvents( +func (s *outputRoomEventsStatements) selectRecentEvents( ctx context.Context, txn *sql.Tx, roomID string, fromPos, toPos types.StreamPosition, timelineFilter *gomatrix.FilterPart, ) ([]streamEvent, bool, error) { - stmt := common.TxStmt(txn, s.selectRoomRecentEventsStmt) + stmt := common.TxStmt(txn, s.selectRecentEventsStmt) rows, err := stmt.QueryContext(ctx, roomID, fromPos, toPos, pq.StringArray(timelineFilter.Senders), pq.StringArray(timelineFilter.NotSenders), diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go index 9742b0388..53e199077 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go @@ -304,7 +304,7 @@ func (d *SyncServerDatabase) CompleteSync( if !isRoomFiltered(roomID, nil, &filter.Room.Timeline) { var recentStreamEvents []streamEvent var limited bool - recentStreamEvents, limited, err = d.events.selectRoomRecentEvents( + recentStreamEvents, limited, err = d.events.selectRecentEvents( ctx, txn, roomID, posFrom, posTo, &filter.Room.Timeline) if err != nil { return nil, err @@ -447,7 +447,7 @@ func (d *SyncServerDatabase) addRoomDeltaToResponse( // This is all "okay" assuming history_visibility == "shared" which it is by default. endPos = delta.membershipPos } - recentStreamEvents, limited, err := d.events.selectRoomRecentEvents( + recentStreamEvents, limited, err := d.events.selectRecentEvents( ctx, txn, delta.roomID, fromPos, endPos, &filter.Room.Timeline, ) if err != nil { From abee5d1305afa23a892b67ef017be87a060a813f Mon Sep 17 00:00:00 2001 From: "Crom (Thibaut CHARLES)" Date: Mon, 29 Jan 2018 17:08:12 +0100 Subject: [PATCH 17/17] Removed checked TODO Signed-off-by: Thibaut CHARLES cromfr@gmail.com --- vendor/src/github.com/matrix-org/gomatrix/filter.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vendor/src/github.com/matrix-org/gomatrix/filter.go b/vendor/src/github.com/matrix-org/gomatrix/filter.go index e9c0c18a8..57ad2ba4f 100644 --- a/vendor/src/github.com/matrix-org/gomatrix/filter.go +++ b/vendor/src/github.com/matrix-org/gomatrix/filter.go @@ -63,7 +63,7 @@ func DefaultFilter() Filter { Room: FilterRoom{ AccountData: DefaultFilterPart(), Ephemeral: DefaultFilterPart(), - IncludeLeave: false, //TODO check default value on synapse + IncludeLeave: false, NotRooms: nil, Rooms: nil, State: DefaultFilterPart(),