mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-16 19:33:09 -06:00
Fix too high complexity for getStateDeltas
Signed-off-by: Alex Chen <minecnly@gmail.com>
This commit is contained in:
parent
5d9d4a4628
commit
0b1e7abfc9
|
|
@ -250,7 +250,11 @@ func (d *SyncServerDatasource) addPDUDeltaToResponse(
|
||||||
// This works out what the 'state' key should be for each room as well as which membership block
|
// This works out what the 'state' key should be for each room as well as which membership block
|
||||||
// to put the room into.
|
// to put the room into.
|
||||||
var deltas []stateDelta
|
var deltas []stateDelta
|
||||||
deltas, err = d.getStateDeltas(ctx, &device, txn, fromPos, toPos, device.UserID, wantFullState)
|
if !wantFullState {
|
||||||
|
deltas, err = d.getStateDeltas(ctx, &device, txn, fromPos, toPos, device.UserID)
|
||||||
|
} else {
|
||||||
|
deltas, err = d.getStateDeltasForFullStateSync(ctx, &device, txn, fromPos, toPos, device.UserID)
|
||||||
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
@ -725,7 +729,7 @@ func (d *SyncServerDatasource) fetchMissingStateEvents(
|
||||||
|
|
||||||
func (d *SyncServerDatasource) getStateDeltas(
|
func (d *SyncServerDatasource) getStateDeltas(
|
||||||
ctx context.Context, device *authtypes.Device, txn *sql.Tx,
|
ctx context.Context, device *authtypes.Device, txn *sql.Tx,
|
||||||
fromPos, toPos int64, userID string, wantFullState bool,
|
fromPos, toPos int64, userID string,
|
||||||
) ([]stateDelta, error) {
|
) ([]stateDelta, error) {
|
||||||
// Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821
|
// Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821
|
||||||
// - Get membership list changes for this user in this sync response
|
// - Get membership list changes for this user in this sync response
|
||||||
|
|
@ -756,13 +760,6 @@ func (d *SyncServerDatasource) getStateDeltas(
|
||||||
// the timeline.
|
// the timeline.
|
||||||
if membership := getMembershipFromEvent(&ev.Event, userID); membership != "" {
|
if membership := getMembershipFromEvent(&ev.Event, userID); membership != "" {
|
||||||
if membership == "join" {
|
if membership == "join" {
|
||||||
if wantFullState {
|
|
||||||
// If full_state=true, we need to return full state for
|
|
||||||
// ALL joined rooms. So we skip handling individual
|
|
||||||
// rooms here and do this later.
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// send full room state down instead of a delta
|
// send full room state down instead of a delta
|
||||||
var s []streamEvent
|
var s []streamEvent
|
||||||
s, err = d.currentStateStreamEventsForRoom(ctx, txn, roomID)
|
s, err = d.currentStateStreamEventsForRoom(ctx, txn, roomID)
|
||||||
|
|
@ -790,21 +787,9 @@ func (d *SyncServerDatasource) getStateDeltas(
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
for _, joinedRoomID := range joinedRoomIDs {
|
for _, joinedRoomID := range joinedRoomIDs {
|
||||||
var toAdd []streamEvent
|
|
||||||
|
|
||||||
if wantFullState {
|
|
||||||
s, err := d.currentStateStreamEventsForRoom(ctx, txn, joinedRoomID)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
toAdd = s
|
|
||||||
} else {
|
|
||||||
toAdd = state[joinedRoomID]
|
|
||||||
}
|
|
||||||
|
|
||||||
deltas = append(deltas, stateDelta{
|
deltas = append(deltas, stateDelta{
|
||||||
membership: "join",
|
membership: "join",
|
||||||
stateEvents: streamEventsToEvents(device, toAdd),
|
stateEvents: streamEventsToEvents(device, state[joinedRoomID]),
|
||||||
roomID: joinedRoomID,
|
roomID: joinedRoomID,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
@ -812,6 +797,65 @@ func (d *SyncServerDatasource) getStateDeltas(
|
||||||
return deltas, nil
|
return deltas, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// getStateDeltasForFullStateSync is a variant of getStateDeltas used for /sync
|
||||||
|
// requests with full_state=true.
|
||||||
|
// Fetches full state for all joined rooms and uses selectStateInRange to get
|
||||||
|
// updates for other rooms.
|
||||||
|
func (d *SyncServerDatasource) getStateDeltasForFullStateSync(
|
||||||
|
ctx context.Context, device *authtypes.Device, txn *sql.Tx,
|
||||||
|
fromPos, toPos int64, userID string,
|
||||||
|
) ([]stateDelta, error) {
|
||||||
|
joinedRoomIDs, err := d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, "join")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Use a reasonable initial capacity
|
||||||
|
deltas := make([]stateDelta, 0, len(joinedRoomIDs))
|
||||||
|
|
||||||
|
// Add full states for all joined rooms
|
||||||
|
for _, joinedRoomID := range joinedRoomIDs {
|
||||||
|
s, stateErr := d.currentStateStreamEventsForRoom(ctx, txn, joinedRoomID)
|
||||||
|
if stateErr != nil {
|
||||||
|
return nil, stateErr
|
||||||
|
}
|
||||||
|
deltas = append(deltas, stateDelta{
|
||||||
|
membership: "join",
|
||||||
|
stateEvents: streamEventsToEvents(device, s),
|
||||||
|
roomID: joinedRoomID,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// get all the state events ever between these two positions
|
||||||
|
stateNeeded, eventMap, err := d.events.selectStateInRange(ctx, txn, fromPos, toPos)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
state, err := d.fetchStateEvents(ctx, txn, stateNeeded, eventMap)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
for roomID, stateStreamEvents := range state {
|
||||||
|
for _, ev := range stateStreamEvents {
|
||||||
|
if membership := getMembershipFromEvent(&ev.Event, userID); membership != "" {
|
||||||
|
if membership != "join" { // We've already added full state for all joined rooms above.
|
||||||
|
deltas = append(deltas, stateDelta{
|
||||||
|
membership: membership,
|
||||||
|
membershipPos: ev.streamPosition,
|
||||||
|
stateEvents: streamEventsToEvents(device, stateStreamEvents),
|
||||||
|
roomID: roomID,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return deltas, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (d *SyncServerDatasource) currentStateStreamEventsForRoom(
|
func (d *SyncServerDatasource) currentStateStreamEventsForRoom(
|
||||||
ctx context.Context, txn *sql.Tx, roomID string,
|
ctx context.Context, txn *sql.Tx, roomID string,
|
||||||
) ([]streamEvent, error) {
|
) ([]streamEvent, error) {
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue