diff --git a/appservice/appservice_test.go b/appservice/appservice_test.go index df117aa6d..eef68492c 100644 --- a/appservice/appservice_test.go +++ b/appservice/appservice_test.go @@ -129,7 +129,7 @@ func TestAppserviceInternalAPI(t *testing.T) { // Create required internal APIs natsInstance := jetstream.NATSInstance{} rsAPI := roomserver.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, caches, base.EnableMetrics) - usrAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, nil) + usrAPI := userapi.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, rsAPI, nil) asAPI := appservice.NewInternalAPI(base.ProcessContext, base.Cfg, &natsInstance, usrAPI, rsAPI) runCases(t, asAPI) diff --git a/build/dendritejs-pinecone/main.go b/build/dendritejs-pinecone/main.go index 49de37388..0c52c3e88 100644 --- a/build/dendritejs-pinecone/main.go +++ b/build/dendritejs-pinecone/main.go @@ -189,7 +189,7 @@ func startup() { serverKeyAPI := &signing.YggdrasilKeys{} keyRing := serverKeyAPI.KeyRing() - userAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, federation) + userAPI := userapi.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, rsAPI, federation) asQuery := appservice.NewInternalAPI( base.ProcessContext, base.Cfg, &natsInstance, userAPI, rsAPI, diff --git a/build/gobind-yggdrasil/monolith.go b/build/gobind-yggdrasil/monolith.go index 364ef071b..54e99e07b 100644 --- a/build/gobind-yggdrasil/monolith.go +++ b/build/gobind-yggdrasil/monolith.go @@ -168,7 +168,7 @@ func (m *DendriteMonolith) Start() { base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, federation, rsAPI, caches, keyRing, true, ) - userAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, federation) + userAPI := userapi.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, rsAPI, federation) asAPI := appservice.NewInternalAPI(base.ProcessContext, base.Cfg, &natsInstance, userAPI, rsAPI) rsAPI.SetAppserviceAPI(asAPI) diff --git a/clientapi/admin_test.go b/clientapi/admin_test.go index 07407f2ac..fe20a445a 100644 --- a/clientapi/admin_test.go +++ b/clientapi/admin_test.go @@ -42,7 +42,7 @@ func TestAdminResetPassword(t *testing.T) { caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics) rsAPI := roomserver.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, caches, base.EnableMetrics) // Needed for changing the password/login - userAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, nil) + userAPI := userapi.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, rsAPI, nil) // We mostly need the userAPI for this test, so nil for other APIs/caches etc. AddPublicRoutes(base.ProcessContext, base.Routers, base.Cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, base.EnableMetrics) @@ -157,7 +157,7 @@ func TestPurgeRoom(t *testing.T) { fedClient := base.CreateFederationClient() rsAPI := roomserver.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, caches, base.EnableMetrics) - userAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, nil) + userAPI := userapi.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, rsAPI, nil) // this starts the JetStream consumers syncapi.AddPublicRoutes(base.ProcessContext, base.Routers, base.Cfg, base.ConnectionManager, &natsInstance, userAPI, rsAPI, caches, base.EnableMetrics) diff --git a/clientapi/routing/joinroom_test.go b/clientapi/routing/joinroom_test.go index 721f395e5..4cac0cf78 100644 --- a/clientapi/routing/joinroom_test.go +++ b/clientapi/routing/joinroom_test.go @@ -32,7 +32,7 @@ func TestJoinRoomByIDOrAlias(t *testing.T) { caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics) natsInstance := jetstream.NATSInstance{} rsAPI := roomserver.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, caches, base.EnableMetrics) - userAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, nil) + userAPI := userapi.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, rsAPI, nil) asAPI := appservice.NewInternalAPI(base.ProcessContext, base.Cfg, &natsInstance, userAPI, rsAPI) rsAPI.SetFederationAPI(nil, nil) // creates the rs.Inputer etc diff --git a/clientapi/routing/login_test.go b/clientapi/routing/login_test.go index 3a2f53f29..44be87cb0 100644 --- a/clientapi/routing/login_test.go +++ b/clientapi/routing/login_test.go @@ -42,7 +42,7 @@ func TestLogin(t *testing.T) { caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics) rsAPI := roomserver.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, caches, base.EnableMetrics) // Needed for /login - userAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, nil) + userAPI := userapi.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, rsAPI, nil) // We mostly need the userAPI for this test, so nil for other APIs/caches etc. Setup(base.Routers, base.Cfg, nil, nil, userAPI, nil, nil, nil, nil, nil, nil, nil, base.EnableMetrics) diff --git a/clientapi/routing/register_test.go b/clientapi/routing/register_test.go index 1f4564dac..a3d055a7e 100644 --- a/clientapi/routing/register_test.go +++ b/clientapi/routing/register_test.go @@ -412,7 +412,7 @@ func Test_register(t *testing.T) { caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics) natsInstance := jetstream.NATSInstance{} rsAPI := roomserver.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, caches, base.EnableMetrics) - userAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, nil) + userAPI := userapi.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, rsAPI, nil) for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { @@ -585,7 +585,7 @@ func TestRegisterUserWithDisplayName(t *testing.T) { caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics) natsInstance := jetstream.NATSInstance{} rsAPI := roomserver.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, caches, base.EnableMetrics) - userAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, nil) + userAPI := userapi.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, rsAPI, nil) deviceName, deviceID := "deviceName", "deviceID" expectedDisplayName := "DisplayName" response := completeRegistration( @@ -625,7 +625,7 @@ func TestRegisterAdminUsingSharedSecret(t *testing.T) { base.Cfg.ClientAPI.RegistrationSharedSecret = sharedSecret caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics) rsAPI := roomserver.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, caches, base.EnableMetrics) - userAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, nil) + userAPI := userapi.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, rsAPI, nil) expectedDisplayName := "rabbit" jsonStr := []byte(`{"admin":true,"mac":"24dca3bba410e43fe64b9b5c28306693bf3baa9f","nonce":"759f047f312b99ff428b21d581256f8592b8976e58bc1b543972dc6147e529a79657605b52d7becd160ff5137f3de11975684319187e06901955f79e5a6c5a79","password":"wonderland","username":"alice","displayname":"rabbit"}`) diff --git a/cmd/dendrite-demo-pinecone/monolith/monolith.go b/cmd/dendrite-demo-pinecone/monolith/monolith.go index f759f147c..f315834a2 100644 --- a/cmd/dendrite-demo-pinecone/monolith/monolith.go +++ b/cmd/dendrite-demo-pinecone/monolith/monolith.go @@ -142,7 +142,7 @@ func (p *P2PMonolith) SetupDendrite(cfg *config.Dendrite, port int, enableRelayi p.BaseDendrite.ProcessContext, p.BaseDendrite.Cfg, p.BaseDendrite.ConnectionManager, &natsInstance, federation, rsAPI, caches, keyRing, true, ) - userAPI := userapi.NewInternalAPI(p.BaseDendrite, &natsInstance, rsAPI, federation) + userAPI := userapi.NewInternalAPI(p.BaseDendrite.ProcessContext, p.BaseDendrite.Cfg, p.BaseDendrite.ConnectionManager, &natsInstance, rsAPI, federation) asAPI := appservice.NewInternalAPI(p.BaseDendrite.ProcessContext, p.BaseDendrite.Cfg, &natsInstance, userAPI, rsAPI) diff --git a/cmd/dendrite-demo-yggdrasil/main.go b/cmd/dendrite-demo-yggdrasil/main.go index 50d8c7ece..29247dec0 100644 --- a/cmd/dendrite-demo-yggdrasil/main.go +++ b/cmd/dendrite-demo-yggdrasil/main.go @@ -162,7 +162,7 @@ func main() { natsInstance := jetstream.NATSInstance{} rsAPI := roomserver.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, caches, base.EnableMetrics) - userAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, federation) + userAPI := userapi.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, rsAPI, federation) asAPI := appservice.NewInternalAPI(base.ProcessContext, base.Cfg, &natsInstance, userAPI, rsAPI) rsAPI.SetAppserviceAPI(asAPI) diff --git a/cmd/dendrite/main.go b/cmd/dendrite/main.go index 2a2c7fde5..35689e172 100644 --- a/cmd/dendrite/main.go +++ b/cmd/dendrite/main.go @@ -83,7 +83,7 @@ func main() { keyRing := fsAPI.KeyRing() - userAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, federation) + userAPI := userapi.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, rsAPI, federation) asAPI := appservice.NewInternalAPI(base.ProcessContext, base.Cfg, &natsInstance, userAPI, rsAPI) diff --git a/roomserver/roomserver_test.go b/roomserver/roomserver_test.go index 03782cf62..6a4044559 100644 --- a/roomserver/roomserver_test.go +++ b/roomserver/roomserver_test.go @@ -56,7 +56,7 @@ func TestUsers(t *testing.T) { }) t.Run("kick users", func(t *testing.T) { - usrAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, nil) + usrAPI := userapi.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, rsAPI, nil) rsAPI.SetUserAPI(usrAPI) testKickUsers(t, rsAPI, usrAPI) }) @@ -241,7 +241,7 @@ func TestPurgeRoom(t *testing.T) { fedClient := base.CreateFederationClient() caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics) rsAPI := roomserver.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, caches, base.EnableMetrics) - userAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, nil) + userAPI := userapi.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, rsAPI, nil) // this starts the JetStream consumers syncapi.AddPublicRoutes(base.ProcessContext, base.Routers, base.Cfg, base.ConnectionManager, &natsInstance, userAPI, rsAPI, caches, base.EnableMetrics) diff --git a/userapi/userapi.go b/userapi/userapi.go index dfa26f47d..6dcbc121f 100644 --- a/userapi/userapi.go +++ b/userapi/userapi.go @@ -19,10 +19,12 @@ import ( fedsenderapi "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/internal/pushgateway" + "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/process" "github.com/sirupsen/logrus" rsapi "github.com/matrix-org/dendrite/roomserver/api" - "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/dendrite/userapi/consumers" @@ -35,32 +37,33 @@ import ( // NewInternalAPI returns a concrete implementation of the internal API. Callers // can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes. func NewInternalAPI( - base *base.BaseDendrite, + processContext *process.ProcessContext, + dendriteCfg *config.Dendrite, + cm sqlutil.Connections, natsInstance *jetstream.NATSInstance, rsAPI rsapi.UserRoomserverAPI, fedClient fedsenderapi.KeyserverFederationAPI, ) *internal.UserInternalAPI { - cfg := &base.Cfg.UserAPI - js, _ := natsInstance.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) - appServices := base.Cfg.Derived.ApplicationServices + js, _ := natsInstance.Prepare(processContext, &dendriteCfg.Global.JetStream) + appServices := dendriteCfg.Derived.ApplicationServices - pgClient := pushgateway.NewHTTPClient(cfg.PushGatewayDisableTLSValidation) + pgClient := pushgateway.NewHTTPClient(dendriteCfg.UserAPI.PushGatewayDisableTLSValidation) db, err := storage.NewUserDatabase( - base.ProcessContext.Context(), - base.ConnectionManager, - &cfg.AccountDatabase, - cfg.Matrix.ServerName, - cfg.BCryptCost, - cfg.OpenIDTokenLifetimeMS, + processContext.Context(), + cm, + &dendriteCfg.UserAPI.AccountDatabase, + dendriteCfg.Global.ServerName, + dendriteCfg.UserAPI.BCryptCost, + dendriteCfg.UserAPI.OpenIDTokenLifetimeMS, api.DefaultLoginTokenLifetime, - cfg.Matrix.ServerNotices.LocalPart, + dendriteCfg.UserAPI.Matrix.ServerNotices.LocalPart, ) if err != nil { logrus.WithError(err).Panicf("failed to connect to accounts db") } - keyDB, err := storage.NewKeyDatabase(base.ConnectionManager, &base.Cfg.KeyServer.Database) + keyDB, err := storage.NewKeyDatabase(cm, &dendriteCfg.KeyServer.Database) if err != nil { logrus.WithError(err).Panicf("failed to connect to key db") } @@ -71,11 +74,11 @@ func NewInternalAPI( // it's handled by clientapi, and hence uses its topic. When user // API handles it for all account data, we can remove it from // here. - cfg.Matrix.JetStream.Prefixed(jetstream.OutputClientData), - cfg.Matrix.JetStream.Prefixed(jetstream.OutputNotificationData), + dendriteCfg.Global.JetStream.Prefixed(jetstream.OutputClientData), + dendriteCfg.Global.JetStream.Prefixed(jetstream.OutputNotificationData), ) keyChangeProducer := &producers.KeyChange{ - Topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputKeyChangeEvent), + Topic: dendriteCfg.Global.JetStream.Prefixed(jetstream.OutputKeyChangeEvent), JetStream: js, DB: keyDB, } @@ -85,15 +88,15 @@ func NewInternalAPI( KeyDatabase: keyDB, SyncProducer: syncProducer, KeyChangeProducer: keyChangeProducer, - Config: cfg, + Config: &dendriteCfg.UserAPI, AppServices: appServices, RSAPI: rsAPI, - DisableTLSValidation: cfg.PushGatewayDisableTLSValidation, + DisableTLSValidation: dendriteCfg.UserAPI.PushGatewayDisableTLSValidation, PgClient: pgClient, FedClient: fedClient, } - updater := internal.NewDeviceListUpdater(base.ProcessContext, keyDB, userAPI, keyChangeProducer, fedClient, 8, rsAPI, cfg.Matrix.ServerName) // 8 workers TODO: configurable + updater := internal.NewDeviceListUpdater(processContext, keyDB, userAPI, keyChangeProducer, fedClient, 8, rsAPI, dendriteCfg.Global.ServerName) // 8 workers TODO: configurable userAPI.Updater = updater // Remove users which we don't share a room with anymore if err := updater.CleanUp(); err != nil { @@ -107,28 +110,28 @@ func NewInternalAPI( }() dlConsumer := consumers.NewDeviceListUpdateConsumer( - base.ProcessContext, cfg, js, updater, + processContext, &dendriteCfg.UserAPI, js, updater, ) if err := dlConsumer.Start(); err != nil { logrus.WithError(err).Panic("failed to start device list consumer") } sigConsumer := consumers.NewSigningKeyUpdateConsumer( - base.ProcessContext, cfg, js, userAPI, + processContext, &dendriteCfg.UserAPI, js, userAPI, ) if err := sigConsumer.Start(); err != nil { logrus.WithError(err).Panic("failed to start signing key consumer") } receiptConsumer := consumers.NewOutputReceiptEventConsumer( - base.ProcessContext, cfg, js, db, syncProducer, pgClient, + processContext, &dendriteCfg.UserAPI, js, db, syncProducer, pgClient, ) if err := receiptConsumer.Start(); err != nil { logrus.WithError(err).Panic("failed to start user API receipt consumer") } eventConsumer := consumers.NewOutputRoomEventConsumer( - base.ProcessContext, cfg, js, db, pgClient, rsAPI, syncProducer, + processContext, &dendriteCfg.UserAPI, js, db, pgClient, rsAPI, syncProducer, ) if err := eventConsumer.Start(); err != nil { logrus.WithError(err).Panic("failed to start user API streamed event consumer") @@ -137,15 +140,15 @@ func NewInternalAPI( var cleanOldNotifs func() cleanOldNotifs = func() { logrus.Infof("Cleaning old notifications") - if err := db.DeleteOldNotifications(base.Context()); err != nil { + if err := db.DeleteOldNotifications(processContext.Context()); err != nil { logrus.WithError(err).Error("Failed to clean old notifications") } time.AfterFunc(time.Hour, cleanOldNotifs) } time.AfterFunc(time.Minute, cleanOldNotifs) - if base.Cfg.Global.ReportStats.Enabled { - go util.StartPhoneHomeCollector(time.Now(), base.Cfg, db) + if dendriteCfg.Global.ReportStats.Enabled { + go util.StartPhoneHomeCollector(time.Now(), dendriteCfg, db) } return userAPI