Remove BaseDendrite from userAPI

This commit is contained in:
Till Faelligen 2023-03-20 14:28:32 +01:00
parent 72d48335cc
commit 1d2429cf58
No known key found for this signature in database
GPG key ID: ACCDC9606D472758
12 changed files with 45 additions and 42 deletions

View file

@ -129,7 +129,7 @@ func TestAppserviceInternalAPI(t *testing.T) {
// Create required internal APIs // Create required internal APIs
natsInstance := jetstream.NATSInstance{} natsInstance := jetstream.NATSInstance{}
rsAPI := roomserver.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, caches, base.EnableMetrics) rsAPI := roomserver.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, caches, base.EnableMetrics)
usrAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, nil) usrAPI := userapi.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, rsAPI, nil)
asAPI := appservice.NewInternalAPI(base.ProcessContext, base.Cfg, &natsInstance, usrAPI, rsAPI) asAPI := appservice.NewInternalAPI(base.ProcessContext, base.Cfg, &natsInstance, usrAPI, rsAPI)
runCases(t, asAPI) runCases(t, asAPI)

View file

@ -189,7 +189,7 @@ func startup() {
serverKeyAPI := &signing.YggdrasilKeys{} serverKeyAPI := &signing.YggdrasilKeys{}
keyRing := serverKeyAPI.KeyRing() keyRing := serverKeyAPI.KeyRing()
userAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, federation) userAPI := userapi.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, rsAPI, federation)
asQuery := appservice.NewInternalAPI( asQuery := appservice.NewInternalAPI(
base.ProcessContext, base.Cfg, &natsInstance, userAPI, rsAPI, base.ProcessContext, base.Cfg, &natsInstance, userAPI, rsAPI,

View file

@ -168,7 +168,7 @@ func (m *DendriteMonolith) Start() {
base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, federation, rsAPI, caches, keyRing, true, base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, federation, rsAPI, caches, keyRing, true,
) )
userAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, federation) userAPI := userapi.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, rsAPI, federation)
asAPI := appservice.NewInternalAPI(base.ProcessContext, base.Cfg, &natsInstance, userAPI, rsAPI) asAPI := appservice.NewInternalAPI(base.ProcessContext, base.Cfg, &natsInstance, userAPI, rsAPI)
rsAPI.SetAppserviceAPI(asAPI) rsAPI.SetAppserviceAPI(asAPI)

View file

@ -42,7 +42,7 @@ func TestAdminResetPassword(t *testing.T) {
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics) caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
rsAPI := roomserver.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, caches, base.EnableMetrics) rsAPI := roomserver.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, caches, base.EnableMetrics)
// Needed for changing the password/login // Needed for changing the password/login
userAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, nil) userAPI := userapi.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, rsAPI, nil)
// We mostly need the userAPI for this test, so nil for other APIs/caches etc. // We mostly need the userAPI for this test, so nil for other APIs/caches etc.
AddPublicRoutes(base.ProcessContext, base.Routers, base.Cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, base.EnableMetrics) AddPublicRoutes(base.ProcessContext, base.Routers, base.Cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, base.EnableMetrics)
@ -157,7 +157,7 @@ func TestPurgeRoom(t *testing.T) {
fedClient := base.CreateFederationClient() fedClient := base.CreateFederationClient()
rsAPI := roomserver.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, caches, base.EnableMetrics) rsAPI := roomserver.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, caches, base.EnableMetrics)
userAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, nil) userAPI := userapi.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, rsAPI, nil)
// this starts the JetStream consumers // this starts the JetStream consumers
syncapi.AddPublicRoutes(base.ProcessContext, base.Routers, base.Cfg, base.ConnectionManager, &natsInstance, userAPI, rsAPI, caches, base.EnableMetrics) syncapi.AddPublicRoutes(base.ProcessContext, base.Routers, base.Cfg, base.ConnectionManager, &natsInstance, userAPI, rsAPI, caches, base.EnableMetrics)

View file

@ -32,7 +32,7 @@ func TestJoinRoomByIDOrAlias(t *testing.T) {
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics) caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
natsInstance := jetstream.NATSInstance{} natsInstance := jetstream.NATSInstance{}
rsAPI := roomserver.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, caches, base.EnableMetrics) rsAPI := roomserver.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, caches, base.EnableMetrics)
userAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, nil) userAPI := userapi.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, rsAPI, nil)
asAPI := appservice.NewInternalAPI(base.ProcessContext, base.Cfg, &natsInstance, userAPI, rsAPI) asAPI := appservice.NewInternalAPI(base.ProcessContext, base.Cfg, &natsInstance, userAPI, rsAPI)
rsAPI.SetFederationAPI(nil, nil) // creates the rs.Inputer etc rsAPI.SetFederationAPI(nil, nil) // creates the rs.Inputer etc

View file

@ -42,7 +42,7 @@ func TestLogin(t *testing.T) {
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics) caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
rsAPI := roomserver.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, caches, base.EnableMetrics) rsAPI := roomserver.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, caches, base.EnableMetrics)
// Needed for /login // Needed for /login
userAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, nil) userAPI := userapi.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, rsAPI, nil)
// We mostly need the userAPI for this test, so nil for other APIs/caches etc. // We mostly need the userAPI for this test, so nil for other APIs/caches etc.
Setup(base.Routers, base.Cfg, nil, nil, userAPI, nil, nil, nil, nil, nil, nil, nil, base.EnableMetrics) Setup(base.Routers, base.Cfg, nil, nil, userAPI, nil, nil, nil, nil, nil, nil, nil, base.EnableMetrics)

View file

@ -412,7 +412,7 @@ func Test_register(t *testing.T) {
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics) caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
natsInstance := jetstream.NATSInstance{} natsInstance := jetstream.NATSInstance{}
rsAPI := roomserver.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, caches, base.EnableMetrics) rsAPI := roomserver.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, caches, base.EnableMetrics)
userAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, nil) userAPI := userapi.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, rsAPI, nil)
for _, tc := range testCases { for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
@ -585,7 +585,7 @@ func TestRegisterUserWithDisplayName(t *testing.T) {
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics) caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
natsInstance := jetstream.NATSInstance{} natsInstance := jetstream.NATSInstance{}
rsAPI := roomserver.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, caches, base.EnableMetrics) rsAPI := roomserver.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, caches, base.EnableMetrics)
userAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, nil) userAPI := userapi.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, rsAPI, nil)
deviceName, deviceID := "deviceName", "deviceID" deviceName, deviceID := "deviceName", "deviceID"
expectedDisplayName := "DisplayName" expectedDisplayName := "DisplayName"
response := completeRegistration( response := completeRegistration(
@ -625,7 +625,7 @@ func TestRegisterAdminUsingSharedSecret(t *testing.T) {
base.Cfg.ClientAPI.RegistrationSharedSecret = sharedSecret base.Cfg.ClientAPI.RegistrationSharedSecret = sharedSecret
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics) caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
rsAPI := roomserver.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, caches, base.EnableMetrics) rsAPI := roomserver.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, caches, base.EnableMetrics)
userAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, nil) userAPI := userapi.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, rsAPI, nil)
expectedDisplayName := "rabbit" expectedDisplayName := "rabbit"
jsonStr := []byte(`{"admin":true,"mac":"24dca3bba410e43fe64b9b5c28306693bf3baa9f","nonce":"759f047f312b99ff428b21d581256f8592b8976e58bc1b543972dc6147e529a79657605b52d7becd160ff5137f3de11975684319187e06901955f79e5a6c5a79","password":"wonderland","username":"alice","displayname":"rabbit"}`) jsonStr := []byte(`{"admin":true,"mac":"24dca3bba410e43fe64b9b5c28306693bf3baa9f","nonce":"759f047f312b99ff428b21d581256f8592b8976e58bc1b543972dc6147e529a79657605b52d7becd160ff5137f3de11975684319187e06901955f79e5a6c5a79","password":"wonderland","username":"alice","displayname":"rabbit"}`)

View file

@ -142,7 +142,7 @@ func (p *P2PMonolith) SetupDendrite(cfg *config.Dendrite, port int, enableRelayi
p.BaseDendrite.ProcessContext, p.BaseDendrite.Cfg, p.BaseDendrite.ConnectionManager, &natsInstance, federation, rsAPI, caches, keyRing, true, p.BaseDendrite.ProcessContext, p.BaseDendrite.Cfg, p.BaseDendrite.ConnectionManager, &natsInstance, federation, rsAPI, caches, keyRing, true,
) )
userAPI := userapi.NewInternalAPI(p.BaseDendrite, &natsInstance, rsAPI, federation) userAPI := userapi.NewInternalAPI(p.BaseDendrite.ProcessContext, p.BaseDendrite.Cfg, p.BaseDendrite.ConnectionManager, &natsInstance, rsAPI, federation)
asAPI := appservice.NewInternalAPI(p.BaseDendrite.ProcessContext, p.BaseDendrite.Cfg, &natsInstance, userAPI, rsAPI) asAPI := appservice.NewInternalAPI(p.BaseDendrite.ProcessContext, p.BaseDendrite.Cfg, &natsInstance, userAPI, rsAPI)

View file

@ -162,7 +162,7 @@ func main() {
natsInstance := jetstream.NATSInstance{} natsInstance := jetstream.NATSInstance{}
rsAPI := roomserver.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, caches, base.EnableMetrics) rsAPI := roomserver.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, caches, base.EnableMetrics)
userAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, federation) userAPI := userapi.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, rsAPI, federation)
asAPI := appservice.NewInternalAPI(base.ProcessContext, base.Cfg, &natsInstance, userAPI, rsAPI) asAPI := appservice.NewInternalAPI(base.ProcessContext, base.Cfg, &natsInstance, userAPI, rsAPI)
rsAPI.SetAppserviceAPI(asAPI) rsAPI.SetAppserviceAPI(asAPI)

View file

@ -83,7 +83,7 @@ func main() {
keyRing := fsAPI.KeyRing() keyRing := fsAPI.KeyRing()
userAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, federation) userAPI := userapi.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, rsAPI, federation)
asAPI := appservice.NewInternalAPI(base.ProcessContext, base.Cfg, &natsInstance, userAPI, rsAPI) asAPI := appservice.NewInternalAPI(base.ProcessContext, base.Cfg, &natsInstance, userAPI, rsAPI)

View file

@ -56,7 +56,7 @@ func TestUsers(t *testing.T) {
}) })
t.Run("kick users", func(t *testing.T) { t.Run("kick users", func(t *testing.T) {
usrAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, nil) usrAPI := userapi.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, rsAPI, nil)
rsAPI.SetUserAPI(usrAPI) rsAPI.SetUserAPI(usrAPI)
testKickUsers(t, rsAPI, usrAPI) testKickUsers(t, rsAPI, usrAPI)
}) })
@ -241,7 +241,7 @@ func TestPurgeRoom(t *testing.T) {
fedClient := base.CreateFederationClient() fedClient := base.CreateFederationClient()
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics) caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
rsAPI := roomserver.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, caches, base.EnableMetrics) rsAPI := roomserver.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, caches, base.EnableMetrics)
userAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, nil) userAPI := userapi.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, rsAPI, nil)
// this starts the JetStream consumers // this starts the JetStream consumers
syncapi.AddPublicRoutes(base.ProcessContext, base.Routers, base.Cfg, base.ConnectionManager, &natsInstance, userAPI, rsAPI, caches, base.EnableMetrics) syncapi.AddPublicRoutes(base.ProcessContext, base.Routers, base.Cfg, base.ConnectionManager, &natsInstance, userAPI, rsAPI, caches, base.EnableMetrics)

View file

@ -19,10 +19,12 @@ import (
fedsenderapi "github.com/matrix-org/dendrite/federationapi/api" fedsenderapi "github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/internal/pushgateway" "github.com/matrix-org/dendrite/internal/pushgateway"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/process"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
rsapi "github.com/matrix-org/dendrite/roomserver/api" rsapi "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/base"
"github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/dendrite/userapi/consumers" "github.com/matrix-org/dendrite/userapi/consumers"
@ -35,32 +37,33 @@ import (
// NewInternalAPI returns a concrete implementation of the internal API. Callers // NewInternalAPI returns a concrete implementation of the internal API. Callers
// can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes. // can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes.
func NewInternalAPI( func NewInternalAPI(
base *base.BaseDendrite, processContext *process.ProcessContext,
dendriteCfg *config.Dendrite,
cm sqlutil.Connections,
natsInstance *jetstream.NATSInstance, natsInstance *jetstream.NATSInstance,
rsAPI rsapi.UserRoomserverAPI, rsAPI rsapi.UserRoomserverAPI,
fedClient fedsenderapi.KeyserverFederationAPI, fedClient fedsenderapi.KeyserverFederationAPI,
) *internal.UserInternalAPI { ) *internal.UserInternalAPI {
cfg := &base.Cfg.UserAPI js, _ := natsInstance.Prepare(processContext, &dendriteCfg.Global.JetStream)
js, _ := natsInstance.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) appServices := dendriteCfg.Derived.ApplicationServices
appServices := base.Cfg.Derived.ApplicationServices
pgClient := pushgateway.NewHTTPClient(cfg.PushGatewayDisableTLSValidation) pgClient := pushgateway.NewHTTPClient(dendriteCfg.UserAPI.PushGatewayDisableTLSValidation)
db, err := storage.NewUserDatabase( db, err := storage.NewUserDatabase(
base.ProcessContext.Context(), processContext.Context(),
base.ConnectionManager, cm,
&cfg.AccountDatabase, &dendriteCfg.UserAPI.AccountDatabase,
cfg.Matrix.ServerName, dendriteCfg.Global.ServerName,
cfg.BCryptCost, dendriteCfg.UserAPI.BCryptCost,
cfg.OpenIDTokenLifetimeMS, dendriteCfg.UserAPI.OpenIDTokenLifetimeMS,
api.DefaultLoginTokenLifetime, api.DefaultLoginTokenLifetime,
cfg.Matrix.ServerNotices.LocalPart, dendriteCfg.UserAPI.Matrix.ServerNotices.LocalPart,
) )
if err != nil { if err != nil {
logrus.WithError(err).Panicf("failed to connect to accounts db") logrus.WithError(err).Panicf("failed to connect to accounts db")
} }
keyDB, err := storage.NewKeyDatabase(base.ConnectionManager, &base.Cfg.KeyServer.Database) keyDB, err := storage.NewKeyDatabase(cm, &dendriteCfg.KeyServer.Database)
if err != nil { if err != nil {
logrus.WithError(err).Panicf("failed to connect to key db") logrus.WithError(err).Panicf("failed to connect to key db")
} }
@ -71,11 +74,11 @@ func NewInternalAPI(
// it's handled by clientapi, and hence uses its topic. When user // it's handled by clientapi, and hence uses its topic. When user
// API handles it for all account data, we can remove it from // API handles it for all account data, we can remove it from
// here. // here.
cfg.Matrix.JetStream.Prefixed(jetstream.OutputClientData), dendriteCfg.Global.JetStream.Prefixed(jetstream.OutputClientData),
cfg.Matrix.JetStream.Prefixed(jetstream.OutputNotificationData), dendriteCfg.Global.JetStream.Prefixed(jetstream.OutputNotificationData),
) )
keyChangeProducer := &producers.KeyChange{ keyChangeProducer := &producers.KeyChange{
Topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputKeyChangeEvent), Topic: dendriteCfg.Global.JetStream.Prefixed(jetstream.OutputKeyChangeEvent),
JetStream: js, JetStream: js,
DB: keyDB, DB: keyDB,
} }
@ -85,15 +88,15 @@ func NewInternalAPI(
KeyDatabase: keyDB, KeyDatabase: keyDB,
SyncProducer: syncProducer, SyncProducer: syncProducer,
KeyChangeProducer: keyChangeProducer, KeyChangeProducer: keyChangeProducer,
Config: cfg, Config: &dendriteCfg.UserAPI,
AppServices: appServices, AppServices: appServices,
RSAPI: rsAPI, RSAPI: rsAPI,
DisableTLSValidation: cfg.PushGatewayDisableTLSValidation, DisableTLSValidation: dendriteCfg.UserAPI.PushGatewayDisableTLSValidation,
PgClient: pgClient, PgClient: pgClient,
FedClient: fedClient, FedClient: fedClient,
} }
updater := internal.NewDeviceListUpdater(base.ProcessContext, keyDB, userAPI, keyChangeProducer, fedClient, 8, rsAPI, cfg.Matrix.ServerName) // 8 workers TODO: configurable updater := internal.NewDeviceListUpdater(processContext, keyDB, userAPI, keyChangeProducer, fedClient, 8, rsAPI, dendriteCfg.Global.ServerName) // 8 workers TODO: configurable
userAPI.Updater = updater userAPI.Updater = updater
// Remove users which we don't share a room with anymore // Remove users which we don't share a room with anymore
if err := updater.CleanUp(); err != nil { if err := updater.CleanUp(); err != nil {
@ -107,28 +110,28 @@ func NewInternalAPI(
}() }()
dlConsumer := consumers.NewDeviceListUpdateConsumer( dlConsumer := consumers.NewDeviceListUpdateConsumer(
base.ProcessContext, cfg, js, updater, processContext, &dendriteCfg.UserAPI, js, updater,
) )
if err := dlConsumer.Start(); err != nil { if err := dlConsumer.Start(); err != nil {
logrus.WithError(err).Panic("failed to start device list consumer") logrus.WithError(err).Panic("failed to start device list consumer")
} }
sigConsumer := consumers.NewSigningKeyUpdateConsumer( sigConsumer := consumers.NewSigningKeyUpdateConsumer(
base.ProcessContext, cfg, js, userAPI, processContext, &dendriteCfg.UserAPI, js, userAPI,
) )
if err := sigConsumer.Start(); err != nil { if err := sigConsumer.Start(); err != nil {
logrus.WithError(err).Panic("failed to start signing key consumer") logrus.WithError(err).Panic("failed to start signing key consumer")
} }
receiptConsumer := consumers.NewOutputReceiptEventConsumer( receiptConsumer := consumers.NewOutputReceiptEventConsumer(
base.ProcessContext, cfg, js, db, syncProducer, pgClient, processContext, &dendriteCfg.UserAPI, js, db, syncProducer, pgClient,
) )
if err := receiptConsumer.Start(); err != nil { if err := receiptConsumer.Start(); err != nil {
logrus.WithError(err).Panic("failed to start user API receipt consumer") logrus.WithError(err).Panic("failed to start user API receipt consumer")
} }
eventConsumer := consumers.NewOutputRoomEventConsumer( eventConsumer := consumers.NewOutputRoomEventConsumer(
base.ProcessContext, cfg, js, db, pgClient, rsAPI, syncProducer, processContext, &dendriteCfg.UserAPI, js, db, pgClient, rsAPI, syncProducer,
) )
if err := eventConsumer.Start(); err != nil { if err := eventConsumer.Start(); err != nil {
logrus.WithError(err).Panic("failed to start user API streamed event consumer") logrus.WithError(err).Panic("failed to start user API streamed event consumer")
@ -137,15 +140,15 @@ func NewInternalAPI(
var cleanOldNotifs func() var cleanOldNotifs func()
cleanOldNotifs = func() { cleanOldNotifs = func() {
logrus.Infof("Cleaning old notifications") logrus.Infof("Cleaning old notifications")
if err := db.DeleteOldNotifications(base.Context()); err != nil { if err := db.DeleteOldNotifications(processContext.Context()); err != nil {
logrus.WithError(err).Error("Failed to clean old notifications") logrus.WithError(err).Error("Failed to clean old notifications")
} }
time.AfterFunc(time.Hour, cleanOldNotifs) time.AfterFunc(time.Hour, cleanOldNotifs)
} }
time.AfterFunc(time.Minute, cleanOldNotifs) time.AfterFunc(time.Minute, cleanOldNotifs)
if base.Cfg.Global.ReportStats.Enabled { if dendriteCfg.Global.ReportStats.Enabled {
go util.StartPhoneHomeCollector(time.Now(), base.Cfg, db) go util.StartPhoneHomeCollector(time.Now(), dendriteCfg, db)
} }
return userAPI return userAPI