mirror of
https://github.com/matrix-org/dendrite.git
synced 2026-01-16 18:43:10 -06:00
Change lazy loading a bit
This commit is contained in:
parent
6cf33f546a
commit
ccc89edf49
|
|
@ -161,7 +161,6 @@ func Context(
|
||||||
}).Debug("applied history visibility (context eventsBefore/eventsAfter)")
|
}).Debug("applied history visibility (context eventsBefore/eventsAfter)")
|
||||||
|
|
||||||
// TODO: Get the actual state at the last event returned by SelectContextAfterEvent
|
// TODO: Get the actual state at the last event returned by SelectContextAfterEvent
|
||||||
stateFilter.LazyLoadMembers = false
|
|
||||||
state, err := snapshot.CurrentState(ctx, roomID, &stateFilter, nil)
|
state, err := snapshot.CurrentState(ctx, roomID, &stateFilter, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.WithError(err).Error("unable to fetch current room state")
|
logrus.WithError(err).Error("unable to fetch current room state")
|
||||||
|
|
@ -170,7 +169,18 @@ func Context(
|
||||||
|
|
||||||
eventsBeforeClient := gomatrixserverlib.HeaderedToClientEvents(eventsBeforeFiltered, gomatrixserverlib.FormatAll)
|
eventsBeforeClient := gomatrixserverlib.HeaderedToClientEvents(eventsBeforeFiltered, gomatrixserverlib.FormatAll)
|
||||||
eventsAfterClient := gomatrixserverlib.HeaderedToClientEvents(eventsAfterFiltered, gomatrixserverlib.FormatAll)
|
eventsAfterClient := gomatrixserverlib.HeaderedToClientEvents(eventsAfterFiltered, gomatrixserverlib.FormatAll)
|
||||||
newState := applyLazyLoadMembers(device, filter, eventsAfterClient, eventsBeforeClient, state, lazyLoadCache)
|
|
||||||
|
newState := state
|
||||||
|
if filter.LazyLoadMembers {
|
||||||
|
allEvents := append(eventsBeforeFiltered, eventsAfterFiltered...)
|
||||||
|
allEvents = append(allEvents, &requestedEvent)
|
||||||
|
evs := gomatrixserverlib.HeaderedToClientEvents(allEvents, gomatrixserverlib.FormatAll)
|
||||||
|
newState, err = applyLazyLoadMembers(ctx, device, snapshot, roomID, evs, lazyLoadCache)
|
||||||
|
if err != nil {
|
||||||
|
logrus.WithError(err).Error("unable to load membership events")
|
||||||
|
return jsonerror.InternalServerError()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
ev := gomatrixserverlib.HeaderedToClientEvent(&requestedEvent, gomatrixserverlib.FormatAll)
|
ev := gomatrixserverlib.HeaderedToClientEvent(&requestedEvent, gomatrixserverlib.FormatAll)
|
||||||
response := ContextRespsonse{
|
response := ContextRespsonse{
|
||||||
|
|
@ -247,39 +257,43 @@ func getStartEnd(ctx context.Context, snapshot storage.DatabaseTransaction, star
|
||||||
}
|
}
|
||||||
|
|
||||||
func applyLazyLoadMembers(
|
func applyLazyLoadMembers(
|
||||||
|
ctx context.Context,
|
||||||
device *userapi.Device,
|
device *userapi.Device,
|
||||||
filter *gomatrixserverlib.RoomEventFilter,
|
snapshot storage.DatabaseTransaction,
|
||||||
eventsAfter, eventsBefore []gomatrixserverlib.ClientEvent,
|
roomID string,
|
||||||
state []*gomatrixserverlib.HeaderedEvent,
|
events []gomatrixserverlib.ClientEvent,
|
||||||
lazyLoadCache caching.LazyLoadCache,
|
lazyLoadCache caching.LazyLoadCache,
|
||||||
) []*gomatrixserverlib.HeaderedEvent {
|
) ([]*gomatrixserverlib.HeaderedEvent, error) {
|
||||||
if filter == nil || !filter.LazyLoadMembers {
|
eventSenders := make(map[string]struct{})
|
||||||
return state
|
|
||||||
}
|
|
||||||
allEvents := append(eventsBefore, eventsAfter...)
|
|
||||||
x := make(map[string]struct{})
|
|
||||||
// get members who actually send an event
|
// get members who actually send an event
|
||||||
for _, e := range allEvents {
|
for _, e := range events {
|
||||||
// Don't add membership events the client should already know about
|
// Don't add membership events the client should already know about
|
||||||
if _, cached := lazyLoadCache.IsLazyLoadedUserCached(device, e.RoomID, e.Sender); cached {
|
if _, cached := lazyLoadCache.IsLazyLoadedUserCached(device, e.RoomID, e.Sender); cached {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
x[e.Sender] = struct{}{}
|
eventSenders[e.Sender] = struct{}{}
|
||||||
}
|
}
|
||||||
|
|
||||||
newState := []*gomatrixserverlib.HeaderedEvent{}
|
wantUsers := make([]string, 0, len(eventSenders))
|
||||||
membershipEvents := []*gomatrixserverlib.HeaderedEvent{}
|
for userID := range eventSenders {
|
||||||
for _, event := range state {
|
wantUsers = append(wantUsers, userID)
|
||||||
if event.Type() == gomatrixserverlib.MRoomMember {
|
|
||||||
// did the user send an event?
|
|
||||||
if _, ok := x[event.Sender()]; ok {
|
|
||||||
membershipEvents = append(membershipEvents, event)
|
|
||||||
lazyLoadCache.StoreLazyLoadedUser(device, event.RoomID(), event.Sender(), event.EventID())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Query missing membership events
|
||||||
|
filter := gomatrixserverlib.DefaultStateFilter()
|
||||||
|
filter.Senders = &wantUsers
|
||||||
|
filter.Types = &[]string{gomatrixserverlib.MRoomMember}
|
||||||
|
memberships, err := snapshot.GetStateEventsForRoom(ctx, roomID, &filter)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// cache the membership events
|
||||||
|
for _, membership := range memberships {
|
||||||
|
lazyLoadCache.StoreLazyLoadedUser(device, roomID, *membership.StateKey(), membership.EventID())
|
||||||
}
|
}
|
||||||
// Add the membershipEvents to the end of the list, to make Sytest happy
|
|
||||||
return append(newState, membershipEvents...)
|
return memberships, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func parseRoomEventFilter(req *http.Request) (*gomatrixserverlib.RoomEventFilter, error) {
|
func parseRoomEventFilter(req *http.Request) (*gomatrixserverlib.RoomEventFilter, error) {
|
||||||
|
|
|
||||||
|
|
@ -246,7 +246,14 @@ func OnIncomingMessagesRequest(
|
||||||
Start: start.String(),
|
Start: start.String(),
|
||||||
End: end.String(),
|
End: end.String(),
|
||||||
}
|
}
|
||||||
res.applyLazyLoadMembers(req.Context(), snapshot, roomID, device, filter.LazyLoadMembers, lazyLoadCache)
|
if filter.LazyLoadMembers {
|
||||||
|
membershipEvents, err := applyLazyLoadMembers(req.Context(), device, snapshot, roomID, clientEvents, lazyLoadCache)
|
||||||
|
if err != nil {
|
||||||
|
util.GetLogger(req.Context()).WithError(err).Error("failed to apply lazy loading")
|
||||||
|
return jsonerror.InternalServerError()
|
||||||
|
}
|
||||||
|
res.State = append(res.State, gomatrixserverlib.HeaderedToClientEvents(membershipEvents, gomatrixserverlib.FormatAll)...)
|
||||||
|
}
|
||||||
|
|
||||||
// If we didn't return any events, set the end to an empty string, so it will be omitted
|
// If we didn't return any events, set the end to an empty string, so it will be omitted
|
||||||
// in the response JSON.
|
// in the response JSON.
|
||||||
|
|
@ -265,40 +272,6 @@ func OnIncomingMessagesRequest(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// applyLazyLoadMembers loads membership events for users returned in Chunk, if the filter has
|
|
||||||
// LazyLoadMembers enabled.
|
|
||||||
func (m *messagesResp) applyLazyLoadMembers(
|
|
||||||
ctx context.Context,
|
|
||||||
db storage.DatabaseTransaction,
|
|
||||||
roomID string,
|
|
||||||
device *userapi.Device,
|
|
||||||
lazyLoad bool,
|
|
||||||
lazyLoadCache caching.LazyLoadCache,
|
|
||||||
) {
|
|
||||||
if !lazyLoad {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
membershipToUser := make(map[string]*gomatrixserverlib.HeaderedEvent)
|
|
||||||
for _, evt := range m.Chunk {
|
|
||||||
// Don't add membership events the client should already know about
|
|
||||||
if _, cached := lazyLoadCache.IsLazyLoadedUserCached(device, roomID, evt.Sender); cached {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
membership, err := db.GetStateEvent(ctx, roomID, gomatrixserverlib.MRoomMember, evt.Sender)
|
|
||||||
if err != nil {
|
|
||||||
util.GetLogger(ctx).WithError(err).Error("failed to get membership event for user")
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if membership != nil {
|
|
||||||
membershipToUser[evt.Sender] = membership
|
|
||||||
lazyLoadCache.StoreLazyLoadedUser(device, roomID, evt.Sender, membership.EventID())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for _, evt := range membershipToUser {
|
|
||||||
m.State = append(m.State, gomatrixserverlib.HeaderedToClientEvent(evt, gomatrixserverlib.FormatAll))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func getMembershipForUser(ctx context.Context, roomID, userID string, rsAPI api.SyncRoomserverAPI) (resp api.QueryMembershipForUserResponse, err error) {
|
func getMembershipForUser(ctx context.Context, roomID, userID string, rsAPI api.SyncRoomserverAPI) (resp api.QueryMembershipForUserResponse, err error) {
|
||||||
req := api.QueryMembershipForUserRequest{
|
req := api.QueryMembershipForUserRequest{
|
||||||
RoomID: roomID,
|
RoomID: roomID,
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue