From da54821c9f7a072d052e4b9046d1b81e569d953a Mon Sep 17 00:00:00 2001 From: Joakim Recht Date: Tue, 23 Jan 2024 10:51:20 +0100 Subject: [PATCH] handle newly joined rooms better by using a different range/filter for those --- syncapi/streams/stream_pdu.go | 90 ++++++++++++++++++++++++++--------- 1 file changed, 67 insertions(+), 23 deletions(-) diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go index 303148ba2..a1d9a4ea9 100644 --- a/syncapi/streams/stream_pdu.go +++ b/syncapi/streams/stream_pdu.go @@ -203,20 +203,14 @@ func (p *PDUStreamProvider) IncrementalSync( req.Log.WithError(err).Error("unable to update event filter with ignored users") } - roomIDs := make([]string, len(stateDeltas)) - for i := range stateDeltas { - roomIDs[i] = stateDeltas[i].RoomID - } - dbEvents, err := snapshot.RecentEvents( - ctx, roomIDs, r, - &eventFilter, true, true, - ) + dbEvents, err := p.getRecentEvents(ctx, stateDeltas, r, eventFilter, &stateFilter, req, snapshot) if err != nil { - if err == sql.ErrNoRows { - return r.To - } + req.Log.WithError(err).Error("unable to get recent events") return r.From } + if len(dbEvents) == 0 { + return r.To + } newPos = from for _, delta := range stateDeltas { @@ -255,6 +249,66 @@ func (p *PDUStreamProvider) IncrementalSync( return newPos } +func (p *PDUStreamProvider) getRecentEvents(ctx context.Context, stateDeltas []types.StateDelta, r types.Range, eventFilter synctypes.RoomEventFilter, stateFilter *synctypes.StateFilter, req *types.SyncRequest, snapshot storage.DatabaseTransaction) (map[string]types.RecentEvents, error) { + var roomIDs []string + var newlyJoinedRoomIDs []string + for _, delta := range stateDeltas { + if delta.NewlyJoined { + newlyJoinedRoomIDs = append(newlyJoinedRoomIDs, delta.RoomID) + } else { + roomIDs = append(roomIDs, delta.RoomID) + } + } + dbEvents := make(map[string]types.RecentEvents) + if len(roomIDs) > 0 { + events, err := snapshot.RecentEvents( + ctx, roomIDs, r, + &eventFilter, true, true, + ) + if err != nil { + if err != sql.ErrNoRows { + return nil, err + } + } + for k, v := range events { + dbEvents[k] = v + } + } + if len(newlyJoinedRoomIDs) > 0 { + // For rooms that were joined in this sync, try to fetch + // as much timeline events as allowed by the filter. + + filter := eventFilter + // If we're going backwards, grep at least X events, this is mostly to satisfy Sytest + if eventFilter.Limit < recentEventBackwardsLimit { + filter.Limit = recentEventBackwardsLimit // TODO: Figure out a better way + diff := r.From - r.To + if diff > 0 && diff < recentEventBackwardsLimit { + filter.Limit = int(diff) + } + } + + events, err := snapshot.RecentEvents( + ctx, newlyJoinedRoomIDs, types.Range{ + From: r.To, + To: 0, + Backwards: true, + }, + &filter, true, true, + ) + if err != nil { + if err != sql.ErrNoRows { + return nil, err + } + } + for k, v := range events { + dbEvents[k] = v + } + } + + return dbEvents, nil +} + // Limit the recent events to X when going backwards const recentEventBackwardsLimit = 100 @@ -271,16 +325,6 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( dbEvents map[string]types.RecentEvents, ) (types.StreamPosition, error) { var err error - originalLimit := eventFilter.Limit - // If we're going backwards, grep at least X events, this is mostly to satisfy Sytest - if r.Backwards && originalLimit < recentEventBackwardsLimit { - eventFilter.Limit = recentEventBackwardsLimit // TODO: Figure out a better way - diff := r.From - r.To - if diff > 0 && diff < recentEventBackwardsLimit { - eventFilter.Limit = int(diff) - } - } - recentStreamEvents := dbEvents[delta.RoomID].Events limited := dbEvents[delta.RoomID].Limited @@ -342,9 +386,9 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( logrus.WithError(err).Error("unable to apply history visibility filter") } - if r.Backwards && len(events) > originalLimit { + if r.Backwards && len(events) > eventFilter.Limit { // We're going backwards and the events are ordered chronologically, so take the last `limit` events - events = events[len(events)-originalLimit:] + events = events[len(events)-eventFilter.Limit:] limited = true }