diff --git a/src/github.com/matrix-org/dendrite/syncapi/routing/state.go b/src/github.com/matrix-org/dendrite/syncapi/routing/state.go index 6c825fce8..fc219b555 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/routing/state.go +++ b/src/github.com/matrix-org/dendrite/syncapi/routing/state.go @@ -44,7 +44,7 @@ func OnIncomingStateRequest(req *http.Request, db *storage.SyncServerDatabase, r // TODO(#287): Auth request and handle the case where the user has left (where // we should return the state at the poin they left) - stateEvents, err := db.GetStateEventsForRoom(req.Context(), roomID) + stateEvents, err := db.GetStateEventsForRoom(req.Context(), roomID, nil) if err != nil { return httputil.LogThenError(req, err) } diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go b/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go index 852bfd760..dfe4314d9 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/current_room_state_table.go @@ -20,6 +20,7 @@ import ( "github.com/lib/pq" "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/gomatrix" "github.com/matrix-org/gomatrixserverlib" ) @@ -32,6 +33,8 @@ CREATE TABLE IF NOT EXISTS syncapi_current_room_state ( event_id TEXT NOT NULL, -- The state event type e.g 'm.room.member' type TEXT NOT NULL, + -- The 'sender' property for the event. + sender TEXT NOT NULL, -- The state_key value for this state event e.g '' state_key TEXT NOT NULL, -- The JSON for the event. Stored as TEXT because this should be valid UTF-8. @@ -46,16 +49,17 @@ CREATE TABLE IF NOT EXISTS syncapi_current_room_state ( CONSTRAINT syncapi_room_state_unique UNIQUE (room_id, type, state_key) ); -- for event deletion -CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_id_idx ON syncapi_current_room_state(event_id); +CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_id_idx ON syncapi_current_room_state(event_id, room_id, type, sender); -- for querying membership states of users CREATE INDEX IF NOT EXISTS syncapi_membership_idx ON syncapi_current_room_state(type, state_key, membership) WHERE membership IS NOT NULL AND membership != 'leave'; + ` const upsertRoomStateSQL = "" + - "INSERT INTO syncapi_current_room_state (room_id, event_id, type, state_key, event_json, membership, added_at)" + - " VALUES ($1, $2, $3, $4, $5, $6, $7)" + + "INSERT INTO syncapi_current_room_state (room_id, event_id, type, sender, state_key, event_json, membership, added_at)" + + " VALUES ($1, $2, $3, $4, $5, $6, $7, 8)" + " ON CONFLICT ON CONSTRAINT syncapi_room_state_unique" + - " DO UPDATE SET event_id = $2, event_json = $5, membership = $6, added_at = $7" + " DO UPDATE SET event_id = $2, sender=$4, event_json = $6, membership = $7, added_at = $8" const deleteRoomStateByEventIDSQL = "" + "DELETE FROM syncapi_current_room_state WHERE event_id = $1" @@ -64,7 +68,12 @@ const selectRoomIDsWithMembershipSQL = "" + "SELECT room_id FROM syncapi_current_room_state WHERE type = 'm.room.member' AND state_key = $1 AND membership = $2" const selectCurrentStateSQL = "" + - "SELECT event_json FROM syncapi_current_room_state WHERE room_id = $1" + "SELECT event_json FROM syncapi_current_room_state WHERE room_id = $1" + + " AND ( $2::text[] IS NULL OR sender = ANY($2) )" + + " AND ( $3::text[] IS NULL OR NOT(sender = ANY($3)) )" + + " AND ( $4::text[] IS NULL OR type = ANY($4) )" + + " AND ( $5::text[] IS NULL OR NOT(type = ANY($5)) )" + + " LIMIT $6" const selectJoinedUsersSQL = "" + "SELECT room_id, state_key FROM syncapi_current_room_state WHERE type = 'm.room.member' AND membership = 'join'" @@ -165,10 +174,23 @@ func (s *currentRoomStateStatements) selectRoomIDsWithMembership( // CurrentState returns all the current state events for the given room. func (s *currentRoomStateStatements) selectCurrentState( - ctx context.Context, txn *sql.Tx, roomID string, + ctx context.Context, txn *sql.Tx, roomID string, stateFilter *gomatrix.FilterPart, ) ([]gomatrixserverlib.Event, error) { + + var filter gomatrix.FilterPart + if stateFilter == nil { + filter = gomatrix.DefaultFilterPart() + } else { + filter = *stateFilter + } + stmt := common.TxStmt(txn, s.selectCurrentStateStmt) - rows, err := stmt.QueryContext(ctx, roomID) + rows, err := stmt.QueryContext(ctx, roomID, + pq.StringArray(filter.Senders), + pq.StringArray(filter.NotSenders), + pq.StringArray(filter.Types), + pq.StringArray(filter.NotTypes), + filter.Limit) if err != nil { return nil, err } @@ -195,6 +217,7 @@ func (s *currentRoomStateStatements) upsertRoomState( event.RoomID(), event.EventID(), event.Type(), + event.Sender(), *event.StateKey(), event.JSON(), membership, diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go index 36b10e156..5622ec9aa 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go @@ -178,10 +178,11 @@ func (d *SyncServerDatabase) GetStateEvent( // Returns an empty slice if no state events could be found for this room. // Returns an error if there was an issue with the retrieval. func (d *SyncServerDatabase) GetStateEventsForRoom( - ctx context.Context, roomID string, + ctx context.Context, roomID string, stateFilter *gomatrix.FilterPart, ) (stateEvents []gomatrixserverlib.Event, err error) { + err = common.WithTransaction(d.db, func(txn *sql.Tx) error { - stateEvents, err = d.roomstate.selectCurrentState(ctx, txn, roomID) + stateEvents, err = d.roomstate.selectCurrentState(ctx, txn, roomID, stateFilter) return err }) return @@ -238,7 +239,7 @@ func (d *SyncServerDatabase) IncrementalSync( // joined rooms, but also which rooms have membership transitions for this user between the 2 stream positions. // This works out what the 'state' key should be for each room as well as which membership block // to put the room into. - deltas, err := d.getStateDeltas(ctx, &device, txn, fromPos, toPos, device.UserID) + deltas, err := d.getStateDeltas(ctx, &device, txn, fromPos, toPos, device.UserID, &filter.Room.State) if err != nil { return nil, err } @@ -297,31 +298,37 @@ func (d *SyncServerDatabase) CompleteSync( jr := types.NewJoinResponse() //Join response should contain events only if room isn't filtered - if !isRoomFiltered(roomID, filter, &filter.Room.Timeline) { + if !isRoomFiltered(roomID, filter, nil) { // Timeline events - var recentStreamEvents []streamEvent - var limited bool - recentStreamEvents, limited, err = d.events.selectRoomRecentEvents( - ctx, txn, roomID, posFrom, posTo, &filter.Room.Timeline) - if err != nil { - return nil, err - } - jr.Timeline.Limited = limited + var recentEvents []gomatrixserverlib.Event + if !isRoomFiltered(roomID, nil, &filter.Room.Timeline) { + var recentStreamEvents []streamEvent + var limited bool + recentStreamEvents, limited, err = d.events.selectRoomRecentEvents( + ctx, txn, roomID, posFrom, posTo, &filter.Room.Timeline) + if err != nil { + return nil, err + } + recentEvents = streamEventsToEvents(nil, recentStreamEvents) - recentEvents := streamEventsToEvents(nil, recentStreamEvents) - jr.Timeline.Events = gomatrixserverlib.ToClientEvents( - recentEvents, - gomatrixserverlib.FormatSync) + jr.Timeline.Limited = limited + jr.Timeline.Events = gomatrixserverlib.ToClientEvents( + recentEvents, + gomatrixserverlib.FormatSync) + } // State events - var stateEvents []gomatrixserverlib.Event - stateEvents, err = d.roomstate.selectCurrentState(ctx, txn, roomID) - if err != nil { - return nil, err + if !isRoomFiltered(roomID, nil, &filter.Room.State) { + var stateEvents []gomatrixserverlib.Event + stateEvents, err = d.roomstate.selectCurrentState(ctx, txn, roomID, &filter.Room.State) + if err != nil { + return nil, err + } + if recentEvents != nil { + stateEvents = removeDuplicates(stateEvents, recentEvents) + } + jr.State.Events = gomatrixserverlib.ToClientEvents(stateEvents, gomatrixserverlib.FormatSync) } - stateEvents = removeDuplicates(stateEvents, recentEvents) - jr.State.Events = gomatrixserverlib.ToClientEvents(stateEvents, gomatrixserverlib.FormatSync) - //TODO AccountData events //TODO Ephemeral events } @@ -564,7 +571,7 @@ func (d *SyncServerDatabase) fetchMissingStateEvents( func (d *SyncServerDatabase) getStateDeltas( ctx context.Context, device *authtypes.Device, txn *sql.Tx, - fromPos, toPos types.StreamPosition, userID string, + fromPos, toPos types.StreamPosition, userID string, stateFilter *gomatrix.FilterPart, ) ([]stateDelta, error) { // Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821 // - Get membership list changes for this user in this sync response @@ -597,7 +604,7 @@ func (d *SyncServerDatabase) getStateDeltas( if membership == "join" { // send full room state down instead of a delta var allState []gomatrixserverlib.Event - allState, err = d.roomstate.selectCurrentState(ctx, txn, roomID) + allState, err = d.roomstate.selectCurrentState(ctx, txn, roomID, stateFilter) if err != nil { return nil, err } diff --git a/vendor/src/github.com/matrix-org/gomatrix/filter.go b/vendor/src/github.com/matrix-org/gomatrix/filter.go index 820f44f0c..52294bf1c 100644 --- a/vendor/src/github.com/matrix-org/gomatrix/filter.go +++ b/vendor/src/github.com/matrix-org/gomatrix/filter.go @@ -55,22 +55,22 @@ func (filter *Filter) Validate() error { func DefaultFilter() Filter { return Filter{ - AccountData: defaultFilterPart(), + AccountData: DefaultFilterPart(), EventFields: nil, EventFormat: "client", - Presence: defaultFilterPart(), + Presence: DefaultFilterPart(), Room: FilterRoom{ - AccountData: defaultFilterPart(), - Ephemeral: defaultFilterPart(), + AccountData: DefaultFilterPart(), + Ephemeral: DefaultFilterPart(), IncludeLeave: false, //TODO check default value on synapse NotRooms: nil, Rooms: nil, - State: defaultFilterPart(), - Timeline: defaultFilterPart(), + State: DefaultFilterPart(), + Timeline: DefaultFilterPart(), }, } } -func defaultFilterPart() FilterPart { +func DefaultFilterPart() FilterPart { return FilterPart{ NotRooms: nil, Rooms: nil,