From b113217a6d4023aa2b0c468eb2be38dd493ad821 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Fri, 25 Mar 2022 12:38:16 +0000 Subject: [PATCH] Use most recent event in response to get latest stream position in incremental sync (#2302) * Use latest event position in response for advancing the stream position in an incremental sync * Create some calm * Use To in worst case * Don't waste CPU cycles on an empty response after all * Bug fixes * Fix another bug --- syncapi/streams/stream_pdu.go | 56 ++++++++++++++++++++++++++++------- 1 file changed, 45 insertions(+), 11 deletions(-) diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go index 1afcbe750..ccdac0864 100644 --- a/syncapi/streams/stream_pdu.go +++ b/syncapi/streams/stream_pdu.go @@ -147,7 +147,6 @@ func (p *PDUStreamProvider) IncrementalSync( To: to, Backwards: from > to, } - newPos = to var err error var stateDeltas []types.StateDelta @@ -172,14 +171,26 @@ func (p *PDUStreamProvider) IncrementalSync( req.Rooms[roomID] = gomatrixserverlib.Join } + if len(stateDeltas) == 0 { + return to + } + + newPos = from for _, delta := range stateDeltas { - if err = p.addRoomDeltaToResponse(ctx, req.Device, r, delta, &eventFilter, req.Response); err != nil { + var pos types.StreamPosition + if pos, err = p.addRoomDeltaToResponse(ctx, req.Device, r, delta, &eventFilter, req.Response); err != nil { req.Log.WithError(err).Error("d.addRoomDeltaToResponse failed") - return newPos + return to + } + switch { + case r.Backwards && pos < newPos: + fallthrough + case !r.Backwards && pos > newPos: + newPos = pos } } - return r.To + return newPos } func (p *PDUStreamProvider) addRoomDeltaToResponse( @@ -189,7 +200,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( delta types.StateDelta, eventFilter *gomatrixserverlib.RoomEventFilter, res *types.Response, -) error { +) (types.StreamPosition, error) { if delta.MembershipPos > 0 && delta.Membership == gomatrixserverlib.Leave { // make sure we don't leak recent events after the leave event. // TODO: History visibility makes this somewhat complex to handle correctly. For example: @@ -204,19 +215,42 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( eventFilter, true, true, ) if err != nil { - return err + return r.From, err } recentEvents := p.DB.StreamEventsToEvents(device, recentStreamEvents) delta.StateEvents = removeDuplicates(delta.StateEvents, recentEvents) // roll back prevBatch, err := p.DB.GetBackwardTopologyPos(ctx, recentStreamEvents) if err != nil { - return err + return r.From, err } - // XXX: should we ever get this far if we have no recent events or state in this room? - // in practice we do for peeks, but possibly not joins? + // If we didn't return any events at all then don't bother doing anything else. if len(recentEvents) == 0 && len(delta.StateEvents) == 0 { - return nil + return r.To, nil + } + + // Sort the events so that we can pick out the latest events from both sections. + recentEvents = gomatrixserverlib.HeaderedReverseTopologicalOrdering(recentEvents, gomatrixserverlib.TopologicalOrderByPrevEvents) + delta.StateEvents = gomatrixserverlib.HeaderedReverseTopologicalOrdering(delta.StateEvents, gomatrixserverlib.TopologicalOrderByAuthEvents) + + // Work out what the highest stream position is for all of the events in this + // room that were returned. + latestPosition := r.To + updateLatestPosition := func(mostRecentEventID string) { + if _, pos, err := p.DB.PositionInTopology(ctx, mostRecentEventID); err == nil { + switch { + case r.Backwards && pos > latestPosition: + fallthrough + case !r.Backwards && pos < latestPosition: + latestPosition = pos + } + } + } + if len(recentEvents) > 0 { + updateLatestPosition(recentEvents[len(recentEvents)-1].EventID()) + } + if len(delta.StateEvents) > 0 { + updateLatestPosition(delta.StateEvents[len(delta.StateEvents)-1].EventID()) } switch delta.Membership { @@ -250,7 +284,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( res.Rooms.Leave[delta.RoomID] = *lr } - return nil + return latestPosition, nil } func (p *PDUStreamProvider) getJoinResponseForCompleteSync(