handle newly joined rooms better by using a different range/filter for those

This commit is contained in:
Joakim Recht 2024-01-23 10:51:20 +01:00
parent 646b4c1bf5
commit da54821c9f

View file

@ -203,20 +203,14 @@ func (p *PDUStreamProvider) IncrementalSync(
req.Log.WithError(err).Error("unable to update event filter with ignored users")
}
roomIDs := make([]string, len(stateDeltas))
for i := range stateDeltas {
roomIDs[i] = stateDeltas[i].RoomID
}
dbEvents, err := snapshot.RecentEvents(
ctx, roomIDs, r,
&eventFilter, true, true,
)
dbEvents, err := p.getRecentEvents(ctx, stateDeltas, r, eventFilter, &stateFilter, req, snapshot)
if err != nil {
if err == sql.ErrNoRows {
return r.To
}
req.Log.WithError(err).Error("unable to get recent events")
return r.From
}
if len(dbEvents) == 0 {
return r.To
}
newPos = from
for _, delta := range stateDeltas {
@ -255,6 +249,66 @@ func (p *PDUStreamProvider) IncrementalSync(
return newPos
}
func (p *PDUStreamProvider) getRecentEvents(ctx context.Context, stateDeltas []types.StateDelta, r types.Range, eventFilter synctypes.RoomEventFilter, stateFilter *synctypes.StateFilter, req *types.SyncRequest, snapshot storage.DatabaseTransaction) (map[string]types.RecentEvents, error) {
var roomIDs []string
var newlyJoinedRoomIDs []string
for _, delta := range stateDeltas {
if delta.NewlyJoined {
newlyJoinedRoomIDs = append(newlyJoinedRoomIDs, delta.RoomID)
} else {
roomIDs = append(roomIDs, delta.RoomID)
}
}
dbEvents := make(map[string]types.RecentEvents)
if len(roomIDs) > 0 {
events, err := snapshot.RecentEvents(
ctx, roomIDs, r,
&eventFilter, true, true,
)
if err != nil {
if err != sql.ErrNoRows {
return nil, err
}
}
for k, v := range events {
dbEvents[k] = v
}
}
if len(newlyJoinedRoomIDs) > 0 {
// For rooms that were joined in this sync, try to fetch
// as much timeline events as allowed by the filter.
filter := eventFilter
// If we're going backwards, grep at least X events, this is mostly to satisfy Sytest
if eventFilter.Limit < recentEventBackwardsLimit {
filter.Limit = recentEventBackwardsLimit // TODO: Figure out a better way
diff := r.From - r.To
if diff > 0 && diff < recentEventBackwardsLimit {
filter.Limit = int(diff)
}
}
events, err := snapshot.RecentEvents(
ctx, newlyJoinedRoomIDs, types.Range{
From: r.To,
To: 0,
Backwards: true,
},
&filter, true, true,
)
if err != nil {
if err != sql.ErrNoRows {
return nil, err
}
}
for k, v := range events {
dbEvents[k] = v
}
}
return dbEvents, nil
}
// Limit the recent events to X when going backwards
const recentEventBackwardsLimit = 100
@ -271,16 +325,6 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
dbEvents map[string]types.RecentEvents,
) (types.StreamPosition, error) {
var err error
originalLimit := eventFilter.Limit
// If we're going backwards, grep at least X events, this is mostly to satisfy Sytest
if r.Backwards && originalLimit < recentEventBackwardsLimit {
eventFilter.Limit = recentEventBackwardsLimit // TODO: Figure out a better way
diff := r.From - r.To
if diff > 0 && diff < recentEventBackwardsLimit {
eventFilter.Limit = int(diff)
}
}
recentStreamEvents := dbEvents[delta.RoomID].Events
limited := dbEvents[delta.RoomID].Limited
@ -342,9 +386,9 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
logrus.WithError(err).Error("unable to apply history visibility filter")
}
if r.Backwards && len(events) > originalLimit {
if r.Backwards && len(events) > eventFilter.Limit {
// We're going backwards and the events are ordered chronologically, so take the last `limit` events
events = events[len(events)-originalLimit:]
events = events[len(events)-eventFilter.Limit:]
limited = true
}