diff --git a/build/dendritejs-pinecone/main.go b/build/dendritejs-pinecone/main.go index c1cbc98d7..a717e62e8 100644 --- a/build/dendritejs-pinecone/main.go +++ b/build/dendritejs-pinecone/main.go @@ -196,7 +196,7 @@ func startup() { ) rsAPI.SetAppserviceAPI(asQuery) caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.EnableMetrics) - fedSenderAPI := federationapi.NewInternalAPI(base, &natsInstance, federation, rsAPI, caches, keyRing, true) + fedSenderAPI := federationapi.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, federation, rsAPI, caches, keyRing, true) rsAPI.SetFederationAPI(fedSenderAPI, keyRing) monolith := setup.Monolith{ diff --git a/build/gobind-yggdrasil/monolith.go b/build/gobind-yggdrasil/monolith.go index 609726d15..75ecd02ba 100644 --- a/build/gobind-yggdrasil/monolith.go +++ b/build/gobind-yggdrasil/monolith.go @@ -165,7 +165,7 @@ func (m *DendriteMonolith) Start() { rsAPI := roomserver.NewInternalAPI(base, &natsInstance, caches) fsAPI := federationapi.NewInternalAPI( - base, &natsInstance, federation, rsAPI, caches, keyRing, true, + base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, federation, rsAPI, caches, keyRing, true, ) userAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, federation) diff --git a/clientapi/admin_test.go b/clientapi/admin_test.go index 7db1966c2..b6a39308f 100644 --- a/clientapi/admin_test.go +++ b/clientapi/admin_test.go @@ -161,7 +161,7 @@ func TestPurgeRoom(t *testing.T) { // this starts the JetStream consumers syncapi.AddPublicRoutes(base, &natsInstance, userAPI, rsAPI, caches) - federationapi.NewInternalAPI(base, &natsInstance, fedClient, rsAPI, caches, nil, true) + federationapi.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, fedClient, rsAPI, caches, nil, true) rsAPI.SetFederationAPI(nil, nil) // Create the room diff --git a/cmd/dendrite-demo-pinecone/monolith/monolith.go b/cmd/dendrite-demo-pinecone/monolith/monolith.go index 9a1e9b3f0..1e1ddb156 100644 --- a/cmd/dendrite-demo-pinecone/monolith/monolith.go +++ b/cmd/dendrite-demo-pinecone/monolith/monolith.go @@ -139,7 +139,7 @@ func (p *P2PMonolith) SetupDendrite(cfg *config.Dendrite, port int, enableRelayi natsInstance := jetstream.NATSInstance{} rsAPI := roomserver.NewInternalAPI(p.BaseDendrite, &natsInstance, caches) fsAPI := federationapi.NewInternalAPI( - p.BaseDendrite, &natsInstance, federation, rsAPI, caches, keyRing, true, + p.BaseDendrite.ProcessContext, p.BaseDendrite.Cfg, p.BaseDendrite.ConnectionManager, &natsInstance, federation, rsAPI, caches, keyRing, true, ) userAPI := userapi.NewInternalAPI(p.BaseDendrite, &natsInstance, rsAPI, federation) diff --git a/cmd/dendrite-demo-yggdrasil/main.go b/cmd/dendrite-demo-yggdrasil/main.go index cc40d620f..c17593f37 100644 --- a/cmd/dendrite-demo-yggdrasil/main.go +++ b/cmd/dendrite-demo-yggdrasil/main.go @@ -167,7 +167,7 @@ func main() { asAPI := appservice.NewInternalAPI(base.ProcessContext, base.Cfg, &natsInstance, userAPI, rsAPI) rsAPI.SetAppserviceAPI(asAPI) fsAPI := federationapi.NewInternalAPI( - base, &natsInstance, federation, rsAPI, caches, keyRing, true, + base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, federation, rsAPI, caches, keyRing, true, ) rsAPI.SetFederationAPI(fsAPI, keyRing) diff --git a/cmd/dendrite/main.go b/cmd/dendrite/main.go index 84d9adfc7..96f36bb03 100644 --- a/cmd/dendrite/main.go +++ b/cmd/dendrite/main.go @@ -78,7 +78,7 @@ func main() { natsInstance := jetstream.NATSInstance{} rsAPI := roomserver.NewInternalAPI(base, &natsInstance, caches) fsAPI := federationapi.NewInternalAPI( - base, &natsInstance, federation, rsAPI, caches, nil, false, + base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, federation, rsAPI, caches, nil, false, ) keyRing := fsAPI.KeyRing() diff --git a/federationapi/federationapi.go b/federationapi/federationapi.go index 2c0748749..3b5394a13 100644 --- a/federationapi/federationapi.go +++ b/federationapi/federationapi.go @@ -17,6 +17,10 @@ package federationapi import ( "time" + "github.com/matrix-org/dendrite/internal/httputil" + "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/process" "github.com/sirupsen/logrus" "github.com/matrix-org/dendrite/federationapi/api" @@ -29,7 +33,6 @@ import ( "github.com/matrix-org/dendrite/federationapi/storage" "github.com/matrix-org/dendrite/internal/caching" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" - "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/jetstream" userapi "github.com/matrix-org/dendrite/userapi/api" @@ -40,7 +43,9 @@ import ( // AddPublicRoutes sets up and registers HTTP handlers on the base API muxes for the FederationAPI component. func AddPublicRoutes( - base *base.BaseDendrite, + processContext *process.ProcessContext, + routers httputil.Routers, + dendriteConfig *config.Dendrite, natsInstance *jetstream.NATSInstance, userAPI userapi.FederationUserAPI, federation *gomatrixserverlib.FederationClient, @@ -48,10 +53,11 @@ func AddPublicRoutes( rsAPI roomserverAPI.FederationRoomserverAPI, fedAPI federationAPI.FederationInternalAPI, servers federationAPI.ServersInRoomProvider, + enableMetrics bool, ) { - cfg := &base.Cfg.FederationAPI - mscCfg := &base.Cfg.MSCs - js, _ := natsInstance.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) + cfg := &dendriteConfig.FederationAPI + mscCfg := &dendriteConfig.MSCs + js, _ := natsInstance.Prepare(processContext, &cfg.Matrix.JetStream) producer := &producers.SyncAPIProducer{ JetStream: js, TopicReceiptEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent), @@ -76,17 +82,20 @@ func AddPublicRoutes( } routing.Setup( - base, + routers, + dendriteConfig, rsAPI, f, keyRing, federation, userAPI, mscCfg, - servers, producer, + servers, producer, enableMetrics, ) } // NewInternalAPI returns a concerete implementation of the internal API. Callers // can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes. func NewInternalAPI( - base *base.BaseDendrite, + processContext *process.ProcessContext, + dendriteCfg *config.Dendrite, + cm sqlutil.Connections, natsInstance *jetstream.NATSInstance, federation api.FederationClient, rsAPI roomserverAPI.FederationRoomserverAPI, @@ -94,9 +103,9 @@ func NewInternalAPI( keyRing *gomatrixserverlib.KeyRing, resetBlacklist bool, ) api.FederationInternalAPI { - cfg := &base.Cfg.FederationAPI + cfg := &dendriteCfg.FederationAPI - federationDB, err := storage.NewDatabase(base.ProcessContext.Context(), base.ConnectionManager, &cfg.Database, caches, base.Cfg.Global.IsLocalServerName) + federationDB, err := storage.NewDatabase(processContext.Context(), cm, &cfg.Database, caches, dendriteCfg.Global.IsLocalServerName) if err != nil { logrus.WithError(err).Panic("failed to connect to federation sender db") } @@ -110,51 +119,51 @@ func NewInternalAPI( cfg.FederationMaxRetries+1, cfg.P2PFederationRetriesUntilAssumedOffline+1) - js, nats := natsInstance.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) + js, nats := natsInstance.Prepare(processContext, &cfg.Matrix.JetStream) - signingInfo := base.Cfg.Global.SigningIdentities() + signingInfo := dendriteCfg.Global.SigningIdentities() queues := queue.NewOutgoingQueues( - federationDB, base.ProcessContext, + federationDB, processContext, cfg.Matrix.DisableFederation, cfg.Matrix.ServerName, federation, rsAPI, &stats, signingInfo, ) rsConsumer := consumers.NewOutputRoomEventConsumer( - base.ProcessContext, cfg, js, nats, queues, + processContext, cfg, js, nats, queues, federationDB, rsAPI, ) if err = rsConsumer.Start(); err != nil { logrus.WithError(err).Panic("failed to start room server consumer") } tsConsumer := consumers.NewOutputSendToDeviceConsumer( - base.ProcessContext, cfg, js, queues, federationDB, + processContext, cfg, js, queues, federationDB, ) if err = tsConsumer.Start(); err != nil { logrus.WithError(err).Panic("failed to start send-to-device consumer") } receiptConsumer := consumers.NewOutputReceiptConsumer( - base.ProcessContext, cfg, js, queues, federationDB, + processContext, cfg, js, queues, federationDB, ) if err = receiptConsumer.Start(); err != nil { logrus.WithError(err).Panic("failed to start receipt consumer") } typingConsumer := consumers.NewOutputTypingConsumer( - base.ProcessContext, cfg, js, queues, federationDB, + processContext, cfg, js, queues, federationDB, ) if err = typingConsumer.Start(); err != nil { logrus.WithError(err).Panic("failed to start typing consumer") } keyConsumer := consumers.NewKeyChangeConsumer( - base.ProcessContext, &base.Cfg.KeyServer, js, queues, federationDB, rsAPI, + processContext, &dendriteCfg.KeyServer, js, queues, federationDB, rsAPI, ) if err = keyConsumer.Start(); err != nil { logrus.WithError(err).Panic("failed to start key server consumer") } presenceConsumer := consumers.NewOutputPresenceConsumer( - base.ProcessContext, cfg, js, queues, federationDB, rsAPI, + processContext, cfg, js, queues, federationDB, rsAPI, ) if err = presenceConsumer.Start(); err != nil { logrus.WithError(err).Panic("failed to start presence consumer") @@ -163,7 +172,7 @@ func NewInternalAPI( var cleanExpiredEDUs func() cleanExpiredEDUs = func() { logrus.Infof("Cleaning expired EDUs") - if err := federationDB.DeleteExpiredEDUs(base.Context()); err != nil { + if err := federationDB.DeleteExpiredEDUs(processContext.Context()); err != nil { logrus.WithError(err).Error("Failed to clean expired EDUs") } time.AfterFunc(time.Hour, cleanExpiredEDUs) diff --git a/federationapi/federationapi_keys_test.go b/federationapi/federationapi_keys_test.go index 4ab33f639..10ba7d289 100644 --- a/federationapi/federationapi_keys_test.go +++ b/federationapi/federationapi_keys_test.go @@ -111,7 +111,7 @@ func TestMain(m *testing.M) { // Finally, build the server key APIs. sbase := base.NewBaseDendrite(cfg, base.DisableMetrics) - s.api = NewInternalAPI(sbase, &natsInstance, s.fedclient, nil, s.cache, nil, true) + s.api = NewInternalAPI(sbase.ProcessContext, sbase.Cfg, sbase.ConnectionManager, &natsInstance, s.fedclient, nil, s.cache, nil, true) } // Now that we have built our server key APIs, start the diff --git a/federationapi/federationapi_test.go b/federationapi/federationapi_test.go index f499af77c..634699439 100644 --- a/federationapi/federationapi_test.go +++ b/federationapi/federationapi_test.go @@ -215,7 +215,7 @@ func testFederationAPIJoinThenKeyUpdate(t *testing.T, dbType test.DBType) { }, }, } - fsapi := federationapi.NewInternalAPI(base, &natsInstance, fc, rsapi, caches, nil, false) + fsapi := federationapi.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, fc, rsapi, caches, nil, false) var resp api.PerformJoinResponse fsapi.PerformJoin(context.Background(), &api.PerformJoinRequest{ @@ -281,7 +281,7 @@ func TestRoomsV3URLEscapeDoNot404(t *testing.T) { natsInstance := jetstream.NATSInstance{} // TODO: This is pretty fragile, as if anything calls anything on these nils this test will break. // Unfortunately, it makes little sense to instantiate these dependencies when we just want to test routing. - federationapi.AddPublicRoutes(b, &natsInstance, nil, nil, keyRing, nil, &internal.FederationInternalAPI{}, nil) + federationapi.AddPublicRoutes(b.ProcessContext, b.Routers, b.Cfg, &natsInstance, nil, nil, keyRing, nil, &internal.FederationInternalAPI{}, nil, false) baseURL, cancel := test.ListenAndServe(t, b.Routers.Federation, true) defer cancel() serverName := gomatrixserverlib.ServerName(strings.TrimPrefix(baseURL, "https://")) diff --git a/federationapi/routing/profile_test.go b/federationapi/routing/profile_test.go index 82f64bd35..f8b6bcc9c 100644 --- a/federationapi/routing/profile_test.go +++ b/federationapi/routing/profile_test.go @@ -58,13 +58,13 @@ func TestHandleQueryProfile(t *testing.T) { fedClient := fakeFedClient{} serverKeyAPI := &signing.YggdrasilKeys{} keyRing := serverKeyAPI.KeyRing() - fedapi := fedAPI.NewInternalAPI(base, &natsInstance, &fedClient, nil, nil, keyRing, true) + fedapi := fedAPI.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, &fedClient, nil, nil, keyRing, true) userapi := fakeUserAPI{} r, ok := fedapi.(*fedInternal.FederationInternalAPI) if !ok { panic("This is a programming error.") } - routing.Setup(base, nil, r, keyRing, &fedClient, &userapi, &base.Cfg.MSCs, nil, nil) + routing.Setup(base.Routers, base.Cfg, nil, r, keyRing, &fedClient, &userapi, &base.Cfg.MSCs, nil, nil, base.EnableMetrics) handler := fedMux.Get(routing.QueryProfileRouteName).GetHandler().ServeHTTP _, sk, _ := ed25519.GenerateKey(nil) diff --git a/federationapi/routing/query_test.go b/federationapi/routing/query_test.go index d3214bf6b..3429eafec 100644 --- a/federationapi/routing/query_test.go +++ b/federationapi/routing/query_test.go @@ -58,13 +58,13 @@ func TestHandleQueryDirectory(t *testing.T) { fedClient := fakeFedClient{} serverKeyAPI := &signing.YggdrasilKeys{} keyRing := serverKeyAPI.KeyRing() - fedapi := fedAPI.NewInternalAPI(base, &natsInstance, &fedClient, nil, nil, keyRing, true) + fedapi := fedAPI.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, &fedClient, nil, nil, keyRing, true) userapi := fakeUserAPI{} r, ok := fedapi.(*fedInternal.FederationInternalAPI) if !ok { panic("This is a programming error.") } - routing.Setup(base, nil, r, keyRing, &fedClient, &userapi, &base.Cfg.MSCs, nil, nil) + routing.Setup(base.Routers, base.Cfg, nil, r, keyRing, &fedClient, &userapi, &base.Cfg.MSCs, nil, nil, base.EnableMetrics) handler := fedMux.Get(routing.QueryDirectoryRouteName).GetHandler().ServeHTTP _, sk, _ := ed25519.GenerateKey(nil) diff --git a/federationapi/routing/routing.go b/federationapi/routing/routing.go index c86d18d2f..a1f943e77 100644 --- a/federationapi/routing/routing.go +++ b/federationapi/routing/routing.go @@ -31,7 +31,6 @@ import ( "github.com/matrix-org/dendrite/internal/httputil" "github.com/matrix-org/dendrite/roomserver/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" - "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" @@ -55,7 +54,8 @@ const ( // applied: // nolint: gocyclo func Setup( - base *base.BaseDendrite, + routers httputil.Routers, + dendriteCfg *config.Dendrite, rsAPI roomserverAPI.FederationRoomserverAPI, fsAPI *fedInternal.FederationInternalAPI, keys gomatrixserverlib.JSONVerifier, @@ -63,14 +63,14 @@ func Setup( userAPI userapi.FederationUserAPI, mscCfg *config.MSCs, servers federationAPI.ServersInRoomProvider, - producer *producers.SyncAPIProducer, + producer *producers.SyncAPIProducer, enableMetrics bool, ) { - fedMux := base.Routers.Federation - keyMux := base.Routers.Keys - wkMux := base.Routers.WellKnown - cfg := &base.Cfg.FederationAPI + fedMux := routers.Federation + keyMux := routers.Keys + wkMux := routers.WellKnown + cfg := &dendriteCfg.FederationAPI - if base.EnableMetrics { + if enableMetrics { prometheus.MustRegister( internal.PDUCountTotal, internal.EDUCountTotal, ) diff --git a/federationapi/routing/send_test.go b/federationapi/routing/send_test.go index c65196f17..340521dc0 100644 --- a/federationapi/routing/send_test.go +++ b/federationapi/routing/send_test.go @@ -53,14 +53,14 @@ func TestHandleSend(t *testing.T) { base.Routers.Federation = fedMux base.Cfg.FederationAPI.Matrix.SigningIdentity.ServerName = testOrigin base.Cfg.FederationAPI.Matrix.Metrics.Enabled = false - fedapi := fedAPI.NewInternalAPI(base, &natsInstance, nil, nil, nil, nil, true) + fedapi := fedAPI.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, nil, nil, nil, nil, true) serverKeyAPI := &signing.YggdrasilKeys{} keyRing := serverKeyAPI.KeyRing() r, ok := fedapi.(*fedInternal.FederationInternalAPI) if !ok { panic("This is a programming error.") } - routing.Setup(base, nil, r, keyRing, nil, nil, &base.Cfg.MSCs, nil, nil) + routing.Setup(base.Routers, base.Cfg, nil, r, keyRing, nil, nil, &base.Cfg.MSCs, nil, nil, base.EnableMetrics) handler := fedMux.Get(routing.SendRouteName).GetHandler().ServeHTTP _, sk, _ := ed25519.GenerateKey(nil) diff --git a/roomserver/roomserver_test.go b/roomserver/roomserver_test.go index 039bbc57a..02b263329 100644 --- a/roomserver/roomserver_test.go +++ b/roomserver/roomserver_test.go @@ -245,7 +245,7 @@ func TestPurgeRoom(t *testing.T) { // this starts the JetStream consumers syncapi.AddPublicRoutes(base, &natsInstance, userAPI, rsAPI, caches) - federationapi.NewInternalAPI(base, &natsInstance, fedClient, rsAPI, caches, nil, true) + federationapi.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, fedClient, rsAPI, caches, nil, true) rsAPI.SetFederationAPI(nil, nil) // Create the room diff --git a/setup/monolith.go b/setup/monolith.go index 348337c11..611b90dfe 100644 --- a/setup/monolith.go +++ b/setup/monolith.go @@ -65,7 +65,7 @@ func (m *Monolith) AddAllPublicRoutes(base *base.BaseDendrite, natsInstance *jet m.ExtPublicRoomsProvider, base.EnableMetrics, ) federationapi.AddPublicRoutes( - base, natsInstance, m.UserAPI, m.FedClient, m.KeyRing, m.RoomserverAPI, m.FederationAPI, nil, + base.ProcessContext, base.Routers, base.Cfg, natsInstance, m.UserAPI, m.FedClient, m.KeyRing, m.RoomserverAPI, m.FederationAPI, nil, base.EnableMetrics, ) mediaapi.AddPublicRoutes(base.Routers.Media, base.ConnectionManager, base.Cfg, m.UserAPI, m.Client) syncapi.AddPublicRoutes(base, natsInstance, m.UserAPI, m.RoomserverAPI, caches)