Remove caches from BaseDendrite

This commit is contained in:
Till Faelligen 2023-03-15 19:04:01 +01:00
parent e493f4e800
commit 0a67a32ce4
No known key found for this signature in database
GPG key ID: ACCDC9606D472758
26 changed files with 128 additions and 78 deletions

View file

@ -12,6 +12,7 @@ import (
"github.com/matrix-org/dendrite/appservice" "github.com/matrix-org/dendrite/appservice"
"github.com/matrix-org/dendrite/appservice/api" "github.com/matrix-org/dendrite/appservice/api"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/roomserver" "github.com/matrix-org/dendrite/roomserver"
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/test" "github.com/matrix-org/dendrite/test"
@ -123,8 +124,9 @@ func TestAppserviceInternalAPI(t *testing.T) {
}, },
} }
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
// Create required internal APIs // Create required internal APIs
rsAPI := roomserver.NewInternalAPI(base) rsAPI := roomserver.NewInternalAPI(base, caches)
usrAPI := userapi.NewInternalAPI(base, rsAPI, nil) usrAPI := userapi.NewInternalAPI(base, rsAPI, nil)
asAPI := appservice.NewInternalAPI(base, usrAPI, rsAPI) asAPI := appservice.NewInternalAPI(base, usrAPI, rsAPI)

View file

@ -29,6 +29,7 @@ import (
"github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/rooms" "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/rooms"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing" "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing"
"github.com/matrix-org/dendrite/federationapi" "github.com/matrix-org/dendrite/federationapi"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/httputil" "github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/dendrite/roomserver" "github.com/matrix-org/dendrite/roomserver"
"github.com/matrix-org/dendrite/setup" "github.com/matrix-org/dendrite/setup"
@ -192,7 +193,8 @@ func startup() {
base, userAPI, rsAPI, base, userAPI, rsAPI,
) )
rsAPI.SetAppserviceAPI(asQuery) rsAPI.SetAppserviceAPI(asQuery)
fedSenderAPI := federationapi.NewInternalAPI(base, federation, rsAPI, base.Caches, keyRing, true) caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.EnableMetrics)
fedSenderAPI := federationapi.NewInternalAPI(base, federation, rsAPI, caches, keyRing, true)
rsAPI.SetFederationAPI(fedSenderAPI, keyRing) rsAPI.SetFederationAPI(fedSenderAPI, keyRing)
monolith := setup.Monolith{ monolith := setup.Monolith{
@ -208,7 +210,7 @@ func startup() {
//ServerKeyAPI: serverKeyAPI, //ServerKeyAPI: serverKeyAPI,
ExtPublicRoomsProvider: rooms.NewPineconeRoomProvider(pRouter, pSessions, fedSenderAPI, federation), ExtPublicRoomsProvider: rooms.NewPineconeRoomProvider(pRouter, pSessions, fedSenderAPI, federation),
} }
monolith.AddAllPublicRoutes(base) monolith.AddAllPublicRoutes(base, caches)
httpRouter := mux.NewRouter().SkipClean(true).UseEncodedPath() httpRouter := mux.NewRouter().SkipClean(true).UseEncodedPath()
httpRouter.PathPrefix(httputil.PublicClientPathPrefix).Handler(base.PublicClientAPIMux) httpRouter.PathPrefix(httputil.PublicClientPathPrefix).Handler(base.PublicClientAPIMux)

View file

@ -19,6 +19,7 @@ import (
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/yggrooms" "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/yggrooms"
"github.com/matrix-org/dendrite/federationapi" "github.com/matrix-org/dendrite/federationapi"
"github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/httputil" "github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/dendrite/roomserver" "github.com/matrix-org/dendrite/roomserver"
"github.com/matrix-org/dendrite/setup" "github.com/matrix-org/dendrite/setup"
@ -158,10 +159,11 @@ func (m *DendriteMonolith) Start() {
serverKeyAPI := &signing.YggdrasilKeys{} serverKeyAPI := &signing.YggdrasilKeys{}
keyRing := serverKeyAPI.KeyRing() keyRing := serverKeyAPI.KeyRing()
rsAPI := roomserver.NewInternalAPI(base) caches := caching.NewRistrettoCache(cfg.Global.Cache.EstimatedMaxSize, cfg.Global.Cache.MaxAge, caching.DisableMetrics)
rsAPI := roomserver.NewInternalAPI(base, caches)
fsAPI := federationapi.NewInternalAPI( fsAPI := federationapi.NewInternalAPI(
base, federation, rsAPI, base.Caches, keyRing, true, base, federation, rsAPI, caches, keyRing, true,
) )
userAPI := userapi.NewInternalAPI(base, rsAPI, federation) userAPI := userapi.NewInternalAPI(base, rsAPI, federation)
@ -187,7 +189,7 @@ func (m *DendriteMonolith) Start() {
ygg, fsAPI, federation, ygg, fsAPI, federation,
), ),
} }
monolith.AddAllPublicRoutes(base) monolith.AddAllPublicRoutes(base, caches)
httpRouter := mux.NewRouter() httpRouter := mux.NewRouter()
httpRouter.PathPrefix(httputil.PublicClientPathPrefix).Handler(base.PublicClientAPIMux) httpRouter.PathPrefix(httputil.PublicClientPathPrefix).Handler(base.PublicClientAPIMux)

View file

@ -8,6 +8,7 @@ import (
"github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/dendrite/federationapi" "github.com/matrix-org/dendrite/federationapi"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/roomserver" "github.com/matrix-org/dendrite/roomserver"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
@ -37,7 +38,8 @@ func TestAdminResetPassword(t *testing.T) {
SigningIdentity: gomatrixserverlib.SigningIdentity{ServerName: "vh1"}, SigningIdentity: gomatrixserverlib.SigningIdentity{ServerName: "vh1"},
}) })
rsAPI := roomserver.NewInternalAPI(base) caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
rsAPI := roomserver.NewInternalAPI(base, caches)
// Needed for changing the password/login // Needed for changing the password/login
userAPI := userapi.NewInternalAPI(base, rsAPI, nil) userAPI := userapi.NewInternalAPI(base, rsAPI, nil)
// We mostly need the userAPI for this test, so nil for other APIs/caches etc. // We mostly need the userAPI for this test, so nil for other APIs/caches etc.
@ -148,15 +150,16 @@ func TestPurgeRoom(t *testing.T) {
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) { test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
base, baseClose := testrig.CreateBaseDendrite(t, dbType) base, baseClose := testrig.CreateBaseDendrite(t, dbType)
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
defer baseClose() defer baseClose()
fedClient := base.CreateFederationClient() fedClient := base.CreateFederationClient()
rsAPI := roomserver.NewInternalAPI(base) rsAPI := roomserver.NewInternalAPI(base, caches)
userAPI := userapi.NewInternalAPI(base, rsAPI, nil) userAPI := userapi.NewInternalAPI(base, rsAPI, nil)
// this starts the JetStream consumers // this starts the JetStream consumers
syncapi.AddPublicRoutes(base, userAPI, rsAPI) syncapi.AddPublicRoutes(base, userAPI, rsAPI, caches)
federationapi.NewInternalAPI(base, fedClient, rsAPI, base.Caches, nil, true) federationapi.NewInternalAPI(base, fedClient, rsAPI, caches, nil, true)
rsAPI.SetFederationAPI(nil, nil) rsAPI.SetFederationAPI(nil, nil)
// Create the room // Create the room

View file

@ -7,6 +7,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/dendrite/appservice" "github.com/matrix-org/dendrite/appservice"
@ -27,7 +28,8 @@ func TestJoinRoomByIDOrAlias(t *testing.T) {
base, baseClose := testrig.CreateBaseDendrite(t, dbType) base, baseClose := testrig.CreateBaseDendrite(t, dbType)
defer baseClose() defer baseClose()
rsAPI := roomserver.NewInternalAPI(base) caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
rsAPI := roomserver.NewInternalAPI(base, caches)
userAPI := userapi.NewInternalAPI(base, rsAPI, nil) userAPI := userapi.NewInternalAPI(base, rsAPI, nil)
asAPI := appservice.NewInternalAPI(base, userAPI, rsAPI) asAPI := appservice.NewInternalAPI(base, userAPI, rsAPI)
rsAPI.SetFederationAPI(nil, nil) // creates the rs.Inputer etc rsAPI.SetFederationAPI(nil, nil) // creates the rs.Inputer etc

View file

@ -9,6 +9,7 @@ import (
"testing" "testing"
"github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/roomserver" "github.com/matrix-org/dendrite/roomserver"
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
@ -36,7 +37,8 @@ func TestLogin(t *testing.T) {
SigningIdentity: gomatrixserverlib.SigningIdentity{ServerName: "vh1"}, SigningIdentity: gomatrixserverlib.SigningIdentity{ServerName: "vh1"},
}) })
rsAPI := roomserver.NewInternalAPI(base) caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
rsAPI := roomserver.NewInternalAPI(base, caches)
// Needed for /login // Needed for /login
userAPI := userapi.NewInternalAPI(base, rsAPI, nil) userAPI := userapi.NewInternalAPI(base, rsAPI, nil)

View file

@ -30,6 +30,7 @@ import (
"github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/roomserver" "github.com/matrix-org/dendrite/roomserver"
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/test" "github.com/matrix-org/dendrite/test"
@ -407,7 +408,8 @@ func Test_register(t *testing.T) {
base, baseClose := testrig.CreateBaseDendrite(t, dbType) base, baseClose := testrig.CreateBaseDendrite(t, dbType)
defer baseClose() defer baseClose()
rsAPI := roomserver.NewInternalAPI(base) caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
rsAPI := roomserver.NewInternalAPI(base, caches)
userAPI := userapi.NewInternalAPI(base, rsAPI, nil) userAPI := userapi.NewInternalAPI(base, rsAPI, nil)
for _, tc := range testCases { for _, tc := range testCases {
@ -578,7 +580,8 @@ func TestRegisterUserWithDisplayName(t *testing.T) {
defer baseClose() defer baseClose()
base.Cfg.Global.ServerName = "server" base.Cfg.Global.ServerName = "server"
rsAPI := roomserver.NewInternalAPI(base) caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
rsAPI := roomserver.NewInternalAPI(base, caches)
userAPI := userapi.NewInternalAPI(base, rsAPI, nil) userAPI := userapi.NewInternalAPI(base, rsAPI, nil)
deviceName, deviceID := "deviceName", "deviceID" deviceName, deviceID := "deviceName", "deviceID"
expectedDisplayName := "DisplayName" expectedDisplayName := "DisplayName"
@ -616,8 +619,8 @@ func TestRegisterAdminUsingSharedSecret(t *testing.T) {
base.Cfg.Global.ServerName = "server" base.Cfg.Global.ServerName = "server"
sharedSecret := "dendritetest" sharedSecret := "dendritetest"
base.Cfg.ClientAPI.RegistrationSharedSecret = sharedSecret base.Cfg.ClientAPI.RegistrationSharedSecret = sharedSecret
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
rsAPI := roomserver.NewInternalAPI(base) rsAPI := roomserver.NewInternalAPI(base, caches)
userAPI := userapi.NewInternalAPI(base, rsAPI, nil) userAPI := userapi.NewInternalAPI(base, rsAPI, nil)
expectedDisplayName := "rabbit" expectedDisplayName := "rabbit"

View file

@ -37,6 +37,7 @@ import (
"github.com/matrix-org/dendrite/federationapi" "github.com/matrix-org/dendrite/federationapi"
federationAPI "github.com/matrix-org/dendrite/federationapi/api" federationAPI "github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/federationapi/producers" "github.com/matrix-org/dendrite/federationapi/producers"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/httputil" "github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/dendrite/relayapi" "github.com/matrix-org/dendrite/relayapi"
relayAPI "github.com/matrix-org/dendrite/relayapi/api" relayAPI "github.com/matrix-org/dendrite/relayapi/api"
@ -134,17 +135,17 @@ func (p *P2PMonolith) SetupDendrite(cfg *config.Dendrite, port int, enableRelayi
serverKeyAPI := &signing.YggdrasilKeys{} serverKeyAPI := &signing.YggdrasilKeys{}
keyRing := serverKeyAPI.KeyRing() keyRing := serverKeyAPI.KeyRing()
rsComponent := roomserver.NewInternalAPI(p.BaseDendrite) caches := caching.NewRistrettoCache(cfg.Global.Cache.EstimatedMaxSize, cfg.Global.Cache.MaxAge, enableMetrics)
rsAPI := rsComponent rsAPI := roomserver.NewInternalAPI(p.BaseDendrite, caches)
fsAPI := federationapi.NewInternalAPI( fsAPI := federationapi.NewInternalAPI(
p.BaseDendrite, federation, rsAPI, p.BaseDendrite.Caches, keyRing, true, p.BaseDendrite, federation, rsAPI, caches, keyRing, true,
) )
userAPI := userapi.NewInternalAPI(p.BaseDendrite, rsAPI, federation) userAPI := userapi.NewInternalAPI(p.BaseDendrite, rsAPI, federation)
asAPI := appservice.NewInternalAPI(p.BaseDendrite, userAPI, rsAPI) asAPI := appservice.NewInternalAPI(p.BaseDendrite, userAPI, rsAPI)
rsComponent.SetFederationAPI(fsAPI, keyRing) rsAPI.SetFederationAPI(fsAPI, keyRing)
userProvider := users.NewPineconeUserProvider(p.Router, p.Sessions, userAPI, federation) userProvider := users.NewPineconeUserProvider(p.Router, p.Sessions, userAPI, federation)
roomProvider := rooms.NewPineconeRoomProvider(p.Router, p.Sessions, fsAPI, federation) roomProvider := rooms.NewPineconeRoomProvider(p.Router, p.Sessions, fsAPI, federation)
@ -161,7 +162,7 @@ func (p *P2PMonolith) SetupDendrite(cfg *config.Dendrite, port int, enableRelayi
Config: &p.BaseDendrite.Cfg.FederationAPI, Config: &p.BaseDendrite.Cfg.FederationAPI,
UserAPI: userAPI, UserAPI: userAPI,
} }
relayAPI := relayapi.NewRelayInternalAPI(p.BaseDendrite, federation, rsAPI, keyRing, producer, enableRelaying) relayAPI := relayapi.NewRelayInternalAPI(p.BaseDendrite, federation, rsAPI, keyRing, producer, enableRelaying, caches)
logrus.Infof("Relaying enabled: %v", relayAPI.RelayingEnabled()) logrus.Infof("Relaying enabled: %v", relayAPI.RelayingEnabled())
p.dendrite = setup.Monolith{ p.dendrite = setup.Monolith{
@ -178,7 +179,7 @@ func (p *P2PMonolith) SetupDendrite(cfg *config.Dendrite, port int, enableRelayi
ExtPublicRoomsProvider: roomProvider, ExtPublicRoomsProvider: roomProvider,
ExtUserDirectoryProvider: userProvider, ExtUserDirectoryProvider: userProvider,
} }
p.dendrite.AddAllPublicRoutes(p.BaseDendrite) p.dendrite.AddAllPublicRoutes(p.BaseDendrite, caches)
p.setupHttpServers(userProvider, enableWebsockets) p.setupHttpServers(userProvider, enableWebsockets)
} }

View file

@ -27,6 +27,7 @@ import (
"path/filepath" "path/filepath"
"time" "time"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/gorilla/mux" "github.com/gorilla/mux"
@ -156,16 +157,15 @@ func main() {
serverKeyAPI := &signing.YggdrasilKeys{} serverKeyAPI := &signing.YggdrasilKeys{}
keyRing := serverKeyAPI.KeyRing() keyRing := serverKeyAPI.KeyRing()
rsAPI := roomserver.NewInternalAPI( caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.EnableMetrics)
base, rsAPI := roomserver.NewInternalAPI(base, caches)
)
userAPI := userapi.NewInternalAPI(base, rsAPI, federation) userAPI := userapi.NewInternalAPI(base, rsAPI, federation)
asAPI := appservice.NewInternalAPI(base, userAPI, rsAPI) asAPI := appservice.NewInternalAPI(base, userAPI, rsAPI)
rsAPI.SetAppserviceAPI(asAPI) rsAPI.SetAppserviceAPI(asAPI)
fsAPI := federationapi.NewInternalAPI( fsAPI := federationapi.NewInternalAPI(
base, federation, rsAPI, base.Caches, keyRing, true, base, federation, rsAPI, caches, keyRing, true,
) )
rsAPI.SetFederationAPI(fsAPI, keyRing) rsAPI.SetFederationAPI(fsAPI, keyRing)
@ -184,8 +184,8 @@ func main() {
ygg, fsAPI, federation, ygg, fsAPI, federation,
), ),
} }
monolith.AddAllPublicRoutes(base) monolith.AddAllPublicRoutes(base, caches)
if err := mscs.Enable(base, &monolith); err != nil { if err := mscs.Enable(base, &monolith, caches); err != nil {
logrus.WithError(err).Fatalf("Failed to enable MSCs") logrus.WithError(err).Fatalf("Failed to enable MSCs")
} }

View file

@ -17,6 +17,7 @@ package main
import ( import (
"flag" "flag"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/matrix-org/dendrite/appservice" "github.com/matrix-org/dendrite/appservice"
@ -72,10 +73,11 @@ func main() {
federation := base.CreateFederationClient() federation := base.CreateFederationClient()
rsAPI := roomserver.NewInternalAPI(base) caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.EnableMetrics)
rsAPI := roomserver.NewInternalAPI(base, caches)
fsAPI := federationapi.NewInternalAPI( fsAPI := federationapi.NewInternalAPI(
base, federation, rsAPI, base.Caches, nil, false, base, federation, rsAPI, caches, nil, false,
) )
keyRing := fsAPI.KeyRing() keyRing := fsAPI.KeyRing()
@ -104,10 +106,10 @@ func main() {
RoomserverAPI: rsAPI, RoomserverAPI: rsAPI,
UserAPI: userAPI, UserAPI: userAPI,
} }
monolith.AddAllPublicRoutes(base) monolith.AddAllPublicRoutes(base, caches)
if len(base.Cfg.MSCs.MSCs) > 0 { if len(base.Cfg.MSCs.MSCs) > 0 {
if err := mscs.Enable(base, &monolith); err != nil { if err := mscs.Enable(base, &monolith, caches); err != nil {
logrus.WithError(err).Fatalf("Failed to enable MSCs") logrus.WithError(err).Fatalf("Failed to enable MSCs")
} }
} }

View file

@ -94,7 +94,7 @@ func NewInternalAPI(
) api.FederationInternalAPI { ) api.FederationInternalAPI {
cfg := &base.Cfg.FederationAPI cfg := &base.Cfg.FederationAPI
federationDB, err := storage.NewDatabase(base, &cfg.Database, base.Caches, base.Cfg.Global.IsLocalServerName) federationDB, err := storage.NewDatabase(base, &cfg.Database, caches, base.Cfg.Global.IsLocalServerName)
if err != nil { if err != nil {
logrus.WithError(err).Panic("failed to connect to federation sender db") logrus.WithError(err).Panic("failed to connect to federation sender db")
} }

View file

@ -10,6 +10,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/gomatrix" "github.com/matrix-org/gomatrix"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go" "github.com/nats-io/nats.go"
@ -163,6 +164,7 @@ func TestFederationAPIJoinThenKeyUpdate(t *testing.T) {
func testFederationAPIJoinThenKeyUpdate(t *testing.T, dbType test.DBType) { func testFederationAPIJoinThenKeyUpdate(t *testing.T, dbType test.DBType) {
base, close := testrig.CreateBaseDendrite(t, dbType) base, close := testrig.CreateBaseDendrite(t, dbType)
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
base.Cfg.FederationAPI.PreferDirectFetch = true base.Cfg.FederationAPI.PreferDirectFetch = true
base.Cfg.FederationAPI.KeyPerspectives = nil base.Cfg.FederationAPI.KeyPerspectives = nil
defer close() defer close()
@ -212,7 +214,7 @@ func testFederationAPIJoinThenKeyUpdate(t *testing.T, dbType test.DBType) {
}, },
}, },
} }
fsapi := federationapi.NewInternalAPI(base, fc, rsapi, base.Caches, nil, false) fsapi := federationapi.NewInternalAPI(base, fc, rsapi, caches, nil, false)
var resp api.PerformJoinResponse var resp api.PerformJoinResponse
fsapi.PerformJoin(context.Background(), &api.PerformJoinRequest{ fsapi.PerformJoin(context.Background(), &api.PerformJoinRequest{

View file

@ -21,6 +21,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/matrix-org/dendrite/internal/caching"
"go.uber.org/atomic" "go.uber.org/atomic"
"gotest.tools/v3/poll" "gotest.tools/v3/poll"
@ -41,10 +42,11 @@ func mustCreateFederationDatabase(t *testing.T, dbType test.DBType, realDatabase
if realDatabase { if realDatabase {
// Real Database/s // Real Database/s
b, baseClose := testrig.CreateBaseDendrite(t, dbType) b, baseClose := testrig.CreateBaseDendrite(t, dbType)
caches := caching.NewRistrettoCache(b.Cfg.Global.Cache.EstimatedMaxSize, b.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
connStr, dbClose := test.PrepareDBConnectionString(t, dbType) connStr, dbClose := test.PrepareDBConnectionString(t, dbType)
db, err := storage.NewDatabase(b, &config.DatabaseOptions{ db, err := storage.NewDatabase(b, &config.DatabaseOptions{
ConnectionString: config.DataSource(connStr), ConnectionString: config.DataSource(connStr),
}, b.Caches, b.Cfg.Global.IsLocalServerName) }, caches, b.Cfg.Global.IsLocalServerName)
if err != nil { if err != nil {
t.Fatalf("NewDatabase returned %s", err) t.Fatalf("NewDatabase returned %s", err)
} }

View file

@ -7,6 +7,7 @@ import (
"time" "time"
"github.com/matrix-org/dendrite/federationapi/storage" "github.com/matrix-org/dendrite/federationapi/storage"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/test" "github.com/matrix-org/dendrite/test"
"github.com/matrix-org/dendrite/test/testrig" "github.com/matrix-org/dendrite/test/testrig"
@ -17,10 +18,11 @@ import (
func mustCreateFederationDatabase(t *testing.T, dbType test.DBType) (storage.Database, func()) { func mustCreateFederationDatabase(t *testing.T, dbType test.DBType) (storage.Database, func()) {
b, baseClose := testrig.CreateBaseDendrite(t, dbType) b, baseClose := testrig.CreateBaseDendrite(t, dbType)
caches := caching.NewRistrettoCache(b.Cfg.Global.Cache.EstimatedMaxSize, b.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
connStr, dbClose := test.PrepareDBConnectionString(t, dbType) connStr, dbClose := test.PrepareDBConnectionString(t, dbType)
db, err := storage.NewDatabase(b, &config.DatabaseOptions{ db, err := storage.NewDatabase(b, &config.DatabaseOptions{
ConnectionString: config.DataSource(connStr), ConnectionString: config.DataSource(connStr),
}, b.Caches, func(server gomatrixserverlib.ServerName) bool { return server == "localhost" }) }, caches, func(server gomatrixserverlib.ServerName) bool { return server == "localhost" })
if err != nil { if err != nil {
t.Fatalf("NewDatabase returned %s", err) t.Fatalf("NewDatabase returned %s", err)
} }

View file

@ -46,6 +46,11 @@ const (
eventStateKeyNIDCache eventStateKeyNIDCache
) )
const (
DisableMetrics = false
EnableMetrics = true
)
func NewRistrettoCache(maxCost config.DataUnit, maxAge time.Duration, enablePrometheus bool) *Caches { func NewRistrettoCache(maxCost config.DataUnit, maxAge time.Duration, enablePrometheus bool) *Caches {
cache, err := ristretto.NewCache(&ristretto.Config{ cache, err := ristretto.NewCache(&ristretto.Config{
NumCounters: int64((maxCost / 1024) * 10), // 10 counters per 1KB data, affects bloom filter size NumCounters: int64((maxCost / 1024) * 10), // 10 counters per 1KB data, affects bloom filter size

View file

@ -16,6 +16,7 @@ package relayapi
import ( import (
"github.com/matrix-org/dendrite/federationapi/producers" "github.com/matrix-org/dendrite/federationapi/producers"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/relayapi/api" "github.com/matrix-org/dendrite/relayapi/api"
"github.com/matrix-org/dendrite/relayapi/internal" "github.com/matrix-org/dendrite/relayapi/internal"
"github.com/matrix-org/dendrite/relayapi/routing" "github.com/matrix-org/dendrite/relayapi/routing"
@ -55,10 +56,10 @@ func NewRelayInternalAPI(
keyRing *gomatrixserverlib.KeyRing, keyRing *gomatrixserverlib.KeyRing,
producer *producers.SyncAPIProducer, producer *producers.SyncAPIProducer,
relayingEnabled bool, relayingEnabled bool,
caches *caching.Caches,
) api.RelayInternalAPI { ) api.RelayInternalAPI {
cfg := &base.Cfg.RelayAPI cfg := &base.Cfg.RelayAPI
relayDB, err := storage.NewDatabase(base, &cfg.Database, caches, base.Cfg.Global.IsLocalServerName)
relayDB, err := storage.NewDatabase(base, &cfg.Database, base.Caches, base.Cfg.Global.IsLocalServerName)
if err != nil { if err != nil {
logrus.WithError(err).Panic("failed to connect to relay db") logrus.WithError(err).Panic("failed to connect to relay db")
} }

View file

@ -24,6 +24,7 @@ import (
"github.com/gorilla/mux" "github.com/gorilla/mux"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing" "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/relayapi" "github.com/matrix-org/dendrite/relayapi"
"github.com/matrix-org/dendrite/test" "github.com/matrix-org/dendrite/test"
"github.com/matrix-org/dendrite/test/testrig" "github.com/matrix-org/dendrite/test/testrig"
@ -34,9 +35,10 @@ import (
func TestCreateNewRelayInternalAPI(t *testing.T) { func TestCreateNewRelayInternalAPI(t *testing.T) {
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) { test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
base, close := testrig.CreateBaseDendrite(t, dbType) base, close := testrig.CreateBaseDendrite(t, dbType)
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
defer close() defer close()
relayAPI := relayapi.NewRelayInternalAPI(base, nil, nil, nil, nil, true) relayAPI := relayapi.NewRelayInternalAPI(base, nil, nil, nil, nil, true, caches)
assert.NotNil(t, relayAPI) assert.NotNil(t, relayAPI)
}) })
} }
@ -52,7 +54,7 @@ func TestCreateRelayInternalInvalidDatabasePanics(t *testing.T) {
defer close() defer close()
assert.Panics(t, func() { assert.Panics(t, func() {
relayapi.NewRelayInternalAPI(base, nil, nil, nil, nil, true) relayapi.NewRelayInternalAPI(base, nil, nil, nil, nil, true, nil)
}) })
}) })
} }
@ -107,8 +109,9 @@ func TestCreateRelayPublicRoutes(t *testing.T) {
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) { test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
base, close := testrig.CreateBaseDendrite(t, dbType) base, close := testrig.CreateBaseDendrite(t, dbType)
defer close() defer close()
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
relayAPI := relayapi.NewRelayInternalAPI(base, nil, nil, nil, nil, true) relayAPI := relayapi.NewRelayInternalAPI(base, nil, nil, nil, nil, true, caches)
assert.NotNil(t, relayAPI) assert.NotNil(t, relayAPI)
serverKeyAPI := &signing.YggdrasilKeys{} serverKeyAPI := &signing.YggdrasilKeys{}
@ -156,8 +159,9 @@ func TestDisableRelayPublicRoutes(t *testing.T) {
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) { test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
base, close := testrig.CreateBaseDendrite(t, dbType) base, close := testrig.CreateBaseDendrite(t, dbType)
defer close() defer close()
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
relayAPI := relayapi.NewRelayInternalAPI(base, nil, nil, nil, nil, false) relayAPI := relayapi.NewRelayInternalAPI(base, nil, nil, nil, nil, false, caches)
assert.NotNil(t, relayAPI) assert.NotNil(t, relayAPI)
serverKeyAPI := &signing.YggdrasilKeys{} serverKeyAPI := &signing.YggdrasilKeys{}

View file

@ -60,7 +60,7 @@ type RoomserverInternalAPI struct {
func NewRoomserverAPI( func NewRoomserverAPI(
base *base.BaseDendrite, roomserverDB storage.Database, base *base.BaseDendrite, roomserverDB storage.Database,
js nats.JetStreamContext, nc *nats.Conn, js nats.JetStreamContext, nc *nats.Conn, caches caching.RoomServerCaches,
) *RoomserverInternalAPI { ) *RoomserverInternalAPI {
var perspectiveServerNames []gomatrixserverlib.ServerName var perspectiveServerNames []gomatrixserverlib.ServerName
for _, kp := range base.Cfg.FederationAPI.KeyPerspectives { for _, kp := range base.Cfg.FederationAPI.KeyPerspectives {
@ -78,7 +78,7 @@ func NewRoomserverAPI(
DB: roomserverDB, DB: roomserverDB,
Base: base, Base: base,
Cfg: &base.Cfg.RoomServer, Cfg: &base.Cfg.RoomServer,
Cache: base.Caches, Cache: caches,
ServerName: base.Cfg.Global.ServerName, ServerName: base.Cfg.Global.ServerName,
PerspectiveServerNames: perspectiveServerNames, PerspectiveServerNames: perspectiveServerNames,
InputRoomEventTopic: base.Cfg.Global.JetStream.Prefixed(jetstream.InputRoomEvent), InputRoomEventTopic: base.Cfg.Global.JetStream.Prefixed(jetstream.InputRoomEvent),
@ -89,7 +89,7 @@ func NewRoomserverAPI(
ServerACLs: serverACLs, ServerACLs: serverACLs,
Queryer: &query.Queryer{ Queryer: &query.Queryer{
DB: roomserverDB, DB: roomserverDB,
Cache: base.Caches, Cache: caches,
IsLocalServerName: base.Cfg.Global.IsLocalServerName, IsLocalServerName: base.Cfg.Global.IsLocalServerName,
ServerACLs: serverACLs, ServerACLs: serverACLs,
}, },

View file

@ -4,6 +4,7 @@ import (
"context" "context"
"testing" "testing"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/dendrite/roomserver/types"
@ -18,7 +19,8 @@ import (
func mustCreateDatabase(t *testing.T, dbType test.DBType) (*base.BaseDendrite, storage.Database, func()) { func mustCreateDatabase(t *testing.T, dbType test.DBType) (*base.BaseDendrite, storage.Database, func()) {
base, close := testrig.CreateBaseDendrite(t, dbType) base, close := testrig.CreateBaseDendrite(t, dbType)
db, err := storage.Open(base, &base.Cfg.RoomServer.Database, base.Caches) caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
db, err := storage.Open(base, &base.Cfg.RoomServer.Database, caches)
if err != nil { if err != nil {
t.Fatalf("failed to create Database: %v", err) t.Fatalf("failed to create Database: %v", err)
} }

View file

@ -15,6 +15,7 @@
package roomserver package roomserver
import ( import (
"github.com/matrix-org/dendrite/internal/caching"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
@ -26,10 +27,11 @@ import (
// NewInternalAPI returns a concrete implementation of the internal API. // NewInternalAPI returns a concrete implementation of the internal API.
func NewInternalAPI( func NewInternalAPI(
base *base.BaseDendrite, base *base.BaseDendrite,
caches *caching.Caches,
) api.RoomserverInternalAPI { ) api.RoomserverInternalAPI {
cfg := &base.Cfg.RoomServer cfg := &base.Cfg.RoomServer
roomserverDB, err := storage.Open(base, &cfg.Database, base.Caches) roomserverDB, err := storage.Open(base, &cfg.Database, caches)
if err != nil { if err != nil {
logrus.WithError(err).Panicf("failed to connect to room server db") logrus.WithError(err).Panicf("failed to connect to room server db")
} }
@ -37,6 +39,6 @@ func NewInternalAPI(
js, nc := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) js, nc := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream)
return internal.NewRoomserverAPI( return internal.NewRoomserverAPI(
base, roomserverDB, js, nc, base, roomserverDB, js, nc, caches,
) )
} }

View file

@ -7,6 +7,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/matrix-org/dendrite/roomserver/state" "github.com/matrix-org/dendrite/roomserver/state"
@ -32,7 +33,8 @@ import (
func mustCreateDatabase(t *testing.T, dbType test.DBType) (*base.BaseDendrite, storage.Database, func()) { func mustCreateDatabase(t *testing.T, dbType test.DBType) (*base.BaseDendrite, storage.Database, func()) {
t.Helper() t.Helper()
base, close := testrig.CreateBaseDendrite(t, dbType) base, close := testrig.CreateBaseDendrite(t, dbType)
db, err := storage.Open(base, &base.Cfg.RoomServer.Database, base.Caches) caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
db, err := storage.Open(base, &base.Cfg.RoomServer.Database, caches)
if err != nil { if err != nil {
t.Fatalf("failed to create Database: %v", err) t.Fatalf("failed to create Database: %v", err)
} }
@ -43,7 +45,8 @@ func TestUsers(t *testing.T) {
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) { test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
base, close := testrig.CreateBaseDendrite(t, dbType) base, close := testrig.CreateBaseDendrite(t, dbType)
defer close() defer close()
rsAPI := roomserver.NewInternalAPI(base) caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
rsAPI := roomserver.NewInternalAPI(base, caches)
// SetFederationAPI starts the room event input consumer // SetFederationAPI starts the room event input consumer
rsAPI.SetFederationAPI(nil, nil) rsAPI.SetFederationAPI(nil, nil)
@ -181,7 +184,8 @@ func Test_QueryLeftUsers(t *testing.T) {
base, _, close := mustCreateDatabase(t, dbType) base, _, close := mustCreateDatabase(t, dbType)
defer close() defer close()
rsAPI := roomserver.NewInternalAPI(base) caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
rsAPI := roomserver.NewInternalAPI(base, caches)
// SetFederationAPI starts the room event input consumer // SetFederationAPI starts the room event input consumer
rsAPI.SetFederationAPI(nil, nil) rsAPI.SetFederationAPI(nil, nil)
// Create the room // Create the room
@ -232,12 +236,13 @@ func TestPurgeRoom(t *testing.T) {
defer jetstream.DeleteAllStreams(jsCtx, &base.Cfg.Global.JetStream) defer jetstream.DeleteAllStreams(jsCtx, &base.Cfg.Global.JetStream)
fedClient := base.CreateFederationClient() fedClient := base.CreateFederationClient()
rsAPI := roomserver.NewInternalAPI(base) caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
rsAPI := roomserver.NewInternalAPI(base, caches)
userAPI := userapi.NewInternalAPI(base, rsAPI, nil) userAPI := userapi.NewInternalAPI(base, rsAPI, nil)
// this starts the JetStream consumers // this starts the JetStream consumers
syncapi.AddPublicRoutes(base, userAPI, rsAPI) syncapi.AddPublicRoutes(base, userAPI, rsAPI, caches)
federationapi.NewInternalAPI(base, fedClient, rsAPI, base.Caches, nil, true) federationapi.NewInternalAPI(base, fedClient, rsAPI, caches, nil, true)
rsAPI.SetFederationAPI(nil, nil) rsAPI.SetFederationAPI(nil, nil)
// Create the room // Create the room

View file

@ -36,12 +36,12 @@ import (
"github.com/getsentry/sentry-go" "github.com/getsentry/sentry-go"
sentryhttp "github.com/getsentry/sentry-go/http" sentryhttp "github.com/getsentry/sentry-go/http"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/client_golang/prometheus/promhttp"
"go.uber.org/atomic" "go.uber.org/atomic"
"github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/httputil" "github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/dendrite/internal/pushgateway" "github.com/matrix-org/dendrite/internal/pushgateway"
"github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/internal/sqlutil"
@ -52,7 +52,6 @@ import (
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/dendrite/setup/process"
) )
@ -77,7 +76,6 @@ type BaseDendrite struct {
SynapseAdminMux *mux.Router SynapseAdminMux *mux.Router
NATS *jetstream.NATSInstance NATS *jetstream.NATSInstance
Cfg *config.Dendrite Cfg *config.Dendrite
Caches *caching.Caches
DNSCache *gomatrixserverlib.DNSCache DNSCache *gomatrixserverlib.DNSCache
Database *sql.DB Database *sql.DB
DatabaseWriter sqlutil.Writer DatabaseWriter sqlutil.Writer
@ -188,7 +186,6 @@ func NewBaseDendrite(cfg *config.Dendrite, options ...BaseDendriteOptions) *Base
ProcessContext: process.NewProcessContext(), ProcessContext: process.NewProcessContext(),
tracerCloser: closer, tracerCloser: closer,
Cfg: cfg, Cfg: cfg,
Caches: caching.NewRistrettoCache(cfg.Global.Cache.EstimatedMaxSize, cfg.Global.Cache.MaxAge, enableMetrics),
DNSCache: dnsCache, DNSCache: dnsCache,
PublicClientAPIMux: mux.NewRouter().SkipClean(true).PathPrefix(httputil.PublicClientPathPrefix).Subrouter().UseEncodedPath(), PublicClientAPIMux: mux.NewRouter().SkipClean(true).PathPrefix(httputil.PublicClientPathPrefix).Subrouter().UseEncodedPath(),
PublicFederationAPIMux: mux.NewRouter().SkipClean(true).PathPrefix(httputil.PublicFederationPathPrefix).Subrouter().UseEncodedPath(), PublicFederationAPIMux: mux.NewRouter().SkipClean(true).PathPrefix(httputil.PublicFederationPathPrefix).Subrouter().UseEncodedPath(),

View file

@ -20,6 +20,7 @@ import (
"github.com/matrix-org/dendrite/clientapi/api" "github.com/matrix-org/dendrite/clientapi/api"
"github.com/matrix-org/dendrite/federationapi" "github.com/matrix-org/dendrite/federationapi"
federationAPI "github.com/matrix-org/dendrite/federationapi/api" federationAPI "github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/transactions" "github.com/matrix-org/dendrite/internal/transactions"
"github.com/matrix-org/dendrite/mediaapi" "github.com/matrix-org/dendrite/mediaapi"
"github.com/matrix-org/dendrite/relayapi" "github.com/matrix-org/dendrite/relayapi"
@ -52,7 +53,7 @@ type Monolith struct {
} }
// AddAllPublicRoutes attaches all public paths to the given router // AddAllPublicRoutes attaches all public paths to the given router
func (m *Monolith) AddAllPublicRoutes(base *base.BaseDendrite) { func (m *Monolith) AddAllPublicRoutes(base *base.BaseDendrite, caches *caching.Caches) {
userDirectoryProvider := m.ExtUserDirectoryProvider userDirectoryProvider := m.ExtUserDirectoryProvider
if userDirectoryProvider == nil { if userDirectoryProvider == nil {
userDirectoryProvider = m.UserAPI userDirectoryProvider = m.UserAPI
@ -66,7 +67,7 @@ func (m *Monolith) AddAllPublicRoutes(base *base.BaseDendrite) {
base, m.UserAPI, m.FedClient, m.KeyRing, m.RoomserverAPI, m.FederationAPI, nil, base, m.UserAPI, m.FedClient, m.KeyRing, m.RoomserverAPI, m.FederationAPI, nil,
) )
mediaapi.AddPublicRoutes(base, m.UserAPI, m.Client) mediaapi.AddPublicRoutes(base, m.UserAPI, m.Client)
syncapi.AddPublicRoutes(base, m.UserAPI, m.RoomserverAPI) syncapi.AddPublicRoutes(base, m.UserAPI, m.RoomserverAPI, caches)
if m.RelayAPI != nil { if m.RelayAPI != nil {
relayapi.AddPublicRoutes(base, m.KeyRing, m.RelayAPI) relayapi.AddPublicRoutes(base, m.KeyRing, m.RelayAPI)

View file

@ -19,6 +19,7 @@ import (
"context" "context"
"fmt" "fmt"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/setup" "github.com/matrix-org/dendrite/setup"
"github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/base"
"github.com/matrix-org/dendrite/setup/mscs/msc2836" "github.com/matrix-org/dendrite/setup/mscs/msc2836"
@ -27,22 +28,22 @@ import (
) )
// Enable MSCs - returns an error on unknown MSCs // Enable MSCs - returns an error on unknown MSCs
func Enable(base *base.BaseDendrite, monolith *setup.Monolith) error { func Enable(base *base.BaseDendrite, monolith *setup.Monolith, caches *caching.Caches) error {
for _, msc := range base.Cfg.MSCs.MSCs { for _, msc := range base.Cfg.MSCs.MSCs {
util.GetLogger(context.Background()).WithField("msc", msc).Info("Enabling MSC") util.GetLogger(context.Background()).WithField("msc", msc).Info("Enabling MSC")
if err := EnableMSC(base, monolith, msc); err != nil { if err := EnableMSC(base, monolith, msc, caches); err != nil {
return err return err
} }
} }
return nil return nil
} }
func EnableMSC(base *base.BaseDendrite, monolith *setup.Monolith, msc string) error { func EnableMSC(base *base.BaseDendrite, monolith *setup.Monolith, msc string, caches *caching.Caches) error {
switch msc { switch msc {
case "msc2836": case "msc2836":
return msc2836.Enable(base, monolith.RoomserverAPI, monolith.FederationAPI, monolith.UserAPI, monolith.KeyRing) return msc2836.Enable(base, monolith.RoomserverAPI, monolith.FederationAPI, monolith.UserAPI, monolith.KeyRing)
case "msc2946": case "msc2946":
return msc2946.Enable(base, monolith.RoomserverAPI, monolith.UserAPI, monolith.FederationAPI, monolith.KeyRing, base.Caches) return msc2946.Enable(base, monolith.RoomserverAPI, monolith.UserAPI, monolith.FederationAPI, monolith.KeyRing, caches)
case "msc2444": // enabled inside federationapi case "msc2444": // enabled inside federationapi
case "msc2753": // enabled inside clientapi case "msc2753": // enabled inside clientapi
default: default:

View file

@ -42,6 +42,7 @@ func AddPublicRoutes(
base *base.BaseDendrite, base *base.BaseDendrite,
userAPI userapi.SyncUserAPI, userAPI userapi.SyncUserAPI,
rsAPI api.SyncRoomserverAPI, rsAPI api.SyncRoomserverAPI,
caches *caching.Caches,
) { ) {
cfg := &base.Cfg.SyncAPI cfg := &base.Cfg.SyncAPI
@ -54,7 +55,7 @@ func AddPublicRoutes(
eduCache := caching.NewTypingCache() eduCache := caching.NewTypingCache()
notifier := notifier.NewNotifier() notifier := notifier.NewNotifier()
streams := streams.NewSyncStreamProviders(syncDB, userAPI, rsAPI, eduCache, base.Caches, notifier) streams := streams.NewSyncStreamProviders(syncDB, userAPI, rsAPI, eduCache, caches, notifier)
notifier.SetCurrentPosition(streams.Latest(context.Background())) notifier.SetCurrentPosition(streams.Latest(context.Background()))
if err = notifier.Load(context.Background(), syncDB); err != nil { if err = notifier.Load(context.Background(), syncDB); err != nil {
logrus.WithError(err).Panicf("failed to load notifier ") logrus.WithError(err).Panicf("failed to load notifier ")
@ -140,6 +141,6 @@ func AddPublicRoutes(
routing.Setup( routing.Setup(
base.PublicClientAPIMux, requestPool, syncDB, userAPI, base.PublicClientAPIMux, requestPool, syncDB, userAPI,
rsAPI, cfg, base.Caches, fts, rsAPI, cfg, caches, fts,
) )
} }

View file

@ -10,6 +10,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go" "github.com/nats-io/nats.go"
"github.com/tidwall/gjson" "github.com/tidwall/gjson"
@ -114,12 +115,13 @@ func testSyncAccessTokens(t *testing.T, dbType test.DBType) {
} }
base, close := testrig.CreateBaseDendrite(t, dbType) base, close := testrig.CreateBaseDendrite(t, dbType)
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
defer close() defer close()
jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream) jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream) defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
msgs := toNATSMsgs(t, base, room.Events()...) msgs := toNATSMsgs(t, base, room.Events()...)
AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}) AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, caches)
testrig.MustPublishMsgs(t, jsctx, msgs...) testrig.MustPublishMsgs(t, jsctx, msgs...)
testCases := []struct { testCases := []struct {
@ -218,7 +220,8 @@ func testSyncAPICreateRoomSyncEarly(t *testing.T, dbType test.DBType) {
// m.room.history_visibility // m.room.history_visibility
msgs := toNATSMsgs(t, base, room.Events()...) msgs := toNATSMsgs(t, base, room.Events()...)
sinceTokens := make([]string, len(msgs)) sinceTokens := make([]string, len(msgs))
AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}) caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, caches)
for i, msg := range msgs { for i, msg := range msgs {
testrig.MustPublishMsgs(t, jsctx, msg) testrig.MustPublishMsgs(t, jsctx, msg)
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
@ -302,7 +305,8 @@ func testSyncAPIUpdatePresenceImmediately(t *testing.T, dbType test.DBType) {
jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream) jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream) defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{}) caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{}, caches)
w := httptest.NewRecorder() w := httptest.NewRecorder()
base.PublicClientAPIMux.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{ base.PublicClientAPIMux.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
"access_token": alice.AccessToken, "access_token": alice.AccessToken,
@ -417,10 +421,10 @@ func testHistoryVisibility(t *testing.T, dbType test.DBType) {
defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream) defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
// Use the actual internal roomserver API // Use the actual internal roomserver API
rsAPI := roomserver.NewInternalAPI(base) caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
rsAPI := roomserver.NewInternalAPI(base, caches)
rsAPI.SetFederationAPI(nil, nil) rsAPI.SetFederationAPI(nil, nil)
AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{aliceDev, bobDev}}, rsAPI, caches)
AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{aliceDev, bobDev}}, rsAPI)
for _, tc := range testCases { for _, tc := range testCases {
testname := fmt.Sprintf("%s - %s", tc.historyVisibility, userType) testname := fmt.Sprintf("%s - %s", tc.historyVisibility, userType)
@ -717,10 +721,11 @@ func TestGetMembership(t *testing.T) {
defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream) defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
// Use an actual roomserver for this // Use an actual roomserver for this
rsAPI := roomserver.NewInternalAPI(base) caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
rsAPI := roomserver.NewInternalAPI(base, caches)
rsAPI.SetFederationAPI(nil, nil) rsAPI.SetFederationAPI(nil, nil)
AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{aliceDev, bobDev}}, rsAPI) AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{aliceDev, bobDev}}, rsAPI, caches)
for _, tc := range testCases { for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
@ -786,8 +791,8 @@ func testSendToDevice(t *testing.T, dbType test.DBType) {
jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream) jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream) defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{}) AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{}, caches)
producer := producers.SyncAPIProducer{ producer := producers.SyncAPIProducer{
TopicSendToDeviceEvent: base.Cfg.Global.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent), TopicSendToDeviceEvent: base.Cfg.Global.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent),
@ -1003,10 +1008,11 @@ func testContext(t *testing.T, dbType test.DBType) {
defer baseClose() defer baseClose()
// Use an actual roomserver for this // Use an actual roomserver for this
rsAPI := roomserver.NewInternalAPI(base) caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
rsAPI := roomserver.NewInternalAPI(base, caches)
rsAPI.SetFederationAPI(nil, nil) rsAPI.SetFederationAPI(nil, nil)
AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, rsAPI) AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, rsAPI, caches)
room := test.NewRoom(t, user) room := test.NewRoom(t, user)