From 1e57c01837476095f572d09833f8eee8249f37ac Mon Sep 17 00:00:00 2001 From: Till Faelligen Date: Tue, 12 Apr 2022 16:35:11 +0200 Subject: [PATCH] Make missing tests pass --- internal/caching/cache_lazy_load_members.go | 13 ++++-- syncapi/streams/stream_pdu.go | 51 ++++++++++++--------- sytest-whitelist | 6 ++- 3 files changed, 43 insertions(+), 27 deletions(-) diff --git a/internal/caching/cache_lazy_load_members.go b/internal/caching/cache_lazy_load_members.go index a4fda99ef..f0614f09f 100644 --- a/internal/caching/cache_lazy_load_members.go +++ b/internal/caching/cache_lazy_load_members.go @@ -32,13 +32,16 @@ func NewLazyLoadCache() (*LazyLoadCache, error) { return &LazyLoadCache{cache}, err } -func (c *LazyLoadCache) StoreLazyLoadedUser(reqUser, deviceID, roomID, userID string) { +func (c *LazyLoadCache) StoreLazyLoadedUser(reqUser, deviceID, roomID, userID, eventID string) { cacheKey := fmt.Sprintf("%s/%s/%s/%s", reqUser, deviceID, roomID, userID) - c.Set(cacheKey, true) + c.Set(cacheKey, eventID) } -func (c *LazyLoadCache) IsLazyLoadedUserCached(reqUser, deviceID, roomID, userID string) bool { +func (c *LazyLoadCache) IsLazyLoadedUserCached(reqUser, deviceID, roomID, userID string) (string, bool) { cacheKey := fmt.Sprintf("%s/%s/%s/%s", reqUser, deviceID, roomID, userID) - _, ok := c.Get(cacheKey) - return ok + val, ok := c.Get(cacheKey) + if !ok { + return "", ok + } + return val.(string), ok } diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go index b345e4f2c..ced796f6e 100644 --- a/syncapi/streams/stream_pdu.go +++ b/syncapi/streams/stream_pdu.go @@ -271,7 +271,11 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( if err != nil { return r.From, err } - delta.StateEvents, err = p.lazyLoadMembers(ctx, delta.RoomID, true, stateFilter, device, recentEvents, cache, delta.StateEvents) + delta.StateEvents, err = p.lazyLoadMembers( + ctx, delta.RoomID, true, limited, stateFilter.IncludeRedundantMembers, + device, cache, + recentEvents, delta.StateEvents, + ) if err != nil { return r.From, err } @@ -422,7 +426,11 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync( if err != nil { return nil, err } - stateEvents, err = p.lazyLoadMembers(ctx, roomID, false, stateFilter, device, recentEvents, cache, stateEvents) + stateEvents, err = p.lazyLoadMembers(ctx, roomID, + false, limited, stateFilter.IncludeRedundantMembers, + device, cache, + recentEvents, stateEvents, + ) if err != nil { return nil, err } @@ -439,39 +447,40 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync( } func (p *PDUStreamProvider) lazyLoadMembers( - ctx context.Context, roomID string, isIncremental bool, - stateFilter *gomatrixserverlib.StateFilter, device *userapi.Device, - timelineEvents []*gomatrixserverlib.HeaderedEvent, cache *caching.LazyLoadCache, - stateEvents []*gomatrixserverlib.HeaderedEvent, + ctx context.Context, roomID string, + incremental, limited, includeRedundant bool, + device *userapi.Device, + cache *caching.LazyLoadCache, + timelineEvents, stateEvents []*gomatrixserverlib.HeaderedEvent, ) ([]*gomatrixserverlib.HeaderedEvent, error) { if len(timelineEvents) == 0 { return stateEvents, nil } - // Work out if we need to include membership events + // Work out which memberships to include timelineUsers := make(map[string]struct{}) - - if !isIncremental { + if !incremental { timelineUsers[device.UserID] = struct{}{} } - // add all users the client doesn't know about yet to a list + // Add all users the client doesn't know about yet to a list for _, event := range timelineEvents { - if event.Type() == gomatrixserverlib.MRoomMember && event.StateKey() != nil { - continue - } - // membership is not yet cached, add it to the list - if !cache.IsLazyLoadedUserCached(device.UserID, device.ID, roomID, event.Sender()) { + // Membership is not yet cached, add it to the list + if _, ok := cache.IsLazyLoadedUserCached(device.UserID, device.ID, roomID, event.Sender()); !ok { timelineUsers[event.Sender()] = struct{}{} } } - // remove existing membership events we don't care about, e.g. users not in the timeline.events + // Remove existing membership events we don't care about, e.g. users not in the timeline.events newStateEvents := []*gomatrixserverlib.HeaderedEvent{} for _, event := range stateEvents { if event.Type() == gomatrixserverlib.MRoomMember && event.StateKey() != nil { + // If this is a gapped incremental sync, we still want this membership + isGappedIncremental := limited && incremental // We want this users membership event, keep it in the list - if _, ok := timelineUsers[event.Sender()]; ok { + _, ok := timelineUsers[event.Sender()] + wantMembership := ok || isGappedIncremental + if wantMembership { newStateEvents = append(newStateEvents, event) - if !stateFilter.IncludeRedundantMembers { - cache.StoreLazyLoadedUser(device.UserID, device.ID, roomID, event.Sender()) + if !includeRedundant { + cache.StoreLazyLoadedUser(device.UserID, device.ID, roomID, event.Sender(), event.EventID()) } delete(timelineUsers, event.Sender()) } @@ -479,7 +488,6 @@ func (p *PDUStreamProvider) lazyLoadMembers( newStateEvents = append(newStateEvents, event) } } - wantUsers := make([]string, 0, len(timelineUsers)) for userID := range timelineUsers { wantUsers = append(wantUsers, userID) @@ -495,12 +503,13 @@ func (p *PDUStreamProvider) lazyLoadMembers( } // cache the membership events for _, membership := range memberships { - cache.StoreLazyLoadedUser(device.UserID, device.ID, roomID, membership.Sender()) + cache.StoreLazyLoadedUser(device.UserID, device.ID, roomID, membership.Sender(), membership.EventID()) } stateEvents = append(newStateEvents, memberships...) return stateEvents, nil } +// getLazyLoadCache gets/creates a lazy load cache for a given device. func (p *PDUStreamProvider) getLazyLoadCache(device *userapi.Device) (*caching.LazyLoadCache, error) { var err error cacheKey := device.UserID + device.ID diff --git a/sytest-whitelist b/sytest-whitelist index 906ac30e8..6f573e440 100644 --- a/sytest-whitelist +++ b/sytest-whitelist @@ -703,4 +703,8 @@ The only membership state included in an initial sync is for all the senders in The only membership state included in an incremental sync is for senders in the timeline Old members are included in gappy incr LL sync if they start speaking We do send redundant membership state across incremental syncs if asked -Rejecting invite over federation doesn't break incremental /sync \ No newline at end of file +Rejecting invite over federation doesn't break incremental /sync +Gapped incremental syncs include all state changes +Old leaves are present in gapped incremental syncs +Leaves are present in non-gapped incremental syncs +Members from the gap are included in gappy incr LL sync \ No newline at end of file