diff --git a/syncapi/storage/syncserver.go b/syncapi/storage/syncserver.go index b4d7ccbd2..418cf5b78 100644 --- a/syncapi/storage/syncserver.go +++ b/syncapi/storage/syncserver.go @@ -235,6 +235,7 @@ func (d *SyncServerDatasource) addPDUDeltaToResponse( device authtypes.Device, fromPos, toPos int64, numRecentEventsPerRoom int, + wantFullState bool, res *types.Response, ) ([]string, error) { txn, err := d.db.BeginTx(ctx, &txReadOnlySnapshot) @@ -248,7 +249,8 @@ func (d *SyncServerDatasource) addPDUDeltaToResponse( // joined rooms, but also which rooms have membership transitions for this user between the 2 PDU stream positions. // This works out what the 'state' key should be for each room as well as which membership block // to put the room into. - deltas, err := d.getStateDeltas(ctx, &device, txn, fromPos, toPos, device.UserID) + var deltas []stateDelta + deltas, err = d.getStateDeltas(ctx, &device, txn, fromPos, toPos, device.UserID, wantFullState) if err != nil { return nil, err } @@ -332,15 +334,16 @@ func (d *SyncServerDatasource) IncrementalSync( device authtypes.Device, fromPos, toPos types.SyncPosition, numRecentEventsPerRoom int, + wantFullState bool, ) (*types.Response, error) { nextBatchPos := fromPos.WithUpdates(toPos) res := types.NewResponse(nextBatchPos) var joinedRoomIDs []string var err error - if fromPos.PDUPosition != toPos.PDUPosition { + if fromPos.PDUPosition != toPos.PDUPosition || wantFullState { joinedRoomIDs, err = d.addPDUDeltaToResponse( - ctx, device, fromPos.PDUPosition, toPos.PDUPosition, numRecentEventsPerRoom, res, + ctx, device, fromPos.PDUPosition, toPos.PDUPosition, numRecentEventsPerRoom, wantFullState, res, ) } else { joinedRoomIDs, err = d.roomstate.selectRoomIDsWithMembership( @@ -589,21 +592,30 @@ func (d *SyncServerDatasource) addRoomDeltaToResponse( recentEvents := streamEventsToEvents(device, recentStreamEvents) delta.stateEvents = removeDuplicates(delta.stateEvents, recentEvents) // roll back - // Don't bother appending empty room entries - if len(recentEvents) == 0 && len(delta.stateEvents) == 0 { - return nil + var prevPDUPos int64 + + if len(recentEvents) == 0 { + if len(delta.stateEvents) == 0 { + // Don't bother appending empty room entries + return nil + } + + // If full_state=true and since= is already up to date, then we'll have + // state events but no recent events. + prevPDUPos = toPos - 1 + } else { + prevPDUPos = recentStreamEvents[0].streamPosition - 1 + } + + if prevPDUPos <= 0 { + prevPDUPos = 1 } switch delta.membership { case "join": jr := types.NewJoinResponse() - if prevPDUPos := recentStreamEvents[0].streamPosition - 1; prevPDUPos > 0 { - // Use the short form of batch token for prev_batch - jr.Timeline.PrevBatch = strconv.FormatInt(prevPDUPos, 10) - } else { - // Use the short form of batch token for prev_batch - jr.Timeline.PrevBatch = "1" - } + // Use the short form of batch token for prev_batch + jr.Timeline.PrevBatch = strconv.FormatInt(prevPDUPos, 10) jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync) jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true jr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync) @@ -614,13 +626,8 @@ func (d *SyncServerDatasource) addRoomDeltaToResponse( // TODO: recentEvents may contain events that this user is not allowed to see because they are // no longer in the room. lr := types.NewLeaveResponse() - if prevPDUPos := recentStreamEvents[0].streamPosition - 1; prevPDUPos > 0 { - // Use the short form of batch token for prev_batch - lr.Timeline.PrevBatch = strconv.FormatInt(prevPDUPos, 10) - } else { - // Use the short form of batch token for prev_batch - lr.Timeline.PrevBatch = "1" - } + // Use the short form of batch token for prev_batch + lr.Timeline.PrevBatch = strconv.FormatInt(prevPDUPos, 10) lr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync) lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true lr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync) @@ -718,7 +725,7 @@ func (d *SyncServerDatasource) fetchMissingStateEvents( func (d *SyncServerDatasource) getStateDeltas( ctx context.Context, device *authtypes.Device, txn *sql.Tx, - fromPos, toPos int64, userID string, + fromPos, toPos int64, userID string, wantFullState bool, ) ([]stateDelta, error) { // Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821 // - Get membership list changes for this user in this sync response @@ -749,16 +756,19 @@ func (d *SyncServerDatasource) getStateDeltas( // the timeline. if membership := getMembershipFromEvent(&ev.Event, userID); membership != "" { if membership == "join" { + if wantFullState { + // If full_state=true, we need to return full state for + // ALL joined rooms. So we skip handling individual + // rooms here and do this later. + continue + } + // send full room state down instead of a delta - var allState []gomatrixserverlib.Event - allState, err = d.roomstate.selectCurrentState(ctx, txn, roomID) + var s []streamEvent + s, err = d.currentStateStreamEventsForRoom(ctx, txn, roomID) if err != nil { return nil, err } - s := make([]streamEvent, len(allState)) - for i := 0; i < len(s); i++ { - s[i] = streamEvent{Event: allState[i], streamPosition: 0} - } state[roomID] = s continue // we'll add this room in when we do joined rooms } @@ -780,9 +790,21 @@ func (d *SyncServerDatasource) getStateDeltas( return nil, err } for _, joinedRoomID := range joinedRoomIDs { + var toAdd []streamEvent + + if wantFullState { + s, err := d.currentStateStreamEventsForRoom(ctx, txn, joinedRoomID) + if err != nil { + return nil, err + } + toAdd = s + } else { + toAdd = state[joinedRoomID] + } + deltas = append(deltas, stateDelta{ membership: "join", - stateEvents: streamEventsToEvents(device, state[joinedRoomID]), + stateEvents: streamEventsToEvents(device, toAdd), roomID: joinedRoomID, }) } @@ -790,6 +812,20 @@ func (d *SyncServerDatasource) getStateDeltas( return deltas, nil } +func (d *SyncServerDatasource) currentStateStreamEventsForRoom( + ctx context.Context, txn *sql.Tx, roomID string, +) ([]streamEvent, error) { + allState, err := d.roomstate.selectCurrentState(ctx, txn, roomID) + if err != nil { + return nil, err + } + s := make([]streamEvent, len(allState)) + for i := 0; i < len(s); i++ { + s[i] = streamEvent{Event: allState[i], streamPosition: 0} + } + return s, nil +} + // streamEventsToEvents converts streamEvent to Event. If device is non-nil and // matches the streamevent.transactionID device then the transaction ID gets // added to the unsigned section of the output event. diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index a6ec6bd92..d773a4606 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -65,8 +65,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype currPos := rp.notifier.CurrentPosition() - // If this is an initial sync or timeout=0 we return immediately - if syncReq.since == nil || syncReq.timeout == 0 { + if shouldReturnImmediately(syncReq) { syncData, err = rp.currentSyncForUser(*syncReq, currPos) if err != nil { return httputil.LogThenError(req, err) @@ -135,7 +134,7 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.SyncP if req.since == nil { res, err = rp.db.CompleteSync(req.ctx, req.device.UserID, req.limit) } else { - res, err = rp.db.IncrementalSync(req.ctx, req.device, *req.since, latestPos, req.limit) + res, err = rp.db.IncrementalSync(req.ctx, req.device, *req.since, latestPos, req.limit, req.wantFullState) } if err != nil { @@ -216,3 +215,10 @@ func (rp *RequestPool) appendAccountData( return data, nil } + +// shouldReturnImmediately returns whether the /sync request is an initial sync, +// or timeout=0, or full_state=true, in any of the cases the request should +// return immediately. +func shouldReturnImmediately(syncReq *syncRequest) bool { + return syncReq.since == nil || syncReq.timeout == 0 || syncReq.wantFullState +}