Tweaks for incremental syncs

This commit is contained in:
Till Faelligen 2022-08-09 09:59:05 +02:00
parent 4f694e6b45
commit 7198b916b8
No known key found for this signature in database
GPG key ID: 3DF82D8AB9211D4E

View file

@ -284,12 +284,6 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
}
}
}
if len(recentEvents) > 0 {
updateLatestPosition(recentEvents[len(recentEvents)-1].EventID())
}
if len(delta.StateEvents) > 0 {
updateLatestPosition(delta.StateEvents[len(delta.StateEvents)-1].EventID())
}
if stateFilter.LazyLoadMembers {
delta.StateEvents, err = p.lazyLoadMembers(
@ -309,13 +303,21 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
}
}
// Applies the history visibility rules
events, err := applyHistoryVisibilityFilter(ctx, p.DB, p.rsAPI, delta.RoomID, device.UserID, eventFilter.Limit, recentEvents)
if err != nil {
logrus.WithError(err).Error("unable to apply history visibility filter")
}
if len(events) > 0 {
updateLatestPosition(events[len(events)-1].EventID())
}
if len(delta.StateEvents) > 0 {
updateLatestPosition(delta.StateEvents[len(delta.StateEvents)-1].EventID())
}
switch delta.Membership {
case gomatrixserverlib.Join:
// We need to make sure we always include the latest states events, if they are in the timeline
events, err := applyHistoryVisibilityFilter(ctx, p.DB, p.rsAPI, delta.RoomID, device.UserID, eventFilter.Limit, recentEvents)
if err != nil {
logrus.WithError(err).Error("unable to apply history visibility filter")
}
jr := types.NewJoinResponse()
if hasMembershipChange {
p.addRoomSummary(ctx, jr, delta.RoomID, device.UserID, latestPosition)
@ -331,6 +333,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
case gomatrixserverlib.Peek:
jr := types.NewJoinResponse()
jr.Timeline.PrevBatch = &prevBatch
// TODO: Apply history visibility on peeked rooms
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
jr.Timeline.Limited = limited
jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync)
@ -340,12 +343,12 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
fallthrough // transitions to leave are the same as ban
case gomatrixserverlib.Ban:
// TODO: recentEvents may contain events that this user is not allowed to see because they are
// no longer in the room.
lr := types.NewLeaveResponse()
lr.Timeline.PrevBatch = &prevBatch
lr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
lr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(events, gomatrixserverlib.FormatSync)
// If we are limited by the filter AND the history visibility filter
// didn't "remove" events, return that the response is limited.
lr.Timeline.Limited = limited && len(events) == len(recentEvents)
lr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Leave[delta.RoomID] = *lr
}
@ -385,7 +388,7 @@ func applyHistoryVisibilityFilter(
"duration": time.Since(startTime),
"room_id": roomID,
}).Debug("applied history visibility (sync)")
return events, err
return events, nil
}
func (p *PDUStreamProvider) addRoomSummary(ctx context.Context, jr *types.JoinResponse, roomID, userID string, latestPosition types.StreamPosition) {
@ -521,7 +524,9 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
jr.Timeline.PrevBatch = prevBatch
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(events, gomatrixserverlib.FormatSync)
jr.Timeline.Limited = limited
// If we are limited by the filter AND the history visibility filter
// didn't "remove" events, return that the response is limited.
jr.Timeline.Limited = limited && len(events) == len(recentEvents)
jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(stateEvents, gomatrixserverlib.FormatSync)
return jr, nil
}