From 98d3f88bfbfaa23aa4ca63abf9c980b39425cd24 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Thu, 3 Nov 2022 16:56:21 +0000 Subject: [PATCH] Move `prev_batch` calculation (#2856) This might help #2847. --- syncapi/storage/interface.go | 2 +- syncapi/storage/shared/storage_sync.go | 2 +- syncapi/streams/stream_pdu.go | 53 +++++++++++++------------- 3 files changed, 29 insertions(+), 28 deletions(-) diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index af4fce44e..97c2ced49 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -47,7 +47,7 @@ type DatabaseTransaction interface { MembershipCount(ctx context.Context, roomID, membership string, pos types.StreamPosition) (int, error) GetRoomHeroes(ctx context.Context, roomID, userID string, memberships []string) ([]string, error) RecentEvents(ctx context.Context, roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error) - GetBackwardTopologyPos(ctx context.Context, events []types.StreamEvent) (types.TopologyToken, error) + GetBackwardTopologyPos(ctx context.Context, events []*gomatrixserverlib.HeaderedEvent) (types.TopologyToken, error) PositionInTopology(ctx context.Context, eventID string) (pos types.StreamPosition, spos types.StreamPosition, err error) InviteEventsInRange(ctx context.Context, targetUserID string, r types.Range) (map[string]*gomatrixserverlib.HeaderedEvent, map[string]*gomatrixserverlib.HeaderedEvent, types.StreamPosition, error) PeeksInRange(ctx context.Context, userID, deviceID string, r types.Range) (peeks []types.Peek, err error) diff --git a/syncapi/storage/shared/storage_sync.go b/syncapi/storage/shared/storage_sync.go index 1f66ccc0e..c3763521c 100644 --- a/syncapi/storage/shared/storage_sync.go +++ b/syncapi/storage/shared/storage_sync.go @@ -259,7 +259,7 @@ func (d *DatabaseTransaction) StreamToTopologicalPosition( // oldest event in the room's topology. func (d *DatabaseTransaction) GetBackwardTopologyPos( ctx context.Context, - events []types.StreamEvent, + events []*gomatrixserverlib.HeaderedEvent, ) (types.TopologyToken, error) { zeroToken := types.TopologyToken{} if len(events) == 0 { diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go index 5b3560810..b21be6c5e 100644 --- a/syncapi/streams/stream_pdu.go +++ b/syncapi/streams/stream_pdu.go @@ -255,10 +255,6 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( snapshot.StreamEventsToEvents(device, recentStreamEvents), gomatrixserverlib.TopologicalOrderByPrevEvents, ) - prevBatch, err := snapshot.GetBackwardTopologyPos(ctx, recentStreamEvents) - if err != nil { - return r.From, fmt.Errorf("p.DB.GetBackwardTopologyPos: %w", err) - } // If we didn't return any events at all then don't bother doing anything else. if len(recentEvents) == 0 && len(delta.StateEvents) == 0 { @@ -312,6 +308,11 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( limited = true } + prevBatch, err := snapshot.GetBackwardTopologyPos(ctx, events) + if err != nil { + return r.From, fmt.Errorf("p.DB.GetBackwardTopologyPos: %w", err) + } + // Now that we've filtered the timeline, work out which state events are still // left. Anything that appears in the filtered timeline will be removed from the // "state" section and kept in "timeline". @@ -489,28 +490,6 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync( return } - // Retrieve the backward topology position, i.e. the position of the - // oldest event in the room's topology. - var prevBatch *types.TopologyToken - if len(recentStreamEvents) > 0 { - var backwardTopologyPos, backwardStreamPos types.StreamPosition - event := recentStreamEvents[0] - // If this is the beginning of the room, we can't go back further. We're going to return - // the TopologyToken from the last event instead. (Synapse returns the /sync next_Batch) - if event.Type() == gomatrixserverlib.MRoomCreate && event.StateKeyEquals("") { - event = recentStreamEvents[len(recentStreamEvents)-1] - } - backwardTopologyPos, backwardStreamPos, err = snapshot.PositionInTopology(ctx, event.EventID()) - if err != nil { - return - } - prevBatch = &types.TopologyToken{ - Depth: backwardTopologyPos, - PDUPosition: backwardStreamPos, - } - prevBatch.Decrement() - } - p.addRoomSummary(ctx, snapshot, jr, roomID, device.UserID, r.From) // We don't include a device here as we don't need to send down @@ -545,6 +524,28 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync( } } + // Retrieve the backward topology position, i.e. the position of the + // oldest event in the room's topology. + var prevBatch *types.TopologyToken + if len(events) > 0 { + var backwardTopologyPos, backwardStreamPos types.StreamPosition + event := events[0] + // If this is the beginning of the room, we can't go back further. We're going to return + // the TopologyToken from the last event instead. (Synapse returns the /sync next_Batch) + if event.Type() == gomatrixserverlib.MRoomCreate && event.StateKeyEquals("") { + event = events[len(events)-1] + } + backwardTopologyPos, backwardStreamPos, err = snapshot.PositionInTopology(ctx, event.EventID()) + if err != nil { + return + } + prevBatch = &types.TopologyToken{ + Depth: backwardTopologyPos, + PDUPosition: backwardStreamPos, + } + prevBatch.Decrement() + } + jr.Timeline.PrevBatch = prevBatch jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(events, gomatrixserverlib.FormatSync) // If we are limited by the filter AND the history visibility filter