mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-15 10:53:09 -06:00
Implement "full_state" query parameter for /sync
Closes #637. Signed-off-by: Alex Chen <minecnly@gmail.com>
This commit is contained in:
parent
e56d6e41fd
commit
ee14a27949
|
|
@ -235,6 +235,7 @@ func (d *SyncServerDatasource) addPDUDeltaToResponse(
|
|||
device authtypes.Device,
|
||||
fromPos, toPos int64,
|
||||
numRecentEventsPerRoom int,
|
||||
wantFullState bool,
|
||||
res *types.Response,
|
||||
) ([]string, error) {
|
||||
txn, err := d.db.BeginTx(ctx, &txReadOnlySnapshot)
|
||||
|
|
@ -248,7 +249,8 @@ func (d *SyncServerDatasource) addPDUDeltaToResponse(
|
|||
// joined rooms, but also which rooms have membership transitions for this user between the 2 PDU stream positions.
|
||||
// This works out what the 'state' key should be for each room as well as which membership block
|
||||
// to put the room into.
|
||||
deltas, err := d.getStateDeltas(ctx, &device, txn, fromPos, toPos, device.UserID)
|
||||
var deltas []stateDelta
|
||||
deltas, err = d.getStateDeltas(ctx, &device, txn, fromPos, toPos, device.UserID, wantFullState)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -332,15 +334,16 @@ func (d *SyncServerDatasource) IncrementalSync(
|
|||
device authtypes.Device,
|
||||
fromPos, toPos types.SyncPosition,
|
||||
numRecentEventsPerRoom int,
|
||||
wantFullState bool,
|
||||
) (*types.Response, error) {
|
||||
nextBatchPos := fromPos.WithUpdates(toPos)
|
||||
res := types.NewResponse(nextBatchPos)
|
||||
|
||||
var joinedRoomIDs []string
|
||||
var err error
|
||||
if fromPos.PDUPosition != toPos.PDUPosition {
|
||||
if fromPos.PDUPosition != toPos.PDUPosition || wantFullState {
|
||||
joinedRoomIDs, err = d.addPDUDeltaToResponse(
|
||||
ctx, device, fromPos.PDUPosition, toPos.PDUPosition, numRecentEventsPerRoom, res,
|
||||
ctx, device, fromPos.PDUPosition, toPos.PDUPosition, numRecentEventsPerRoom, wantFullState, res,
|
||||
)
|
||||
} else {
|
||||
joinedRoomIDs, err = d.roomstate.selectRoomIDsWithMembership(
|
||||
|
|
@ -589,21 +592,30 @@ func (d *SyncServerDatasource) addRoomDeltaToResponse(
|
|||
recentEvents := streamEventsToEvents(device, recentStreamEvents)
|
||||
delta.stateEvents = removeDuplicates(delta.stateEvents, recentEvents) // roll back
|
||||
|
||||
// Don't bother appending empty room entries
|
||||
if len(recentEvents) == 0 && len(delta.stateEvents) == 0 {
|
||||
return nil
|
||||
var prevPDUPos int64
|
||||
|
||||
if len(recentEvents) == 0 {
|
||||
if len(delta.stateEvents) == 0 {
|
||||
// Don't bother appending empty room entries
|
||||
return nil
|
||||
}
|
||||
|
||||
// If full_state=true and since= is already up to date, then we'll have
|
||||
// state events but no recent events.
|
||||
prevPDUPos = toPos - 1
|
||||
} else {
|
||||
prevPDUPos = recentStreamEvents[0].streamPosition - 1
|
||||
}
|
||||
|
||||
if prevPDUPos <= 0 {
|
||||
prevPDUPos = 1
|
||||
}
|
||||
|
||||
switch delta.membership {
|
||||
case "join":
|
||||
jr := types.NewJoinResponse()
|
||||
if prevPDUPos := recentStreamEvents[0].streamPosition - 1; prevPDUPos > 0 {
|
||||
// Use the short form of batch token for prev_batch
|
||||
jr.Timeline.PrevBatch = strconv.FormatInt(prevPDUPos, 10)
|
||||
} else {
|
||||
// Use the short form of batch token for prev_batch
|
||||
jr.Timeline.PrevBatch = "1"
|
||||
}
|
||||
// Use the short form of batch token for prev_batch
|
||||
jr.Timeline.PrevBatch = strconv.FormatInt(prevPDUPos, 10)
|
||||
jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
|
||||
jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
|
||||
jr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
|
||||
|
|
@ -614,13 +626,8 @@ func (d *SyncServerDatasource) addRoomDeltaToResponse(
|
|||
// TODO: recentEvents may contain events that this user is not allowed to see because they are
|
||||
// no longer in the room.
|
||||
lr := types.NewLeaveResponse()
|
||||
if prevPDUPos := recentStreamEvents[0].streamPosition - 1; prevPDUPos > 0 {
|
||||
// Use the short form of batch token for prev_batch
|
||||
lr.Timeline.PrevBatch = strconv.FormatInt(prevPDUPos, 10)
|
||||
} else {
|
||||
// Use the short form of batch token for prev_batch
|
||||
lr.Timeline.PrevBatch = "1"
|
||||
}
|
||||
// Use the short form of batch token for prev_batch
|
||||
lr.Timeline.PrevBatch = strconv.FormatInt(prevPDUPos, 10)
|
||||
lr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
|
||||
lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
|
||||
lr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
|
||||
|
|
@ -718,7 +725,7 @@ func (d *SyncServerDatasource) fetchMissingStateEvents(
|
|||
|
||||
func (d *SyncServerDatasource) getStateDeltas(
|
||||
ctx context.Context, device *authtypes.Device, txn *sql.Tx,
|
||||
fromPos, toPos int64, userID string,
|
||||
fromPos, toPos int64, userID string, wantFullState bool,
|
||||
) ([]stateDelta, error) {
|
||||
// Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821
|
||||
// - Get membership list changes for this user in this sync response
|
||||
|
|
@ -749,16 +756,19 @@ func (d *SyncServerDatasource) getStateDeltas(
|
|||
// the timeline.
|
||||
if membership := getMembershipFromEvent(&ev.Event, userID); membership != "" {
|
||||
if membership == "join" {
|
||||
if wantFullState {
|
||||
// If full_state=true, we need to return full state for
|
||||
// ALL joined rooms. So we skip handling individual
|
||||
// rooms here and do this later.
|
||||
continue
|
||||
}
|
||||
|
||||
// send full room state down instead of a delta
|
||||
var allState []gomatrixserverlib.Event
|
||||
allState, err = d.roomstate.selectCurrentState(ctx, txn, roomID)
|
||||
var s []streamEvent
|
||||
s, err = d.currentStateStreamEventsForRoom(ctx, txn, roomID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
s := make([]streamEvent, len(allState))
|
||||
for i := 0; i < len(s); i++ {
|
||||
s[i] = streamEvent{Event: allState[i], streamPosition: 0}
|
||||
}
|
||||
state[roomID] = s
|
||||
continue // we'll add this room in when we do joined rooms
|
||||
}
|
||||
|
|
@ -780,9 +790,21 @@ func (d *SyncServerDatasource) getStateDeltas(
|
|||
return nil, err
|
||||
}
|
||||
for _, joinedRoomID := range joinedRoomIDs {
|
||||
var toAdd []streamEvent
|
||||
|
||||
if wantFullState {
|
||||
s, err := d.currentStateStreamEventsForRoom(ctx, txn, joinedRoomID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
toAdd = s
|
||||
} else {
|
||||
toAdd = state[joinedRoomID]
|
||||
}
|
||||
|
||||
deltas = append(deltas, stateDelta{
|
||||
membership: "join",
|
||||
stateEvents: streamEventsToEvents(device, state[joinedRoomID]),
|
||||
stateEvents: streamEventsToEvents(device, toAdd),
|
||||
roomID: joinedRoomID,
|
||||
})
|
||||
}
|
||||
|
|
@ -790,6 +812,20 @@ func (d *SyncServerDatasource) getStateDeltas(
|
|||
return deltas, nil
|
||||
}
|
||||
|
||||
func (d *SyncServerDatasource) currentStateStreamEventsForRoom(
|
||||
ctx context.Context, txn *sql.Tx, roomID string,
|
||||
) ([]streamEvent, error) {
|
||||
allState, err := d.roomstate.selectCurrentState(ctx, txn, roomID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
s := make([]streamEvent, len(allState))
|
||||
for i := 0; i < len(s); i++ {
|
||||
s[i] = streamEvent{Event: allState[i], streamPosition: 0}
|
||||
}
|
||||
return s, nil
|
||||
}
|
||||
|
||||
// streamEventsToEvents converts streamEvent to Event. If device is non-nil and
|
||||
// matches the streamevent.transactionID device then the transaction ID gets
|
||||
// added to the unsigned section of the output event.
|
||||
|
|
|
|||
|
|
@ -65,8 +65,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype
|
|||
|
||||
currPos := rp.notifier.CurrentPosition()
|
||||
|
||||
// If this is an initial sync or timeout=0 we return immediately
|
||||
if syncReq.since == nil || syncReq.timeout == 0 {
|
||||
if shouldReturnImmediately(syncReq) {
|
||||
syncData, err = rp.currentSyncForUser(*syncReq, currPos)
|
||||
if err != nil {
|
||||
return httputil.LogThenError(req, err)
|
||||
|
|
@ -135,7 +134,7 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.SyncP
|
|||
if req.since == nil {
|
||||
res, err = rp.db.CompleteSync(req.ctx, req.device.UserID, req.limit)
|
||||
} else {
|
||||
res, err = rp.db.IncrementalSync(req.ctx, req.device, *req.since, latestPos, req.limit)
|
||||
res, err = rp.db.IncrementalSync(req.ctx, req.device, *req.since, latestPos, req.limit, req.wantFullState)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
|
|
@ -216,3 +215,10 @@ func (rp *RequestPool) appendAccountData(
|
|||
|
||||
return data, nil
|
||||
}
|
||||
|
||||
// shouldReturnImmediately returns whether the /sync request is an initial sync,
|
||||
// or timeout=0, or full_state=true, in any of the cases the request should
|
||||
// return immediately.
|
||||
func shouldReturnImmediately(syncReq *syncRequest) bool {
|
||||
return syncReq.since == nil || syncReq.timeout == 0 || syncReq.wantFullState
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue