From ccc89edf49a63a538185f2b9314c305e3efae52c Mon Sep 17 00:00:00 2001 From: Till Faelligen <2353100+S7evinK@users.noreply.github.com> Date: Thu, 2 Feb 2023 08:50:05 +0100 Subject: [PATCH] Change lazy loading a bit --- syncapi/routing/context.go | 64 ++++++++++++++++++++++--------------- syncapi/routing/messages.go | 43 +++++-------------------- 2 files changed, 47 insertions(+), 60 deletions(-) diff --git a/syncapi/routing/context.go b/syncapi/routing/context.go index 15f4a4483..76f003671 100644 --- a/syncapi/routing/context.go +++ b/syncapi/routing/context.go @@ -161,7 +161,6 @@ func Context( }).Debug("applied history visibility (context eventsBefore/eventsAfter)") // TODO: Get the actual state at the last event returned by SelectContextAfterEvent - stateFilter.LazyLoadMembers = false state, err := snapshot.CurrentState(ctx, roomID, &stateFilter, nil) if err != nil { logrus.WithError(err).Error("unable to fetch current room state") @@ -170,7 +169,18 @@ func Context( eventsBeforeClient := gomatrixserverlib.HeaderedToClientEvents(eventsBeforeFiltered, gomatrixserverlib.FormatAll) eventsAfterClient := gomatrixserverlib.HeaderedToClientEvents(eventsAfterFiltered, gomatrixserverlib.FormatAll) - newState := applyLazyLoadMembers(device, filter, eventsAfterClient, eventsBeforeClient, state, lazyLoadCache) + + newState := state + if filter.LazyLoadMembers { + allEvents := append(eventsBeforeFiltered, eventsAfterFiltered...) + allEvents = append(allEvents, &requestedEvent) + evs := gomatrixserverlib.HeaderedToClientEvents(allEvents, gomatrixserverlib.FormatAll) + newState, err = applyLazyLoadMembers(ctx, device, snapshot, roomID, evs, lazyLoadCache) + if err != nil { + logrus.WithError(err).Error("unable to load membership events") + return jsonerror.InternalServerError() + } + } ev := gomatrixserverlib.HeaderedToClientEvent(&requestedEvent, gomatrixserverlib.FormatAll) response := ContextRespsonse{ @@ -247,39 +257,43 @@ func getStartEnd(ctx context.Context, snapshot storage.DatabaseTransaction, star } func applyLazyLoadMembers( + ctx context.Context, device *userapi.Device, - filter *gomatrixserverlib.RoomEventFilter, - eventsAfter, eventsBefore []gomatrixserverlib.ClientEvent, - state []*gomatrixserverlib.HeaderedEvent, + snapshot storage.DatabaseTransaction, + roomID string, + events []gomatrixserverlib.ClientEvent, lazyLoadCache caching.LazyLoadCache, -) []*gomatrixserverlib.HeaderedEvent { - if filter == nil || !filter.LazyLoadMembers { - return state - } - allEvents := append(eventsBefore, eventsAfter...) - x := make(map[string]struct{}) +) ([]*gomatrixserverlib.HeaderedEvent, error) { + eventSenders := make(map[string]struct{}) // get members who actually send an event - for _, e := range allEvents { + for _, e := range events { // Don't add membership events the client should already know about if _, cached := lazyLoadCache.IsLazyLoadedUserCached(device, e.RoomID, e.Sender); cached { continue } - x[e.Sender] = struct{}{} + eventSenders[e.Sender] = struct{}{} } - newState := []*gomatrixserverlib.HeaderedEvent{} - membershipEvents := []*gomatrixserverlib.HeaderedEvent{} - for _, event := range state { - if event.Type() == gomatrixserverlib.MRoomMember { - // did the user send an event? - if _, ok := x[event.Sender()]; ok { - membershipEvents = append(membershipEvents, event) - lazyLoadCache.StoreLazyLoadedUser(device, event.RoomID(), event.Sender(), event.EventID()) - } - } + wantUsers := make([]string, 0, len(eventSenders)) + for userID := range eventSenders { + wantUsers = append(wantUsers, userID) } - // Add the membershipEvents to the end of the list, to make Sytest happy - return append(newState, membershipEvents...) + + // Query missing membership events + filter := gomatrixserverlib.DefaultStateFilter() + filter.Senders = &wantUsers + filter.Types = &[]string{gomatrixserverlib.MRoomMember} + memberships, err := snapshot.GetStateEventsForRoom(ctx, roomID, &filter) + if err != nil { + return nil, err + } + + // cache the membership events + for _, membership := range memberships { + lazyLoadCache.StoreLazyLoadedUser(device, roomID, *membership.StateKey(), membership.EventID()) + } + + return memberships, nil } func parseRoomEventFilter(req *http.Request) (*gomatrixserverlib.RoomEventFilter, error) { diff --git a/syncapi/routing/messages.go b/syncapi/routing/messages.go index 4a01ec357..96a6e93b3 100644 --- a/syncapi/routing/messages.go +++ b/syncapi/routing/messages.go @@ -246,7 +246,14 @@ func OnIncomingMessagesRequest( Start: start.String(), End: end.String(), } - res.applyLazyLoadMembers(req.Context(), snapshot, roomID, device, filter.LazyLoadMembers, lazyLoadCache) + if filter.LazyLoadMembers { + membershipEvents, err := applyLazyLoadMembers(req.Context(), device, snapshot, roomID, clientEvents, lazyLoadCache) + if err != nil { + util.GetLogger(req.Context()).WithError(err).Error("failed to apply lazy loading") + return jsonerror.InternalServerError() + } + res.State = append(res.State, gomatrixserverlib.HeaderedToClientEvents(membershipEvents, gomatrixserverlib.FormatAll)...) + } // If we didn't return any events, set the end to an empty string, so it will be omitted // in the response JSON. @@ -265,40 +272,6 @@ func OnIncomingMessagesRequest( } } -// applyLazyLoadMembers loads membership events for users returned in Chunk, if the filter has -// LazyLoadMembers enabled. -func (m *messagesResp) applyLazyLoadMembers( - ctx context.Context, - db storage.DatabaseTransaction, - roomID string, - device *userapi.Device, - lazyLoad bool, - lazyLoadCache caching.LazyLoadCache, -) { - if !lazyLoad { - return - } - membershipToUser := make(map[string]*gomatrixserverlib.HeaderedEvent) - for _, evt := range m.Chunk { - // Don't add membership events the client should already know about - if _, cached := lazyLoadCache.IsLazyLoadedUserCached(device, roomID, evt.Sender); cached { - continue - } - membership, err := db.GetStateEvent(ctx, roomID, gomatrixserverlib.MRoomMember, evt.Sender) - if err != nil { - util.GetLogger(ctx).WithError(err).Error("failed to get membership event for user") - continue - } - if membership != nil { - membershipToUser[evt.Sender] = membership - lazyLoadCache.StoreLazyLoadedUser(device, roomID, evt.Sender, membership.EventID()) - } - } - for _, evt := range membershipToUser { - m.State = append(m.State, gomatrixserverlib.HeaderedToClientEvent(evt, gomatrixserverlib.FormatAll)) - } -} - func getMembershipForUser(ctx context.Context, roomID, userID string, rsAPI api.SyncRoomserverAPI) (resp api.QueryMembershipForUserResponse, err error) { req := api.QueryMembershipForUserRequest{ RoomID: roomID,