diff --git a/internal/caching/cache_lazy_load_members.go b/internal/caching/cache_lazy_load_members.go new file mode 100644 index 000000000..198983767 --- /dev/null +++ b/internal/caching/cache_lazy_load_members.go @@ -0,0 +1,49 @@ +package caching + +import ( + "fmt" + "time" + + "github.com/matrix-org/gomatrixserverlib" +) + +const ( + LazyLoadCacheName = "lazy_load_members" + LazyLoadCacheMaxEntries = 128 + LazyLoadCacheMutable = true + LazyLoadCacheMaxAge = time.Minute * 30 +) + +type LazyLoadCache struct { + *InMemoryLRUCachePartition +} + +// NewLazyLoadCache creates a new InMemoryLRUCachePartition. +func NewLazyLoadCache() (*LazyLoadCache, error) { + cache, err := NewInMemoryLRUCachePartition( + LazyLoadCacheName, + LazyLoadCacheMutable, + LazyLoadCacheMaxEntries, + LazyLoadCacheMaxAge, + false, + ) + if err != nil { + return nil, err + } + go cacheCleaner(cache) + return &LazyLoadCache{cache}, err +} + +func (c *LazyLoadCache) StoreLazyLoadedMembers(reqUser, deviceID, roomID, userID string, event *gomatrixserverlib.HeaderedEvent) { + cacheKey := fmt.Sprintf("%s/%s/%s/%s", reqUser, deviceID, roomID, userID) + c.Set(cacheKey, event) +} + +func (c *LazyLoadCache) GetLazyLoadedMembers(reqUser, deviceID, roomID, userID string) (*gomatrixserverlib.HeaderedEvent, bool) { + cacheKey := fmt.Sprintf("%s/%s/%s/%s", reqUser, deviceID, roomID, userID) + val, ok := c.Get(cacheKey) + if !ok { + return nil, ok + } + return val.(*gomatrixserverlib.HeaderedEvent), ok +} diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go index bcaf6ca31..02e19bd2c 100644 --- a/syncapi/streams/stream_pdu.go +++ b/syncapi/streams/stream_pdu.go @@ -6,6 +6,7 @@ import ( "sync" "time" + "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/syncapi/types" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" @@ -26,7 +27,8 @@ type PDUStreamProvider struct { tasks chan func() workers atomic.Int32 - userAPI userapi.UserInternalAPI + // userID+deviceID -> lazy loading cache + lazyLoadCache map[string]*caching.LazyLoadCache } func (p *PDUStreamProvider) worker() { @@ -188,7 +190,7 @@ func (p *PDUStreamProvider) IncrementalSync( newPos = from for _, delta := range stateDeltas { var pos types.StreamPosition - if pos, err = p.addRoomDeltaToResponse(ctx, req.Device, r, delta, &eventFilter, req.Response); err != nil { + if pos, err = p.addRoomDeltaToResponse(ctx, req.Device, r, delta, &eventFilter, &stateFilter, req.Response); err != nil { req.Log.WithError(err).Error("d.addRoomDeltaToResponse failed") return to } @@ -209,6 +211,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( r types.Range, delta types.StateDelta, eventFilter *gomatrixserverlib.RoomEventFilter, + stateFilter *gomatrixserverlib.StateFilter, res *types.Response, ) (types.StreamPosition, error) { if delta.MembershipPos > 0 && delta.Membership == gomatrixserverlib.Leave { @@ -263,6 +266,17 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( updateLatestPosition(delta.StateEvents[len(delta.StateEvents)-1].EventID()) } + if stateFilter.LazyLoadMembers { + cache, err := p.getLazyLoadCache(device) + if err != nil { + return r.From, err + } + delta.StateEvents, err = p.lazyLoadMembers(ctx, delta.RoomID, true, stateFilter, device, recentEvents, cache, delta.StateEvents) + if err != nil { + return r.From, err + } + } + hasMembershipChange := false for _, recentEvent := range recentStreamEvents { if recentEvent.Type() == gomatrixserverlib.MRoomMember && recentEvent.StateKey() != nil { @@ -402,6 +416,18 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync( // "Can sync a room with a message with a transaction id" - which does a complete sync to check. recentEvents := p.DB.StreamEventsToEvents(device, recentStreamEvents) stateEvents = removeDuplicates(stateEvents, recentEvents) + + if stateFilter.LazyLoadMembers { + cache, err := p.getLazyLoadCache(device) + if err != nil { + return nil, err + } + stateEvents, err = p.lazyLoadMembers(ctx, roomID, false, stateFilter, device, recentEvents, cache, stateEvents) + if err != nil { + return nil, err + } + } + jr = types.NewJoinResponse() jr.Summary.JoinedMemberCount = &joinedCount jr.Summary.InvitedMemberCount = &invitedCount @@ -412,6 +438,83 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync( return jr, nil } +func (p *PDUStreamProvider) lazyLoadMembers( + ctx context.Context, roomID string, isIncremental bool, + stateFilter *gomatrixserverlib.StateFilter, device *userapi.Device, + timelineEvents []*gomatrixserverlib.HeaderedEvent, cache *caching.LazyLoadCache, + stateEvents []*gomatrixserverlib.HeaderedEvent, +) ([]*gomatrixserverlib.HeaderedEvent, error) { + if len(timelineEvents) == 0 { + return stateEvents, nil + } + // Work out if we need to include membership events + timelineUsers := make(map[string]struct{}) + + if !isIncremental { + timelineUsers[device.UserID] = struct{}{} + } + // add all users the client doesn't know about yet to a list + for _, event := range timelineEvents { + if event.Type() == gomatrixserverlib.MRoomMember && event.StateKey() != nil { + continue + } + // membership is not yet cached, add it to the list + if _, ok := cache.GetLazyLoadedMembers(device.UserID, device.ID, roomID, event.Sender()); !ok { + timelineUsers[event.Sender()] = struct{}{} + } + } + // remove existing membership events we don't care about, e.g. users not in the timeline.events + newStateEvents := []*gomatrixserverlib.HeaderedEvent{} + for _, event := range stateEvents { + if event.Type() == gomatrixserverlib.MRoomMember && event.StateKey() != nil { + // We want this users membership event, keep it in the list + if _, ok := timelineUsers[event.Sender()]; ok { + newStateEvents = append(newStateEvents, event) + if !stateFilter.IncludeRedundantMembers { + cache.StoreLazyLoadedMembers(device.UserID, device.ID, roomID, event.Sender(), event) + } + delete(timelineUsers, event.Sender()) + } + } else { + newStateEvents = append(newStateEvents, event) + } + } + + wantUsers := make([]string, 0, len(timelineUsers)) + for userID := range timelineUsers { + wantUsers = append(wantUsers, userID) + } + // Query missing membership events + memberships, err := p.DB.GetStateEventsForRoom(ctx, roomID, &gomatrixserverlib.StateFilter{ + Limit: 100, + Senders: &wantUsers, + Types: &[]string{gomatrixserverlib.MRoomMember}, + }) + if err != nil { + return stateEvents, err + } + // cache the membership events + for _, membership := range memberships { + cache.StoreLazyLoadedMembers(device.UserID, device.ID, roomID, membership.Sender(), membership) + } + stateEvents = append(newStateEvents, memberships...) + return stateEvents, nil +} + +func (p *PDUStreamProvider) getLazyLoadCache(device *userapi.Device) (*caching.LazyLoadCache, error) { + var err error + cacheKey := device.UserID + device.ID + cache, ok := p.lazyLoadCache[cacheKey] + if !ok { + cache, err = caching.NewLazyLoadCache() + if err != nil { + return nil, err + } + p.lazyLoadCache[cacheKey] = cache + } + return cache, nil +} + // addIgnoredUsersToFilter adds ignored users to the eventfilter and // the syncreq itself for further use in streams. func (p *PDUStreamProvider) addIgnoredUsersToFilter(ctx context.Context, req *types.SyncRequest, eventFilter *gomatrixserverlib.RoomEventFilter) error { diff --git a/syncapi/streams/streams.go b/syncapi/streams/streams.go index c7d06a296..99560966b 100644 --- a/syncapi/streams/streams.go +++ b/syncapi/streams/streams.go @@ -27,12 +27,12 @@ type Streams struct { func NewSyncStreamProviders( d storage.Database, userAPI userapi.UserInternalAPI, rsAPI rsapi.RoomserverInternalAPI, keyAPI keyapi.KeyInternalAPI, - eduCache *caching.EDUCache, notifier *notifier.Notifier, + eduCache *caching.EDUCache, lazyLoadCache map[string]*caching.LazyLoadCache, notifier *notifier.Notifier, ) *Streams { streams := &Streams{ PDUStreamProvider: &PDUStreamProvider{ StreamProvider: StreamProvider{DB: d}, - userAPI: userAPI, + lazyLoadCache: lazyLoadCache, }, TypingStreamProvider: &TypingStreamProvider{ StreamProvider: StreamProvider{DB: d}, diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index 73c7ce9c3..703340997 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -337,71 +337,12 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. } } - if syncReq.Filter.Room.State.LazyLoadMembers { - for roomID, jr := range syncReq.Response.Rooms.Join { - jr.State.Events = rp.applyLazyLoadMembers(req.Context(), syncReq.Since.IsEmpty(), roomID, syncReq.Device.UserID, jr.Timeline.Events, jr.State.Events) - syncReq.Response.Rooms.Join[roomID] = jr - } - } - return util.JSONResponse{ Code: http.StatusOK, JSON: syncReq.Response, } } -func (rp *RequestPool) applyLazyLoadMembers( - ctx context.Context, isInitial bool, roomID, userID string, timelineEvents, stateEvents []gomatrixserverlib.ClientEvent, -) []gomatrixserverlib.ClientEvent { - if len(stateEvents) == 0 || len(timelineEvents) == 0 { - return stateEvents - } - logrus.Debugf("before stateEvents: %d", len(stateEvents)) - - // First, get a list of users we need in the response - requiredUsers := make(map[string]bool) - if isInitial { - requiredUsers[userID] = true - } - for _, ev := range timelineEvents { - requiredUsers[ev.Sender] = true - } - // Filter out users who didn't send an event - newState := []gomatrixserverlib.ClientEvent{} - membershipEvents := []gomatrixserverlib.ClientEvent{} - for _, event := range stateEvents { - if event.Type != gomatrixserverlib.MRoomMember { - newState = append(newState, event) - } else { - // did the user send an event? - if requiredUsers[event.Sender] { - membershipEvents = append(membershipEvents, event) - delete(requiredUsers, event.Sender) - } - } - } - // Get all remaining users in the list - membershipToUser := make(map[string]*gomatrixserverlib.HeaderedEvent) - for userID := range requiredUsers { - membership, err := rp.db.GetStateEvent(ctx, roomID, gomatrixserverlib.MRoomMember, userID) - if err != nil { - util.GetLogger(ctx).WithError(err).Error("failed to get membership event for user") - continue - } - if membership != nil { - membershipToUser[userID] = membership - } - } - // Convert HeaderedEvent to ClientEvent - for _, evt := range membershipToUser { - newState = append(newState, gomatrixserverlib.HeaderedToClientEvent(evt, gomatrixserverlib.FormatSync)) - } - - result := append(newState, membershipEvents...) - logrus.Debugf("after stateEvents: %d", len(result)) - return result -} - func (rp *RequestPool) OnIncomingKeyChangeRequest(req *http.Request, device *userapi.Device) util.JSONResponse { from := req.URL.Query().Get("from") to := req.URL.Query().Get("to") diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index 384121a8a..4ecf9076b 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -58,7 +58,8 @@ func AddPublicRoutes( eduCache := caching.NewTypingCache() notifier := notifier.NewNotifier() - streams := streams.NewSyncStreamProviders(syncDB, userAPI, rsAPI, keyAPI, eduCache, notifier) + lazyLoadCache := make(map[string]*caching.LazyLoadCache) + streams := streams.NewSyncStreamProviders(syncDB, userAPI, rsAPI, keyAPI, eduCache, lazyLoadCache, notifier) notifier.SetCurrentPosition(streams.Latest(context.Background())) if err = notifier.Load(context.Background(), syncDB); err != nil { logrus.WithError(err).Panicf("failed to load notifier ") diff --git a/sytest-whitelist b/sytest-whitelist index a7aea05ed..906ac30e8 100644 --- a/sytest-whitelist +++ b/sytest-whitelist @@ -698,4 +698,9 @@ Ignore user in existing room Ignore invite in full sync Ignore invite in incremental sync A filtered timeline reaches its limit -A change to displayname should not result in a full state sync \ No newline at end of file +A change to displayname should not result in a full state sync +The only membership state included in an initial sync is for all the senders in the timeline +The only membership state included in an incremental sync is for senders in the timeline +Old members are included in gappy incr LL sync if they start speaking +We do send redundant membership state across incremental syncs if asked +Rejecting invite over federation doesn't break incremental /sync \ No newline at end of file