mirror of
https://github.com/matrix-org/dendrite.git
synced 2026-01-16 18:43:10 -06:00
Remove BaseDendrite from SyncAPI
This commit is contained in:
parent
5ef86d9679
commit
72d48335cc
|
|
@ -160,7 +160,7 @@ func TestPurgeRoom(t *testing.T) {
|
||||||
userAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, nil)
|
userAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, nil)
|
||||||
|
|
||||||
// this starts the JetStream consumers
|
// this starts the JetStream consumers
|
||||||
syncapi.AddPublicRoutes(base, &natsInstance, userAPI, rsAPI, caches)
|
syncapi.AddPublicRoutes(base.ProcessContext, base.Routers, base.Cfg, base.ConnectionManager, &natsInstance, userAPI, rsAPI, caches, base.EnableMetrics)
|
||||||
federationapi.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, fedClient, rsAPI, caches, nil, true)
|
federationapi.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, fedClient, rsAPI, caches, nil, true)
|
||||||
rsAPI.SetFederationAPI(nil, nil)
|
rsAPI.SetFederationAPI(nil, nil)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -244,7 +244,7 @@ func TestPurgeRoom(t *testing.T) {
|
||||||
userAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, nil)
|
userAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, nil)
|
||||||
|
|
||||||
// this starts the JetStream consumers
|
// this starts the JetStream consumers
|
||||||
syncapi.AddPublicRoutes(base, &natsInstance, userAPI, rsAPI, caches)
|
syncapi.AddPublicRoutes(base.ProcessContext, base.Routers, base.Cfg, base.ConnectionManager, &natsInstance, userAPI, rsAPI, caches, base.EnableMetrics)
|
||||||
federationapi.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, fedClient, rsAPI, caches, nil, true)
|
federationapi.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, fedClient, rsAPI, caches, nil, true)
|
||||||
rsAPI.SetFederationAPI(nil, nil)
|
rsAPI.SetFederationAPI(nil, nil)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -68,7 +68,7 @@ func (m *Monolith) AddAllPublicRoutes(base *base.BaseDendrite, natsInstance *jet
|
||||||
base.ProcessContext, base.Routers, base.Cfg, natsInstance, m.UserAPI, m.FedClient, m.KeyRing, m.RoomserverAPI, m.FederationAPI, nil, base.EnableMetrics,
|
base.ProcessContext, base.Routers, base.Cfg, natsInstance, m.UserAPI, m.FedClient, m.KeyRing, m.RoomserverAPI, m.FederationAPI, nil, base.EnableMetrics,
|
||||||
)
|
)
|
||||||
mediaapi.AddPublicRoutes(base.Routers.Media, base.ConnectionManager, base.Cfg, m.UserAPI, m.Client)
|
mediaapi.AddPublicRoutes(base.Routers.Media, base.ConnectionManager, base.Cfg, m.UserAPI, m.Client)
|
||||||
syncapi.AddPublicRoutes(base, natsInstance, m.UserAPI, m.RoomserverAPI, caches)
|
syncapi.AddPublicRoutes(base.ProcessContext, base.Routers, base.Cfg, base.ConnectionManager, natsInstance, m.UserAPI, m.RoomserverAPI, caches, base.EnableMetrics)
|
||||||
|
|
||||||
if m.RelayAPI != nil {
|
if m.RelayAPI != nil {
|
||||||
relayapi.AddPublicRoutes(base.Routers, base.Cfg, m.KeyRing, m.RelayAPI)
|
relayapi.AddPublicRoutes(base.Routers, base.Cfg, m.KeyRing, m.RelayAPI)
|
||||||
|
|
|
||||||
|
|
@ -18,12 +18,15 @@ import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/internal/fulltext"
|
"github.com/matrix-org/dendrite/internal/fulltext"
|
||||||
|
"github.com/matrix-org/dendrite/internal/httputil"
|
||||||
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
|
"github.com/matrix-org/dendrite/setup/config"
|
||||||
|
"github.com/matrix-org/dendrite/setup/process"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/internal/caching"
|
"github.com/matrix-org/dendrite/internal/caching"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
"github.com/matrix-org/dendrite/setup/base"
|
|
||||||
"github.com/matrix-org/dendrite/setup/jetstream"
|
"github.com/matrix-org/dendrite/setup/jetstream"
|
||||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||||
|
|
||||||
|
|
@ -39,17 +42,19 @@ import (
|
||||||
// AddPublicRoutes sets up and registers HTTP handlers for the SyncAPI
|
// AddPublicRoutes sets up and registers HTTP handlers for the SyncAPI
|
||||||
// component.
|
// component.
|
||||||
func AddPublicRoutes(
|
func AddPublicRoutes(
|
||||||
base *base.BaseDendrite,
|
processContext *process.ProcessContext,
|
||||||
|
routers httputil.Routers,
|
||||||
|
dendriteCfg *config.Dendrite,
|
||||||
|
cm sqlutil.Connections,
|
||||||
natsInstance *jetstream.NATSInstance,
|
natsInstance *jetstream.NATSInstance,
|
||||||
userAPI userapi.SyncUserAPI,
|
userAPI userapi.SyncUserAPI,
|
||||||
rsAPI api.SyncRoomserverAPI,
|
rsAPI api.SyncRoomserverAPI,
|
||||||
caches caching.LazyLoadCache,
|
caches caching.LazyLoadCache,
|
||||||
|
enableMetrics bool,
|
||||||
) {
|
) {
|
||||||
cfg := &base.Cfg.SyncAPI
|
js, natsClient := natsInstance.Prepare(processContext, &dendriteCfg.Global.JetStream)
|
||||||
|
|
||||||
js, natsClient := natsInstance.Prepare(base.ProcessContext, &cfg.Matrix.JetStream)
|
syncDB, err := storage.NewSyncServerDatasource(processContext.Context(), cm, &dendriteCfg.SyncAPI.Database)
|
||||||
|
|
||||||
syncDB, err := storage.NewSyncServerDatasource(base.Context(), base.ConnectionManager, &cfg.Database)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.WithError(err).Panicf("failed to connect to sync db")
|
logrus.WithError(err).Panicf("failed to connect to sync db")
|
||||||
}
|
}
|
||||||
|
|
@ -63,32 +68,32 @@ func AddPublicRoutes(
|
||||||
}
|
}
|
||||||
|
|
||||||
var fts *fulltext.Search
|
var fts *fulltext.Search
|
||||||
if cfg.Fulltext.Enabled {
|
if dendriteCfg.SyncAPI.Fulltext.Enabled {
|
||||||
fts, err = fulltext.New(base.ProcessContext.Context(), cfg.Fulltext)
|
fts, err = fulltext.New(processContext.Context(), dendriteCfg.SyncAPI.Fulltext)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.WithError(err).Panicf("failed to create full text")
|
logrus.WithError(err).Panicf("failed to create full text")
|
||||||
}
|
}
|
||||||
base.ProcessContext.ComponentStarted()
|
processContext.ComponentStarted()
|
||||||
}
|
}
|
||||||
|
|
||||||
federationPresenceProducer := &producers.FederationAPIPresenceProducer{
|
federationPresenceProducer := &producers.FederationAPIPresenceProducer{
|
||||||
Topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputPresenceEvent),
|
Topic: dendriteCfg.Global.JetStream.Prefixed(jetstream.OutputPresenceEvent),
|
||||||
JetStream: js,
|
JetStream: js,
|
||||||
}
|
}
|
||||||
presenceConsumer := consumers.NewPresenceConsumer(
|
presenceConsumer := consumers.NewPresenceConsumer(
|
||||||
base.ProcessContext, cfg, js, natsClient, syncDB,
|
processContext, &dendriteCfg.SyncAPI, js, natsClient, syncDB,
|
||||||
notifier, streams.PresenceStreamProvider,
|
notifier, streams.PresenceStreamProvider,
|
||||||
userAPI,
|
userAPI,
|
||||||
)
|
)
|
||||||
|
|
||||||
requestPool := sync.NewRequestPool(syncDB, cfg, userAPI, rsAPI, streams, notifier, federationPresenceProducer, presenceConsumer, base.EnableMetrics)
|
requestPool := sync.NewRequestPool(syncDB, &dendriteCfg.SyncAPI, userAPI, rsAPI, streams, notifier, federationPresenceProducer, presenceConsumer, enableMetrics)
|
||||||
|
|
||||||
if err = presenceConsumer.Start(); err != nil {
|
if err = presenceConsumer.Start(); err != nil {
|
||||||
logrus.WithError(err).Panicf("failed to start presence consumer")
|
logrus.WithError(err).Panicf("failed to start presence consumer")
|
||||||
}
|
}
|
||||||
|
|
||||||
keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer(
|
keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer(
|
||||||
base.ProcessContext, cfg, cfg.Matrix.JetStream.Prefixed(jetstream.OutputKeyChangeEvent),
|
processContext, &dendriteCfg.SyncAPI, dendriteCfg.Global.JetStream.Prefixed(jetstream.OutputKeyChangeEvent),
|
||||||
js, rsAPI, syncDB, notifier,
|
js, rsAPI, syncDB, notifier,
|
||||||
streams.DeviceListStreamProvider,
|
streams.DeviceListStreamProvider,
|
||||||
)
|
)
|
||||||
|
|
@ -97,7 +102,7 @@ func AddPublicRoutes(
|
||||||
}
|
}
|
||||||
|
|
||||||
roomConsumer := consumers.NewOutputRoomEventConsumer(
|
roomConsumer := consumers.NewOutputRoomEventConsumer(
|
||||||
base.ProcessContext, cfg, js, syncDB, notifier, streams.PDUStreamProvider,
|
processContext, &dendriteCfg.SyncAPI, js, syncDB, notifier, streams.PDUStreamProvider,
|
||||||
streams.InviteStreamProvider, rsAPI, fts,
|
streams.InviteStreamProvider, rsAPI, fts,
|
||||||
)
|
)
|
||||||
if err = roomConsumer.Start(); err != nil {
|
if err = roomConsumer.Start(); err != nil {
|
||||||
|
|
@ -105,7 +110,7 @@ func AddPublicRoutes(
|
||||||
}
|
}
|
||||||
|
|
||||||
clientConsumer := consumers.NewOutputClientDataConsumer(
|
clientConsumer := consumers.NewOutputClientDataConsumer(
|
||||||
base.ProcessContext, cfg, js, natsClient, syncDB, notifier,
|
processContext, &dendriteCfg.SyncAPI, js, natsClient, syncDB, notifier,
|
||||||
streams.AccountDataStreamProvider, fts,
|
streams.AccountDataStreamProvider, fts,
|
||||||
)
|
)
|
||||||
if err = clientConsumer.Start(); err != nil {
|
if err = clientConsumer.Start(); err != nil {
|
||||||
|
|
@ -113,35 +118,35 @@ func AddPublicRoutes(
|
||||||
}
|
}
|
||||||
|
|
||||||
notificationConsumer := consumers.NewOutputNotificationDataConsumer(
|
notificationConsumer := consumers.NewOutputNotificationDataConsumer(
|
||||||
base.ProcessContext, cfg, js, syncDB, notifier, streams.NotificationDataStreamProvider,
|
processContext, &dendriteCfg.SyncAPI, js, syncDB, notifier, streams.NotificationDataStreamProvider,
|
||||||
)
|
)
|
||||||
if err = notificationConsumer.Start(); err != nil {
|
if err = notificationConsumer.Start(); err != nil {
|
||||||
logrus.WithError(err).Panicf("failed to start notification data consumer")
|
logrus.WithError(err).Panicf("failed to start notification data consumer")
|
||||||
}
|
}
|
||||||
|
|
||||||
typingConsumer := consumers.NewOutputTypingEventConsumer(
|
typingConsumer := consumers.NewOutputTypingEventConsumer(
|
||||||
base.ProcessContext, cfg, js, eduCache, notifier, streams.TypingStreamProvider,
|
processContext, &dendriteCfg.SyncAPI, js, eduCache, notifier, streams.TypingStreamProvider,
|
||||||
)
|
)
|
||||||
if err = typingConsumer.Start(); err != nil {
|
if err = typingConsumer.Start(); err != nil {
|
||||||
logrus.WithError(err).Panicf("failed to start typing consumer")
|
logrus.WithError(err).Panicf("failed to start typing consumer")
|
||||||
}
|
}
|
||||||
|
|
||||||
sendToDeviceConsumer := consumers.NewOutputSendToDeviceEventConsumer(
|
sendToDeviceConsumer := consumers.NewOutputSendToDeviceEventConsumer(
|
||||||
base.ProcessContext, cfg, js, syncDB, userAPI, notifier, streams.SendToDeviceStreamProvider,
|
processContext, &dendriteCfg.SyncAPI, js, syncDB, userAPI, notifier, streams.SendToDeviceStreamProvider,
|
||||||
)
|
)
|
||||||
if err = sendToDeviceConsumer.Start(); err != nil {
|
if err = sendToDeviceConsumer.Start(); err != nil {
|
||||||
logrus.WithError(err).Panicf("failed to start send-to-device consumer")
|
logrus.WithError(err).Panicf("failed to start send-to-device consumer")
|
||||||
}
|
}
|
||||||
|
|
||||||
receiptConsumer := consumers.NewOutputReceiptEventConsumer(
|
receiptConsumer := consumers.NewOutputReceiptEventConsumer(
|
||||||
base.ProcessContext, cfg, js, syncDB, notifier, streams.ReceiptStreamProvider,
|
processContext, &dendriteCfg.SyncAPI, js, syncDB, notifier, streams.ReceiptStreamProvider,
|
||||||
)
|
)
|
||||||
if err = receiptConsumer.Start(); err != nil {
|
if err = receiptConsumer.Start(); err != nil {
|
||||||
logrus.WithError(err).Panicf("failed to start receipts consumer")
|
logrus.WithError(err).Panicf("failed to start receipts consumer")
|
||||||
}
|
}
|
||||||
|
|
||||||
routing.Setup(
|
routing.Setup(
|
||||||
base.Routers.Client, requestPool, syncDB, userAPI,
|
routers.Client, requestPool, syncDB, userAPI,
|
||||||
rsAPI, cfg, caches, fts,
|
rsAPI, &dendriteCfg.SyncAPI, caches, fts,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -122,7 +122,7 @@ func testSyncAccessTokens(t *testing.T, dbType test.DBType) {
|
||||||
jsctx, _ := natsInstance.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
|
jsctx, _ := natsInstance.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
|
||||||
defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
|
defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
|
||||||
msgs := toNATSMsgs(t, base, room.Events()...)
|
msgs := toNATSMsgs(t, base, room.Events()...)
|
||||||
AddPublicRoutes(base, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, caches)
|
AddPublicRoutes(base.ProcessContext, base.Routers, base.Cfg, base.ConnectionManager, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, caches, base.EnableMetrics)
|
||||||
testrig.MustPublishMsgs(t, jsctx, msgs...)
|
testrig.MustPublishMsgs(t, jsctx, msgs...)
|
||||||
|
|
||||||
testCases := []struct {
|
testCases := []struct {
|
||||||
|
|
@ -223,7 +223,7 @@ func testSyncAPICreateRoomSyncEarly(t *testing.T, dbType test.DBType) {
|
||||||
msgs := toNATSMsgs(t, base, room.Events()...)
|
msgs := toNATSMsgs(t, base, room.Events()...)
|
||||||
sinceTokens := make([]string, len(msgs))
|
sinceTokens := make([]string, len(msgs))
|
||||||
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
|
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
|
||||||
AddPublicRoutes(base, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, caches)
|
AddPublicRoutes(base.ProcessContext, base.Routers, base.Cfg, base.ConnectionManager, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, caches, base.EnableMetrics)
|
||||||
for i, msg := range msgs {
|
for i, msg := range msgs {
|
||||||
testrig.MustPublishMsgs(t, jsctx, msg)
|
testrig.MustPublishMsgs(t, jsctx, msg)
|
||||||
time.Sleep(100 * time.Millisecond)
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
|
@ -309,7 +309,7 @@ func testSyncAPIUpdatePresenceImmediately(t *testing.T, dbType test.DBType) {
|
||||||
jsctx, _ := natsInstance.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
|
jsctx, _ := natsInstance.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
|
||||||
defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
|
defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
|
||||||
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
|
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
|
||||||
AddPublicRoutes(base, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{}, caches)
|
AddPublicRoutes(base.ProcessContext, base.Routers, base.Cfg, base.ConnectionManager, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{}, caches, base.EnableMetrics)
|
||||||
w := httptest.NewRecorder()
|
w := httptest.NewRecorder()
|
||||||
base.Routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
|
base.Routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
|
||||||
"access_token": alice.AccessToken,
|
"access_token": alice.AccessToken,
|
||||||
|
|
@ -428,7 +428,7 @@ func testHistoryVisibility(t *testing.T, dbType test.DBType) {
|
||||||
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
|
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
|
||||||
rsAPI := roomserver.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, caches, base.EnableMetrics)
|
rsAPI := roomserver.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, caches, base.EnableMetrics)
|
||||||
rsAPI.SetFederationAPI(nil, nil)
|
rsAPI.SetFederationAPI(nil, nil)
|
||||||
AddPublicRoutes(base, &natsInstance, &syncUserAPI{accounts: []userapi.Device{aliceDev, bobDev}}, rsAPI, caches)
|
AddPublicRoutes(base.ProcessContext, base.Routers, base.Cfg, base.ConnectionManager, &natsInstance, &syncUserAPI{accounts: []userapi.Device{aliceDev, bobDev}}, rsAPI, caches, base.EnableMetrics)
|
||||||
|
|
||||||
for _, tc := range testCases {
|
for _, tc := range testCases {
|
||||||
testname := fmt.Sprintf("%s - %s", tc.historyVisibility, userType)
|
testname := fmt.Sprintf("%s - %s", tc.historyVisibility, userType)
|
||||||
|
|
@ -729,7 +729,7 @@ func TestGetMembership(t *testing.T) {
|
||||||
rsAPI := roomserver.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, caches, base.EnableMetrics)
|
rsAPI := roomserver.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, caches, base.EnableMetrics)
|
||||||
rsAPI.SetFederationAPI(nil, nil)
|
rsAPI.SetFederationAPI(nil, nil)
|
||||||
|
|
||||||
AddPublicRoutes(base, &natsInstance, &syncUserAPI{accounts: []userapi.Device{aliceDev, bobDev}}, rsAPI, caches)
|
AddPublicRoutes(base.ProcessContext, base.Routers, base.Cfg, base.ConnectionManager, &natsInstance, &syncUserAPI{accounts: []userapi.Device{aliceDev, bobDev}}, rsAPI, caches, base.EnableMetrics)
|
||||||
|
|
||||||
for _, tc := range testCases {
|
for _, tc := range testCases {
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
|
@ -797,7 +797,7 @@ func testSendToDevice(t *testing.T, dbType test.DBType) {
|
||||||
jsctx, _ := natsInstance.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
|
jsctx, _ := natsInstance.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
|
||||||
defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
|
defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
|
||||||
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
|
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
|
||||||
AddPublicRoutes(base, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{}, caches)
|
AddPublicRoutes(base.ProcessContext, base.Routers, base.Cfg, base.ConnectionManager, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{}, caches, base.EnableMetrics)
|
||||||
|
|
||||||
producer := producers.SyncAPIProducer{
|
producer := producers.SyncAPIProducer{
|
||||||
TopicSendToDeviceEvent: base.Cfg.Global.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent),
|
TopicSendToDeviceEvent: base.Cfg.Global.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent),
|
||||||
|
|
@ -1018,7 +1018,7 @@ func testContext(t *testing.T, dbType test.DBType) {
|
||||||
rsAPI := roomserver.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, caches, base.EnableMetrics)
|
rsAPI := roomserver.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, caches, base.EnableMetrics)
|
||||||
rsAPI.SetFederationAPI(nil, nil)
|
rsAPI.SetFederationAPI(nil, nil)
|
||||||
|
|
||||||
AddPublicRoutes(base, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, rsAPI, caches)
|
AddPublicRoutes(base.ProcessContext, base.Routers, base.Cfg, base.ConnectionManager, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, rsAPI, caches, base.EnableMetrics)
|
||||||
|
|
||||||
room := test.NewRoom(t, user)
|
room := test.NewRoom(t, user)
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue