Partially implement lazy loading on /sync

This commit is contained in:
Till Faelligen 2022-04-12 13:37:29 +02:00
parent afee6a76c0
commit 032aeb8b98
6 changed files with 164 additions and 65 deletions

View file

@ -0,0 +1,49 @@
package caching
import (
"fmt"
"time"
"github.com/matrix-org/gomatrixserverlib"
)
const (
LazyLoadCacheName = "lazy_load_members"
LazyLoadCacheMaxEntries = 128
LazyLoadCacheMutable = true
LazyLoadCacheMaxAge = time.Minute * 30
)
type LazyLoadCache struct {
*InMemoryLRUCachePartition
}
// NewLazyLoadCache creates a new InMemoryLRUCachePartition.
func NewLazyLoadCache() (*LazyLoadCache, error) {
cache, err := NewInMemoryLRUCachePartition(
LazyLoadCacheName,
LazyLoadCacheMutable,
LazyLoadCacheMaxEntries,
LazyLoadCacheMaxAge,
false,
)
if err != nil {
return nil, err
}
go cacheCleaner(cache)
return &LazyLoadCache{cache}, err
}
func (c *LazyLoadCache) StoreLazyLoadedMembers(reqUser, deviceID, roomID, userID string, event *gomatrixserverlib.HeaderedEvent) {
cacheKey := fmt.Sprintf("%s/%s/%s/%s", reqUser, deviceID, roomID, userID)
c.Set(cacheKey, event)
}
func (c *LazyLoadCache) GetLazyLoadedMembers(reqUser, deviceID, roomID, userID string) (*gomatrixserverlib.HeaderedEvent, bool) {
cacheKey := fmt.Sprintf("%s/%s/%s/%s", reqUser, deviceID, roomID, userID)
val, ok := c.Get(cacheKey)
if !ok {
return nil, ok
}
return val.(*gomatrixserverlib.HeaderedEvent), ok
}

View file

@ -6,6 +6,7 @@ import (
"sync"
"time"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
@ -26,7 +27,8 @@ type PDUStreamProvider struct {
tasks chan func()
workers atomic.Int32
userAPI userapi.UserInternalAPI
// userID+deviceID -> lazy loading cache
lazyLoadCache map[string]*caching.LazyLoadCache
}
func (p *PDUStreamProvider) worker() {
@ -188,7 +190,7 @@ func (p *PDUStreamProvider) IncrementalSync(
newPos = from
for _, delta := range stateDeltas {
var pos types.StreamPosition
if pos, err = p.addRoomDeltaToResponse(ctx, req.Device, r, delta, &eventFilter, req.Response); err != nil {
if pos, err = p.addRoomDeltaToResponse(ctx, req.Device, r, delta, &eventFilter, &stateFilter, req.Response); err != nil {
req.Log.WithError(err).Error("d.addRoomDeltaToResponse failed")
return to
}
@ -209,6 +211,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
r types.Range,
delta types.StateDelta,
eventFilter *gomatrixserverlib.RoomEventFilter,
stateFilter *gomatrixserverlib.StateFilter,
res *types.Response,
) (types.StreamPosition, error) {
if delta.MembershipPos > 0 && delta.Membership == gomatrixserverlib.Leave {
@ -263,6 +266,17 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
updateLatestPosition(delta.StateEvents[len(delta.StateEvents)-1].EventID())
}
if stateFilter.LazyLoadMembers {
cache, err := p.getLazyLoadCache(device)
if err != nil {
return r.From, err
}
delta.StateEvents, err = p.lazyLoadMembers(ctx, delta.RoomID, true, stateFilter, device, recentEvents, cache, delta.StateEvents)
if err != nil {
return r.From, err
}
}
hasMembershipChange := false
for _, recentEvent := range recentStreamEvents {
if recentEvent.Type() == gomatrixserverlib.MRoomMember && recentEvent.StateKey() != nil {
@ -402,6 +416,18 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
// "Can sync a room with a message with a transaction id" - which does a complete sync to check.
recentEvents := p.DB.StreamEventsToEvents(device, recentStreamEvents)
stateEvents = removeDuplicates(stateEvents, recentEvents)
if stateFilter.LazyLoadMembers {
cache, err := p.getLazyLoadCache(device)
if err != nil {
return nil, err
}
stateEvents, err = p.lazyLoadMembers(ctx, roomID, false, stateFilter, device, recentEvents, cache, stateEvents)
if err != nil {
return nil, err
}
}
jr = types.NewJoinResponse()
jr.Summary.JoinedMemberCount = &joinedCount
jr.Summary.InvitedMemberCount = &invitedCount
@ -412,6 +438,83 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
return jr, nil
}
func (p *PDUStreamProvider) lazyLoadMembers(
ctx context.Context, roomID string, isIncremental bool,
stateFilter *gomatrixserverlib.StateFilter, device *userapi.Device,
timelineEvents []*gomatrixserverlib.HeaderedEvent, cache *caching.LazyLoadCache,
stateEvents []*gomatrixserverlib.HeaderedEvent,
) ([]*gomatrixserverlib.HeaderedEvent, error) {
if len(timelineEvents) == 0 {
return stateEvents, nil
}
// Work out if we need to include membership events
timelineUsers := make(map[string]struct{})
if !isIncremental {
timelineUsers[device.UserID] = struct{}{}
}
// add all users the client doesn't know about yet to a list
for _, event := range timelineEvents {
if event.Type() == gomatrixserverlib.MRoomMember && event.StateKey() != nil {
continue
}
// membership is not yet cached, add it to the list
if _, ok := cache.GetLazyLoadedMembers(device.UserID, device.ID, roomID, event.Sender()); !ok {
timelineUsers[event.Sender()] = struct{}{}
}
}
// remove existing membership events we don't care about, e.g. users not in the timeline.events
newStateEvents := []*gomatrixserverlib.HeaderedEvent{}
for _, event := range stateEvents {
if event.Type() == gomatrixserverlib.MRoomMember && event.StateKey() != nil {
// We want this users membership event, keep it in the list
if _, ok := timelineUsers[event.Sender()]; ok {
newStateEvents = append(newStateEvents, event)
if !stateFilter.IncludeRedundantMembers {
cache.StoreLazyLoadedMembers(device.UserID, device.ID, roomID, event.Sender(), event)
}
delete(timelineUsers, event.Sender())
}
} else {
newStateEvents = append(newStateEvents, event)
}
}
wantUsers := make([]string, 0, len(timelineUsers))
for userID := range timelineUsers {
wantUsers = append(wantUsers, userID)
}
// Query missing membership events
memberships, err := p.DB.GetStateEventsForRoom(ctx, roomID, &gomatrixserverlib.StateFilter{
Limit: 100,
Senders: &wantUsers,
Types: &[]string{gomatrixserverlib.MRoomMember},
})
if err != nil {
return stateEvents, err
}
// cache the membership events
for _, membership := range memberships {
cache.StoreLazyLoadedMembers(device.UserID, device.ID, roomID, membership.Sender(), membership)
}
stateEvents = append(newStateEvents, memberships...)
return stateEvents, nil
}
func (p *PDUStreamProvider) getLazyLoadCache(device *userapi.Device) (*caching.LazyLoadCache, error) {
var err error
cacheKey := device.UserID + device.ID
cache, ok := p.lazyLoadCache[cacheKey]
if !ok {
cache, err = caching.NewLazyLoadCache()
if err != nil {
return nil, err
}
p.lazyLoadCache[cacheKey] = cache
}
return cache, nil
}
// addIgnoredUsersToFilter adds ignored users to the eventfilter and
// the syncreq itself for further use in streams.
func (p *PDUStreamProvider) addIgnoredUsersToFilter(ctx context.Context, req *types.SyncRequest, eventFilter *gomatrixserverlib.RoomEventFilter) error {

View file

@ -27,12 +27,12 @@ type Streams struct {
func NewSyncStreamProviders(
d storage.Database, userAPI userapi.UserInternalAPI,
rsAPI rsapi.RoomserverInternalAPI, keyAPI keyapi.KeyInternalAPI,
eduCache *caching.EDUCache, notifier *notifier.Notifier,
eduCache *caching.EDUCache, lazyLoadCache map[string]*caching.LazyLoadCache, notifier *notifier.Notifier,
) *Streams {
streams := &Streams{
PDUStreamProvider: &PDUStreamProvider{
StreamProvider: StreamProvider{DB: d},
userAPI: userAPI,
lazyLoadCache: lazyLoadCache,
},
TypingStreamProvider: &TypingStreamProvider{
StreamProvider: StreamProvider{DB: d},

View file

@ -337,71 +337,12 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
}
}
if syncReq.Filter.Room.State.LazyLoadMembers {
for roomID, jr := range syncReq.Response.Rooms.Join {
jr.State.Events = rp.applyLazyLoadMembers(req.Context(), syncReq.Since.IsEmpty(), roomID, syncReq.Device.UserID, jr.Timeline.Events, jr.State.Events)
syncReq.Response.Rooms.Join[roomID] = jr
}
}
return util.JSONResponse{
Code: http.StatusOK,
JSON: syncReq.Response,
}
}
func (rp *RequestPool) applyLazyLoadMembers(
ctx context.Context, isInitial bool, roomID, userID string, timelineEvents, stateEvents []gomatrixserverlib.ClientEvent,
) []gomatrixserverlib.ClientEvent {
if len(stateEvents) == 0 || len(timelineEvents) == 0 {
return stateEvents
}
logrus.Debugf("before stateEvents: %d", len(stateEvents))
// First, get a list of users we need in the response
requiredUsers := make(map[string]bool)
if isInitial {
requiredUsers[userID] = true
}
for _, ev := range timelineEvents {
requiredUsers[ev.Sender] = true
}
// Filter out users who didn't send an event
newState := []gomatrixserverlib.ClientEvent{}
membershipEvents := []gomatrixserverlib.ClientEvent{}
for _, event := range stateEvents {
if event.Type != gomatrixserverlib.MRoomMember {
newState = append(newState, event)
} else {
// did the user send an event?
if requiredUsers[event.Sender] {
membershipEvents = append(membershipEvents, event)
delete(requiredUsers, event.Sender)
}
}
}
// Get all remaining users in the list
membershipToUser := make(map[string]*gomatrixserverlib.HeaderedEvent)
for userID := range requiredUsers {
membership, err := rp.db.GetStateEvent(ctx, roomID, gomatrixserverlib.MRoomMember, userID)
if err != nil {
util.GetLogger(ctx).WithError(err).Error("failed to get membership event for user")
continue
}
if membership != nil {
membershipToUser[userID] = membership
}
}
// Convert HeaderedEvent to ClientEvent
for _, evt := range membershipToUser {
newState = append(newState, gomatrixserverlib.HeaderedToClientEvent(evt, gomatrixserverlib.FormatSync))
}
result := append(newState, membershipEvents...)
logrus.Debugf("after stateEvents: %d", len(result))
return result
}
func (rp *RequestPool) OnIncomingKeyChangeRequest(req *http.Request, device *userapi.Device) util.JSONResponse {
from := req.URL.Query().Get("from")
to := req.URL.Query().Get("to")

View file

@ -58,7 +58,8 @@ func AddPublicRoutes(
eduCache := caching.NewTypingCache()
notifier := notifier.NewNotifier()
streams := streams.NewSyncStreamProviders(syncDB, userAPI, rsAPI, keyAPI, eduCache, notifier)
lazyLoadCache := make(map[string]*caching.LazyLoadCache)
streams := streams.NewSyncStreamProviders(syncDB, userAPI, rsAPI, keyAPI, eduCache, lazyLoadCache, notifier)
notifier.SetCurrentPosition(streams.Latest(context.Background()))
if err = notifier.Load(context.Background(), syncDB); err != nil {
logrus.WithError(err).Panicf("failed to load notifier ")

View file

@ -699,3 +699,8 @@ Ignore invite in full sync
Ignore invite in incremental sync
A filtered timeline reaches its limit
A change to displayname should not result in a full state sync
The only membership state included in an initial sync is for all the senders in the timeline
The only membership state included in an incremental sync is for senders in the timeline
Old members are included in gappy incr LL sync if they start speaking
We do send redundant membership state across incremental syncs if asked
Rejecting invite over federation doesn't break incremental /sync