Move LL cache (#2429)

This commit is contained in:
Till 2022-05-06 15:33:34 +02:00 committed by GitHub
parent 507f63d0fc
commit 6493c0c0f2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 34 additions and 41 deletions

View file

@ -15,33 +15,14 @@ const (
LazyLoadCacheMaxAge = time.Minute * 30 LazyLoadCacheMaxAge = time.Minute * 30
) )
type LazyLoadCache struct { type LazyLoadCache interface {
// InMemoryLRUCachePartition containing other InMemoryLRUCachePartitions StoreLazyLoadedUser(device *userapi.Device, roomID, userID, eventID string)
// with the actual cached members IsLazyLoadedUserCached(device *userapi.Device, roomID, userID string) (string, bool)
userCaches *InMemoryLRUCachePartition
} }
// NewLazyLoadCache creates a new LazyLoadCache. func (c Caches) lazyLoadCacheForUser(device *userapi.Device) (*InMemoryLRUCachePartition, error) {
func NewLazyLoadCache() (*LazyLoadCache, error) {
cache, err := NewInMemoryLRUCachePartition(
LazyLoadCacheName,
LazyLoadCacheMutable,
LazyLoadCacheMaxEntries,
LazyLoadCacheMaxAge,
true,
)
if err != nil {
return nil, err
}
go cacheCleaner(cache)
return &LazyLoadCache{
userCaches: cache,
}, nil
}
func (c *LazyLoadCache) lazyLoadCacheForUser(device *userapi.Device) (*InMemoryLRUCachePartition, error) {
cacheName := fmt.Sprintf("%s/%s", device.UserID, device.ID) cacheName := fmt.Sprintf("%s/%s", device.UserID, device.ID)
userCache, ok := c.userCaches.Get(cacheName) userCache, ok := c.LazyLoading.Get(cacheName)
if ok && userCache != nil { if ok && userCache != nil {
if cache, ok := userCache.(*InMemoryLRUCachePartition); ok { if cache, ok := userCache.(*InMemoryLRUCachePartition); ok {
return cache, nil return cache, nil
@ -57,12 +38,12 @@ func (c *LazyLoadCache) lazyLoadCacheForUser(device *userapi.Device) (*InMemoryL
if err != nil { if err != nil {
return nil, err return nil, err
} }
c.userCaches.Set(cacheName, cache) c.LazyLoading.Set(cacheName, cache)
go cacheCleaner(cache) go cacheCleaner(cache)
return cache, nil return cache, nil
} }
func (c *LazyLoadCache) StoreLazyLoadedUser(device *userapi.Device, roomID, userID, eventID string) { func (c Caches) StoreLazyLoadedUser(device *userapi.Device, roomID, userID, eventID string) {
cache, err := c.lazyLoadCacheForUser(device) cache, err := c.lazyLoadCacheForUser(device)
if err != nil { if err != nil {
return return
@ -71,7 +52,7 @@ func (c *LazyLoadCache) StoreLazyLoadedUser(device *userapi.Device, roomID, user
cache.Set(cacheKey, eventID) cache.Set(cacheKey, eventID)
} }
func (c *LazyLoadCache) IsLazyLoadedUserCached(device *userapi.Device, roomID, userID string) (string, bool) { func (c Caches) IsLazyLoadedUserCached(device *userapi.Device, roomID, userID string) (string, bool) {
cache, err := c.lazyLoadCacheForUser(device) cache, err := c.lazyLoadCacheForUser(device)
if err != nil { if err != nil {
return "", false return "", false

View file

@ -1,6 +1,8 @@
package caching package caching
import "time" import (
"time"
)
// Caches contains a set of references to caches. They may be // Caches contains a set of references to caches. They may be
// different implementations as long as they satisfy the Cache // different implementations as long as they satisfy the Cache
@ -13,6 +15,7 @@ type Caches struct {
RoomInfos Cache // RoomInfoCache RoomInfos Cache // RoomInfoCache
FederationEvents Cache // FederationEventsCache FederationEvents Cache // FederationEventsCache
SpaceSummaryRooms Cache // SpaceSummaryRoomsCache SpaceSummaryRooms Cache // SpaceSummaryRoomsCache
LazyLoading Cache // LazyLoadCache
} }
// Cache is the interface that an implementation must satisfy. // Cache is the interface that an implementation must satisfy.

View file

@ -70,9 +70,21 @@ func NewInMemoryLRUCache(enablePrometheus bool) (*Caches, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
lazyLoadCache, err := NewInMemoryLRUCachePartition(
LazyLoadCacheName,
LazyLoadCacheMutable,
LazyLoadCacheMaxEntries,
LazyLoadCacheMaxAge,
enablePrometheus,
)
if err != nil {
return nil, err
}
go cacheCleaner( go cacheCleaner(
roomVersions, serverKeys, roomServerRoomIDs, roomVersions, serverKeys, roomServerRoomIDs,
roomInfos, federationEvents, spaceRooms, roomInfos, federationEvents, spaceRooms, lazyLoadCache,
) )
return &Caches{ return &Caches{
RoomVersions: roomVersions, RoomVersions: roomVersions,
@ -81,6 +93,7 @@ func NewInMemoryLRUCache(enablePrometheus bool) (*Caches, error) {
RoomInfos: roomInfos, RoomInfos: roomInfos,
FederationEvents: federationEvents, FederationEvents: federationEvents,
SpaceSummaryRooms: spaceRooms, SpaceSummaryRooms: spaceRooms,
LazyLoading: lazyLoadCache,
}, nil }, nil
} }

View file

@ -45,7 +45,7 @@ func Context(
rsAPI roomserver.SyncRoomserverAPI, rsAPI roomserver.SyncRoomserverAPI,
syncDB storage.Database, syncDB storage.Database,
roomID, eventID string, roomID, eventID string,
lazyLoadCache *caching.LazyLoadCache, lazyLoadCache caching.LazyLoadCache,
) util.JSONResponse { ) util.JSONResponse {
filter, err := parseRoomEventFilter(req) filter, err := parseRoomEventFilter(req)
if err != nil { if err != nil {
@ -155,7 +155,7 @@ func applyLazyLoadMembers(
filter *gomatrixserverlib.RoomEventFilter, filter *gomatrixserverlib.RoomEventFilter,
eventsAfter, eventsBefore []gomatrixserverlib.ClientEvent, eventsAfter, eventsBefore []gomatrixserverlib.ClientEvent,
state []*gomatrixserverlib.HeaderedEvent, state []*gomatrixserverlib.HeaderedEvent,
lazyLoadCache *caching.LazyLoadCache, lazyLoadCache caching.LazyLoadCache,
) []*gomatrixserverlib.HeaderedEvent { ) []*gomatrixserverlib.HeaderedEvent {
if filter == nil || !filter.LazyLoadMembers { if filter == nil || !filter.LazyLoadMembers {
return state return state

View file

@ -63,7 +63,7 @@ func OnIncomingMessagesRequest(
rsAPI api.SyncRoomserverAPI, rsAPI api.SyncRoomserverAPI,
cfg *config.SyncAPI, cfg *config.SyncAPI,
srp *sync.RequestPool, srp *sync.RequestPool,
lazyLoadCache *caching.LazyLoadCache, lazyLoadCache caching.LazyLoadCache,
) util.JSONResponse { ) util.JSONResponse {
var err error var err error

View file

@ -39,7 +39,7 @@ func Setup(
userAPI userapi.SyncUserAPI, userAPI userapi.SyncUserAPI,
rsAPI api.SyncRoomserverAPI, rsAPI api.SyncRoomserverAPI,
cfg *config.SyncAPI, cfg *config.SyncAPI,
lazyLoadCache *caching.LazyLoadCache, lazyLoadCache caching.LazyLoadCache,
) { ) {
v3mux := csMux.PathPrefix("/{apiversion:(?:r0|v3)}/").Subrouter() v3mux := csMux.PathPrefix("/{apiversion:(?:r0|v3)}/").Subrouter()

View file

@ -32,7 +32,7 @@ type PDUStreamProvider struct {
tasks chan func() tasks chan func()
workers atomic.Int32 workers atomic.Int32
// userID+deviceID -> lazy loading cache // userID+deviceID -> lazy loading cache
lazyLoadCache *caching.LazyLoadCache lazyLoadCache caching.LazyLoadCache
rsAPI roomserverAPI.SyncRoomserverAPI rsAPI roomserverAPI.SyncRoomserverAPI
} }

View file

@ -27,7 +27,7 @@ type Streams struct {
func NewSyncStreamProviders( func NewSyncStreamProviders(
d storage.Database, userAPI userapi.SyncUserAPI, d storage.Database, userAPI userapi.SyncUserAPI,
rsAPI rsapi.SyncRoomserverAPI, keyAPI keyapi.SyncKeyAPI, rsAPI rsapi.SyncRoomserverAPI, keyAPI keyapi.SyncKeyAPI,
eduCache *caching.EDUCache, lazyLoadCache *caching.LazyLoadCache, notifier *notifier.Notifier, eduCache *caching.EDUCache, lazyLoadCache caching.LazyLoadCache, notifier *notifier.Notifier,
) *Streams { ) *Streams {
streams := &Streams{ streams := &Streams{
PDUStreamProvider: &PDUStreamProvider{ PDUStreamProvider: &PDUStreamProvider{

View file

@ -53,12 +53,8 @@ func AddPublicRoutes(
} }
eduCache := caching.NewTypingCache() eduCache := caching.NewTypingCache()
lazyLoadCache, err := caching.NewLazyLoadCache()
if err != nil {
logrus.WithError(err).Panicf("failed to create lazy loading cache")
}
notifier := notifier.NewNotifier() notifier := notifier.NewNotifier()
streams := streams.NewSyncStreamProviders(syncDB, userAPI, rsAPI, keyAPI, eduCache, lazyLoadCache, notifier) streams := streams.NewSyncStreamProviders(syncDB, userAPI, rsAPI, keyAPI, eduCache, base.Caches, notifier)
notifier.SetCurrentPosition(streams.Latest(context.Background())) notifier.SetCurrentPosition(streams.Latest(context.Background()))
if err = notifier.Load(context.Background(), syncDB); err != nil { if err = notifier.Load(context.Background(), syncDB); err != nil {
logrus.WithError(err).Panicf("failed to load notifier ") logrus.WithError(err).Panicf("failed to load notifier ")
@ -146,6 +142,6 @@ func AddPublicRoutes(
routing.Setup( routing.Setup(
base.PublicClientAPIMux, requestPool, syncDB, userAPI, base.PublicClientAPIMux, requestPool, syncDB, userAPI,
rsAPI, cfg, lazyLoadCache, rsAPI, cfg, base.Caches,
) )
} }