diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go index 1afcbe750..bd9ec042f 100644 --- a/syncapi/streams/stream_pdu.go +++ b/syncapi/streams/stream_pdu.go @@ -147,7 +147,7 @@ func (p *PDUStreamProvider) IncrementalSync( To: to, Backwards: from > to, } - newPos = to + newPos = from var err error var stateDeltas []types.StateDelta @@ -173,13 +173,17 @@ func (p *PDUStreamProvider) IncrementalSync( } for _, delta := range stateDeltas { - if err = p.addRoomDeltaToResponse(ctx, req.Device, r, delta, &eventFilter, req.Response); err != nil { + var pos types.StreamPosition + if pos, err = p.addRoomDeltaToResponse(ctx, req.Device, r, delta, &eventFilter, req.Response); err != nil { req.Log.WithError(err).Error("d.addRoomDeltaToResponse failed") - return newPos + return to + } + if pos > newPos { + newPos = pos } } - return r.To + return newPos } func (p *PDUStreamProvider) addRoomDeltaToResponse( @@ -189,7 +193,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( delta types.StateDelta, eventFilter *gomatrixserverlib.RoomEventFilter, res *types.Response, -) error { +) (types.StreamPosition, error) { if delta.MembershipPos > 0 && delta.Membership == gomatrixserverlib.Leave { // make sure we don't leak recent events after the leave event. // TODO: History visibility makes this somewhat complex to handle correctly. For example: @@ -204,19 +208,42 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( eventFilter, true, true, ) if err != nil { - return err + return r.From, err } recentEvents := p.DB.StreamEventsToEvents(device, recentStreamEvents) delta.StateEvents = removeDuplicates(delta.StateEvents, recentEvents) // roll back prevBatch, err := p.DB.GetBackwardTopologyPos(ctx, recentStreamEvents) if err != nil { - return err + return r.From, err } - // XXX: should we ever get this far if we have no recent events or state in this room? - // in practice we do for peeks, but possibly not joins? + // If we didn't return any events at all then don't bother advancing the stream position. if len(recentEvents) == 0 && len(delta.StateEvents) == 0 { - return nil + return r.From, nil + } + + // Sort the events so that we can pick out the latest events from both sections. + recentEvents = gomatrixserverlib.HeaderedReverseTopologicalOrdering(recentEvents, gomatrixserverlib.TopologicalOrderByPrevEvents) + delta.StateEvents = gomatrixserverlib.HeaderedReverseTopologicalOrdering(delta.StateEvents, gomatrixserverlib.TopologicalOrderByAuthEvents) + + // Work out what the highest stream position is for all of the events in this + // room that were returned. + latestPosition := r.From + updateLatestPosition := func(mostRecentEventID string) { + if _, pos, err := p.DB.PositionInTopology(ctx, mostRecentEventID); err == nil { + switch { + case r.Backwards && pos < latestPosition: + fallthrough + case !r.Backwards && pos > latestPosition: + latestPosition = pos + } + } + } + if len(recentEvents) > 0 { + updateLatestPosition(recentEvents[len(recentEvents)-1].EventID()) + } + if len(delta.StateEvents) > 0 { + updateLatestPosition(delta.StateEvents[len(delta.StateEvents)-1].EventID()) } switch delta.Membership { @@ -250,7 +277,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( res.Rooms.Leave[delta.RoomID] = *lr } - return nil + return latestPosition, nil } func (p *PDUStreamProvider) getJoinResponseForCompleteSync(