From 93d0fba3c0b25e3f1c1d69b7e0c18f998f501fa7 Mon Sep 17 00:00:00 2001 From: Till Faelligen <2353100+S7evinK@users.noreply.github.com> Date: Mon, 20 Mar 2023 10:34:56 +0100 Subject: [PATCH] Remove NATS from BaseDendrite --- appservice/appservice.go | 4 +- appservice/appservice_test.go | 8 ++-- build/dendritejs-pinecone/main.go | 14 +++--- build/gobind-yggdrasil/monolith.go | 12 +++--- clientapi/admin_test.go | 20 +++++---- clientapi/clientapi.go | 3 +- clientapi/routing/joinroom_test.go | 8 ++-- clientapi/routing/login_test.go | 6 ++- clientapi/routing/register_test.go | 16 ++++--- .../monolith/monolith.go | 13 +++--- cmd/dendrite-demo-yggdrasil/main.go | 12 +++--- cmd/dendrite/main.go | 13 +++--- federationapi/federationapi.go | 6 ++- federationapi/federationapi_keys_test.go | 5 ++- federationapi/federationapi_test.go | 8 ++-- federationapi/routing/profile_test.go | 4 +- federationapi/routing/query_test.go | 4 +- federationapi/routing/send_test.go | 4 +- roomserver/roomserver.go | 4 +- roomserver/roomserver_test.go | 19 ++++---- setup/base/base.go | 3 -- setup/jetstream/nats.go | 1 + setup/monolith.go | 9 ++-- syncapi/syncapi.go | 3 +- syncapi/syncapi_test.go | 43 +++++++++++-------- test/testrig/base.go | 4 +- userapi/userapi.go | 3 +- 27 files changed, 149 insertions(+), 100 deletions(-) diff --git a/appservice/appservice.go b/appservice/appservice.go index 5b1b93de2..3adf54377 100644 --- a/appservice/appservice.go +++ b/appservice/appservice.go @@ -21,6 +21,7 @@ import ( "sync" "time" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/sirupsen/logrus" "github.com/matrix-org/gomatrixserverlib" @@ -38,6 +39,7 @@ import ( // can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes. func NewInternalAPI( base *base.BaseDendrite, + natsInstance *jetstream.NATSInstance, userAPI userapi.AppserviceUserAPI, rsAPI roomserverAPI.RoomserverInternalAPI, ) appserviceAPI.AppServiceInternalAPI { @@ -78,7 +80,7 @@ func NewInternalAPI( // Only consume if we actually have ASes to track, else we'll just chew cycles needlessly. // We can't add ASes at runtime so this is safe to do. - js, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream) + js, _ := natsInstance.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream) consumer := consumers.NewOutputRoomEventConsumer( base.ProcessContext, &base.Cfg.AppServiceAPI, client, js, rsAPI, diff --git a/appservice/appservice_test.go b/appservice/appservice_test.go index ad6f1dfc9..22116307a 100644 --- a/appservice/appservice_test.go +++ b/appservice/appservice_test.go @@ -15,6 +15,7 @@ import ( "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/roomserver" "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/test" "github.com/matrix-org/dendrite/userapi" @@ -126,9 +127,10 @@ func TestAppserviceInternalAPI(t *testing.T) { caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics) // Create required internal APIs - rsAPI := roomserver.NewInternalAPI(base, caches) - usrAPI := userapi.NewInternalAPI(base, rsAPI, nil) - asAPI := appservice.NewInternalAPI(base, usrAPI, rsAPI) + natsInstance := jetstream.NATSInstance{} + rsAPI := roomserver.NewInternalAPI(base, &natsInstance, caches) + usrAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, nil) + asAPI := appservice.NewInternalAPI(base, &natsInstance, usrAPI, rsAPI) runCases(t, asAPI) }) diff --git a/build/dendritejs-pinecone/main.go b/build/dendritejs-pinecone/main.go index 96f034bdf..a9d4e22d6 100644 --- a/build/dendritejs-pinecone/main.go +++ b/build/dendritejs-pinecone/main.go @@ -35,6 +35,7 @@ import ( "github.com/matrix-org/dendrite/setup" "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/userapi" "github.com/matrix-org/gomatrixserverlib" @@ -179,22 +180,23 @@ func startup() { } base := base.NewBaseDendrite(cfg) defer base.Close() // nolint: errcheck - - rsAPI := roomserver.NewInternalAPI(base) + natsInstance := jetstream.NATSInstance{} + caches := caching.NewRistrettoCache(cfg.Global.Cache.EstimatedMaxSize, cfg.Global.Cache.MaxAge, caching.EnableMetrics) + rsAPI := roomserver.NewInternalAPI(base, &natsInstance, caches) federation := conn.CreateFederationClient(base, pSessions) serverKeyAPI := &signing.YggdrasilKeys{} keyRing := serverKeyAPI.KeyRing() - userAPI := userapi.NewInternalAPI(base, rsAPI, federation) + userAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, federation) asQuery := appservice.NewInternalAPI( - base, userAPI, rsAPI, + base, &natsInstance, userAPI, rsAPI, ) rsAPI.SetAppserviceAPI(asQuery) caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.EnableMetrics) - fedSenderAPI := federationapi.NewInternalAPI(base, federation, rsAPI, caches, keyRing, true) + fedSenderAPI := federationapi.NewInternalAPI(base, &natsInstance, federation, rsAPI, caches, keyRing, true) rsAPI.SetFederationAPI(fedSenderAPI, keyRing) monolith := setup.Monolith{ @@ -210,7 +212,7 @@ func startup() { //ServerKeyAPI: serverKeyAPI, ExtPublicRoomsProvider: rooms.NewPineconeRoomProvider(pRouter, pSessions, fedSenderAPI, federation), } - monolith.AddAllPublicRoutes(base, caches) + monolith.AddAllPublicRoutes(base, &natsInstance, caches) httpRouter := mux.NewRouter().SkipClean(true).UseEncodedPath() httpRouter.PathPrefix(httputil.PublicClientPathPrefix).Handler(base.Routers.Client) diff --git a/build/gobind-yggdrasil/monolith.go b/build/gobind-yggdrasil/monolith.go index 8faad1d02..f2f8a3b74 100644 --- a/build/gobind-yggdrasil/monolith.go +++ b/build/gobind-yggdrasil/monolith.go @@ -25,6 +25,7 @@ import ( "github.com/matrix-org/dendrite/setup" "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/dendrite/test" "github.com/matrix-org/dendrite/userapi" @@ -160,15 +161,16 @@ func (m *DendriteMonolith) Start() { keyRing := serverKeyAPI.KeyRing() caches := caching.NewRistrettoCache(cfg.Global.Cache.EstimatedMaxSize, cfg.Global.Cache.MaxAge, caching.DisableMetrics) - rsAPI := roomserver.NewInternalAPI(base, caches) + natsInstance := jetstream.NATSInstance{} + rsAPI := roomserver.NewInternalAPI(base, &natsInstance, caches) fsAPI := federationapi.NewInternalAPI( - base, federation, rsAPI, caches, keyRing, true, + base, &natsInstance, federation, rsAPI, caches, keyRing, true, ) - userAPI := userapi.NewInternalAPI(base, rsAPI, federation) + userAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, federation) - asAPI := appservice.NewInternalAPI(base, userAPI, rsAPI) + asAPI := appservice.NewInternalAPI(base, &natsInstance, userAPI, rsAPI) rsAPI.SetAppserviceAPI(asAPI) // The underlying roomserver implementation needs to be able to call the fedsender. @@ -189,7 +191,7 @@ func (m *DendriteMonolith) Start() { ygg, fsAPI, federation, ), } - monolith.AddAllPublicRoutes(base, caches) + monolith.AddAllPublicRoutes(base, &natsInstance, caches) httpRouter := mux.NewRouter() httpRouter.PathPrefix(httputil.PublicClientPathPrefix).Handler(base.Routers.Client) diff --git a/clientapi/admin_test.go b/clientapi/admin_test.go index 46e2d3037..e5ea056ed 100644 --- a/clientapi/admin_test.go +++ b/clientapi/admin_test.go @@ -12,6 +12,7 @@ import ( "github.com/matrix-org/dendrite/roomserver" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/syncapi" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" @@ -32,18 +33,18 @@ func TestAdminResetPassword(t *testing.T) { test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) { base, baseClose := testrig.CreateBaseDendrite(t, dbType) defer baseClose() - + natsInstance := jetstream.NATSInstance{} // add a vhost base.Cfg.Global.VirtualHosts = append(base.Cfg.Global.VirtualHosts, &config.VirtualHost{ SigningIdentity: gomatrixserverlib.SigningIdentity{ServerName: "vh1"}, }) caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics) - rsAPI := roomserver.NewInternalAPI(base, caches) + rsAPI := roomserver.NewInternalAPI(base, &natsInstance, caches) // Needed for changing the password/login - userAPI := userapi.NewInternalAPI(base, rsAPI, nil) + userAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, nil) // We mostly need the userAPI for this test, so nil for other APIs/caches etc. - AddPublicRoutes(base, nil, rsAPI, nil, nil, nil, userAPI, nil, nil) + AddPublicRoutes(base, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil) // Create the users in the userapi and login accessTokens := map[*test.User]string{ @@ -151,15 +152,16 @@ func TestPurgeRoom(t *testing.T) { test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) { base, baseClose := testrig.CreateBaseDendrite(t, dbType) caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics) + natsInstance := jetstream.NATSInstance{} defer baseClose() fedClient := base.CreateFederationClient() - rsAPI := roomserver.NewInternalAPI(base, caches) - userAPI := userapi.NewInternalAPI(base, rsAPI, nil) + rsAPI := roomserver.NewInternalAPI(base, &natsInstance, caches) + userAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, nil) // this starts the JetStream consumers - syncapi.AddPublicRoutes(base, userAPI, rsAPI, caches) - federationapi.NewInternalAPI(base, fedClient, rsAPI, caches, nil, true) + syncapi.AddPublicRoutes(base, &natsInstance, userAPI, rsAPI, caches) + federationapi.NewInternalAPI(base, &natsInstance, fedClient, rsAPI, caches, nil, true) rsAPI.SetFederationAPI(nil, nil) // Create the room @@ -168,7 +170,7 @@ func TestPurgeRoom(t *testing.T) { } // We mostly need the rsAPI for this test, so nil for other APIs/caches etc. - AddPublicRoutes(base, nil, rsAPI, nil, nil, nil, userAPI, nil, nil) + AddPublicRoutes(base, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil) // Create the users in the userapi and login accessTokens := map[*test.User]string{ diff --git a/clientapi/clientapi.go b/clientapi/clientapi.go index e9985d43f..899c5225d 100644 --- a/clientapi/clientapi.go +++ b/clientapi/clientapi.go @@ -32,6 +32,7 @@ import ( // AddPublicRoutes sets up and registers HTTP handlers for the ClientAPI component. func AddPublicRoutes( base *base.BaseDendrite, + natsInstance *jetstream.NATSInstance, federation *gomatrixserverlib.FederationClient, rsAPI roomserverAPI.ClientRoomserverAPI, asAPI appserviceAPI.AppServiceInternalAPI, @@ -43,7 +44,7 @@ func AddPublicRoutes( ) { cfg := &base.Cfg.ClientAPI mscCfg := &base.Cfg.MSCs - js, natsClient := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) + js, natsClient := natsInstance.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) syncProducer := &producers.SyncAPIProducer{ JetStream: js, diff --git a/clientapi/routing/joinroom_test.go b/clientapi/routing/joinroom_test.go index de8f9538d..12867b54e 100644 --- a/clientapi/routing/joinroom_test.go +++ b/clientapi/routing/joinroom_test.go @@ -8,6 +8,7 @@ import ( "time" "github.com/matrix-org/dendrite/internal/caching" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/dendrite/appservice" @@ -29,9 +30,10 @@ func TestJoinRoomByIDOrAlias(t *testing.T) { defer baseClose() caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics) - rsAPI := roomserver.NewInternalAPI(base, caches) - userAPI := userapi.NewInternalAPI(base, rsAPI, nil) - asAPI := appservice.NewInternalAPI(base, userAPI, rsAPI) + natsInstance := jetstream.NATSInstance{} + rsAPI := roomserver.NewInternalAPI(base, &natsInstance, caches) + userAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, nil) + asAPI := appservice.NewInternalAPI(base, &natsInstance, userAPI, rsAPI) rsAPI.SetFederationAPI(nil, nil) // creates the rs.Inputer etc // Create the users in the userapi diff --git a/clientapi/routing/login_test.go b/clientapi/routing/login_test.go index fd3d8cba9..7ba3f41fc 100644 --- a/clientapi/routing/login_test.go +++ b/clientapi/routing/login_test.go @@ -12,6 +12,7 @@ import ( "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/roomserver" "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" @@ -32,15 +33,16 @@ func TestLogin(t *testing.T) { base, baseClose := testrig.CreateBaseDendrite(t, dbType) defer baseClose() base.Cfg.ClientAPI.RateLimiting.Enabled = false + natsInstance := jetstream.NATSInstance{} // add a vhost base.Cfg.Global.VirtualHosts = append(base.Cfg.Global.VirtualHosts, &config.VirtualHost{ SigningIdentity: gomatrixserverlib.SigningIdentity{ServerName: "vh1"}, }) caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics) - rsAPI := roomserver.NewInternalAPI(base, caches) + rsAPI := roomserver.NewInternalAPI(base, &natsInstance, caches) // Needed for /login - userAPI := userapi.NewInternalAPI(base, rsAPI, nil) + userAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, nil) // We mostly need the userAPI for this test, so nil for other APIs/caches etc. Setup(base, &base.Cfg.ClientAPI, nil, nil, userAPI, nil, nil, nil, nil, nil, nil, &base.Cfg.MSCs, nil) diff --git a/clientapi/routing/register_test.go b/clientapi/routing/register_test.go index c06b0ae12..5c58f3148 100644 --- a/clientapi/routing/register_test.go +++ b/clientapi/routing/register_test.go @@ -33,6 +33,7 @@ import ( "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/roomserver" "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/test" "github.com/matrix-org/dendrite/test/testrig" "github.com/matrix-org/dendrite/userapi" @@ -409,8 +410,9 @@ func Test_register(t *testing.T) { defer baseClose() caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics) - rsAPI := roomserver.NewInternalAPI(base, caches) - userAPI := userapi.NewInternalAPI(base, rsAPI, nil) + natsInstance := jetstream.NATSInstance{} + rsAPI := roomserver.NewInternalAPI(base, &natsInstance, caches) + userAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, nil) for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { @@ -581,8 +583,9 @@ func TestRegisterUserWithDisplayName(t *testing.T) { base.Cfg.Global.ServerName = "server" caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics) - rsAPI := roomserver.NewInternalAPI(base, caches) - userAPI := userapi.NewInternalAPI(base, rsAPI, nil) + natsInstance := jetstream.NATSInstance{} + rsAPI := roomserver.NewInternalAPI(base, &natsInstance, caches) + userAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, nil) deviceName, deviceID := "deviceName", "deviceID" expectedDisplayName := "DisplayName" response := completeRegistration( @@ -616,12 +619,13 @@ func TestRegisterAdminUsingSharedSecret(t *testing.T) { test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) { base, baseClose := testrig.CreateBaseDendrite(t, dbType) defer baseClose() + natsInstance := jetstream.NATSInstance{} base.Cfg.Global.ServerName = "server" sharedSecret := "dendritetest" base.Cfg.ClientAPI.RegistrationSharedSecret = sharedSecret caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics) - rsAPI := roomserver.NewInternalAPI(base, caches) - userAPI := userapi.NewInternalAPI(base, rsAPI, nil) + rsAPI := roomserver.NewInternalAPI(base, &natsInstance, caches) + userAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, nil) expectedDisplayName := "rabbit" jsonStr := []byte(`{"admin":true,"mac":"24dca3bba410e43fe64b9b5c28306693bf3baa9f","nonce":"759f047f312b99ff428b21d581256f8592b8976e58bc1b543972dc6147e529a79657605b52d7becd160ff5137f3de11975684319187e06901955f79e5a6c5a79","password":"wonderland","username":"alice","displayname":"rabbit"}`) diff --git a/cmd/dendrite-demo-pinecone/monolith/monolith.go b/cmd/dendrite-demo-pinecone/monolith/monolith.go index 5781d6571..2d6977da3 100644 --- a/cmd/dendrite-demo-pinecone/monolith/monolith.go +++ b/cmd/dendrite-demo-pinecone/monolith/monolith.go @@ -136,21 +136,22 @@ func (p *P2PMonolith) SetupDendrite(cfg *config.Dendrite, port int, enableRelayi keyRing := serverKeyAPI.KeyRing() caches := caching.NewRistrettoCache(cfg.Global.Cache.EstimatedMaxSize, cfg.Global.Cache.MaxAge, enableMetrics) - rsAPI := roomserver.NewInternalAPI(p.BaseDendrite, caches) + natsInstance := jetstream.NATSInstance{} + rsAPI := roomserver.NewInternalAPI(p.BaseDendrite, &natsInstance, caches) fsAPI := federationapi.NewInternalAPI( - p.BaseDendrite, federation, rsAPI, caches, keyRing, true, + p.BaseDendrite, &natsInstance, federation, rsAPI, caches, keyRing, true, ) - userAPI := userapi.NewInternalAPI(p.BaseDendrite, rsAPI, federation) + userAPI := userapi.NewInternalAPI(p.BaseDendrite, &natsInstance, rsAPI, federation) - asAPI := appservice.NewInternalAPI(p.BaseDendrite, userAPI, rsAPI) + asAPI := appservice.NewInternalAPI(p.BaseDendrite, &natsInstance, userAPI, rsAPI) rsAPI.SetFederationAPI(fsAPI, keyRing) userProvider := users.NewPineconeUserProvider(p.Router, p.Sessions, userAPI, federation) roomProvider := rooms.NewPineconeRoomProvider(p.Router, p.Sessions, fsAPI, federation) - js, _ := p.BaseDendrite.NATS.Prepare(p.BaseDendrite.ProcessContext, &p.BaseDendrite.Cfg.Global.JetStream) + js, _ := natsInstance.Prepare(p.BaseDendrite.ProcessContext, &p.BaseDendrite.Cfg.Global.JetStream) producer := &producers.SyncAPIProducer{ JetStream: js, TopicReceiptEvent: p.BaseDendrite.Cfg.Global.JetStream.Prefixed(jetstream.OutputReceiptEvent), @@ -179,7 +180,7 @@ func (p *P2PMonolith) SetupDendrite(cfg *config.Dendrite, port int, enableRelayi ExtPublicRoomsProvider: roomProvider, ExtUserDirectoryProvider: userProvider, } - p.dendrite.AddAllPublicRoutes(p.BaseDendrite, caches) + p.dendrite.AddAllPublicRoutes(p.BaseDendrite, &natsInstance, caches) p.setupHttpServers(userProvider, enableWebsockets) } diff --git a/cmd/dendrite-demo-yggdrasil/main.go b/cmd/dendrite-demo-yggdrasil/main.go index 62184719a..f170eb01e 100644 --- a/cmd/dendrite-demo-yggdrasil/main.go +++ b/cmd/dendrite-demo-yggdrasil/main.go @@ -28,6 +28,7 @@ import ( "time" "github.com/matrix-org/dendrite/internal/caching" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/gomatrixserverlib" "github.com/gorilla/mux" @@ -158,14 +159,15 @@ func main() { keyRing := serverKeyAPI.KeyRing() caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.EnableMetrics) - rsAPI := roomserver.NewInternalAPI(base, caches) + natsInstance := jetstream.NATSInstance{} + rsAPI := roomserver.NewInternalAPI(base, &natsInstance, caches) - userAPI := userapi.NewInternalAPI(base, rsAPI, federation) + userAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, federation) - asAPI := appservice.NewInternalAPI(base, userAPI, rsAPI) + asAPI := appservice.NewInternalAPI(base, &natsInstance, userAPI, rsAPI) rsAPI.SetAppserviceAPI(asAPI) fsAPI := federationapi.NewInternalAPI( - base, federation, rsAPI, caches, keyRing, true, + base, &natsInstance, federation, rsAPI, caches, keyRing, true, ) rsAPI.SetFederationAPI(fsAPI, keyRing) @@ -184,7 +186,7 @@ func main() { ygg, fsAPI, federation, ), } - monolith.AddAllPublicRoutes(base, caches) + monolith.AddAllPublicRoutes(base, &natsInstance, caches) if err := mscs.Enable(base, &monolith, caches); err != nil { logrus.WithError(err).Fatalf("Failed to enable MSCs") } diff --git a/cmd/dendrite/main.go b/cmd/dendrite/main.go index 29290eb98..c6cfa8dbc 100644 --- a/cmd/dendrite/main.go +++ b/cmd/dendrite/main.go @@ -18,6 +18,7 @@ import ( "flag" "github.com/matrix-org/dendrite/internal/caching" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/sirupsen/logrus" "github.com/matrix-org/dendrite/appservice" @@ -74,17 +75,17 @@ func main() { federation := base.CreateFederationClient() caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.EnableMetrics) - - rsAPI := roomserver.NewInternalAPI(base, caches) + natsInstance := jetstream.NATSInstance{} + rsAPI := roomserver.NewInternalAPI(base, &natsInstance, caches) fsAPI := federationapi.NewInternalAPI( - base, federation, rsAPI, caches, nil, false, + base, &natsInstance, federation, rsAPI, caches, nil, false, ) keyRing := fsAPI.KeyRing() - userAPI := userapi.NewInternalAPI(base, rsAPI, federation) + userAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, federation) - asAPI := appservice.NewInternalAPI(base, userAPI, rsAPI) + asAPI := appservice.NewInternalAPI(base, &natsInstance, userAPI, rsAPI) // The underlying roomserver implementation needs to be able to call the fedsender. // This is different to rsAPI which can be the http client which doesn't need this @@ -106,7 +107,7 @@ func main() { RoomserverAPI: rsAPI, UserAPI: userAPI, } - monolith.AddAllPublicRoutes(base, caches) + monolith.AddAllPublicRoutes(base, &natsInstance, caches) if len(base.Cfg.MSCs.MSCs) > 0 { if err := mscs.Enable(base, &monolith, caches); err != nil { diff --git a/federationapi/federationapi.go b/federationapi/federationapi.go index 8a4237bae..2c0748749 100644 --- a/federationapi/federationapi.go +++ b/federationapi/federationapi.go @@ -41,6 +41,7 @@ import ( // AddPublicRoutes sets up and registers HTTP handlers on the base API muxes for the FederationAPI component. func AddPublicRoutes( base *base.BaseDendrite, + natsInstance *jetstream.NATSInstance, userAPI userapi.FederationUserAPI, federation *gomatrixserverlib.FederationClient, keyRing gomatrixserverlib.JSONVerifier, @@ -50,7 +51,7 @@ func AddPublicRoutes( ) { cfg := &base.Cfg.FederationAPI mscCfg := &base.Cfg.MSCs - js, _ := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) + js, _ := natsInstance.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) producer := &producers.SyncAPIProducer{ JetStream: js, TopicReceiptEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent), @@ -86,6 +87,7 @@ func AddPublicRoutes( // can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes. func NewInternalAPI( base *base.BaseDendrite, + natsInstance *jetstream.NATSInstance, federation api.FederationClient, rsAPI roomserverAPI.FederationRoomserverAPI, caches *caching.Caches, @@ -108,7 +110,7 @@ func NewInternalAPI( cfg.FederationMaxRetries+1, cfg.P2PFederationRetriesUntilAssumedOffline+1) - js, nats := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) + js, nats := natsInstance.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) signingInfo := base.Cfg.Global.SigningIdentities() diff --git a/federationapi/federationapi_keys_test.go b/federationapi/federationapi_keys_test.go index bb6ee8935..4ab33f639 100644 --- a/federationapi/federationapi_keys_test.go +++ b/federationapi/federationapi_keys_test.go @@ -12,6 +12,7 @@ import ( "testing" "time" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/dendrite/federationapi/api" @@ -65,7 +66,7 @@ func TestMain(m *testing.M) { // Create a new cache but don't enable prometheus! s.cache = caching.NewRistrettoCache(8*1024*1024, time.Hour, false) - + natsInstance := jetstream.NATSInstance{} // Create a temporary directory for JetStream. d, err := os.MkdirTemp("./", "jetstream*") if err != nil { @@ -110,7 +111,7 @@ func TestMain(m *testing.M) { // Finally, build the server key APIs. sbase := base.NewBaseDendrite(cfg, base.DisableMetrics) - s.api = NewInternalAPI(sbase, s.fedclient, nil, s.cache, nil, true) + s.api = NewInternalAPI(sbase, &natsInstance, s.fedclient, nil, s.cache, nil, true) } // Now that we have built our server key APIs, start the diff --git a/federationapi/federationapi_test.go b/federationapi/federationapi_test.go index 8aea96a73..f499af77c 100644 --- a/federationapi/federationapi_test.go +++ b/federationapi/federationapi_test.go @@ -165,10 +165,11 @@ func TestFederationAPIJoinThenKeyUpdate(t *testing.T) { func testFederationAPIJoinThenKeyUpdate(t *testing.T, dbType test.DBType) { base, close := testrig.CreateBaseDendrite(t, dbType) caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics) + natsInstance := jetstream.NATSInstance{} base.Cfg.FederationAPI.PreferDirectFetch = true base.Cfg.FederationAPI.KeyPerspectives = nil defer close() - jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream) + jsctx, _ := natsInstance.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream) defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream) serverA := gomatrixserverlib.ServerName("server.a") @@ -214,7 +215,7 @@ func testFederationAPIJoinThenKeyUpdate(t *testing.T, dbType test.DBType) { }, }, } - fsapi := federationapi.NewInternalAPI(base, fc, rsapi, caches, nil, false) + fsapi := federationapi.NewInternalAPI(base, &natsInstance, fc, rsapi, caches, nil, false) var resp api.PerformJoinResponse fsapi.PerformJoin(context.Background(), &api.PerformJoinRequest{ @@ -277,9 +278,10 @@ func TestRoomsV3URLEscapeDoNot404(t *testing.T) { cfg.Global.JetStream.InMemory = true b := base.NewBaseDendrite(cfg, base.DisableMetrics) keyRing := &test.NopJSONVerifier{} + natsInstance := jetstream.NATSInstance{} // TODO: This is pretty fragile, as if anything calls anything on these nils this test will break. // Unfortunately, it makes little sense to instantiate these dependencies when we just want to test routing. - federationapi.AddPublicRoutes(b, nil, nil, keyRing, nil, &internal.FederationInternalAPI{}, nil) + federationapi.AddPublicRoutes(b, &natsInstance, nil, nil, keyRing, nil, &internal.FederationInternalAPI{}, nil) baseURL, cancel := test.ListenAndServe(t, b.Routers.Federation, true) defer cancel() serverName := gomatrixserverlib.ServerName(strings.TrimPrefix(baseURL, "https://")) diff --git a/federationapi/routing/profile_test.go b/federationapi/routing/profile_test.go index df494a743..82f64bd35 100644 --- a/federationapi/routing/profile_test.go +++ b/federationapi/routing/profile_test.go @@ -28,6 +28,7 @@ import ( fedInternal "github.com/matrix-org/dendrite/federationapi/internal" "github.com/matrix-org/dendrite/federationapi/routing" "github.com/matrix-org/dendrite/internal/httputil" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/test" "github.com/matrix-org/dendrite/test/testrig" userAPI "github.com/matrix-org/dendrite/userapi/api" @@ -50,13 +51,14 @@ func TestHandleQueryProfile(t *testing.T) { defer close() fedMux := mux.NewRouter().SkipClean(true).PathPrefix(httputil.PublicFederationPathPrefix).Subrouter().UseEncodedPath() + natsInstance := jetstream.NATSInstance{} base.Routers.Federation = fedMux base.Cfg.FederationAPI.Matrix.SigningIdentity.ServerName = testOrigin base.Cfg.FederationAPI.Matrix.Metrics.Enabled = false fedClient := fakeFedClient{} serverKeyAPI := &signing.YggdrasilKeys{} keyRing := serverKeyAPI.KeyRing() - fedapi := fedAPI.NewInternalAPI(base, &fedClient, nil, nil, keyRing, true) + fedapi := fedAPI.NewInternalAPI(base, &natsInstance, &fedClient, nil, nil, keyRing, true) userapi := fakeUserAPI{} r, ok := fedapi.(*fedInternal.FederationInternalAPI) if !ok { diff --git a/federationapi/routing/query_test.go b/federationapi/routing/query_test.go index 69cf7047d..d3214bf6b 100644 --- a/federationapi/routing/query_test.go +++ b/federationapi/routing/query_test.go @@ -29,6 +29,7 @@ import ( fedInternal "github.com/matrix-org/dendrite/federationapi/internal" "github.com/matrix-org/dendrite/federationapi/routing" "github.com/matrix-org/dendrite/internal/httputil" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/test" "github.com/matrix-org/dendrite/test/testrig" "github.com/matrix-org/gomatrixserverlib" @@ -50,13 +51,14 @@ func TestHandleQueryDirectory(t *testing.T) { defer close() fedMux := mux.NewRouter().SkipClean(true).PathPrefix(httputil.PublicFederationPathPrefix).Subrouter().UseEncodedPath() + natsInstance := jetstream.NATSInstance{} base.Routers.Federation = fedMux base.Cfg.FederationAPI.Matrix.SigningIdentity.ServerName = testOrigin base.Cfg.FederationAPI.Matrix.Metrics.Enabled = false fedClient := fakeFedClient{} serverKeyAPI := &signing.YggdrasilKeys{} keyRing := serverKeyAPI.KeyRing() - fedapi := fedAPI.NewInternalAPI(base, &fedClient, nil, nil, keyRing, true) + fedapi := fedAPI.NewInternalAPI(base, &natsInstance, &fedClient, nil, nil, keyRing, true) userapi := fakeUserAPI{} r, ok := fedapi.(*fedInternal.FederationInternalAPI) if !ok { diff --git a/federationapi/routing/send_test.go b/federationapi/routing/send_test.go index 53e1399ab..c65196f17 100644 --- a/federationapi/routing/send_test.go +++ b/federationapi/routing/send_test.go @@ -26,6 +26,7 @@ import ( fedInternal "github.com/matrix-org/dendrite/federationapi/internal" "github.com/matrix-org/dendrite/federationapi/routing" "github.com/matrix-org/dendrite/internal/httputil" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/test" "github.com/matrix-org/dendrite/test/testrig" "github.com/matrix-org/gomatrixserverlib" @@ -48,10 +49,11 @@ func TestHandleSend(t *testing.T) { defer close() fedMux := mux.NewRouter().SkipClean(true).PathPrefix(httputil.PublicFederationPathPrefix).Subrouter().UseEncodedPath() + natsInstance := jetstream.NATSInstance{} base.Routers.Federation = fedMux base.Cfg.FederationAPI.Matrix.SigningIdentity.ServerName = testOrigin base.Cfg.FederationAPI.Matrix.Metrics.Enabled = false - fedapi := fedAPI.NewInternalAPI(base, nil, nil, nil, nil, true) + fedapi := fedAPI.NewInternalAPI(base, &natsInstance, nil, nil, nil, nil, true) serverKeyAPI := &signing.YggdrasilKeys{} keyRing := serverKeyAPI.KeyRing() r, ok := fedapi.(*fedInternal.FederationInternalAPI) diff --git a/roomserver/roomserver.go b/roomserver/roomserver.go index 1c55423ef..c487af8dd 100644 --- a/roomserver/roomserver.go +++ b/roomserver/roomserver.go @@ -16,6 +16,7 @@ package roomserver import ( "github.com/matrix-org/dendrite/internal/caching" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/sirupsen/logrus" "github.com/matrix-org/dendrite/roomserver/api" @@ -27,6 +28,7 @@ import ( // NewInternalAPI returns a concrete implementation of the internal API. func NewInternalAPI( base *base.BaseDendrite, + natsInstance *jetstream.NATSInstance, caches caching.RoomServerCaches, ) api.RoomserverInternalAPI { cfg := &base.Cfg.RoomServer @@ -36,7 +38,7 @@ func NewInternalAPI( logrus.WithError(err).Panicf("failed to connect to room server db") } - js, nc := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) + js, nc := natsInstance.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) return internal.NewRoomserverAPI( base, roomserverDB, js, nc, caches, diff --git a/roomserver/roomserver_test.go b/roomserver/roomserver_test.go index 1b0b3155d..039bbc57a 100644 --- a/roomserver/roomserver_test.go +++ b/roomserver/roomserver_test.go @@ -46,7 +46,8 @@ func TestUsers(t *testing.T) { base, close := testrig.CreateBaseDendrite(t, dbType) defer close() caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics) - rsAPI := roomserver.NewInternalAPI(base, caches) + natsInstance := jetstream.NATSInstance{} + rsAPI := roomserver.NewInternalAPI(base, &natsInstance, caches) // SetFederationAPI starts the room event input consumer rsAPI.SetFederationAPI(nil, nil) @@ -55,7 +56,7 @@ func TestUsers(t *testing.T) { }) t.Run("kick users", func(t *testing.T) { - usrAPI := userapi.NewInternalAPI(base, rsAPI, nil) + usrAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, nil) rsAPI.SetUserAPI(usrAPI) testKickUsers(t, rsAPI, usrAPI) }) @@ -185,7 +186,8 @@ func Test_QueryLeftUsers(t *testing.T) { defer close() caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics) - rsAPI := roomserver.NewInternalAPI(base, caches) + natsInstance := jetstream.NATSInstance{} + rsAPI := roomserver.NewInternalAPI(base, &natsInstance, caches) // SetFederationAPI starts the room event input consumer rsAPI.SetFederationAPI(nil, nil) // Create the room @@ -230,19 +232,20 @@ func TestPurgeRoom(t *testing.T) { test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) { base, db, close := mustCreateDatabase(t, dbType) + natsInstance := jetstream.NATSInstance{} defer close() - jsCtx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream) + jsCtx, _ := natsInstance.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream) defer jetstream.DeleteAllStreams(jsCtx, &base.Cfg.Global.JetStream) fedClient := base.CreateFederationClient() caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics) - rsAPI := roomserver.NewInternalAPI(base, caches) - userAPI := userapi.NewInternalAPI(base, rsAPI, nil) + rsAPI := roomserver.NewInternalAPI(base, &natsInstance, caches) + userAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, nil) // this starts the JetStream consumers - syncapi.AddPublicRoutes(base, userAPI, rsAPI, caches) - federationapi.NewInternalAPI(base, fedClient, rsAPI, caches, nil, true) + syncapi.AddPublicRoutes(base, &natsInstance, userAPI, rsAPI, caches) + federationapi.NewInternalAPI(base, &natsInstance, fedClient, rsAPI, caches, nil, true) rsAPI.SetFederationAPI(nil, nil) // Create the room diff --git a/setup/base/base.go b/setup/base/base.go index 8c9b06d0e..a0361b550 100644 --- a/setup/base/base.go +++ b/setup/base/base.go @@ -35,7 +35,6 @@ import ( "github.com/getsentry/sentry-go" sentryhttp "github.com/getsentry/sentry-go/http" - "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/gomatrixserverlib" "github.com/prometheus/client_golang/prometheus/promhttp" "go.uber.org/atomic" @@ -65,7 +64,6 @@ type BaseDendrite struct { *process.ProcessContext tracerCloser io.Closer Routers httputil.Routers - NATS *jetstream.NATSInstance Cfg *config.Dendrite DNSCache *gomatrixserverlib.DNSCache ConnectionManager sqlutil.Connections @@ -177,7 +175,6 @@ func NewBaseDendrite(cfg *config.Dendrite, options ...BaseDendriteOptions) *Base Cfg: cfg, DNSCache: dnsCache, Routers: httputil.NewRouters(), - NATS: &jetstream.NATSInstance{}, ConnectionManager: cm, EnableMetrics: enableMetrics, } diff --git a/setup/jetstream/nats.go b/setup/jetstream/nats.go index 48683789b..8b144436a 100644 --- a/setup/jetstream/nats.go +++ b/setup/jetstream/nats.go @@ -73,6 +73,7 @@ func (s *NATSInstance) Prepare(process *process.ProcessContext, cfg *config.JetS } // reuse existing connections if s.nc != nil { + logrus.Infof("XXX: reusing connection") return s.js, s.nc } nc, err := natsclient.Connect("", natsclient.InProcessServer(s)) diff --git a/setup/monolith.go b/setup/monolith.go index 174eba680..8a6609a7d 100644 --- a/setup/monolith.go +++ b/setup/monolith.go @@ -28,6 +28,7 @@ import ( roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/syncapi" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" @@ -53,21 +54,21 @@ type Monolith struct { } // AddAllPublicRoutes attaches all public paths to the given router -func (m *Monolith) AddAllPublicRoutes(base *base.BaseDendrite, caches *caching.Caches) { +func (m *Monolith) AddAllPublicRoutes(base *base.BaseDendrite, natsInstance *jetstream.NATSInstance, caches *caching.Caches) { userDirectoryProvider := m.ExtUserDirectoryProvider if userDirectoryProvider == nil { userDirectoryProvider = m.UserAPI } clientapi.AddPublicRoutes( - base, m.FedClient, m.RoomserverAPI, m.AppserviceAPI, transactions.New(), + base, natsInstance, m.FedClient, m.RoomserverAPI, m.AppserviceAPI, transactions.New(), m.FederationAPI, m.UserAPI, userDirectoryProvider, m.ExtPublicRoomsProvider, ) federationapi.AddPublicRoutes( - base, m.UserAPI, m.FedClient, m.KeyRing, m.RoomserverAPI, m.FederationAPI, nil, + base, natsInstance, m.UserAPI, m.FedClient, m.KeyRing, m.RoomserverAPI, m.FederationAPI, nil, ) mediaapi.AddPublicRoutes(base, m.UserAPI, m.Client) - syncapi.AddPublicRoutes(base, m.UserAPI, m.RoomserverAPI, caches) + syncapi.AddPublicRoutes(base, natsInstance, m.UserAPI, m.RoomserverAPI, caches) if m.RelayAPI != nil { relayapi.AddPublicRoutes(base, m.KeyRing, m.RelayAPI) diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index e0cc8462e..3dc64ea38 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -40,13 +40,14 @@ import ( // component. func AddPublicRoutes( base *base.BaseDendrite, + natsInstance *jetstream.NATSInstance, userAPI userapi.SyncUserAPI, rsAPI api.SyncRoomserverAPI, caches caching.LazyLoadCache, ) { cfg := &base.Cfg.SyncAPI - js, natsClient := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) + js, natsClient := natsInstance.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) syncDB, err := storage.NewSyncServerDatasource(base.Context(), base.ConnectionManager, &cfg.Database) if err != nil { diff --git a/syncapi/syncapi_test.go b/syncapi/syncapi_test.go index 13a078659..7e92bb21c 100644 --- a/syncapi/syncapi_test.go +++ b/syncapi/syncapi_test.go @@ -116,12 +116,13 @@ func testSyncAccessTokens(t *testing.T, dbType test.DBType) { base, close := testrig.CreateBaseDendrite(t, dbType) caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics) + natsInstance := jetstream.NATSInstance{} defer close() - jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream) + jsctx, _ := natsInstance.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream) defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream) msgs := toNATSMsgs(t, base, room.Events()...) - AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, caches) + AddPublicRoutes(base, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, caches) testrig.MustPublishMsgs(t, jsctx, msgs...) testCases := []struct { @@ -209,8 +210,9 @@ func testSyncAPICreateRoomSyncEarly(t *testing.T, dbType test.DBType) { base, close := testrig.CreateBaseDendrite(t, dbType) defer close() + natsInstance := jetstream.NATSInstance{} - jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream) + jsctx, _ := natsInstance.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream) defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream) // order is: // m.room.create @@ -221,7 +223,7 @@ func testSyncAPICreateRoomSyncEarly(t *testing.T, dbType test.DBType) { msgs := toNATSMsgs(t, base, room.Events()...) sinceTokens := make([]string, len(msgs)) caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics) - AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, caches) + AddPublicRoutes(base, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, caches) for i, msg := range msgs { testrig.MustPublishMsgs(t, jsctx, msg) time.Sleep(100 * time.Millisecond) @@ -302,11 +304,12 @@ func testSyncAPIUpdatePresenceImmediately(t *testing.T, dbType test.DBType) { base.Cfg.Global.Presence.EnableOutbound = true base.Cfg.Global.Presence.EnableInbound = true defer close() + natsInstance := jetstream.NATSInstance{} - jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream) + jsctx, _ := natsInstance.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream) defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream) caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics) - AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{}, caches) + AddPublicRoutes(base, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{}, caches) w := httptest.NewRecorder() base.Routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{ "access_token": alice.AccessToken, @@ -416,15 +419,16 @@ func testHistoryVisibility(t *testing.T, dbType test.DBType) { base, close := testrig.CreateBaseDendrite(t, dbType) defer close() + natsInstance := jetstream.NATSInstance{} - jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream) + jsctx, _ := natsInstance.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream) defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream) // Use the actual internal roomserver API caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics) - rsAPI := roomserver.NewInternalAPI(base, caches) + rsAPI := roomserver.NewInternalAPI(base, &natsInstance, caches) rsAPI.SetFederationAPI(nil, nil) - AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{aliceDev, bobDev}}, rsAPI, caches) + AddPublicRoutes(base, &natsInstance, &syncUserAPI{accounts: []userapi.Device{aliceDev, bobDev}}, rsAPI, caches) for _, tc := range testCases { testname := fmt.Sprintf("%s - %s", tc.historyVisibility, userType) @@ -716,16 +720,16 @@ func TestGetMembership(t *testing.T) { base, close := testrig.CreateBaseDendrite(t, dbType) defer close() - - jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream) + natsInstance := jetstream.NATSInstance{} + jsctx, _ := natsInstance.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream) defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream) // Use an actual roomserver for this caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics) - rsAPI := roomserver.NewInternalAPI(base, caches) + rsAPI := roomserver.NewInternalAPI(base, &natsInstance, caches) rsAPI.SetFederationAPI(nil, nil) - AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{aliceDev, bobDev}}, rsAPI, caches) + AddPublicRoutes(base, &natsInstance, &syncUserAPI{accounts: []userapi.Device{aliceDev, bobDev}}, rsAPI, caches) for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { @@ -788,11 +792,12 @@ func testSendToDevice(t *testing.T, dbType test.DBType) { base, baseClose := testrig.CreateBaseDendrite(t, dbType) defer baseClose() + natsInstance := jetstream.NATSInstance{} - jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream) + jsctx, _ := natsInstance.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream) defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream) caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics) - AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{}, caches) + AddPublicRoutes(base, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{}, caches) producer := producers.SyncAPIProducer{ TopicSendToDeviceEvent: base.Cfg.Global.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent), @@ -1009,10 +1014,11 @@ func testContext(t *testing.T, dbType test.DBType) { // Use an actual roomserver for this caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics) - rsAPI := roomserver.NewInternalAPI(base, caches) + natsInstance := jetstream.NATSInstance{} + rsAPI := roomserver.NewInternalAPI(base, &natsInstance, caches) rsAPI.SetFederationAPI(nil, nil) - AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, rsAPI, caches) + AddPublicRoutes(base, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, rsAPI, caches) room := test.NewRoom(t, user) @@ -1025,7 +1031,7 @@ func testContext(t *testing.T, dbType test.DBType) { t.Fatalf("failed to send events: %v", err) } - jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream) + jsctx, _ := natsInstance.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream) defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream) syncUntil(t, base, alice.AccessToken, false, func(syncBody string) bool { @@ -1171,6 +1177,7 @@ func syncUntil(t *testing.T, skip bool, checkFunc func(syncBody string) bool, ) { + t.Helper() if checkFunc == nil { t.Fatalf("No checkFunc defined") } diff --git a/test/testrig/base.go b/test/testrig/base.go index bb8fded21..d9ccfbde8 100644 --- a/test/testrig/base.go +++ b/test/testrig/base.go @@ -21,6 +21,7 @@ import ( "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/test" "github.com/nats-io/nats.go" ) @@ -110,6 +111,7 @@ func Base(cfg *config.Dendrite) (*base.BaseDendrite, nats.JetStreamContext, *nat cfg.SyncAPI.Fulltext.InMemory = true cfg.FederationAPI.KeyPerspectives = nil base := base.NewBaseDendrite(cfg, base.DisableMetrics) - js, jc := base.NATS.Prepare(base.ProcessContext, &cfg.Global.JetStream) + natsInstance := jetstream.NATSInstance{} + js, jc := natsInstance.Prepare(base.ProcessContext, &cfg.Global.JetStream) return base, js, jc } diff --git a/userapi/userapi.go b/userapi/userapi.go index 3ada8020c..dfa26f47d 100644 --- a/userapi/userapi.go +++ b/userapi/userapi.go @@ -36,11 +36,12 @@ import ( // can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes. func NewInternalAPI( base *base.BaseDendrite, + natsInstance *jetstream.NATSInstance, rsAPI rsapi.UserRoomserverAPI, fedClient fedsenderapi.KeyserverFederationAPI, ) *internal.UserInternalAPI { cfg := &base.Cfg.UserAPI - js, _ := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) + js, _ := natsInstance.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) appServices := base.Cfg.Derived.ApplicationServices pgClient := pushgateway.NewHTTPClient(cfg.PushGatewayDisableTLSValidation)