diff --git a/clientapi/admin_test.go b/clientapi/admin_test.go index ec62a4b30..07407f2ac 100644 --- a/clientapi/admin_test.go +++ b/clientapi/admin_test.go @@ -160,7 +160,7 @@ func TestPurgeRoom(t *testing.T) { userAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, nil) // this starts the JetStream consumers - syncapi.AddPublicRoutes(base, &natsInstance, userAPI, rsAPI, caches) + syncapi.AddPublicRoutes(base.ProcessContext, base.Routers, base.Cfg, base.ConnectionManager, &natsInstance, userAPI, rsAPI, caches, base.EnableMetrics) federationapi.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, fedClient, rsAPI, caches, nil, true) rsAPI.SetFederationAPI(nil, nil) diff --git a/roomserver/roomserver_test.go b/roomserver/roomserver_test.go index 5783a45d1..03782cf62 100644 --- a/roomserver/roomserver_test.go +++ b/roomserver/roomserver_test.go @@ -244,7 +244,7 @@ func TestPurgeRoom(t *testing.T) { userAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, nil) // this starts the JetStream consumers - syncapi.AddPublicRoutes(base, &natsInstance, userAPI, rsAPI, caches) + syncapi.AddPublicRoutes(base.ProcessContext, base.Routers, base.Cfg, base.ConnectionManager, &natsInstance, userAPI, rsAPI, caches, base.EnableMetrics) federationapi.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, fedClient, rsAPI, caches, nil, true) rsAPI.SetFederationAPI(nil, nil) diff --git a/setup/monolith.go b/setup/monolith.go index 8eed606e2..8c2ebe307 100644 --- a/setup/monolith.go +++ b/setup/monolith.go @@ -68,7 +68,7 @@ func (m *Monolith) AddAllPublicRoutes(base *base.BaseDendrite, natsInstance *jet base.ProcessContext, base.Routers, base.Cfg, natsInstance, m.UserAPI, m.FedClient, m.KeyRing, m.RoomserverAPI, m.FederationAPI, nil, base.EnableMetrics, ) mediaapi.AddPublicRoutes(base.Routers.Media, base.ConnectionManager, base.Cfg, m.UserAPI, m.Client) - syncapi.AddPublicRoutes(base, natsInstance, m.UserAPI, m.RoomserverAPI, caches) + syncapi.AddPublicRoutes(base.ProcessContext, base.Routers, base.Cfg, base.ConnectionManager, natsInstance, m.UserAPI, m.RoomserverAPI, caches, base.EnableMetrics) if m.RelayAPI != nil { relayapi.AddPublicRoutes(base.Routers, base.Cfg, m.KeyRing, m.RelayAPI) diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index 3dc64ea38..9a27b954a 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -18,12 +18,15 @@ import ( "context" "github.com/matrix-org/dendrite/internal/fulltext" + "github.com/matrix-org/dendrite/internal/httputil" + "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/process" "github.com/sirupsen/logrus" "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/roomserver/api" - "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/jetstream" userapi "github.com/matrix-org/dendrite/userapi/api" @@ -39,17 +42,19 @@ import ( // AddPublicRoutes sets up and registers HTTP handlers for the SyncAPI // component. func AddPublicRoutes( - base *base.BaseDendrite, + processContext *process.ProcessContext, + routers httputil.Routers, + dendriteCfg *config.Dendrite, + cm sqlutil.Connections, natsInstance *jetstream.NATSInstance, userAPI userapi.SyncUserAPI, rsAPI api.SyncRoomserverAPI, caches caching.LazyLoadCache, + enableMetrics bool, ) { - cfg := &base.Cfg.SyncAPI + js, natsClient := natsInstance.Prepare(processContext, &dendriteCfg.Global.JetStream) - js, natsClient := natsInstance.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) - - syncDB, err := storage.NewSyncServerDatasource(base.Context(), base.ConnectionManager, &cfg.Database) + syncDB, err := storage.NewSyncServerDatasource(processContext.Context(), cm, &dendriteCfg.SyncAPI.Database) if err != nil { logrus.WithError(err).Panicf("failed to connect to sync db") } @@ -63,32 +68,32 @@ func AddPublicRoutes( } var fts *fulltext.Search - if cfg.Fulltext.Enabled { - fts, err = fulltext.New(base.ProcessContext.Context(), cfg.Fulltext) + if dendriteCfg.SyncAPI.Fulltext.Enabled { + fts, err = fulltext.New(processContext.Context(), dendriteCfg.SyncAPI.Fulltext) if err != nil { logrus.WithError(err).Panicf("failed to create full text") } - base.ProcessContext.ComponentStarted() + processContext.ComponentStarted() } federationPresenceProducer := &producers.FederationAPIPresenceProducer{ - Topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputPresenceEvent), + Topic: dendriteCfg.Global.JetStream.Prefixed(jetstream.OutputPresenceEvent), JetStream: js, } presenceConsumer := consumers.NewPresenceConsumer( - base.ProcessContext, cfg, js, natsClient, syncDB, + processContext, &dendriteCfg.SyncAPI, js, natsClient, syncDB, notifier, streams.PresenceStreamProvider, userAPI, ) - requestPool := sync.NewRequestPool(syncDB, cfg, userAPI, rsAPI, streams, notifier, federationPresenceProducer, presenceConsumer, base.EnableMetrics) + requestPool := sync.NewRequestPool(syncDB, &dendriteCfg.SyncAPI, userAPI, rsAPI, streams, notifier, federationPresenceProducer, presenceConsumer, enableMetrics) if err = presenceConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start presence consumer") } keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer( - base.ProcessContext, cfg, cfg.Matrix.JetStream.Prefixed(jetstream.OutputKeyChangeEvent), + processContext, &dendriteCfg.SyncAPI, dendriteCfg.Global.JetStream.Prefixed(jetstream.OutputKeyChangeEvent), js, rsAPI, syncDB, notifier, streams.DeviceListStreamProvider, ) @@ -97,7 +102,7 @@ func AddPublicRoutes( } roomConsumer := consumers.NewOutputRoomEventConsumer( - base.ProcessContext, cfg, js, syncDB, notifier, streams.PDUStreamProvider, + processContext, &dendriteCfg.SyncAPI, js, syncDB, notifier, streams.PDUStreamProvider, streams.InviteStreamProvider, rsAPI, fts, ) if err = roomConsumer.Start(); err != nil { @@ -105,7 +110,7 @@ func AddPublicRoutes( } clientConsumer := consumers.NewOutputClientDataConsumer( - base.ProcessContext, cfg, js, natsClient, syncDB, notifier, + processContext, &dendriteCfg.SyncAPI, js, natsClient, syncDB, notifier, streams.AccountDataStreamProvider, fts, ) if err = clientConsumer.Start(); err != nil { @@ -113,35 +118,35 @@ func AddPublicRoutes( } notificationConsumer := consumers.NewOutputNotificationDataConsumer( - base.ProcessContext, cfg, js, syncDB, notifier, streams.NotificationDataStreamProvider, + processContext, &dendriteCfg.SyncAPI, js, syncDB, notifier, streams.NotificationDataStreamProvider, ) if err = notificationConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start notification data consumer") } typingConsumer := consumers.NewOutputTypingEventConsumer( - base.ProcessContext, cfg, js, eduCache, notifier, streams.TypingStreamProvider, + processContext, &dendriteCfg.SyncAPI, js, eduCache, notifier, streams.TypingStreamProvider, ) if err = typingConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start typing consumer") } sendToDeviceConsumer := consumers.NewOutputSendToDeviceEventConsumer( - base.ProcessContext, cfg, js, syncDB, userAPI, notifier, streams.SendToDeviceStreamProvider, + processContext, &dendriteCfg.SyncAPI, js, syncDB, userAPI, notifier, streams.SendToDeviceStreamProvider, ) if err = sendToDeviceConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start send-to-device consumer") } receiptConsumer := consumers.NewOutputReceiptEventConsumer( - base.ProcessContext, cfg, js, syncDB, notifier, streams.ReceiptStreamProvider, + processContext, &dendriteCfg.SyncAPI, js, syncDB, notifier, streams.ReceiptStreamProvider, ) if err = receiptConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start receipts consumer") } routing.Setup( - base.Routers.Client, requestPool, syncDB, userAPI, - rsAPI, cfg, caches, fts, + routers.Client, requestPool, syncDB, userAPI, + rsAPI, &dendriteCfg.SyncAPI, caches, fts, ) } diff --git a/syncapi/syncapi_test.go b/syncapi/syncapi_test.go index 7b15a6b9e..1c4e2cc88 100644 --- a/syncapi/syncapi_test.go +++ b/syncapi/syncapi_test.go @@ -122,7 +122,7 @@ func testSyncAccessTokens(t *testing.T, dbType test.DBType) { jsctx, _ := natsInstance.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream) defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream) msgs := toNATSMsgs(t, base, room.Events()...) - AddPublicRoutes(base, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, caches) + AddPublicRoutes(base.ProcessContext, base.Routers, base.Cfg, base.ConnectionManager, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, caches, base.EnableMetrics) testrig.MustPublishMsgs(t, jsctx, msgs...) testCases := []struct { @@ -223,7 +223,7 @@ func testSyncAPICreateRoomSyncEarly(t *testing.T, dbType test.DBType) { msgs := toNATSMsgs(t, base, room.Events()...) sinceTokens := make([]string, len(msgs)) caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics) - AddPublicRoutes(base, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, caches) + AddPublicRoutes(base.ProcessContext, base.Routers, base.Cfg, base.ConnectionManager, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, caches, base.EnableMetrics) for i, msg := range msgs { testrig.MustPublishMsgs(t, jsctx, msg) time.Sleep(100 * time.Millisecond) @@ -309,7 +309,7 @@ func testSyncAPIUpdatePresenceImmediately(t *testing.T, dbType test.DBType) { jsctx, _ := natsInstance.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream) defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream) caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics) - AddPublicRoutes(base, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{}, caches) + AddPublicRoutes(base.ProcessContext, base.Routers, base.Cfg, base.ConnectionManager, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{}, caches, base.EnableMetrics) w := httptest.NewRecorder() base.Routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{ "access_token": alice.AccessToken, @@ -428,7 +428,7 @@ func testHistoryVisibility(t *testing.T, dbType test.DBType) { caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics) rsAPI := roomserver.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, caches, base.EnableMetrics) rsAPI.SetFederationAPI(nil, nil) - AddPublicRoutes(base, &natsInstance, &syncUserAPI{accounts: []userapi.Device{aliceDev, bobDev}}, rsAPI, caches) + AddPublicRoutes(base.ProcessContext, base.Routers, base.Cfg, base.ConnectionManager, &natsInstance, &syncUserAPI{accounts: []userapi.Device{aliceDev, bobDev}}, rsAPI, caches, base.EnableMetrics) for _, tc := range testCases { testname := fmt.Sprintf("%s - %s", tc.historyVisibility, userType) @@ -729,7 +729,7 @@ func TestGetMembership(t *testing.T) { rsAPI := roomserver.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, caches, base.EnableMetrics) rsAPI.SetFederationAPI(nil, nil) - AddPublicRoutes(base, &natsInstance, &syncUserAPI{accounts: []userapi.Device{aliceDev, bobDev}}, rsAPI, caches) + AddPublicRoutes(base.ProcessContext, base.Routers, base.Cfg, base.ConnectionManager, &natsInstance, &syncUserAPI{accounts: []userapi.Device{aliceDev, bobDev}}, rsAPI, caches, base.EnableMetrics) for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { @@ -797,7 +797,7 @@ func testSendToDevice(t *testing.T, dbType test.DBType) { jsctx, _ := natsInstance.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream) defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream) caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics) - AddPublicRoutes(base, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{}, caches) + AddPublicRoutes(base.ProcessContext, base.Routers, base.Cfg, base.ConnectionManager, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{}, caches, base.EnableMetrics) producer := producers.SyncAPIProducer{ TopicSendToDeviceEvent: base.Cfg.Global.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent), @@ -1018,7 +1018,7 @@ func testContext(t *testing.T, dbType test.DBType) { rsAPI := roomserver.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, caches, base.EnableMetrics) rsAPI.SetFederationAPI(nil, nil) - AddPublicRoutes(base, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, rsAPI, caches) + AddPublicRoutes(base.ProcessContext, base.Routers, base.Cfg, base.ConnectionManager, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, rsAPI, caches, base.EnableMetrics) room := test.NewRoom(t, user)