Remove NATS from BaseDendrite

This commit is contained in:
Till Faelligen 2023-03-20 10:34:56 +01:00
parent 0459d2b9e5
commit 93d0fba3c0
No known key found for this signature in database
GPG key ID: ACCDC9606D472758
27 changed files with 149 additions and 100 deletions

View file

@ -21,6 +21,7 @@ import (
"sync"
"time"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/sirupsen/logrus"
"github.com/matrix-org/gomatrixserverlib"
@ -38,6 +39,7 @@ import (
// can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes.
func NewInternalAPI(
base *base.BaseDendrite,
natsInstance *jetstream.NATSInstance,
userAPI userapi.AppserviceUserAPI,
rsAPI roomserverAPI.RoomserverInternalAPI,
) appserviceAPI.AppServiceInternalAPI {
@ -78,7 +80,7 @@ func NewInternalAPI(
// Only consume if we actually have ASes to track, else we'll just chew cycles needlessly.
// We can't add ASes at runtime so this is safe to do.
js, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
js, _ := natsInstance.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
consumer := consumers.NewOutputRoomEventConsumer(
base.ProcessContext, &base.Cfg.AppServiceAPI,
client, js, rsAPI,

View file

@ -15,6 +15,7 @@ import (
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/roomserver"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/test"
"github.com/matrix-org/dendrite/userapi"
@ -126,9 +127,10 @@ func TestAppserviceInternalAPI(t *testing.T) {
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
// Create required internal APIs
rsAPI := roomserver.NewInternalAPI(base, caches)
usrAPI := userapi.NewInternalAPI(base, rsAPI, nil)
asAPI := appservice.NewInternalAPI(base, usrAPI, rsAPI)
natsInstance := jetstream.NATSInstance{}
rsAPI := roomserver.NewInternalAPI(base, &natsInstance, caches)
usrAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, nil)
asAPI := appservice.NewInternalAPI(base, &natsInstance, usrAPI, rsAPI)
runCases(t, asAPI)
})

View file

@ -35,6 +35,7 @@ import (
"github.com/matrix-org/dendrite/setup"
"github.com/matrix-org/dendrite/setup/base"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/userapi"
"github.com/matrix-org/gomatrixserverlib"
@ -179,22 +180,23 @@ func startup() {
}
base := base.NewBaseDendrite(cfg)
defer base.Close() // nolint: errcheck
rsAPI := roomserver.NewInternalAPI(base)
natsInstance := jetstream.NATSInstance{}
caches := caching.NewRistrettoCache(cfg.Global.Cache.EstimatedMaxSize, cfg.Global.Cache.MaxAge, caching.EnableMetrics)
rsAPI := roomserver.NewInternalAPI(base, &natsInstance, caches)
federation := conn.CreateFederationClient(base, pSessions)
serverKeyAPI := &signing.YggdrasilKeys{}
keyRing := serverKeyAPI.KeyRing()
userAPI := userapi.NewInternalAPI(base, rsAPI, federation)
userAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, federation)
asQuery := appservice.NewInternalAPI(
base, userAPI, rsAPI,
base, &natsInstance, userAPI, rsAPI,
)
rsAPI.SetAppserviceAPI(asQuery)
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.EnableMetrics)
fedSenderAPI := federationapi.NewInternalAPI(base, federation, rsAPI, caches, keyRing, true)
fedSenderAPI := federationapi.NewInternalAPI(base, &natsInstance, federation, rsAPI, caches, keyRing, true)
rsAPI.SetFederationAPI(fedSenderAPI, keyRing)
monolith := setup.Monolith{
@ -210,7 +212,7 @@ func startup() {
//ServerKeyAPI: serverKeyAPI,
ExtPublicRoomsProvider: rooms.NewPineconeRoomProvider(pRouter, pSessions, fedSenderAPI, federation),
}
monolith.AddAllPublicRoutes(base, caches)
monolith.AddAllPublicRoutes(base, &natsInstance, caches)
httpRouter := mux.NewRouter().SkipClean(true).UseEncodedPath()
httpRouter.PathPrefix(httputil.PublicClientPathPrefix).Handler(base.Routers.Client)

View file

@ -25,6 +25,7 @@ import (
"github.com/matrix-org/dendrite/setup"
"github.com/matrix-org/dendrite/setup/base"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/dendrite/test"
"github.com/matrix-org/dendrite/userapi"
@ -160,15 +161,16 @@ func (m *DendriteMonolith) Start() {
keyRing := serverKeyAPI.KeyRing()
caches := caching.NewRistrettoCache(cfg.Global.Cache.EstimatedMaxSize, cfg.Global.Cache.MaxAge, caching.DisableMetrics)
rsAPI := roomserver.NewInternalAPI(base, caches)
natsInstance := jetstream.NATSInstance{}
rsAPI := roomserver.NewInternalAPI(base, &natsInstance, caches)
fsAPI := federationapi.NewInternalAPI(
base, federation, rsAPI, caches, keyRing, true,
base, &natsInstance, federation, rsAPI, caches, keyRing, true,
)
userAPI := userapi.NewInternalAPI(base, rsAPI, federation)
userAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, federation)
asAPI := appservice.NewInternalAPI(base, userAPI, rsAPI)
asAPI := appservice.NewInternalAPI(base, &natsInstance, userAPI, rsAPI)
rsAPI.SetAppserviceAPI(asAPI)
// The underlying roomserver implementation needs to be able to call the fedsender.
@ -189,7 +191,7 @@ func (m *DendriteMonolith) Start() {
ygg, fsAPI, federation,
),
}
monolith.AddAllPublicRoutes(base, caches)
monolith.AddAllPublicRoutes(base, &natsInstance, caches)
httpRouter := mux.NewRouter()
httpRouter.PathPrefix(httputil.PublicClientPathPrefix).Handler(base.Routers.Client)

View file

@ -12,6 +12,7 @@ import (
"github.com/matrix-org/dendrite/roomserver"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/syncapi"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
@ -32,18 +33,18 @@ func TestAdminResetPassword(t *testing.T) {
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
base, baseClose := testrig.CreateBaseDendrite(t, dbType)
defer baseClose()
natsInstance := jetstream.NATSInstance{}
// add a vhost
base.Cfg.Global.VirtualHosts = append(base.Cfg.Global.VirtualHosts, &config.VirtualHost{
SigningIdentity: gomatrixserverlib.SigningIdentity{ServerName: "vh1"},
})
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
rsAPI := roomserver.NewInternalAPI(base, caches)
rsAPI := roomserver.NewInternalAPI(base, &natsInstance, caches)
// Needed for changing the password/login
userAPI := userapi.NewInternalAPI(base, rsAPI, nil)
userAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, nil)
// We mostly need the userAPI for this test, so nil for other APIs/caches etc.
AddPublicRoutes(base, nil, rsAPI, nil, nil, nil, userAPI, nil, nil)
AddPublicRoutes(base, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil)
// Create the users in the userapi and login
accessTokens := map[*test.User]string{
@ -151,15 +152,16 @@ func TestPurgeRoom(t *testing.T) {
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
base, baseClose := testrig.CreateBaseDendrite(t, dbType)
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
natsInstance := jetstream.NATSInstance{}
defer baseClose()
fedClient := base.CreateFederationClient()
rsAPI := roomserver.NewInternalAPI(base, caches)
userAPI := userapi.NewInternalAPI(base, rsAPI, nil)
rsAPI := roomserver.NewInternalAPI(base, &natsInstance, caches)
userAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, nil)
// this starts the JetStream consumers
syncapi.AddPublicRoutes(base, userAPI, rsAPI, caches)
federationapi.NewInternalAPI(base, fedClient, rsAPI, caches, nil, true)
syncapi.AddPublicRoutes(base, &natsInstance, userAPI, rsAPI, caches)
federationapi.NewInternalAPI(base, &natsInstance, fedClient, rsAPI, caches, nil, true)
rsAPI.SetFederationAPI(nil, nil)
// Create the room
@ -168,7 +170,7 @@ func TestPurgeRoom(t *testing.T) {
}
// We mostly need the rsAPI for this test, so nil for other APIs/caches etc.
AddPublicRoutes(base, nil, rsAPI, nil, nil, nil, userAPI, nil, nil)
AddPublicRoutes(base, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil)
// Create the users in the userapi and login
accessTokens := map[*test.User]string{

View file

@ -32,6 +32,7 @@ import (
// AddPublicRoutes sets up and registers HTTP handlers for the ClientAPI component.
func AddPublicRoutes(
base *base.BaseDendrite,
natsInstance *jetstream.NATSInstance,
federation *gomatrixserverlib.FederationClient,
rsAPI roomserverAPI.ClientRoomserverAPI,
asAPI appserviceAPI.AppServiceInternalAPI,
@ -43,7 +44,7 @@ func AddPublicRoutes(
) {
cfg := &base.Cfg.ClientAPI
mscCfg := &base.Cfg.MSCs
js, natsClient := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream)
js, natsClient := natsInstance.Prepare(base.ProcessContext, &cfg.Matrix.JetStream)
syncProducer := &producers.SyncAPIProducer{
JetStream: js,

View file

@ -8,6 +8,7 @@ import (
"time"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/dendrite/appservice"
@ -29,9 +30,10 @@ func TestJoinRoomByIDOrAlias(t *testing.T) {
defer baseClose()
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
rsAPI := roomserver.NewInternalAPI(base, caches)
userAPI := userapi.NewInternalAPI(base, rsAPI, nil)
asAPI := appservice.NewInternalAPI(base, userAPI, rsAPI)
natsInstance := jetstream.NATSInstance{}
rsAPI := roomserver.NewInternalAPI(base, &natsInstance, caches)
userAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, nil)
asAPI := appservice.NewInternalAPI(base, &natsInstance, userAPI, rsAPI)
rsAPI.SetFederationAPI(nil, nil) // creates the rs.Inputer etc
// Create the users in the userapi

View file

@ -12,6 +12,7 @@ import (
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/roomserver"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
@ -32,15 +33,16 @@ func TestLogin(t *testing.T) {
base, baseClose := testrig.CreateBaseDendrite(t, dbType)
defer baseClose()
base.Cfg.ClientAPI.RateLimiting.Enabled = false
natsInstance := jetstream.NATSInstance{}
// add a vhost
base.Cfg.Global.VirtualHosts = append(base.Cfg.Global.VirtualHosts, &config.VirtualHost{
SigningIdentity: gomatrixserverlib.SigningIdentity{ServerName: "vh1"},
})
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
rsAPI := roomserver.NewInternalAPI(base, caches)
rsAPI := roomserver.NewInternalAPI(base, &natsInstance, caches)
// Needed for /login
userAPI := userapi.NewInternalAPI(base, rsAPI, nil)
userAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, nil)
// We mostly need the userAPI for this test, so nil for other APIs/caches etc.
Setup(base, &base.Cfg.ClientAPI, nil, nil, userAPI, nil, nil, nil, nil, nil, nil, &base.Cfg.MSCs, nil)

View file

@ -33,6 +33,7 @@ import (
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/roomserver"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/test"
"github.com/matrix-org/dendrite/test/testrig"
"github.com/matrix-org/dendrite/userapi"
@ -409,8 +410,9 @@ func Test_register(t *testing.T) {
defer baseClose()
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
rsAPI := roomserver.NewInternalAPI(base, caches)
userAPI := userapi.NewInternalAPI(base, rsAPI, nil)
natsInstance := jetstream.NATSInstance{}
rsAPI := roomserver.NewInternalAPI(base, &natsInstance, caches)
userAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, nil)
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
@ -581,8 +583,9 @@ func TestRegisterUserWithDisplayName(t *testing.T) {
base.Cfg.Global.ServerName = "server"
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
rsAPI := roomserver.NewInternalAPI(base, caches)
userAPI := userapi.NewInternalAPI(base, rsAPI, nil)
natsInstance := jetstream.NATSInstance{}
rsAPI := roomserver.NewInternalAPI(base, &natsInstance, caches)
userAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, nil)
deviceName, deviceID := "deviceName", "deviceID"
expectedDisplayName := "DisplayName"
response := completeRegistration(
@ -616,12 +619,13 @@ func TestRegisterAdminUsingSharedSecret(t *testing.T) {
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
base, baseClose := testrig.CreateBaseDendrite(t, dbType)
defer baseClose()
natsInstance := jetstream.NATSInstance{}
base.Cfg.Global.ServerName = "server"
sharedSecret := "dendritetest"
base.Cfg.ClientAPI.RegistrationSharedSecret = sharedSecret
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
rsAPI := roomserver.NewInternalAPI(base, caches)
userAPI := userapi.NewInternalAPI(base, rsAPI, nil)
rsAPI := roomserver.NewInternalAPI(base, &natsInstance, caches)
userAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, nil)
expectedDisplayName := "rabbit"
jsonStr := []byte(`{"admin":true,"mac":"24dca3bba410e43fe64b9b5c28306693bf3baa9f","nonce":"759f047f312b99ff428b21d581256f8592b8976e58bc1b543972dc6147e529a79657605b52d7becd160ff5137f3de11975684319187e06901955f79e5a6c5a79","password":"wonderland","username":"alice","displayname":"rabbit"}`)

View file

@ -136,21 +136,22 @@ func (p *P2PMonolith) SetupDendrite(cfg *config.Dendrite, port int, enableRelayi
keyRing := serverKeyAPI.KeyRing()
caches := caching.NewRistrettoCache(cfg.Global.Cache.EstimatedMaxSize, cfg.Global.Cache.MaxAge, enableMetrics)
rsAPI := roomserver.NewInternalAPI(p.BaseDendrite, caches)
natsInstance := jetstream.NATSInstance{}
rsAPI := roomserver.NewInternalAPI(p.BaseDendrite, &natsInstance, caches)
fsAPI := federationapi.NewInternalAPI(
p.BaseDendrite, federation, rsAPI, caches, keyRing, true,
p.BaseDendrite, &natsInstance, federation, rsAPI, caches, keyRing, true,
)
userAPI := userapi.NewInternalAPI(p.BaseDendrite, rsAPI, federation)
userAPI := userapi.NewInternalAPI(p.BaseDendrite, &natsInstance, rsAPI, federation)
asAPI := appservice.NewInternalAPI(p.BaseDendrite, userAPI, rsAPI)
asAPI := appservice.NewInternalAPI(p.BaseDendrite, &natsInstance, userAPI, rsAPI)
rsAPI.SetFederationAPI(fsAPI, keyRing)
userProvider := users.NewPineconeUserProvider(p.Router, p.Sessions, userAPI, federation)
roomProvider := rooms.NewPineconeRoomProvider(p.Router, p.Sessions, fsAPI, federation)
js, _ := p.BaseDendrite.NATS.Prepare(p.BaseDendrite.ProcessContext, &p.BaseDendrite.Cfg.Global.JetStream)
js, _ := natsInstance.Prepare(p.BaseDendrite.ProcessContext, &p.BaseDendrite.Cfg.Global.JetStream)
producer := &producers.SyncAPIProducer{
JetStream: js,
TopicReceiptEvent: p.BaseDendrite.Cfg.Global.JetStream.Prefixed(jetstream.OutputReceiptEvent),
@ -179,7 +180,7 @@ func (p *P2PMonolith) SetupDendrite(cfg *config.Dendrite, port int, enableRelayi
ExtPublicRoomsProvider: roomProvider,
ExtUserDirectoryProvider: userProvider,
}
p.dendrite.AddAllPublicRoutes(p.BaseDendrite, caches)
p.dendrite.AddAllPublicRoutes(p.BaseDendrite, &natsInstance, caches)
p.setupHttpServers(userProvider, enableWebsockets)
}

View file

@ -28,6 +28,7 @@ import (
"time"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/gomatrixserverlib"
"github.com/gorilla/mux"
@ -158,14 +159,15 @@ func main() {
keyRing := serverKeyAPI.KeyRing()
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.EnableMetrics)
rsAPI := roomserver.NewInternalAPI(base, caches)
natsInstance := jetstream.NATSInstance{}
rsAPI := roomserver.NewInternalAPI(base, &natsInstance, caches)
userAPI := userapi.NewInternalAPI(base, rsAPI, federation)
userAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, federation)
asAPI := appservice.NewInternalAPI(base, userAPI, rsAPI)
asAPI := appservice.NewInternalAPI(base, &natsInstance, userAPI, rsAPI)
rsAPI.SetAppserviceAPI(asAPI)
fsAPI := federationapi.NewInternalAPI(
base, federation, rsAPI, caches, keyRing, true,
base, &natsInstance, federation, rsAPI, caches, keyRing, true,
)
rsAPI.SetFederationAPI(fsAPI, keyRing)
@ -184,7 +186,7 @@ func main() {
ygg, fsAPI, federation,
),
}
monolith.AddAllPublicRoutes(base, caches)
monolith.AddAllPublicRoutes(base, &natsInstance, caches)
if err := mscs.Enable(base, &monolith, caches); err != nil {
logrus.WithError(err).Fatalf("Failed to enable MSCs")
}

View file

@ -18,6 +18,7 @@ import (
"flag"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/sirupsen/logrus"
"github.com/matrix-org/dendrite/appservice"
@ -74,17 +75,17 @@ func main() {
federation := base.CreateFederationClient()
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.EnableMetrics)
rsAPI := roomserver.NewInternalAPI(base, caches)
natsInstance := jetstream.NATSInstance{}
rsAPI := roomserver.NewInternalAPI(base, &natsInstance, caches)
fsAPI := federationapi.NewInternalAPI(
base, federation, rsAPI, caches, nil, false,
base, &natsInstance, federation, rsAPI, caches, nil, false,
)
keyRing := fsAPI.KeyRing()
userAPI := userapi.NewInternalAPI(base, rsAPI, federation)
userAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, federation)
asAPI := appservice.NewInternalAPI(base, userAPI, rsAPI)
asAPI := appservice.NewInternalAPI(base, &natsInstance, userAPI, rsAPI)
// The underlying roomserver implementation needs to be able to call the fedsender.
// This is different to rsAPI which can be the http client which doesn't need this
@ -106,7 +107,7 @@ func main() {
RoomserverAPI: rsAPI,
UserAPI: userAPI,
}
monolith.AddAllPublicRoutes(base, caches)
monolith.AddAllPublicRoutes(base, &natsInstance, caches)
if len(base.Cfg.MSCs.MSCs) > 0 {
if err := mscs.Enable(base, &monolith, caches); err != nil {

View file

@ -41,6 +41,7 @@ import (
// AddPublicRoutes sets up and registers HTTP handlers on the base API muxes for the FederationAPI component.
func AddPublicRoutes(
base *base.BaseDendrite,
natsInstance *jetstream.NATSInstance,
userAPI userapi.FederationUserAPI,
federation *gomatrixserverlib.FederationClient,
keyRing gomatrixserverlib.JSONVerifier,
@ -50,7 +51,7 @@ func AddPublicRoutes(
) {
cfg := &base.Cfg.FederationAPI
mscCfg := &base.Cfg.MSCs
js, _ := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream)
js, _ := natsInstance.Prepare(base.ProcessContext, &cfg.Matrix.JetStream)
producer := &producers.SyncAPIProducer{
JetStream: js,
TopicReceiptEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent),
@ -86,6 +87,7 @@ func AddPublicRoutes(
// can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes.
func NewInternalAPI(
base *base.BaseDendrite,
natsInstance *jetstream.NATSInstance,
federation api.FederationClient,
rsAPI roomserverAPI.FederationRoomserverAPI,
caches *caching.Caches,
@ -108,7 +110,7 @@ func NewInternalAPI(
cfg.FederationMaxRetries+1,
cfg.P2PFederationRetriesUntilAssumedOffline+1)
js, nats := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream)
js, nats := natsInstance.Prepare(base.ProcessContext, &cfg.Matrix.JetStream)
signingInfo := base.Cfg.Global.SigningIdentities()

View file

@ -12,6 +12,7 @@ import (
"testing"
"time"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/dendrite/federationapi/api"
@ -65,7 +66,7 @@ func TestMain(m *testing.M) {
// Create a new cache but don't enable prometheus!
s.cache = caching.NewRistrettoCache(8*1024*1024, time.Hour, false)
natsInstance := jetstream.NATSInstance{}
// Create a temporary directory for JetStream.
d, err := os.MkdirTemp("./", "jetstream*")
if err != nil {
@ -110,7 +111,7 @@ func TestMain(m *testing.M) {
// Finally, build the server key APIs.
sbase := base.NewBaseDendrite(cfg, base.DisableMetrics)
s.api = NewInternalAPI(sbase, s.fedclient, nil, s.cache, nil, true)
s.api = NewInternalAPI(sbase, &natsInstance, s.fedclient, nil, s.cache, nil, true)
}
// Now that we have built our server key APIs, start the

View file

@ -165,10 +165,11 @@ func TestFederationAPIJoinThenKeyUpdate(t *testing.T) {
func testFederationAPIJoinThenKeyUpdate(t *testing.T, dbType test.DBType) {
base, close := testrig.CreateBaseDendrite(t, dbType)
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
natsInstance := jetstream.NATSInstance{}
base.Cfg.FederationAPI.PreferDirectFetch = true
base.Cfg.FederationAPI.KeyPerspectives = nil
defer close()
jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
jsctx, _ := natsInstance.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
serverA := gomatrixserverlib.ServerName("server.a")
@ -214,7 +215,7 @@ func testFederationAPIJoinThenKeyUpdate(t *testing.T, dbType test.DBType) {
},
},
}
fsapi := federationapi.NewInternalAPI(base, fc, rsapi, caches, nil, false)
fsapi := federationapi.NewInternalAPI(base, &natsInstance, fc, rsapi, caches, nil, false)
var resp api.PerformJoinResponse
fsapi.PerformJoin(context.Background(), &api.PerformJoinRequest{
@ -277,9 +278,10 @@ func TestRoomsV3URLEscapeDoNot404(t *testing.T) {
cfg.Global.JetStream.InMemory = true
b := base.NewBaseDendrite(cfg, base.DisableMetrics)
keyRing := &test.NopJSONVerifier{}
natsInstance := jetstream.NATSInstance{}
// TODO: This is pretty fragile, as if anything calls anything on these nils this test will break.
// Unfortunately, it makes little sense to instantiate these dependencies when we just want to test routing.
federationapi.AddPublicRoutes(b, nil, nil, keyRing, nil, &internal.FederationInternalAPI{}, nil)
federationapi.AddPublicRoutes(b, &natsInstance, nil, nil, keyRing, nil, &internal.FederationInternalAPI{}, nil)
baseURL, cancel := test.ListenAndServe(t, b.Routers.Federation, true)
defer cancel()
serverName := gomatrixserverlib.ServerName(strings.TrimPrefix(baseURL, "https://"))

View file

@ -28,6 +28,7 @@ import (
fedInternal "github.com/matrix-org/dendrite/federationapi/internal"
"github.com/matrix-org/dendrite/federationapi/routing"
"github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/test"
"github.com/matrix-org/dendrite/test/testrig"
userAPI "github.com/matrix-org/dendrite/userapi/api"
@ -50,13 +51,14 @@ func TestHandleQueryProfile(t *testing.T) {
defer close()
fedMux := mux.NewRouter().SkipClean(true).PathPrefix(httputil.PublicFederationPathPrefix).Subrouter().UseEncodedPath()
natsInstance := jetstream.NATSInstance{}
base.Routers.Federation = fedMux
base.Cfg.FederationAPI.Matrix.SigningIdentity.ServerName = testOrigin
base.Cfg.FederationAPI.Matrix.Metrics.Enabled = false
fedClient := fakeFedClient{}
serverKeyAPI := &signing.YggdrasilKeys{}
keyRing := serverKeyAPI.KeyRing()
fedapi := fedAPI.NewInternalAPI(base, &fedClient, nil, nil, keyRing, true)
fedapi := fedAPI.NewInternalAPI(base, &natsInstance, &fedClient, nil, nil, keyRing, true)
userapi := fakeUserAPI{}
r, ok := fedapi.(*fedInternal.FederationInternalAPI)
if !ok {

View file

@ -29,6 +29,7 @@ import (
fedInternal "github.com/matrix-org/dendrite/federationapi/internal"
"github.com/matrix-org/dendrite/federationapi/routing"
"github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/test"
"github.com/matrix-org/dendrite/test/testrig"
"github.com/matrix-org/gomatrixserverlib"
@ -50,13 +51,14 @@ func TestHandleQueryDirectory(t *testing.T) {
defer close()
fedMux := mux.NewRouter().SkipClean(true).PathPrefix(httputil.PublicFederationPathPrefix).Subrouter().UseEncodedPath()
natsInstance := jetstream.NATSInstance{}
base.Routers.Federation = fedMux
base.Cfg.FederationAPI.Matrix.SigningIdentity.ServerName = testOrigin
base.Cfg.FederationAPI.Matrix.Metrics.Enabled = false
fedClient := fakeFedClient{}
serverKeyAPI := &signing.YggdrasilKeys{}
keyRing := serverKeyAPI.KeyRing()
fedapi := fedAPI.NewInternalAPI(base, &fedClient, nil, nil, keyRing, true)
fedapi := fedAPI.NewInternalAPI(base, &natsInstance, &fedClient, nil, nil, keyRing, true)
userapi := fakeUserAPI{}
r, ok := fedapi.(*fedInternal.FederationInternalAPI)
if !ok {

View file

@ -26,6 +26,7 @@ import (
fedInternal "github.com/matrix-org/dendrite/federationapi/internal"
"github.com/matrix-org/dendrite/federationapi/routing"
"github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/test"
"github.com/matrix-org/dendrite/test/testrig"
"github.com/matrix-org/gomatrixserverlib"
@ -48,10 +49,11 @@ func TestHandleSend(t *testing.T) {
defer close()
fedMux := mux.NewRouter().SkipClean(true).PathPrefix(httputil.PublicFederationPathPrefix).Subrouter().UseEncodedPath()
natsInstance := jetstream.NATSInstance{}
base.Routers.Federation = fedMux
base.Cfg.FederationAPI.Matrix.SigningIdentity.ServerName = testOrigin
base.Cfg.FederationAPI.Matrix.Metrics.Enabled = false
fedapi := fedAPI.NewInternalAPI(base, nil, nil, nil, nil, true)
fedapi := fedAPI.NewInternalAPI(base, &natsInstance, nil, nil, nil, nil, true)
serverKeyAPI := &signing.YggdrasilKeys{}
keyRing := serverKeyAPI.KeyRing()
r, ok := fedapi.(*fedInternal.FederationInternalAPI)

View file

@ -16,6 +16,7 @@ package roomserver
import (
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/sirupsen/logrus"
"github.com/matrix-org/dendrite/roomserver/api"
@ -27,6 +28,7 @@ import (
// NewInternalAPI returns a concrete implementation of the internal API.
func NewInternalAPI(
base *base.BaseDendrite,
natsInstance *jetstream.NATSInstance,
caches caching.RoomServerCaches,
) api.RoomserverInternalAPI {
cfg := &base.Cfg.RoomServer
@ -36,7 +38,7 @@ func NewInternalAPI(
logrus.WithError(err).Panicf("failed to connect to room server db")
}
js, nc := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream)
js, nc := natsInstance.Prepare(base.ProcessContext, &cfg.Matrix.JetStream)
return internal.NewRoomserverAPI(
base, roomserverDB, js, nc, caches,

View file

@ -46,7 +46,8 @@ func TestUsers(t *testing.T) {
base, close := testrig.CreateBaseDendrite(t, dbType)
defer close()
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
rsAPI := roomserver.NewInternalAPI(base, caches)
natsInstance := jetstream.NATSInstance{}
rsAPI := roomserver.NewInternalAPI(base, &natsInstance, caches)
// SetFederationAPI starts the room event input consumer
rsAPI.SetFederationAPI(nil, nil)
@ -55,7 +56,7 @@ func TestUsers(t *testing.T) {
})
t.Run("kick users", func(t *testing.T) {
usrAPI := userapi.NewInternalAPI(base, rsAPI, nil)
usrAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, nil)
rsAPI.SetUserAPI(usrAPI)
testKickUsers(t, rsAPI, usrAPI)
})
@ -185,7 +186,8 @@ func Test_QueryLeftUsers(t *testing.T) {
defer close()
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
rsAPI := roomserver.NewInternalAPI(base, caches)
natsInstance := jetstream.NATSInstance{}
rsAPI := roomserver.NewInternalAPI(base, &natsInstance, caches)
// SetFederationAPI starts the room event input consumer
rsAPI.SetFederationAPI(nil, nil)
// Create the room
@ -230,19 +232,20 @@ func TestPurgeRoom(t *testing.T) {
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
base, db, close := mustCreateDatabase(t, dbType)
natsInstance := jetstream.NATSInstance{}
defer close()
jsCtx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
jsCtx, _ := natsInstance.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
defer jetstream.DeleteAllStreams(jsCtx, &base.Cfg.Global.JetStream)
fedClient := base.CreateFederationClient()
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
rsAPI := roomserver.NewInternalAPI(base, caches)
userAPI := userapi.NewInternalAPI(base, rsAPI, nil)
rsAPI := roomserver.NewInternalAPI(base, &natsInstance, caches)
userAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, nil)
// this starts the JetStream consumers
syncapi.AddPublicRoutes(base, userAPI, rsAPI, caches)
federationapi.NewInternalAPI(base, fedClient, rsAPI, caches, nil, true)
syncapi.AddPublicRoutes(base, &natsInstance, userAPI, rsAPI, caches)
federationapi.NewInternalAPI(base, &natsInstance, fedClient, rsAPI, caches, nil, true)
rsAPI.SetFederationAPI(nil, nil)
// Create the room

View file

@ -35,7 +35,6 @@ import (
"github.com/getsentry/sentry-go"
sentryhttp "github.com/getsentry/sentry-go/http"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/gomatrixserverlib"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.uber.org/atomic"
@ -65,7 +64,6 @@ type BaseDendrite struct {
*process.ProcessContext
tracerCloser io.Closer
Routers httputil.Routers
NATS *jetstream.NATSInstance
Cfg *config.Dendrite
DNSCache *gomatrixserverlib.DNSCache
ConnectionManager sqlutil.Connections
@ -177,7 +175,6 @@ func NewBaseDendrite(cfg *config.Dendrite, options ...BaseDendriteOptions) *Base
Cfg: cfg,
DNSCache: dnsCache,
Routers: httputil.NewRouters(),
NATS: &jetstream.NATSInstance{},
ConnectionManager: cm,
EnableMetrics: enableMetrics,
}

View file

@ -73,6 +73,7 @@ func (s *NATSInstance) Prepare(process *process.ProcessContext, cfg *config.JetS
}
// reuse existing connections
if s.nc != nil {
logrus.Infof("XXX: reusing connection")
return s.js, s.nc
}
nc, err := natsclient.Connect("", natsclient.InProcessServer(s))

View file

@ -28,6 +28,7 @@ import (
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/base"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/syncapi"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
@ -53,21 +54,21 @@ type Monolith struct {
}
// AddAllPublicRoutes attaches all public paths to the given router
func (m *Monolith) AddAllPublicRoutes(base *base.BaseDendrite, caches *caching.Caches) {
func (m *Monolith) AddAllPublicRoutes(base *base.BaseDendrite, natsInstance *jetstream.NATSInstance, caches *caching.Caches) {
userDirectoryProvider := m.ExtUserDirectoryProvider
if userDirectoryProvider == nil {
userDirectoryProvider = m.UserAPI
}
clientapi.AddPublicRoutes(
base, m.FedClient, m.RoomserverAPI, m.AppserviceAPI, transactions.New(),
base, natsInstance, m.FedClient, m.RoomserverAPI, m.AppserviceAPI, transactions.New(),
m.FederationAPI, m.UserAPI, userDirectoryProvider,
m.ExtPublicRoomsProvider,
)
federationapi.AddPublicRoutes(
base, m.UserAPI, m.FedClient, m.KeyRing, m.RoomserverAPI, m.FederationAPI, nil,
base, natsInstance, m.UserAPI, m.FedClient, m.KeyRing, m.RoomserverAPI, m.FederationAPI, nil,
)
mediaapi.AddPublicRoutes(base, m.UserAPI, m.Client)
syncapi.AddPublicRoutes(base, m.UserAPI, m.RoomserverAPI, caches)
syncapi.AddPublicRoutes(base, natsInstance, m.UserAPI, m.RoomserverAPI, caches)
if m.RelayAPI != nil {
relayapi.AddPublicRoutes(base, m.KeyRing, m.RelayAPI)

View file

@ -40,13 +40,14 @@ import (
// component.
func AddPublicRoutes(
base *base.BaseDendrite,
natsInstance *jetstream.NATSInstance,
userAPI userapi.SyncUserAPI,
rsAPI api.SyncRoomserverAPI,
caches caching.LazyLoadCache,
) {
cfg := &base.Cfg.SyncAPI
js, natsClient := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream)
js, natsClient := natsInstance.Prepare(base.ProcessContext, &cfg.Matrix.JetStream)
syncDB, err := storage.NewSyncServerDatasource(base.Context(), base.ConnectionManager, &cfg.Database)
if err != nil {

View file

@ -116,12 +116,13 @@ func testSyncAccessTokens(t *testing.T, dbType test.DBType) {
base, close := testrig.CreateBaseDendrite(t, dbType)
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
natsInstance := jetstream.NATSInstance{}
defer close()
jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
jsctx, _ := natsInstance.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
msgs := toNATSMsgs(t, base, room.Events()...)
AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, caches)
AddPublicRoutes(base, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, caches)
testrig.MustPublishMsgs(t, jsctx, msgs...)
testCases := []struct {
@ -209,8 +210,9 @@ func testSyncAPICreateRoomSyncEarly(t *testing.T, dbType test.DBType) {
base, close := testrig.CreateBaseDendrite(t, dbType)
defer close()
natsInstance := jetstream.NATSInstance{}
jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
jsctx, _ := natsInstance.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
// order is:
// m.room.create
@ -221,7 +223,7 @@ func testSyncAPICreateRoomSyncEarly(t *testing.T, dbType test.DBType) {
msgs := toNATSMsgs(t, base, room.Events()...)
sinceTokens := make([]string, len(msgs))
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, caches)
AddPublicRoutes(base, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, caches)
for i, msg := range msgs {
testrig.MustPublishMsgs(t, jsctx, msg)
time.Sleep(100 * time.Millisecond)
@ -302,11 +304,12 @@ func testSyncAPIUpdatePresenceImmediately(t *testing.T, dbType test.DBType) {
base.Cfg.Global.Presence.EnableOutbound = true
base.Cfg.Global.Presence.EnableInbound = true
defer close()
natsInstance := jetstream.NATSInstance{}
jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
jsctx, _ := natsInstance.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{}, caches)
AddPublicRoutes(base, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{}, caches)
w := httptest.NewRecorder()
base.Routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
"access_token": alice.AccessToken,
@ -416,15 +419,16 @@ func testHistoryVisibility(t *testing.T, dbType test.DBType) {
base, close := testrig.CreateBaseDendrite(t, dbType)
defer close()
natsInstance := jetstream.NATSInstance{}
jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
jsctx, _ := natsInstance.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
// Use the actual internal roomserver API
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
rsAPI := roomserver.NewInternalAPI(base, caches)
rsAPI := roomserver.NewInternalAPI(base, &natsInstance, caches)
rsAPI.SetFederationAPI(nil, nil)
AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{aliceDev, bobDev}}, rsAPI, caches)
AddPublicRoutes(base, &natsInstance, &syncUserAPI{accounts: []userapi.Device{aliceDev, bobDev}}, rsAPI, caches)
for _, tc := range testCases {
testname := fmt.Sprintf("%s - %s", tc.historyVisibility, userType)
@ -716,16 +720,16 @@ func TestGetMembership(t *testing.T) {
base, close := testrig.CreateBaseDendrite(t, dbType)
defer close()
jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
natsInstance := jetstream.NATSInstance{}
jsctx, _ := natsInstance.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
// Use an actual roomserver for this
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
rsAPI := roomserver.NewInternalAPI(base, caches)
rsAPI := roomserver.NewInternalAPI(base, &natsInstance, caches)
rsAPI.SetFederationAPI(nil, nil)
AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{aliceDev, bobDev}}, rsAPI, caches)
AddPublicRoutes(base, &natsInstance, &syncUserAPI{accounts: []userapi.Device{aliceDev, bobDev}}, rsAPI, caches)
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
@ -788,11 +792,12 @@ func testSendToDevice(t *testing.T, dbType test.DBType) {
base, baseClose := testrig.CreateBaseDendrite(t, dbType)
defer baseClose()
natsInstance := jetstream.NATSInstance{}
jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
jsctx, _ := natsInstance.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{}, caches)
AddPublicRoutes(base, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{}, caches)
producer := producers.SyncAPIProducer{
TopicSendToDeviceEvent: base.Cfg.Global.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent),
@ -1009,10 +1014,11 @@ func testContext(t *testing.T, dbType test.DBType) {
// Use an actual roomserver for this
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
rsAPI := roomserver.NewInternalAPI(base, caches)
natsInstance := jetstream.NATSInstance{}
rsAPI := roomserver.NewInternalAPI(base, &natsInstance, caches)
rsAPI.SetFederationAPI(nil, nil)
AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, rsAPI, caches)
AddPublicRoutes(base, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, rsAPI, caches)
room := test.NewRoom(t, user)
@ -1025,7 +1031,7 @@ func testContext(t *testing.T, dbType test.DBType) {
t.Fatalf("failed to send events: %v", err)
}
jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
jsctx, _ := natsInstance.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
syncUntil(t, base, alice.AccessToken, false, func(syncBody string) bool {
@ -1171,6 +1177,7 @@ func syncUntil(t *testing.T,
skip bool,
checkFunc func(syncBody string) bool,
) {
t.Helper()
if checkFunc == nil {
t.Fatalf("No checkFunc defined")
}

View file

@ -21,6 +21,7 @@ import (
"github.com/matrix-org/dendrite/setup/base"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/test"
"github.com/nats-io/nats.go"
)
@ -110,6 +111,7 @@ func Base(cfg *config.Dendrite) (*base.BaseDendrite, nats.JetStreamContext, *nat
cfg.SyncAPI.Fulltext.InMemory = true
cfg.FederationAPI.KeyPerspectives = nil
base := base.NewBaseDendrite(cfg, base.DisableMetrics)
js, jc := base.NATS.Prepare(base.ProcessContext, &cfg.Global.JetStream)
natsInstance := jetstream.NATSInstance{}
js, jc := natsInstance.Prepare(base.ProcessContext, &cfg.Global.JetStream)
return base, js, jc
}

View file

@ -36,11 +36,12 @@ import (
// can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes.
func NewInternalAPI(
base *base.BaseDendrite,
natsInstance *jetstream.NATSInstance,
rsAPI rsapi.UserRoomserverAPI,
fedClient fedsenderapi.KeyserverFederationAPI,
) *internal.UserInternalAPI {
cfg := &base.Cfg.UserAPI
js, _ := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream)
js, _ := natsInstance.Prepare(base.ProcessContext, &cfg.Matrix.JetStream)
appServices := base.Cfg.Derived.ApplicationServices
pgClient := pushgateway.NewHTTPClient(cfg.PushGatewayDisableTLSValidation)