From 57e3622b85fd4d80d9826404135f09e91ed47973 Mon Sep 17 00:00:00 2001 From: Till <2353100+S7evinK@users.noreply.github.com> Date: Tue, 19 Apr 2022 10:46:45 +0200 Subject: [PATCH] Implement lazy loading on `/sync` (#2346) * Initial work on lazyloading * Partially implement lazy loading on /sync * Rename methods * Make missing tests pass * Preallocate slice, even if it will end up with fewer values * Let the cache handle the user mapping * Linter * Cap cache growth --- internal/caching/cache_lazy_load_members.go | 86 +++++++++++++++++ syncapi/streams/stream_pdu.go | 100 +++++++++++++++++++- syncapi/streams/streams.go | 4 +- syncapi/syncapi.go | 6 +- sytest-whitelist | 11 ++- 5 files changed, 200 insertions(+), 7 deletions(-) create mode 100644 internal/caching/cache_lazy_load_members.go diff --git a/internal/caching/cache_lazy_load_members.go b/internal/caching/cache_lazy_load_members.go new file mode 100644 index 000000000..71a317624 --- /dev/null +++ b/internal/caching/cache_lazy_load_members.go @@ -0,0 +1,86 @@ +package caching + +import ( + "fmt" + "time" + + userapi "github.com/matrix-org/dendrite/userapi/api" +) + +const ( + LazyLoadCacheName = "lazy_load_members" + LazyLoadCacheMaxEntries = 128 + LazyLoadCacheMaxUserEntries = 128 + LazyLoadCacheMutable = true + LazyLoadCacheMaxAge = time.Minute * 30 +) + +type LazyLoadCache struct { + // InMemoryLRUCachePartition containing other InMemoryLRUCachePartitions + // with the actual cached members + userCaches *InMemoryLRUCachePartition +} + +// NewLazyLoadCache creates a new LazyLoadCache. +func NewLazyLoadCache() (*LazyLoadCache, error) { + cache, err := NewInMemoryLRUCachePartition( + LazyLoadCacheName, + LazyLoadCacheMutable, + LazyLoadCacheMaxEntries, + LazyLoadCacheMaxAge, + true, + ) + if err != nil { + return nil, err + } + go cacheCleaner(cache) + return &LazyLoadCache{ + userCaches: cache, + }, nil +} + +func (c *LazyLoadCache) lazyLoadCacheForUser(device *userapi.Device) (*InMemoryLRUCachePartition, error) { + cacheName := fmt.Sprintf("%s/%s", device.UserID, device.ID) + userCache, ok := c.userCaches.Get(cacheName) + if ok && userCache != nil { + if cache, ok := userCache.(*InMemoryLRUCachePartition); ok { + return cache, nil + } + } + cache, err := NewInMemoryLRUCachePartition( + LazyLoadCacheName, + LazyLoadCacheMutable, + LazyLoadCacheMaxUserEntries, + LazyLoadCacheMaxAge, + false, + ) + if err != nil { + return nil, err + } + c.userCaches.Set(cacheName, cache) + go cacheCleaner(cache) + return cache, nil +} + +func (c *LazyLoadCache) StoreLazyLoadedUser(device *userapi.Device, roomID, userID, eventID string) { + cache, err := c.lazyLoadCacheForUser(device) + if err != nil { + return + } + cacheKey := fmt.Sprintf("%s/%s/%s/%s", device.UserID, device.ID, roomID, userID) + cache.Set(cacheKey, eventID) +} + +func (c *LazyLoadCache) IsLazyLoadedUserCached(device *userapi.Device, roomID, userID string) (string, bool) { + cache, err := c.lazyLoadCacheForUser(device) + if err != nil { + return "", false + } + + cacheKey := fmt.Sprintf("%s/%s/%s/%s", device.UserID, device.ID, roomID, userID) + val, ok := cache.Get(cacheKey) + if !ok { + return "", ok + } + return val.(string), ok +} diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go index bcaf6ca31..ddc2f55c2 100644 --- a/syncapi/streams/stream_pdu.go +++ b/syncapi/streams/stream_pdu.go @@ -6,6 +6,7 @@ import ( "sync" "time" + "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/syncapi/types" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" @@ -26,7 +27,8 @@ type PDUStreamProvider struct { tasks chan func() workers atomic.Int32 - userAPI userapi.UserInternalAPI + // userID+deviceID -> lazy loading cache + lazyLoadCache *caching.LazyLoadCache } func (p *PDUStreamProvider) worker() { @@ -188,7 +190,7 @@ func (p *PDUStreamProvider) IncrementalSync( newPos = from for _, delta := range stateDeltas { var pos types.StreamPosition - if pos, err = p.addRoomDeltaToResponse(ctx, req.Device, r, delta, &eventFilter, req.Response); err != nil { + if pos, err = p.addRoomDeltaToResponse(ctx, req.Device, r, delta, &eventFilter, &stateFilter, req.Response); err != nil { req.Log.WithError(err).Error("d.addRoomDeltaToResponse failed") return to } @@ -209,6 +211,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( r types.Range, delta types.StateDelta, eventFilter *gomatrixserverlib.RoomEventFilter, + stateFilter *gomatrixserverlib.StateFilter, res *types.Response, ) (types.StreamPosition, error) { if delta.MembershipPos > 0 && delta.Membership == gomatrixserverlib.Leave { @@ -247,7 +250,8 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( // room that were returned. latestPosition := r.To updateLatestPosition := func(mostRecentEventID string) { - if _, pos, err := p.DB.PositionInTopology(ctx, mostRecentEventID); err == nil { + var pos types.StreamPosition + if _, pos, err = p.DB.PositionInTopology(ctx, mostRecentEventID); err == nil { switch { case r.Backwards && pos > latestPosition: fallthrough @@ -263,6 +267,19 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( updateLatestPosition(delta.StateEvents[len(delta.StateEvents)-1].EventID()) } + if stateFilter.LazyLoadMembers { + if err != nil { + return r.From, err + } + delta.StateEvents, err = p.lazyLoadMembers( + ctx, delta.RoomID, true, limited, stateFilter.IncludeRedundantMembers, + device, recentEvents, delta.StateEvents, + ) + if err != nil { + return r.From, err + } + } + hasMembershipChange := false for _, recentEvent := range recentStreamEvents { if recentEvent.Type() == gomatrixserverlib.MRoomMember && recentEvent.StateKey() != nil { @@ -402,6 +419,20 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync( // "Can sync a room with a message with a transaction id" - which does a complete sync to check. recentEvents := p.DB.StreamEventsToEvents(device, recentStreamEvents) stateEvents = removeDuplicates(stateEvents, recentEvents) + + if stateFilter.LazyLoadMembers { + if err != nil { + return nil, err + } + stateEvents, err = p.lazyLoadMembers(ctx, roomID, + false, limited, stateFilter.IncludeRedundantMembers, + device, recentEvents, stateEvents, + ) + if err != nil { + return nil, err + } + } + jr = types.NewJoinResponse() jr.Summary.JoinedMemberCount = &joinedCount jr.Summary.InvitedMemberCount = &invitedCount @@ -412,6 +443,69 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync( return jr, nil } +func (p *PDUStreamProvider) lazyLoadMembers( + ctx context.Context, roomID string, + incremental, limited, includeRedundant bool, + device *userapi.Device, + timelineEvents, stateEvents []*gomatrixserverlib.HeaderedEvent, +) ([]*gomatrixserverlib.HeaderedEvent, error) { + if len(timelineEvents) == 0 { + return stateEvents, nil + } + // Work out which memberships to include + timelineUsers := make(map[string]struct{}) + if !incremental { + timelineUsers[device.UserID] = struct{}{} + } + // Add all users the client doesn't know about yet to a list + for _, event := range timelineEvents { + // Membership is not yet cached, add it to the list + if _, ok := p.lazyLoadCache.IsLazyLoadedUserCached(device, roomID, event.Sender()); !ok { + timelineUsers[event.Sender()] = struct{}{} + } + } + // Preallocate with the same amount, even if it will end up with fewer values + newStateEvents := make([]*gomatrixserverlib.HeaderedEvent, 0, len(stateEvents)) + // Remove existing membership events we don't care about, e.g. users not in the timeline.events + for _, event := range stateEvents { + if event.Type() == gomatrixserverlib.MRoomMember && event.StateKey() != nil { + // If this is a gapped incremental sync, we still want this membership + isGappedIncremental := limited && incremental + // We want this users membership event, keep it in the list + _, ok := timelineUsers[event.Sender()] + wantMembership := ok || isGappedIncremental + if wantMembership { + newStateEvents = append(newStateEvents, event) + if !includeRedundant { + p.lazyLoadCache.StoreLazyLoadedUser(device, roomID, event.Sender(), event.EventID()) + } + delete(timelineUsers, event.Sender()) + } + } else { + newStateEvents = append(newStateEvents, event) + } + } + wantUsers := make([]string, 0, len(timelineUsers)) + for userID := range timelineUsers { + wantUsers = append(wantUsers, userID) + } + // Query missing membership events + memberships, err := p.DB.GetStateEventsForRoom(ctx, roomID, &gomatrixserverlib.StateFilter{ + Limit: 100, + Senders: &wantUsers, + Types: &[]string{gomatrixserverlib.MRoomMember}, + }) + if err != nil { + return stateEvents, err + } + // cache the membership events + for _, membership := range memberships { + p.lazyLoadCache.StoreLazyLoadedUser(device, roomID, membership.Sender(), membership.EventID()) + } + stateEvents = append(newStateEvents, memberships...) + return stateEvents, nil +} + // addIgnoredUsersToFilter adds ignored users to the eventfilter and // the syncreq itself for further use in streams. func (p *PDUStreamProvider) addIgnoredUsersToFilter(ctx context.Context, req *types.SyncRequest, eventFilter *gomatrixserverlib.RoomEventFilter) error { diff --git a/syncapi/streams/streams.go b/syncapi/streams/streams.go index c7d06a296..d3195b78f 100644 --- a/syncapi/streams/streams.go +++ b/syncapi/streams/streams.go @@ -27,12 +27,12 @@ type Streams struct { func NewSyncStreamProviders( d storage.Database, userAPI userapi.UserInternalAPI, rsAPI rsapi.RoomserverInternalAPI, keyAPI keyapi.KeyInternalAPI, - eduCache *caching.EDUCache, notifier *notifier.Notifier, + eduCache *caching.EDUCache, lazyLoadCache *caching.LazyLoadCache, notifier *notifier.Notifier, ) *Streams { streams := &Streams{ PDUStreamProvider: &PDUStreamProvider{ StreamProvider: StreamProvider{DB: d}, - userAPI: userAPI, + lazyLoadCache: lazyLoadCache, }, TypingStreamProvider: &TypingStreamProvider{ StreamProvider: StreamProvider{DB: d}, diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index 384121a8a..2f9165d91 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -57,8 +57,12 @@ func AddPublicRoutes( } eduCache := caching.NewTypingCache() + lazyLoadCache, err := caching.NewLazyLoadCache() + if err != nil { + logrus.WithError(err).Panicf("failed to create lazy loading cache") + } notifier := notifier.NewNotifier() - streams := streams.NewSyncStreamProviders(syncDB, userAPI, rsAPI, keyAPI, eduCache, notifier) + streams := streams.NewSyncStreamProviders(syncDB, userAPI, rsAPI, keyAPI, eduCache, lazyLoadCache, notifier) notifier.SetCurrentPosition(streams.Latest(context.Background())) if err = notifier.Load(context.Background(), syncDB); err != nil { logrus.WithError(err).Panicf("failed to load notifier ") diff --git a/sytest-whitelist b/sytest-whitelist index f63b96f52..c8dedd59c 100644 --- a/sytest-whitelist +++ b/sytest-whitelist @@ -699,4 +699,13 @@ Ignore invite in full sync Ignore invite in incremental sync A filtered timeline reaches its limit A change to displayname should not result in a full state sync -Can fetch images in room \ No newline at end of file +Can fetch images in room +The only membership state included in an initial sync is for all the senders in the timeline +The only membership state included in an incremental sync is for senders in the timeline +Old members are included in gappy incr LL sync if they start speaking +We do send redundant membership state across incremental syncs if asked +Rejecting invite over federation doesn't break incremental /sync +Gapped incremental syncs include all state changes +Old leaves are present in gapped incremental syncs +Leaves are present in non-gapped incremental syncs +Members from the gap are included in gappy incr LL sync \ No newline at end of file