Implement "full_state" query parameter for /sync (#751)

Closes #637.
This commit is contained in:
Alex Chen 2019-08-01 12:36:13 +08:00 committed by GitHub
parent 0dcf0a7d64
commit 3578d77d25
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 117 additions and 29 deletions

View file

@ -241,6 +241,7 @@ func (d *SyncServerDatasource) addPDUDeltaToResponse(
device authtypes.Device, device authtypes.Device,
fromPos, toPos int64, fromPos, toPos int64,
numRecentEventsPerRoom int, numRecentEventsPerRoom int,
wantFullState bool,
res *types.Response, res *types.Response,
) ([]string, error) { ) ([]string, error) {
txn, err := d.db.BeginTx(ctx, &txReadOnlySnapshot) txn, err := d.db.BeginTx(ctx, &txReadOnlySnapshot)
@ -254,7 +255,13 @@ func (d *SyncServerDatasource) addPDUDeltaToResponse(
// joined rooms, but also which rooms have membership transitions for this user between the 2 PDU stream positions. // joined rooms, but also which rooms have membership transitions for this user between the 2 PDU stream positions.
// This works out what the 'state' key should be for each room as well as which membership block // This works out what the 'state' key should be for each room as well as which membership block
// to put the room into. // to put the room into.
deltas, joinedRoomIDs, err := d.getStateDeltas(ctx, &device, txn, fromPos, toPos, device.UserID) var deltas []stateDelta
var joinedRoomIDs []string
if !wantFullState {
deltas, joinedRoomIDs, err = d.getStateDeltas(ctx, &device, txn, fromPos, toPos, device.UserID)
} else {
deltas, joinedRoomIDs, err = d.getStateDeltasForFullStateSync(ctx, &device, txn, fromPos, toPos, device.UserID)
}
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -336,15 +343,16 @@ func (d *SyncServerDatasource) IncrementalSync(
device authtypes.Device, device authtypes.Device,
fromPos, toPos types.SyncPosition, fromPos, toPos types.SyncPosition,
numRecentEventsPerRoom int, numRecentEventsPerRoom int,
wantFullState bool,
) (*types.Response, error) { ) (*types.Response, error) {
nextBatchPos := fromPos.WithUpdates(toPos) nextBatchPos := fromPos.WithUpdates(toPos)
res := types.NewResponse(nextBatchPos) res := types.NewResponse(nextBatchPos)
var joinedRoomIDs []string var joinedRoomIDs []string
var err error var err error
if fromPos.PDUPosition != toPos.PDUPosition { if fromPos.PDUPosition != toPos.PDUPosition || wantFullState {
joinedRoomIDs, err = d.addPDUDeltaToResponse( joinedRoomIDs, err = d.addPDUDeltaToResponse(
ctx, device, fromPos.PDUPosition, toPos.PDUPosition, numRecentEventsPerRoom, res, ctx, device, fromPos.PDUPosition, toPos.PDUPosition, numRecentEventsPerRoom, wantFullState, res,
) )
} else { } else {
joinedRoomIDs, err = d.roomstate.selectRoomIDsWithMembership( joinedRoomIDs, err = d.roomstate.selectRoomIDsWithMembership(
@ -593,21 +601,30 @@ func (d *SyncServerDatasource) addRoomDeltaToResponse(
recentEvents := streamEventsToEvents(device, recentStreamEvents) recentEvents := streamEventsToEvents(device, recentStreamEvents)
delta.stateEvents = removeDuplicates(delta.stateEvents, recentEvents) // roll back delta.stateEvents = removeDuplicates(delta.stateEvents, recentEvents) // roll back
var prevPDUPos int64
if len(recentEvents) == 0 {
if len(delta.stateEvents) == 0 {
// Don't bother appending empty room entries // Don't bother appending empty room entries
if len(recentEvents) == 0 && len(delta.stateEvents) == 0 {
return nil return nil
} }
// If full_state=true and since is already up to date, then we'll have
// state events but no recent events.
prevPDUPos = toPos - 1
} else {
prevPDUPos = recentStreamEvents[0].streamPosition - 1
}
if prevPDUPos <= 0 {
prevPDUPos = 1
}
switch delta.membership { switch delta.membership {
case membershipJoin: case membershipJoin:
jr := types.NewJoinResponse() jr := types.NewJoinResponse()
if prevPDUPos := recentStreamEvents[0].streamPosition - 1; prevPDUPos > 0 {
// Use the short form of batch token for prev_batch // Use the short form of batch token for prev_batch
jr.Timeline.PrevBatch = strconv.FormatInt(prevPDUPos, 10) jr.Timeline.PrevBatch = strconv.FormatInt(prevPDUPos, 10)
} else {
// Use the short form of batch token for prev_batch
jr.Timeline.PrevBatch = "1"
}
jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync) jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
jr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync) jr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
@ -618,13 +635,8 @@ func (d *SyncServerDatasource) addRoomDeltaToResponse(
// TODO: recentEvents may contain events that this user is not allowed to see because they are // TODO: recentEvents may contain events that this user is not allowed to see because they are
// no longer in the room. // no longer in the room.
lr := types.NewLeaveResponse() lr := types.NewLeaveResponse()
if prevPDUPos := recentStreamEvents[0].streamPosition - 1; prevPDUPos > 0 {
// Use the short form of batch token for prev_batch // Use the short form of batch token for prev_batch
lr.Timeline.PrevBatch = strconv.FormatInt(prevPDUPos, 10) lr.Timeline.PrevBatch = strconv.FormatInt(prevPDUPos, 10)
} else {
// Use the short form of batch token for prev_batch
lr.Timeline.PrevBatch = "1"
}
lr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync) lr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
lr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync) lr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
@ -758,15 +770,11 @@ func (d *SyncServerDatasource) getStateDeltas(
if membership := getMembershipFromEvent(&ev.Event, userID); membership != "" { if membership := getMembershipFromEvent(&ev.Event, userID); membership != "" {
if membership == membershipJoin { if membership == membershipJoin {
// send full room state down instead of a delta // send full room state down instead of a delta
var allState []gomatrixserverlib.Event var s []streamEvent
allState, err = d.roomstate.selectCurrentState(ctx, txn, roomID) s, err = d.currentStateStreamEventsForRoom(ctx, txn, roomID)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
s := make([]streamEvent, len(allState))
for i := 0; i < len(s); i++ {
s[i] = streamEvent{Event: allState[i], streamPosition: 0}
}
state[roomID] = s state[roomID] = s
continue // we'll add this room in when we do joined rooms continue // we'll add this room in when we do joined rooms
} }
@ -798,6 +806,79 @@ func (d *SyncServerDatasource) getStateDeltas(
return deltas, joinedRoomIDs, nil return deltas, joinedRoomIDs, nil
} }
// getStateDeltasForFullStateSync is a variant of getStateDeltas used for /sync
// requests with full_state=true.
// Fetches full state for all joined rooms and uses selectStateInRange to get
// updates for other rooms.
func (d *SyncServerDatasource) getStateDeltasForFullStateSync(
ctx context.Context, device *authtypes.Device, txn *sql.Tx,
fromPos, toPos int64, userID string,
) ([]stateDelta, []string, error) {
joinedRoomIDs, err := d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, "join")
if err != nil {
return nil, nil, err
}
// Use a reasonable initial capacity
deltas := make([]stateDelta, 0, len(joinedRoomIDs))
// Add full states for all joined rooms
for _, joinedRoomID := range joinedRoomIDs {
s, stateErr := d.currentStateStreamEventsForRoom(ctx, txn, joinedRoomID)
if stateErr != nil {
return nil, nil, stateErr
}
deltas = append(deltas, stateDelta{
membership: "join",
stateEvents: streamEventsToEvents(device, s),
roomID: joinedRoomID,
})
}
// Get all the state events ever between these two positions
stateNeeded, eventMap, err := d.events.selectStateInRange(ctx, txn, fromPos, toPos)
if err != nil {
return nil, nil, err
}
state, err := d.fetchStateEvents(ctx, txn, stateNeeded, eventMap)
if err != nil {
return nil, nil, err
}
for roomID, stateStreamEvents := range state {
for _, ev := range stateStreamEvents {
if membership := getMembershipFromEvent(&ev.Event, userID); membership != "" {
if membership != "join" { // We've already added full state for all joined rooms above.
deltas = append(deltas, stateDelta{
membership: membership,
membershipPos: ev.streamPosition,
stateEvents: streamEventsToEvents(device, stateStreamEvents),
roomID: roomID,
})
}
break
}
}
}
return deltas, joinedRoomIDs, nil
}
func (d *SyncServerDatasource) currentStateStreamEventsForRoom(
ctx context.Context, txn *sql.Tx, roomID string,
) ([]streamEvent, error) {
allState, err := d.roomstate.selectCurrentState(ctx, txn, roomID)
if err != nil {
return nil, err
}
s := make([]streamEvent, len(allState))
for i := 0; i < len(s); i++ {
s[i] = streamEvent{Event: allState[i], streamPosition: 0}
}
return s, nil
}
// streamEventsToEvents converts streamEvent to Event. If device is non-nil and // streamEventsToEvents converts streamEvent to Event. If device is non-nil and
// matches the streamevent.transactionID device then the transaction ID gets // matches the streamevent.transactionID device then the transaction ID gets
// added to the unsigned section of the output event. // added to the unsigned section of the output event.

View file

@ -65,8 +65,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype
currPos := rp.notifier.CurrentPosition() currPos := rp.notifier.CurrentPosition()
// If this is an initial sync or timeout=0 we return immediately if shouldReturnImmediately(syncReq) {
if syncReq.since == nil || syncReq.timeout == 0 {
syncData, err = rp.currentSyncForUser(*syncReq, currPos) syncData, err = rp.currentSyncForUser(*syncReq, currPos)
if err != nil { if err != nil {
return httputil.LogThenError(req, err) return httputil.LogThenError(req, err)
@ -135,7 +134,7 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.SyncP
if req.since == nil { if req.since == nil {
res, err = rp.db.CompleteSync(req.ctx, req.device.UserID, req.limit) res, err = rp.db.CompleteSync(req.ctx, req.device.UserID, req.limit)
} else { } else {
res, err = rp.db.IncrementalSync(req.ctx, req.device, *req.since, latestPos, req.limit) res, err = rp.db.IncrementalSync(req.ctx, req.device, *req.since, latestPos, req.limit, req.wantFullState)
} }
if err != nil { if err != nil {
@ -216,3 +215,10 @@ func (rp *RequestPool) appendAccountData(
return data, nil return data, nil
} }
// shouldReturnImmediately returns whether the /sync request is an initial sync,
// or timeout=0, or full_state=true, in any of the cases the request should
// return immediately.
func shouldReturnImmediately(syncReq *syncRequest) bool {
return syncReq.since == nil || syncReq.timeout == 0 || syncReq.wantFullState
}

View file

@ -158,3 +158,4 @@ Inbound federation rejects remote attempts to join local users to rooms
Inbound federation rejects remote attempts to kick local users to rooms Inbound federation rejects remote attempts to kick local users to rooms
An event which redacts itself should be ignored An event which redacts itself should be ignored
A pair of events which redact each other should be ignored A pair of events which redact each other should be ignored
Full state sync includes joined rooms