diff --git a/syncapi/storage/syncserver.go b/syncapi/storage/syncserver.go index 418cf5b78..3f3101670 100644 --- a/syncapi/storage/syncserver.go +++ b/syncapi/storage/syncserver.go @@ -250,7 +250,11 @@ func (d *SyncServerDatasource) addPDUDeltaToResponse( // This works out what the 'state' key should be for each room as well as which membership block // to put the room into. var deltas []stateDelta - deltas, err = d.getStateDeltas(ctx, &device, txn, fromPos, toPos, device.UserID, wantFullState) + if !wantFullState { + deltas, err = d.getStateDeltas(ctx, &device, txn, fromPos, toPos, device.UserID) + } else { + deltas, err = d.getStateDeltasForFullStateSync(ctx, &device, txn, fromPos, toPos, device.UserID) + } if err != nil { return nil, err } @@ -725,7 +729,7 @@ func (d *SyncServerDatasource) fetchMissingStateEvents( func (d *SyncServerDatasource) getStateDeltas( ctx context.Context, device *authtypes.Device, txn *sql.Tx, - fromPos, toPos int64, userID string, wantFullState bool, + fromPos, toPos int64, userID string, ) ([]stateDelta, error) { // Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821 // - Get membership list changes for this user in this sync response @@ -756,13 +760,6 @@ func (d *SyncServerDatasource) getStateDeltas( // the timeline. if membership := getMembershipFromEvent(&ev.Event, userID); membership != "" { if membership == "join" { - if wantFullState { - // If full_state=true, we need to return full state for - // ALL joined rooms. So we skip handling individual - // rooms here and do this later. - continue - } - // send full room state down instead of a delta var s []streamEvent s, err = d.currentStateStreamEventsForRoom(ctx, txn, roomID) @@ -790,21 +787,9 @@ func (d *SyncServerDatasource) getStateDeltas( return nil, err } for _, joinedRoomID := range joinedRoomIDs { - var toAdd []streamEvent - - if wantFullState { - s, err := d.currentStateStreamEventsForRoom(ctx, txn, joinedRoomID) - if err != nil { - return nil, err - } - toAdd = s - } else { - toAdd = state[joinedRoomID] - } - deltas = append(deltas, stateDelta{ membership: "join", - stateEvents: streamEventsToEvents(device, toAdd), + stateEvents: streamEventsToEvents(device, state[joinedRoomID]), roomID: joinedRoomID, }) } @@ -812,6 +797,65 @@ func (d *SyncServerDatasource) getStateDeltas( return deltas, nil } +// getStateDeltasForFullStateSync is a variant of getStateDeltas used for /sync +// requests with full_state=true. +// Fetches full state for all joined rooms and uses selectStateInRange to get +// updates for other rooms. +func (d *SyncServerDatasource) getStateDeltasForFullStateSync( + ctx context.Context, device *authtypes.Device, txn *sql.Tx, + fromPos, toPos int64, userID string, +) ([]stateDelta, error) { + joinedRoomIDs, err := d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, "join") + if err != nil { + return nil, err + } + + // Use a reasonable initial capacity + deltas := make([]stateDelta, 0, len(joinedRoomIDs)) + + // Add full states for all joined rooms + for _, joinedRoomID := range joinedRoomIDs { + s, stateErr := d.currentStateStreamEventsForRoom(ctx, txn, joinedRoomID) + if stateErr != nil { + return nil, stateErr + } + deltas = append(deltas, stateDelta{ + membership: "join", + stateEvents: streamEventsToEvents(device, s), + roomID: joinedRoomID, + }) + } + + // get all the state events ever between these two positions + stateNeeded, eventMap, err := d.events.selectStateInRange(ctx, txn, fromPos, toPos) + if err != nil { + return nil, err + } + state, err := d.fetchStateEvents(ctx, txn, stateNeeded, eventMap) + if err != nil { + return nil, err + } + + for roomID, stateStreamEvents := range state { + for _, ev := range stateStreamEvents { + if membership := getMembershipFromEvent(&ev.Event, userID); membership != "" { + if membership != "join" { // We've already added full state for all joined rooms above. + deltas = append(deltas, stateDelta{ + membership: membership, + membershipPos: ev.streamPosition, + stateEvents: streamEventsToEvents(device, stateStreamEvents), + roomID: roomID, + }) + } + + break + } + } + } + + return deltas, nil +} + func (d *SyncServerDatasource) currentStateStreamEventsForRoom( ctx context.Context, txn *sql.Tx, roomID string, ) ([]streamEvent, error) {