diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index 22d801617..9cff4cad1 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -35,7 +35,7 @@ type Database interface { MaxStreamPositionForAccountData(ctx context.Context) (types.StreamPosition, error) MaxStreamPositionForSendToDeviceMessages(ctx context.Context) (types.StreamPosition, error) - CurrentState(ctx context.Context, roomID string, stateFilterPart *gomatrixserverlib.StateFilter) ([]*gomatrixserverlib.HeaderedEvent, error) + CurrentState(ctx context.Context, roomID string, stateFilterPart *gomatrixserverlib.StateFilter, excludeEventIDs []string) ([]*gomatrixserverlib.HeaderedEvent, error) GetStateDeltasForFullStateSync(ctx context.Context, device *userapi.Device, r types.Range, userID string, stateFilter *gomatrixserverlib.StateFilter) ([]types.StateDelta, []string, error) GetStateDeltas(ctx context.Context, device *userapi.Device, r types.Range, userID string, stateFilter *gomatrixserverlib.StateFilter) ([]types.StateDelta, []string, error) RoomIDsWithMembership(ctx context.Context, userID string, membership string) ([]string, error) diff --git a/syncapi/storage/postgres/current_room_state_table.go b/syncapi/storage/postgres/current_room_state_table.go index 77e1e363f..ce4509ff4 100644 --- a/syncapi/storage/postgres/current_room_state_table.go +++ b/syncapi/storage/postgres/current_room_state_table.go @@ -84,7 +84,8 @@ const selectCurrentStateSQL = "" + " AND ( $4::text[] IS NULL OR type LIKE ANY($4) )" + " AND ( $5::text[] IS NULL OR NOT(type LIKE ANY($5)) )" + " AND ( $6::bool IS NULL OR contains_url = $6 )" + - " LIMIT $7" + " AND NOT (event_id = ANY($7))" + + " LIMIT $8" const selectJoinedUsersSQL = "" + "SELECT room_id, state_key FROM syncapi_current_room_state WHERE type = 'm.room.member' AND membership = 'join'" @@ -197,6 +198,7 @@ func (s *currentRoomStateStatements) SelectRoomIDsWithMembership( func (s *currentRoomStateStatements) SelectCurrentState( ctx context.Context, txn *sql.Tx, roomID string, stateFilter *gomatrixserverlib.StateFilter, + excludeEventIDs []string, ) ([]*gomatrixserverlib.HeaderedEvent, error) { stmt := sqlutil.TxStmt(txn, s.selectCurrentStateStmt) rows, err := stmt.QueryContext(ctx, roomID, @@ -205,6 +207,7 @@ func (s *currentRoomStateStatements) SelectCurrentState( pq.StringArray(filterConvertTypeWildcardToSQL(stateFilter.Types)), pq.StringArray(filterConvertTypeWildcardToSQL(stateFilter.NotTypes)), stateFilter.ContainsURL, + pq.StringArray(excludeEventIDs), stateFilter.Limit, ) if err != nil { diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index 239f68129..ee2c176ca 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -103,8 +103,8 @@ func (d *Database) MaxStreamPositionForAccountData(ctx context.Context) (types.S return types.StreamPosition(id), nil } -func (d *Database) CurrentState(ctx context.Context, roomID string, stateFilterPart *gomatrixserverlib.StateFilter) ([]*gomatrixserverlib.HeaderedEvent, error) { - return d.CurrentRoomState.SelectCurrentState(ctx, nil, roomID, stateFilterPart) +func (d *Database) CurrentState(ctx context.Context, roomID string, stateFilterPart *gomatrixserverlib.StateFilter, excludeEventIDs []string) ([]*gomatrixserverlib.HeaderedEvent, error) { + return d.CurrentRoomState.SelectCurrentState(ctx, nil, roomID, stateFilterPart, excludeEventIDs) } func (d *Database) RoomIDsWithMembership(ctx context.Context, userID string, membership string) ([]string, error) { @@ -195,7 +195,7 @@ func (d *Database) GetStateEvent( func (d *Database) GetStateEventsForRoom( ctx context.Context, roomID string, stateFilter *gomatrixserverlib.StateFilter, ) (stateEvents []*gomatrixserverlib.HeaderedEvent, err error) { - stateEvents, err = d.CurrentRoomState.SelectCurrentState(ctx, nil, roomID, stateFilter) + stateEvents, err = d.CurrentRoomState.SelectCurrentState(ctx, nil, roomID, stateFilter, nil) return } @@ -870,7 +870,7 @@ func (d *Database) currentStateStreamEventsForRoom( ctx context.Context, txn *sql.Tx, roomID string, stateFilter *gomatrixserverlib.StateFilter, ) ([]types.StreamEvent, error) { - allState, err := d.CurrentRoomState.SelectCurrentState(ctx, txn, roomID, stateFilter) + allState, err := d.CurrentRoomState.SelectCurrentState(ctx, txn, roomID, stateFilter, nil) if err != nil { return nil, err } diff --git a/syncapi/storage/sqlite3/current_room_state_table.go b/syncapi/storage/sqlite3/current_room_state_table.go index 55ed27a41..4a334bd8a 100644 --- a/syncapi/storage/sqlite3/current_room_state_table.go +++ b/syncapi/storage/sqlite3/current_room_state_table.go @@ -178,6 +178,7 @@ func (s *currentRoomStateStatements) SelectRoomIDsWithMembership( func (s *currentRoomStateStatements) SelectCurrentState( ctx context.Context, txn *sql.Tx, roomID string, stateFilter *gomatrixserverlib.StateFilter, + _ []string, ) ([]*gomatrixserverlib.HeaderedEvent, error) { stmt, params, err := prepareWithFilters( s.db, txn, selectCurrentStateSQL, diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go index 997486dd4..028872716 100644 --- a/syncapi/storage/tables/interface.go +++ b/syncapi/storage/tables/interface.go @@ -91,7 +91,7 @@ type CurrentRoomState interface { DeleteRoomStateByEventID(ctx context.Context, txn *sql.Tx, eventID string) error DeleteRoomStateForRoom(ctx context.Context, txn *sql.Tx, roomID string) error // SelectCurrentState returns all the current state events for the given room. - SelectCurrentState(ctx context.Context, txn *sql.Tx, roomID string, stateFilter *gomatrixserverlib.StateFilter) ([]*gomatrixserverlib.HeaderedEvent, error) + SelectCurrentState(ctx context.Context, txn *sql.Tx, roomID string, stateFilter *gomatrixserverlib.StateFilter, excludeEventIDs []string) ([]*gomatrixserverlib.HeaderedEvent, error) // SelectRoomIDsWithMembership returns the list of room IDs which have the given user in the given membership state. SelectRoomIDsWithMembership(ctx context.Context, txn *sql.Tx, userID string, membership string) ([]string, error) // SelectJoinedUsers returns a map of room ID to a list of joined user IDs. diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go index d6d7ff444..c8ea9d43b 100644 --- a/syncapi/streams/stream_pdu.go +++ b/syncapi/streams/stream_pdu.go @@ -213,18 +213,22 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync( eventFilter *gomatrixserverlib.RoomEventFilter, device *userapi.Device, ) (jr *types.JoinResponse, err error) { - var stateEvents []*gomatrixserverlib.HeaderedEvent - stateEvents, err = p.DB.CurrentState(ctx, roomID, stateFilter) + // TODO: When filters are added, we may need to call this multiple times to get enough events. + // See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316 + recentStreamEvents, limited, err := p.DB.RecentEvents( + ctx, roomID, r, eventFilter, true, true, + ) if err != nil { return } - // TODO: When filters are added, we may need to call this multiple times to get enough events. - // See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316 - var recentStreamEvents []types.StreamEvent - var limited bool - recentStreamEvents, limited, err = p.DB.RecentEvents( - ctx, roomID, r, eventFilter, true, true, - ) + + // Get the event IDs of the stream events we fetched. There's no point in us + recentStreamEventIDs := make([]string, 0, len(recentStreamEvents)) + for _, eventID := range recentStreamEvents { + recentStreamEventIDs = append(recentStreamEventIDs, eventID.EventID()) + } + + stateEvents, err := p.DB.CurrentState(ctx, roomID, stateFilter, recentStreamEventIDs) if err != nil { return }