mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-17 03:43:11 -06:00
Rename to be specifically immutable, panic if we try to mutate a cache entry
This commit is contained in:
parent
a101b3a713
commit
243081831a
|
|
@ -54,12 +54,12 @@ type BaseDendrite struct {
|
||||||
tracerCloser io.Closer
|
tracerCloser io.Closer
|
||||||
|
|
||||||
// APIMux should be used to register new public matrix api endpoints
|
// APIMux should be used to register new public matrix api endpoints
|
||||||
APIMux *mux.Router
|
APIMux *mux.Router
|
||||||
httpClient *http.Client
|
httpClient *http.Client
|
||||||
Cfg *config.Dendrite
|
Cfg *config.Dendrite
|
||||||
Cache caching.Cache
|
ImmutableCache caching.ImmutableCache
|
||||||
KafkaConsumer sarama.Consumer
|
KafkaConsumer sarama.Consumer
|
||||||
KafkaProducer sarama.SyncProducer
|
KafkaProducer sarama.SyncProducer
|
||||||
}
|
}
|
||||||
|
|
||||||
const HTTPServerTimeout = time.Minute * 5
|
const HTTPServerTimeout = time.Minute * 5
|
||||||
|
|
@ -85,20 +85,20 @@ func NewBaseDendrite(cfg *config.Dendrite, componentName string) *BaseDendrite {
|
||||||
kafkaConsumer, kafkaProducer = setupKafka(cfg)
|
kafkaConsumer, kafkaProducer = setupKafka(cfg)
|
||||||
}
|
}
|
||||||
|
|
||||||
cache, err := caching.NewInMemoryLRUCache()
|
cache, err := caching.NewImmutableInMemoryLRUCache()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.WithError(err).Warnf("Failed to create cache")
|
logrus.WithError(err).Warnf("Failed to create cache")
|
||||||
}
|
}
|
||||||
|
|
||||||
return &BaseDendrite{
|
return &BaseDendrite{
|
||||||
componentName: componentName,
|
componentName: componentName,
|
||||||
tracerCloser: closer,
|
tracerCloser: closer,
|
||||||
Cfg: cfg,
|
Cfg: cfg,
|
||||||
Cache: cache,
|
ImmutableCache: cache,
|
||||||
APIMux: mux.NewRouter().UseEncodedPath(),
|
APIMux: mux.NewRouter().UseEncodedPath(),
|
||||||
httpClient: &http.Client{Timeout: HTTPClientTimeout},
|
httpClient: &http.Client{Timeout: HTTPClientTimeout},
|
||||||
KafkaConsumer: kafkaConsumer,
|
KafkaConsumer: kafkaConsumer,
|
||||||
KafkaProducer: kafkaProducer,
|
KafkaProducer: kafkaProducer,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -132,7 +132,7 @@ func (b *BaseDendrite) CreateHTTPRoomserverAPIs() (
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.WithError(err).Panic("NewRoomserverInputAPIHTTP failed", b.httpClient)
|
logrus.WithError(err).Panic("NewRoomserverInputAPIHTTP failed", b.httpClient)
|
||||||
}
|
}
|
||||||
query, err := roomserverAPI.NewRoomserverQueryAPIHTTP(b.Cfg.RoomServerURL(), b.httpClient, b.Cache)
|
query, err := roomserverAPI.NewRoomserverQueryAPIHTTP(b.Cfg.RoomServerURL(), b.httpClient, b.ImmutableCache)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.WithError(err).Panic("NewRoomserverQueryAPIHTTP failed", b.httpClient)
|
logrus.WithError(err).Panic("NewRoomserverQueryAPIHTTP failed", b.httpClient)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -3,10 +3,11 @@ package caching
|
||||||
import "github.com/matrix-org/gomatrixserverlib"
|
import "github.com/matrix-org/gomatrixserverlib"
|
||||||
|
|
||||||
const (
|
const (
|
||||||
MaxRoomVersionCacheEntries = 128
|
RoomVersionCachingEnabled = true
|
||||||
|
RoomVersionMaxCacheEntries = 128
|
||||||
)
|
)
|
||||||
|
|
||||||
type Cache interface {
|
type ImmutableCache interface {
|
||||||
GetRoomVersion(roomId string) (gomatrixserverlib.RoomVersion, bool)
|
GetRoomVersion(roomId string) (gomatrixserverlib.RoomVersion, bool)
|
||||||
StoreRoomVersion(roomId string, roomVersion gomatrixserverlib.RoomVersion)
|
StoreRoomVersion(roomId string, roomVersion gomatrixserverlib.RoomVersion)
|
||||||
}
|
}
|
||||||
47
common/caching/immutableinmemorylru.go
Normal file
47
common/caching/immutableinmemorylru.go
Normal file
|
|
@ -0,0 +1,47 @@
|
||||||
|
package caching
|
||||||
|
|
||||||
|
import (
|
||||||
|
lru "github.com/hashicorp/golang-lru"
|
||||||
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
)
|
||||||
|
|
||||||
|
type ImmutableInMemoryLRUCache struct {
|
||||||
|
roomVersions *lru.Cache
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewImmutableInMemoryLRUCache() (*ImmutableInMemoryLRUCache, error) {
|
||||||
|
roomVersionCache, rvErr := lru.New(RoomVersionMaxCacheEntries)
|
||||||
|
if rvErr != nil {
|
||||||
|
return nil, rvErr
|
||||||
|
}
|
||||||
|
return &ImmutableInMemoryLRUCache{
|
||||||
|
roomVersions: roomVersionCache,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func checkForInvalidMutation(cache *lru.Cache, key string, value interface{}) {
|
||||||
|
if peek, ok := cache.Peek(key); ok && peek != value {
|
||||||
|
panic("invalid use of immutable cache tries to mutate existing value")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ImmutableInMemoryLRUCache) GetRoomVersion(roomID string) (gomatrixserverlib.RoomVersion, bool) {
|
||||||
|
if c == nil || !RoomVersionCachingEnabled {
|
||||||
|
return "", false
|
||||||
|
}
|
||||||
|
val, found := c.roomVersions.Get(roomID)
|
||||||
|
if found && val != nil {
|
||||||
|
if roomVersion, ok := val.(gomatrixserverlib.RoomVersion); ok {
|
||||||
|
return roomVersion, true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return "", false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ImmutableInMemoryLRUCache) StoreRoomVersion(roomID string, roomVersion gomatrixserverlib.RoomVersion) {
|
||||||
|
if c == nil || !RoomVersionCachingEnabled {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
checkForInvalidMutation(c.roomVersions, roomID, roomVersion)
|
||||||
|
c.roomVersions.Add(roomID, roomVersion)
|
||||||
|
}
|
||||||
|
|
@ -1,40 +0,0 @@
|
||||||
package caching
|
|
||||||
|
|
||||||
import (
|
|
||||||
lru "github.com/hashicorp/golang-lru"
|
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
|
||||||
)
|
|
||||||
|
|
||||||
type InMemoryLRUCache struct {
|
|
||||||
roomVersions *lru.Cache
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewInMemoryLRUCache() (*InMemoryLRUCache, error) {
|
|
||||||
roomVersionCache, rvErr := lru.New(MaxRoomVersionCacheEntries)
|
|
||||||
if rvErr != nil {
|
|
||||||
return nil, rvErr
|
|
||||||
}
|
|
||||||
return &InMemoryLRUCache{
|
|
||||||
roomVersions: roomVersionCache,
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *InMemoryLRUCache) GetRoomVersion(roomID string) (gomatrixserverlib.RoomVersion, bool) {
|
|
||||||
if c == nil {
|
|
||||||
return "", false
|
|
||||||
}
|
|
||||||
val, found := c.roomVersions.Get(roomID)
|
|
||||||
if found && val != nil {
|
|
||||||
if roomVersion, ok := val.(gomatrixserverlib.RoomVersion); ok {
|
|
||||||
return roomVersion, true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return "", false
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *InMemoryLRUCache) StoreRoomVersion(roomID string, roomVersion gomatrixserverlib.RoomVersion) {
|
|
||||||
if c == nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
c.roomVersions.Add(roomID, roomVersion)
|
|
||||||
}
|
|
||||||
|
|
@ -412,7 +412,7 @@ const RoomserverQueryRoomVersionForRoomPath = "/api/roomserver/queryRoomVersionF
|
||||||
|
|
||||||
// NewRoomserverQueryAPIHTTP creates a RoomserverQueryAPI implemented by talking to a HTTP POST API.
|
// NewRoomserverQueryAPIHTTP creates a RoomserverQueryAPI implemented by talking to a HTTP POST API.
|
||||||
// If httpClient is nil an error is returned
|
// If httpClient is nil an error is returned
|
||||||
func NewRoomserverQueryAPIHTTP(roomserverURL string, httpClient *http.Client, cache caching.Cache) (RoomserverQueryAPI, error) {
|
func NewRoomserverQueryAPIHTTP(roomserverURL string, httpClient *http.Client, cache caching.ImmutableCache) (RoomserverQueryAPI, error) {
|
||||||
if httpClient == nil {
|
if httpClient == nil {
|
||||||
return nil, errors.New("NewRoomserverQueryAPIHTTP: httpClient is <nil>")
|
return nil, errors.New("NewRoomserverQueryAPIHTTP: httpClient is <nil>")
|
||||||
}
|
}
|
||||||
|
|
@ -420,9 +420,9 @@ func NewRoomserverQueryAPIHTTP(roomserverURL string, httpClient *http.Client, ca
|
||||||
}
|
}
|
||||||
|
|
||||||
type httpRoomserverQueryAPI struct {
|
type httpRoomserverQueryAPI struct {
|
||||||
roomserverURL string
|
roomserverURL string
|
||||||
httpClient *http.Client
|
httpClient *http.Client
|
||||||
cache caching.Cache
|
immutableCache caching.ImmutableCache
|
||||||
}
|
}
|
||||||
|
|
||||||
// QueryLatestEventsAndState implements RoomserverQueryAPI
|
// QueryLatestEventsAndState implements RoomserverQueryAPI
|
||||||
|
|
@ -587,7 +587,7 @@ func (h *httpRoomserverQueryAPI) QueryRoomVersionForRoom(
|
||||||
request *QueryRoomVersionForRoomRequest,
|
request *QueryRoomVersionForRoomRequest,
|
||||||
response *QueryRoomVersionForRoomResponse,
|
response *QueryRoomVersionForRoomResponse,
|
||||||
) error {
|
) error {
|
||||||
if roomVersion, ok := h.cache.GetRoomVersion(request.RoomID); ok {
|
if roomVersion, ok := h.immutableCache.GetRoomVersion(request.RoomID); ok {
|
||||||
response.RoomVersion = roomVersion
|
response.RoomVersion = roomVersion
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
@ -598,7 +598,7 @@ func (h *httpRoomserverQueryAPI) QueryRoomVersionForRoom(
|
||||||
apiURL := h.roomserverURL + RoomserverQueryRoomVersionForRoomPath
|
apiURL := h.roomserverURL + RoomserverQueryRoomVersionForRoomPath
|
||||||
err := commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
|
err := commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
h.cache.StoreRoomVersion(request.RoomID, response.RoomVersion)
|
h.immutableCache.StoreRoomVersion(request.RoomID, response.RoomVersion)
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -98,8 +98,8 @@ type RoomserverQueryAPIDatabase interface {
|
||||||
|
|
||||||
// RoomserverQueryAPI is an implementation of api.RoomserverQueryAPI
|
// RoomserverQueryAPI is an implementation of api.RoomserverQueryAPI
|
||||||
type RoomserverQueryAPI struct {
|
type RoomserverQueryAPI struct {
|
||||||
DB RoomserverQueryAPIDatabase
|
DB RoomserverQueryAPIDatabase
|
||||||
Cache caching.Cache
|
ImmutableCache caching.ImmutableCache
|
||||||
}
|
}
|
||||||
|
|
||||||
// QueryLatestEventsAndState implements api.RoomserverQueryAPI
|
// QueryLatestEventsAndState implements api.RoomserverQueryAPI
|
||||||
|
|
@ -898,7 +898,7 @@ func (r *RoomserverQueryAPI) QueryRoomVersionForRoom(
|
||||||
request *api.QueryRoomVersionForRoomRequest,
|
request *api.QueryRoomVersionForRoomRequest,
|
||||||
response *api.QueryRoomVersionForRoomResponse,
|
response *api.QueryRoomVersionForRoomResponse,
|
||||||
) error {
|
) error {
|
||||||
if roomVersion, ok := r.Cache.GetRoomVersion(request.RoomID); ok {
|
if roomVersion, ok := r.ImmutableCache.GetRoomVersion(request.RoomID); ok {
|
||||||
response.RoomVersion = roomVersion
|
response.RoomVersion = roomVersion
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
@ -908,7 +908,7 @@ func (r *RoomserverQueryAPI) QueryRoomVersionForRoom(
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
response.RoomVersion = roomVersion
|
response.RoomVersion = roomVersion
|
||||||
r.Cache.StoreRoomVersion(request.RoomID, response.RoomVersion)
|
r.ImmutableCache.StoreRoomVersion(request.RoomID, response.RoomVersion)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -49,8 +49,8 @@ func SetupRoomServerComponent(
|
||||||
inputAPI.SetupHTTP(http.DefaultServeMux)
|
inputAPI.SetupHTTP(http.DefaultServeMux)
|
||||||
|
|
||||||
queryAPI := query.RoomserverQueryAPI{
|
queryAPI := query.RoomserverQueryAPI{
|
||||||
DB: roomserverDB,
|
DB: roomserverDB,
|
||||||
Cache: base.Cache,
|
ImmutableCache: base.ImmutableCache,
|
||||||
}
|
}
|
||||||
|
|
||||||
queryAPI.SetupHTTP(http.DefaultServeMux)
|
queryAPI.SetupHTTP(http.DefaultServeMux)
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue