diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index 857570009..22d801617 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -49,9 +49,6 @@ type Database interface { PeeksInRange(ctx context.Context, userID, deviceID string, r types.Range) (peeks []types.Peek, err error) RoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []eduAPI.OutputReceiptEvent, error) - // MostRecentMembership returns the most recent membership event for the user, along with the global stream position. - MostRecentMembership(ctx context.Context, roomID, userID string) (*gomatrixserverlib.HeaderedEvent, types.StreamPosition, error) - // AllJoinedUsersInRooms returns a map of room ID to a list of all joined user IDs. AllJoinedUsersInRooms(ctx context.Context) (map[string][]string, error) // AllPeekingDevicesInRooms returns a map of room ID to a list of all peeking devices. diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go index 1b3125f8d..28668de0e 100644 --- a/syncapi/storage/postgres/output_room_events_table.go +++ b/syncapi/storage/postgres/output_room_events_table.go @@ -130,10 +130,6 @@ const selectStateInRangeSQL = "" + const deleteEventsForRoomSQL = "" + "DELETE FROM syncapi_output_room_events WHERE room_id = $1" -const selectPositionInStreamSQL = "" + - "SELECT id FROM syncapi_output_room_events" + - " WHERE event_id = $1" - type outputRoomEventsStatements struct { insertEventStmt *sql.Stmt selectEventsStmt *sql.Stmt @@ -144,7 +140,6 @@ type outputRoomEventsStatements struct { selectStateInRangeStmt *sql.Stmt updateEventJSONStmt *sql.Stmt deleteEventsForRoomStmt *sql.Stmt - selectPositionInStreamStmt *sql.Stmt } func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) { @@ -180,9 +175,6 @@ func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) { if s.deleteEventsForRoomStmt, err = db.Prepare(deleteEventsForRoomSQL); err != nil { return nil, err } - if s.selectPositionInStreamStmt, err = db.Prepare(selectPositionInStreamSQL); err != nil { - return nil, err - } return s, nil } @@ -443,15 +435,6 @@ func (s *outputRoomEventsStatements) DeleteEventsForRoom( return err } -// SelectPositionInStream returns the position of a given event in the -// global stream topology. -func (s *outputRoomEventsStatements) SelectPositionInStream( - ctx context.Context, txn *sql.Tx, eventID string, -) (pos types.StreamPosition, err error) { - err = s.selectPositionInStreamStmt.QueryRowContext(ctx, eventID).Scan(&pos) - return -} - func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) { var result []types.StreamEvent for rows.Next() { diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index e876ea8c9..9df076935 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -500,20 +500,6 @@ func (d *Database) EventPositionInTopology( return types.TopologyToken{Depth: depth, PDUPosition: stream}, nil } -func (d *Database) MostRecentMembership( - ctx context.Context, roomID, userID string, -) (*gomatrixserverlib.HeaderedEvent, types.StreamPosition, error) { - event, err := d.CurrentRoomState.SelectStateEvent(ctx, roomID, gomatrixserverlib.MRoomMember, userID) - if err != nil { - return nil, 0, fmt.Errorf("d.CurrentRoomState.SelectStateEvent: %w", err) - } - pos, err := d.OutputEvents.SelectPositionInStream(ctx, nil, event.EventID()) - if err != nil { - return nil, 0, fmt.Errorf("d.OutputEvents.SelectPositionInStream: %w", err) - } - return event, pos, nil -} - func (d *Database) GetFilter( ctx context.Context, localpart string, filterID string, ) (*gomatrixserverlib.Filter, error) { diff --git a/syncapi/storage/sqlite3/output_room_events_table.go b/syncapi/storage/sqlite3/output_room_events_table.go index 51d5fb68c..019aba8b3 100644 --- a/syncapi/storage/sqlite3/output_room_events_table.go +++ b/syncapi/storage/sqlite3/output_room_events_table.go @@ -90,19 +90,14 @@ const selectStateInRangeSQL = "" + const deleteEventsForRoomSQL = "" + "DELETE FROM syncapi_output_room_events WHERE room_id = $1" -const selectPositionInStreamSQL = "" + - "SELECT id FROM syncapi_output_room_events" + - " WHERE event_id = $1" - type outputRoomEventsStatements struct { - db *sql.DB - streamIDStatements *streamIDStatements - insertEventStmt *sql.Stmt - selectEventsStmt *sql.Stmt - selectMaxEventIDStmt *sql.Stmt - updateEventJSONStmt *sql.Stmt - deleteEventsForRoomStmt *sql.Stmt - selectPositionInStreamStmt *sql.Stmt + db *sql.DB + streamIDStatements *streamIDStatements + insertEventStmt *sql.Stmt + selectEventsStmt *sql.Stmt + selectMaxEventIDStmt *sql.Stmt + updateEventJSONStmt *sql.Stmt + deleteEventsForRoomStmt *sql.Stmt } func NewSqliteEventsTable(db *sql.DB, streamID *streamIDStatements) (tables.Events, error) { @@ -129,9 +124,6 @@ func NewSqliteEventsTable(db *sql.DB, streamID *streamIDStatements) (tables.Even if s.deleteEventsForRoomStmt, err = db.Prepare(deleteEventsForRoomSQL); err != nil { return nil, err } - if s.selectPositionInStreamStmt, err = db.Prepare(selectPositionInStreamSQL); err != nil { - return nil, err - } return s, nil } @@ -432,15 +424,6 @@ func (s *outputRoomEventsStatements) DeleteEventsForRoom( return err } -// SelectPositionInStream returns the position of a given event in the -// global stream topology. -func (s *outputRoomEventsStatements) SelectPositionInStream( - ctx context.Context, txn *sql.Tx, eventID string, -) (pos types.StreamPosition, err error) { - err = s.selectPositionInStreamStmt.QueryRowContext(ctx, eventID).Scan(&pos) - return -} - func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) { var result []types.StreamEvent for rows.Next() { diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go index 7d935a718..739676770 100644 --- a/syncapi/storage/tables/interface.go +++ b/syncapi/storage/tables/interface.go @@ -63,7 +63,6 @@ type Events interface { UpdateEventJSON(ctx context.Context, event *gomatrixserverlib.HeaderedEvent) error // DeleteEventsForRoom removes all event information for a room. This should only be done when removing the room entirely. DeleteEventsForRoom(ctx context.Context, txn *sql.Tx, roomID string) (err error) - SelectPositionInStream(ctx context.Context, txn *sql.Tx, eventID string) (pos types.StreamPosition, err error) } // Topology keeps track of the depths and stream positions for all events. diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go index 713692979..39c31be1c 100644 --- a/syncapi/streams/stream_pdu.go +++ b/syncapi/streams/stream_pdu.go @@ -5,7 +5,6 @@ import ( "encoding/json" "fmt" - rsapi "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/syncapi/types" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" @@ -13,7 +12,6 @@ import ( type PDUStreamProvider struct { StreamProvider - rsAPI rsapi.RoomserverInternalAPI } func (p *PDUStreamProvider) Setup() { @@ -52,14 +50,11 @@ func (p *PDUStreamProvider) CompleteSync( return from } - stateFilter := req.Filter.Room.State - eventFilter := req.Filter.Room.Timeline - // Build up a /sync response. Add joined rooms. for _, roomID := range joinedRoomIDs { var jr *types.JoinResponse jr, err = p.getJoinResponseForCompleteSync( - ctx, roomID, r, &stateFilter, &eventFilter, req.Device, + ctx, roomID, r, &req.Filter.Room.State, &req.Filter.Room.Timeline, req.Device, ) if err != nil { req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed") @@ -79,7 +74,7 @@ func (p *PDUStreamProvider) CompleteSync( if !peek.Deleted { var jr *types.JoinResponse jr, err = p.getJoinResponseForCompleteSync( - ctx, peek.RoomID, r, &stateFilter, &eventFilter, req.Device, + ctx, peek.RoomID, r, &req.Filter.Room.State, &req.Filter.Room.Timeline, req.Device, ) if err != nil { req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed") @@ -101,7 +96,7 @@ func (p *PDUStreamProvider) CompleteSync( for _, roomID := range leaveRoomIDs { var lr *types.LeaveResponse lr, err = p.getLeaveResponseForCompleteSync( - ctx, roomID, r, &stateFilter, &eventFilter, req.Device, + ctx, roomID, r, &req.Filter.Room.State, &req.Filter.Room.Timeline, req.Device, ) if err != nil { req.Log.WithError(err).Error("p.getLeaveResponseForCompleteSync failed") @@ -175,11 +170,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( // TODO: This will fail on join -> leave -> sensitive msg -> join -> leave // in a single /sync request // This is all "okay" assuming history_visibility == "shared" which it is by default. - if r.Backwards { - r.From = delta.MembershipPos - } else { - r.To = delta.MembershipPos - } + r.To = delta.MembershipPos } recentStreamEvents, limited, err := p.DB.RecentEvents( ctx, delta.RoomID, r, @@ -235,7 +226,6 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( return nil } -// nolint:gocyclo func (p *PDUStreamProvider) getResponseForCompleteSync( ctx context.Context, roomID string, @@ -251,51 +241,6 @@ func (p *PDUStreamProvider) getResponseForCompleteSync( if err != nil { return } - - // Calculate the current history visibility rule. - historyVisibility := "joined" - for _, stateEvent := range stateEvents { - if stateEvent.Type() == gomatrixserverlib.MRoomHistoryVisibility { - var content struct { - HistoryVisibility string `json:"history_visibility"` - } - if err = json.Unmarshal(stateEvent.Content(), &content); err != nil { - break - } - historyVisibility = content.HistoryVisibility - } - } - - if historyVisibility != "shared" && historyVisibility != "world_readable" { - var membershipEvent *gomatrixserverlib.HeaderedEvent - var membershipPos types.StreamPosition - membershipEvent, membershipPos, err = p.DB.MostRecentMembership(ctx, roomID, device.UserID) - if err != nil { - return - } - if membershipEvent != nil { - membership, _ := membershipEvent.Membership() - switch membership { - case "leave", "kick", "ban": - if r.Backwards { - r.From = membershipPos - } else { - r.To = membershipPos - } - queryReq := &rsapi.QueryStateAfterEventsRequest{ - RoomID: roomID, - PrevEventIDs: []string{membershipEvent.EventID()}, - } - queryRes := &rsapi.QueryStateAfterEventsResponse{} - if err = p.rsAPI.QueryStateAfterEvents(ctx, queryReq, queryRes); err != nil { - return - } - stateEvents = p.filterStateEventsAccordingToFilter(queryRes.StateEvents, stateFilter) - default: - } - } - } - // TODO: When filters are added, we may need to call this multiple times to get enough events. // See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316 var recentStreamEvents []types.StreamEvent @@ -306,9 +251,7 @@ func (p *PDUStreamProvider) getResponseForCompleteSync( return } - recentStreamEvents, limited = p.filterStreamEventsAccordingToHistoryVisibility( - historyVisibility, recentStreamEvents, device, limited, - ) + recentStreamEvents, limited = p.filterStreamEventsAccordingToHistoryVisibility(recentStreamEvents, stateEvents, device, limited) for _, event := range recentStreamEvents { if event.HeaderedEvent.Event.StateKey() != nil { @@ -336,7 +279,7 @@ func (p *PDUStreamProvider) getResponseForCompleteSync( // "Can sync a room with a message with a transaction id" - which does a complete sync to check. recentEvents = p.DB.StreamEventsToEvents(device, recentStreamEvents) stateEvents = removeDuplicates(stateEvents, recentEvents) - return // nolint:nakedret + return } func (p *PDUStreamProvider) getJoinResponseForCompleteSync( @@ -385,64 +328,29 @@ func (p *PDUStreamProvider) getLeaveResponseForCompleteSync( return lr, nil } -// nolint:gocyclo -func (p *PDUStreamProvider) filterStateEventsAccordingToFilter( - stateEvents []*gomatrixserverlib.HeaderedEvent, - stateFilter *gomatrixserverlib.StateFilter, -) []*gomatrixserverlib.HeaderedEvent { - filterRooms, filterNotRooms := map[string]struct{}{}, map[string]struct{}{} - filterTypes, filterNotTypes := map[string]struct{}{}, map[string]struct{}{} - for _, r := range stateFilter.Rooms { - filterRooms[r] = struct{}{} - } - for _, r := range stateFilter.NotRooms { - filterNotRooms[r] = struct{}{} - } - for _, t := range stateFilter.Types { - filterTypes[t] = struct{}{} - } - for _, t := range stateFilter.NotTypes { - filterNotTypes[t] = struct{}{} - } - - newState := make([]*gomatrixserverlib.HeaderedEvent, 0, len(stateEvents)) - for _, event := range stateEvents { - if len(filterRooms) > 0 { - if _, ok := filterRooms[event.RoomID()]; !ok { - continue - } - } - if len(filterNotRooms) > 0 { - if _, ok := filterNotRooms[event.RoomID()]; ok { - continue - } - } - if len(filterTypes) > 0 { - if _, ok := filterTypes[event.Type()]; !ok { - continue - } - } - if len(filterNotTypes) > 0 { - if _, ok := filterNotTypes[event.Type()]; ok { - continue - } - } - newState = append(newState, event) - } - - return newState -} - // nolint:gocyclo func (p *PDUStreamProvider) filterStreamEventsAccordingToHistoryVisibility( - visibility string, recentStreamEvents []types.StreamEvent, + stateEvents []*gomatrixserverlib.HeaderedEvent, device *userapi.Device, limited bool, ) ([]types.StreamEvent, bool) { // If the history is world_readable or shared then don't filter. - if visibility == "world_readable" || visibility == "shared" { - return recentStreamEvents, limited + for _, stateEvent := range stateEvents { + if stateEvent.Type() == gomatrixserverlib.MRoomHistoryVisibility { + var content struct { + HistoryVisibility string `json:"history_visibility"` + } + if err := json.Unmarshal(stateEvent.Content(), &content); err != nil { + break + } + switch content.HistoryVisibility { + case "world_readable", "shared": + return recentStreamEvents, limited + default: + break + } + } } // TODO FIXME: We don't fully implement history visibility yet. To avoid leaking events which the diff --git a/syncapi/streams/streams.go b/syncapi/streams/streams.go index d4e65a175..ba4118df5 100644 --- a/syncapi/streams/streams.go +++ b/syncapi/streams/streams.go @@ -29,7 +29,6 @@ func NewSyncStreamProviders( streams := &Streams{ PDUStreamProvider: &PDUStreamProvider{ StreamProvider: StreamProvider{DB: d}, - rsAPI: rsAPI, }, TypingStreamProvider: &TypingStreamProvider{ StreamProvider: StreamProvider{DB: d},