Use most recent event in response to get latest stream position in incremental sync (#2302)

* Use latest event position in response for advancing the stream position in an incremental sync

* Create some calm

* Use To in worst case

* Don't waste CPU cycles on an empty response after all

* Bug fixes

* Fix another bug
This commit is contained in:
Neil Alexander 2022-03-25 12:38:16 +00:00 committed by GitHub
parent e6d4bdeed5
commit b113217a6d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -147,7 +147,6 @@ func (p *PDUStreamProvider) IncrementalSync(
To: to, To: to,
Backwards: from > to, Backwards: from > to,
} }
newPos = to
var err error var err error
var stateDeltas []types.StateDelta var stateDeltas []types.StateDelta
@ -172,14 +171,26 @@ func (p *PDUStreamProvider) IncrementalSync(
req.Rooms[roomID] = gomatrixserverlib.Join req.Rooms[roomID] = gomatrixserverlib.Join
} }
if len(stateDeltas) == 0 {
return to
}
newPos = from
for _, delta := range stateDeltas { for _, delta := range stateDeltas {
if err = p.addRoomDeltaToResponse(ctx, req.Device, r, delta, &eventFilter, req.Response); err != nil { var pos types.StreamPosition
if pos, err = p.addRoomDeltaToResponse(ctx, req.Device, r, delta, &eventFilter, req.Response); err != nil {
req.Log.WithError(err).Error("d.addRoomDeltaToResponse failed") req.Log.WithError(err).Error("d.addRoomDeltaToResponse failed")
return newPos return to
}
switch {
case r.Backwards && pos < newPos:
fallthrough
case !r.Backwards && pos > newPos:
newPos = pos
} }
} }
return r.To return newPos
} }
func (p *PDUStreamProvider) addRoomDeltaToResponse( func (p *PDUStreamProvider) addRoomDeltaToResponse(
@ -189,7 +200,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
delta types.StateDelta, delta types.StateDelta,
eventFilter *gomatrixserverlib.RoomEventFilter, eventFilter *gomatrixserverlib.RoomEventFilter,
res *types.Response, res *types.Response,
) error { ) (types.StreamPosition, error) {
if delta.MembershipPos > 0 && delta.Membership == gomatrixserverlib.Leave { if delta.MembershipPos > 0 && delta.Membership == gomatrixserverlib.Leave {
// make sure we don't leak recent events after the leave event. // make sure we don't leak recent events after the leave event.
// TODO: History visibility makes this somewhat complex to handle correctly. For example: // TODO: History visibility makes this somewhat complex to handle correctly. For example:
@ -204,19 +215,42 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
eventFilter, true, true, eventFilter, true, true,
) )
if err != nil { if err != nil {
return err return r.From, err
} }
recentEvents := p.DB.StreamEventsToEvents(device, recentStreamEvents) recentEvents := p.DB.StreamEventsToEvents(device, recentStreamEvents)
delta.StateEvents = removeDuplicates(delta.StateEvents, recentEvents) // roll back delta.StateEvents = removeDuplicates(delta.StateEvents, recentEvents) // roll back
prevBatch, err := p.DB.GetBackwardTopologyPos(ctx, recentStreamEvents) prevBatch, err := p.DB.GetBackwardTopologyPos(ctx, recentStreamEvents)
if err != nil { if err != nil {
return err return r.From, err
} }
// XXX: should we ever get this far if we have no recent events or state in this room? // If we didn't return any events at all then don't bother doing anything else.
// in practice we do for peeks, but possibly not joins?
if len(recentEvents) == 0 && len(delta.StateEvents) == 0 { if len(recentEvents) == 0 && len(delta.StateEvents) == 0 {
return nil return r.To, nil
}
// Sort the events so that we can pick out the latest events from both sections.
recentEvents = gomatrixserverlib.HeaderedReverseTopologicalOrdering(recentEvents, gomatrixserverlib.TopologicalOrderByPrevEvents)
delta.StateEvents = gomatrixserverlib.HeaderedReverseTopologicalOrdering(delta.StateEvents, gomatrixserverlib.TopologicalOrderByAuthEvents)
// Work out what the highest stream position is for all of the events in this
// room that were returned.
latestPosition := r.To
updateLatestPosition := func(mostRecentEventID string) {
if _, pos, err := p.DB.PositionInTopology(ctx, mostRecentEventID); err == nil {
switch {
case r.Backwards && pos > latestPosition:
fallthrough
case !r.Backwards && pos < latestPosition:
latestPosition = pos
}
}
}
if len(recentEvents) > 0 {
updateLatestPosition(recentEvents[len(recentEvents)-1].EventID())
}
if len(delta.StateEvents) > 0 {
updateLatestPosition(delta.StateEvents[len(delta.StateEvents)-1].EventID())
} }
switch delta.Membership { switch delta.Membership {
@ -250,7 +284,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
res.Rooms.Leave[delta.RoomID] = *lr res.Rooms.Leave[delta.RoomID] = *lr
} }
return nil return latestPosition, nil
} }
func (p *PDUStreamProvider) getJoinResponseForCompleteSync( func (p *PDUStreamProvider) getJoinResponseForCompleteSync(