Remove BaseDendrite from syncAPI tests

This commit is contained in:
Till Faelligen 2023-03-21 09:08:45 +01:00
parent 9b06041052
commit c6738990cf
No known key found for this signature in database
GPG key ID: ACCDC9606D472758
2 changed files with 87 additions and 70 deletions

View file

@ -11,6 +11,9 @@ import (
"time" "time"
"github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go" "github.com/nats-io/nats.go"
"github.com/tidwall/gjson" "github.com/tidwall/gjson"
@ -22,7 +25,6 @@ import (
"github.com/matrix-org/dendrite/roomserver" "github.com/matrix-org/dendrite/roomserver"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
rsapi "github.com/matrix-org/dendrite/roomserver/api" rsapi "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/base"
"github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/dendrite/test" "github.com/matrix-org/dendrite/test"
@ -114,15 +116,17 @@ func testSyncAccessTokens(t *testing.T, dbType test.DBType) {
AccountType: userapi.AccountTypeUser, AccountType: userapi.AccountTypeUser,
} }
base, close := testrig.CreateBaseDendrite(t, dbType) cfg, processCtx, close := testrig.CreateConfig(t, dbType)
routers := httputil.NewRouters()
cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics) caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
natsInstance := jetstream.NATSInstance{} natsInstance := jetstream.NATSInstance{}
defer close() defer close()
jsctx, _ := natsInstance.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream) jsctx, _ := natsInstance.Prepare(processCtx, &cfg.Global.JetStream)
defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream) defer jetstream.DeleteAllStreams(jsctx, &cfg.Global.JetStream)
msgs := toNATSMsgs(t, base, room.Events()...) msgs := toNATSMsgs(t, cfg, room.Events()...)
AddPublicRoutes(base.ProcessContext, base.Routers, base.Cfg, base.ConnectionManager, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, caches, base.EnableMetrics) AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, caches, caching.DisableMetrics)
testrig.MustPublishMsgs(t, jsctx, msgs...) testrig.MustPublishMsgs(t, jsctx, msgs...)
testCases := []struct { testCases := []struct {
@ -157,7 +161,7 @@ func testSyncAccessTokens(t *testing.T, dbType test.DBType) {
}, },
} }
syncUntil(t, base, alice.AccessToken, false, func(syncBody string) bool { syncUntil(t, routers, alice.AccessToken, false, func(syncBody string) bool {
// wait for the last sent eventID to come down sync // wait for the last sent eventID to come down sync
path := fmt.Sprintf(`rooms.join.%s.timeline.events.#(event_id=="%s")`, room.ID, room.Events()[len(room.Events())-1].EventID()) path := fmt.Sprintf(`rooms.join.%s.timeline.events.#(event_id=="%s")`, room.ID, room.Events()[len(room.Events())-1].EventID())
return gjson.Get(syncBody, path).Exists() return gjson.Get(syncBody, path).Exists()
@ -165,7 +169,7 @@ func testSyncAccessTokens(t *testing.T, dbType test.DBType) {
for _, tc := range testCases { for _, tc := range testCases {
w := httptest.NewRecorder() w := httptest.NewRecorder()
base.Routers.Client.ServeHTTP(w, tc.req) routers.Client.ServeHTTP(w, tc.req)
if w.Code != tc.wantCode { if w.Code != tc.wantCode {
t.Fatalf("%s: got HTTP %d want %d", tc.name, w.Code, tc.wantCode) t.Fatalf("%s: got HTTP %d want %d", tc.name, w.Code, tc.wantCode)
} }
@ -208,27 +212,29 @@ func testSyncAPICreateRoomSyncEarly(t *testing.T, dbType test.DBType) {
AccountType: userapi.AccountTypeUser, AccountType: userapi.AccountTypeUser,
} }
base, close := testrig.CreateBaseDendrite(t, dbType) cfg, processCtx, close := testrig.CreateConfig(t, dbType)
routers := httputil.NewRouters()
cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
defer close() defer close()
natsInstance := jetstream.NATSInstance{} natsInstance := jetstream.NATSInstance{}
jsctx, _ := natsInstance.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream) jsctx, _ := natsInstance.Prepare(processCtx, &cfg.Global.JetStream)
defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream) defer jetstream.DeleteAllStreams(jsctx, &cfg.Global.JetStream)
// order is: // order is:
// m.room.create // m.room.create
// m.room.member // m.room.member
// m.room.power_levels // m.room.power_levels
// m.room.join_rules // m.room.join_rules
// m.room.history_visibility // m.room.history_visibility
msgs := toNATSMsgs(t, base, room.Events()...) msgs := toNATSMsgs(t, cfg, room.Events()...)
sinceTokens := make([]string, len(msgs)) sinceTokens := make([]string, len(msgs))
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics) AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, caches, caching.DisableMetrics)
AddPublicRoutes(base.ProcessContext, base.Routers, base.Cfg, base.ConnectionManager, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, caches, base.EnableMetrics)
for i, msg := range msgs { for i, msg := range msgs {
testrig.MustPublishMsgs(t, jsctx, msg) testrig.MustPublishMsgs(t, jsctx, msg)
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
w := httptest.NewRecorder() w := httptest.NewRecorder()
base.Routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{ routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
"access_token": alice.AccessToken, "access_token": alice.AccessToken,
"timeout": "0", "timeout": "0",
}))) })))
@ -258,7 +264,7 @@ func testSyncAPICreateRoomSyncEarly(t *testing.T, dbType test.DBType) {
t.Logf("waited for events to be consumed; syncing with %v", sinceTokens) t.Logf("waited for events to be consumed; syncing with %v", sinceTokens)
for i, since := range sinceTokens { for i, since := range sinceTokens {
w := httptest.NewRecorder() w := httptest.NewRecorder()
base.Routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{ routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
"access_token": alice.AccessToken, "access_token": alice.AccessToken,
"timeout": "0", "timeout": "0",
"since": since, "since": since,
@ -300,18 +306,20 @@ func testSyncAPIUpdatePresenceImmediately(t *testing.T, dbType test.DBType) {
AccountType: userapi.AccountTypeUser, AccountType: userapi.AccountTypeUser,
} }
base, close := testrig.CreateBaseDendrite(t, dbType) cfg, processCtx, close := testrig.CreateConfig(t, dbType)
base.Cfg.Global.Presence.EnableOutbound = true routers := httputil.NewRouters()
base.Cfg.Global.Presence.EnableInbound = true cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
cfg.Global.Presence.EnableOutbound = true
cfg.Global.Presence.EnableInbound = true
defer close() defer close()
natsInstance := jetstream.NATSInstance{} natsInstance := jetstream.NATSInstance{}
jsctx, _ := natsInstance.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream) jsctx, _ := natsInstance.Prepare(processCtx, &cfg.Global.JetStream)
defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream) defer jetstream.DeleteAllStreams(jsctx, &cfg.Global.JetStream)
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics) AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{}, caches, caching.DisableMetrics)
AddPublicRoutes(base.ProcessContext, base.Routers, base.Cfg, base.ConnectionManager, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{}, caches, base.EnableMetrics)
w := httptest.NewRecorder() w := httptest.NewRecorder()
base.Routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{ routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
"access_token": alice.AccessToken, "access_token": alice.AccessToken,
"timeout": "0", "timeout": "0",
"set_presence": "online", "set_presence": "online",
@ -417,18 +425,20 @@ func testHistoryVisibility(t *testing.T, dbType test.DBType) {
userType = "real user" userType = "real user"
} }
base, close := testrig.CreateBaseDendrite(t, dbType) cfg, processCtx, close := testrig.CreateConfig(t, dbType)
routers := httputil.NewRouters()
cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
defer close() defer close()
natsInstance := jetstream.NATSInstance{} natsInstance := jetstream.NATSInstance{}
jsctx, _ := natsInstance.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream) jsctx, _ := natsInstance.Prepare(processCtx, &cfg.Global.JetStream)
defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream) defer jetstream.DeleteAllStreams(jsctx, &cfg.Global.JetStream)
// Use the actual internal roomserver API // Use the actual internal roomserver API
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics) rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
rsAPI := roomserver.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, caches, base.EnableMetrics)
rsAPI.SetFederationAPI(nil, nil) rsAPI.SetFederationAPI(nil, nil)
AddPublicRoutes(base.ProcessContext, base.Routers, base.Cfg, base.ConnectionManager, &natsInstance, &syncUserAPI{accounts: []userapi.Device{aliceDev, bobDev}}, rsAPI, caches, base.EnableMetrics) AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{aliceDev, bobDev}}, rsAPI, caches, caching.DisableMetrics)
for _, tc := range testCases { for _, tc := range testCases {
testname := fmt.Sprintf("%s - %s", tc.historyVisibility, userType) testname := fmt.Sprintf("%s - %s", tc.historyVisibility, userType)
@ -443,7 +453,7 @@ func testHistoryVisibility(t *testing.T, dbType test.DBType) {
if err := api.SendEvents(ctx, rsAPI, api.KindNew, eventsToSend, "test", "test", "test", nil, false); err != nil { if err := api.SendEvents(ctx, rsAPI, api.KindNew, eventsToSend, "test", "test", "test", nil, false); err != nil {
t.Fatalf("failed to send events: %v", err) t.Fatalf("failed to send events: %v", err)
} }
syncUntil(t, base, aliceDev.AccessToken, false, syncUntil(t, routers, aliceDev.AccessToken, false,
func(syncBody string) bool { func(syncBody string) bool {
path := fmt.Sprintf(`rooms.join.%s.timeline.events.#(content.body=="%s")`, room.ID, beforeJoinBody) path := fmt.Sprintf(`rooms.join.%s.timeline.events.#(content.body=="%s")`, room.ID, beforeJoinBody)
return gjson.Get(syncBody, path).Exists() return gjson.Get(syncBody, path).Exists()
@ -452,7 +462,7 @@ func testHistoryVisibility(t *testing.T, dbType test.DBType) {
// There is only one event, we expect only to be able to see this, if the room is world_readable // There is only one event, we expect only to be able to see this, if the room is world_readable
w := httptest.NewRecorder() w := httptest.NewRecorder()
base.Routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/messages", room.ID), test.WithQueryParams(map[string]string{ routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/messages", room.ID), test.WithQueryParams(map[string]string{
"access_token": bobDev.AccessToken, "access_token": bobDev.AccessToken,
"dir": "b", "dir": "b",
"filter": `{"lazy_load_members":true}`, // check that lazy loading doesn't break history visibility "filter": `{"lazy_load_members":true}`, // check that lazy loading doesn't break history visibility
@ -483,7 +493,7 @@ func testHistoryVisibility(t *testing.T, dbType test.DBType) {
if err := api.SendEvents(ctx, rsAPI, api.KindNew, eventsToSend, "test", "test", "test", nil, false); err != nil { if err := api.SendEvents(ctx, rsAPI, api.KindNew, eventsToSend, "test", "test", "test", nil, false); err != nil {
t.Fatalf("failed to send events: %v", err) t.Fatalf("failed to send events: %v", err)
} }
syncUntil(t, base, aliceDev.AccessToken, false, syncUntil(t, routers, aliceDev.AccessToken, false,
func(syncBody string) bool { func(syncBody string) bool {
path := fmt.Sprintf(`rooms.join.%s.timeline.events.#(content.body=="%s")`, room.ID, afterJoinBody) path := fmt.Sprintf(`rooms.join.%s.timeline.events.#(content.body=="%s")`, room.ID, afterJoinBody)
return gjson.Get(syncBody, path).Exists() return gjson.Get(syncBody, path).Exists()
@ -492,7 +502,7 @@ func testHistoryVisibility(t *testing.T, dbType test.DBType) {
// Verify the messages after/before invite are visible or not // Verify the messages after/before invite are visible or not
w = httptest.NewRecorder() w = httptest.NewRecorder()
base.Routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/messages", room.ID), test.WithQueryParams(map[string]string{ routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/messages", room.ID), test.WithQueryParams(map[string]string{
"access_token": bobDev.AccessToken, "access_token": bobDev.AccessToken,
"dir": "b", "dir": "b",
}))) })))
@ -718,18 +728,20 @@ func TestGetMembership(t *testing.T) {
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) { test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
base, close := testrig.CreateBaseDendrite(t, dbType) cfg, processCtx, close := testrig.CreateConfig(t, dbType)
routers := httputil.NewRouters()
cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
defer close() defer close()
natsInstance := jetstream.NATSInstance{} natsInstance := jetstream.NATSInstance{}
jsctx, _ := natsInstance.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream) jsctx, _ := natsInstance.Prepare(processCtx, &cfg.Global.JetStream)
defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream) defer jetstream.DeleteAllStreams(jsctx, &cfg.Global.JetStream)
// Use an actual roomserver for this // Use an actual roomserver for this
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics) rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
rsAPI := roomserver.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, caches, base.EnableMetrics)
rsAPI.SetFederationAPI(nil, nil) rsAPI.SetFederationAPI(nil, nil)
AddPublicRoutes(base.ProcessContext, base.Routers, base.Cfg, base.ConnectionManager, &natsInstance, &syncUserAPI{accounts: []userapi.Device{aliceDev, bobDev}}, rsAPI, caches, base.EnableMetrics) AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{aliceDev, bobDev}}, rsAPI, caches, caching.DisableMetrics)
for _, tc := range testCases { for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
@ -749,7 +761,7 @@ func TestGetMembership(t *testing.T) {
if tc.useSleep { if tc.useSleep {
time.Sleep(time.Millisecond * 100) time.Sleep(time.Millisecond * 100)
} else { } else {
syncUntil(t, base, aliceDev.AccessToken, false, func(syncBody string) bool { syncUntil(t, routers, aliceDev.AccessToken, false, func(syncBody string) bool {
// wait for the last sent eventID to come down sync // wait for the last sent eventID to come down sync
path := fmt.Sprintf(`rooms.join.%s.timeline.events.#(event_id=="%s")`, room.ID, room.Events()[len(room.Events())-1].EventID()) path := fmt.Sprintf(`rooms.join.%s.timeline.events.#(event_id=="%s")`, room.ID, room.Events()[len(room.Events())-1].EventID())
return gjson.Get(syncBody, path).Exists() return gjson.Get(syncBody, path).Exists()
@ -757,7 +769,7 @@ func TestGetMembership(t *testing.T) {
} }
w := httptest.NewRecorder() w := httptest.NewRecorder()
base.Routers.Client.ServeHTTP(w, tc.request(t, room)) routers.Client.ServeHTTP(w, tc.request(t, room))
if w.Code != 200 && tc.wantOK { if w.Code != 200 && tc.wantOK {
t.Logf("%s", w.Body.String()) t.Logf("%s", w.Body.String())
t.Fatalf("got HTTP %d want %d", w.Code, 200) t.Fatalf("got HTTP %d want %d", w.Code, 200)
@ -790,17 +802,19 @@ func testSendToDevice(t *testing.T, dbType test.DBType) {
AccountType: userapi.AccountTypeUser, AccountType: userapi.AccountTypeUser,
} }
base, baseClose := testrig.CreateBaseDendrite(t, dbType) cfg, processCtx, close := testrig.CreateConfig(t, dbType)
defer baseClose() routers := httputil.NewRouters()
cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
defer close()
natsInstance := jetstream.NATSInstance{} natsInstance := jetstream.NATSInstance{}
jsctx, _ := natsInstance.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream) jsctx, _ := natsInstance.Prepare(processCtx, &cfg.Global.JetStream)
defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream) defer jetstream.DeleteAllStreams(jsctx, &cfg.Global.JetStream)
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics) AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{}, caches, caching.DisableMetrics)
AddPublicRoutes(base.ProcessContext, base.Routers, base.Cfg, base.ConnectionManager, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{}, caches, base.EnableMetrics)
producer := producers.SyncAPIProducer{ producer := producers.SyncAPIProducer{
TopicSendToDeviceEvent: base.Cfg.Global.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent), TopicSendToDeviceEvent: cfg.Global.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent),
JetStream: jsctx, JetStream: jsctx,
} }
@ -886,7 +900,7 @@ func testSendToDevice(t *testing.T, dbType test.DBType) {
} }
} }
syncUntil(t, base, alice.AccessToken, syncUntil(t, routers, alice.AccessToken,
len(tc.want) == 0, len(tc.want) == 0,
func(body string) bool { func(body string) bool {
return gjson.Get(body, fmt.Sprintf(`to_device.events.#(content.dummy=="message %d")`, msgCounter)).Exists() return gjson.Get(body, fmt.Sprintf(`to_device.events.#(content.dummy=="message %d")`, msgCounter)).Exists()
@ -895,7 +909,7 @@ func testSendToDevice(t *testing.T, dbType test.DBType) {
// Execute a /sync request, recording the response // Execute a /sync request, recording the response
w := httptest.NewRecorder() w := httptest.NewRecorder()
base.Routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{ routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
"access_token": alice.AccessToken, "access_token": alice.AccessToken,
"since": tc.since, "since": tc.since,
}))) })))
@ -1009,16 +1023,18 @@ func testContext(t *testing.T, dbType test.DBType) {
AccountType: userapi.AccountTypeUser, AccountType: userapi.AccountTypeUser,
} }
base, baseClose := testrig.CreateBaseDendrite(t, dbType) cfg, processCtx, close := testrig.CreateConfig(t, dbType)
defer baseClose() routers := httputil.NewRouters()
cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
defer close()
// Use an actual roomserver for this // Use an actual roomserver for this
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
natsInstance := jetstream.NATSInstance{} natsInstance := jetstream.NATSInstance{}
rsAPI := roomserver.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, caches, base.EnableMetrics) rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil) rsAPI.SetFederationAPI(nil, nil)
AddPublicRoutes(base.ProcessContext, base.Routers, base.Cfg, base.ConnectionManager, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, rsAPI, caches, base.EnableMetrics) AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, rsAPI, caches, caching.DisableMetrics)
room := test.NewRoom(t, user) room := test.NewRoom(t, user)
@ -1031,10 +1047,10 @@ func testContext(t *testing.T, dbType test.DBType) {
t.Fatalf("failed to send events: %v", err) t.Fatalf("failed to send events: %v", err)
} }
jsctx, _ := natsInstance.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream) jsctx, _ := natsInstance.Prepare(processCtx, &cfg.Global.JetStream)
defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream) defer jetstream.DeleteAllStreams(jsctx, &cfg.Global.JetStream)
syncUntil(t, base, alice.AccessToken, false, func(syncBody string) bool { syncUntil(t, routers, alice.AccessToken, false, func(syncBody string) bool {
// wait for the last sent eventID to come down sync // wait for the last sent eventID to come down sync
path := fmt.Sprintf(`rooms.join.%s.timeline.events.#(event_id=="%s")`, room.ID, thirdMsg.EventID()) path := fmt.Sprintf(`rooms.join.%s.timeline.events.#(event_id=="%s")`, room.ID, thirdMsg.EventID())
return gjson.Get(syncBody, path).Exists() return gjson.Get(syncBody, path).Exists()
@ -1061,7 +1077,7 @@ func testContext(t *testing.T, dbType test.DBType) {
params[k] = v params[k] = v
} }
} }
base.Routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", requestPath, test.WithQueryParams(params))) routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", requestPath, test.WithQueryParams(params)))
if tc.wantError && w.Code == 200 { if tc.wantError && w.Code == 200 {
t.Fatalf("Expected an error, but got none") t.Fatalf("Expected an error, but got none")
@ -1149,9 +1165,10 @@ func TestUpdateRelations(t *testing.T) {
room := test.NewRoom(t, alice) room := test.NewRoom(t, alice)
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) { test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
base, shutdownBase := testrig.CreateBaseDendrite(t, dbType) cfg, processCtx, close := testrig.CreateConfig(t, dbType)
t.Cleanup(shutdownBase) cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
db, err := storage.NewSyncServerDatasource(base.Context(), base.ConnectionManager, &base.Cfg.SyncAPI.Database) t.Cleanup(close)
db, err := storage.NewSyncServerDatasource(processCtx.Context(), cm, &cfg.SyncAPI.Database)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -1173,7 +1190,7 @@ func TestUpdateRelations(t *testing.T) {
} }
func syncUntil(t *testing.T, func syncUntil(t *testing.T,
base *base.BaseDendrite, accessToken string, routers httputil.Routers, accessToken string,
skip bool, skip bool,
checkFunc func(syncBody string) bool, checkFunc func(syncBody string) bool,
) { ) {
@ -1191,7 +1208,7 @@ func syncUntil(t *testing.T,
go func() { go func() {
for { for {
w := httptest.NewRecorder() w := httptest.NewRecorder()
base.Routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{ routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
"access_token": accessToken, "access_token": accessToken,
"timeout": "1000", "timeout": "1000",
}))) })))
@ -1209,14 +1226,14 @@ func syncUntil(t *testing.T,
} }
} }
func toNATSMsgs(t *testing.T, base *base.BaseDendrite, input ...*gomatrixserverlib.HeaderedEvent) []*nats.Msg { func toNATSMsgs(t *testing.T, cfg *config.Dendrite, input ...*gomatrixserverlib.HeaderedEvent) []*nats.Msg {
result := make([]*nats.Msg, len(input)) result := make([]*nats.Msg, len(input))
for i, ev := range input { for i, ev := range input {
var addsStateIDs []string var addsStateIDs []string
if ev.StateKey() != nil { if ev.StateKey() != nil {
addsStateIDs = append(addsStateIDs, ev.EventID()) addsStateIDs = append(addsStateIDs, ev.EventID())
} }
result[i] = testrig.NewOutputEventMsg(t, base, ev.RoomID(), api.OutputEvent{ result[i] = testrig.NewOutputEventMsg(t, cfg, ev.RoomID(), api.OutputEvent{
Type: rsapi.OutputTypeNewRoomEvent, Type: rsapi.OutputTypeNewRoomEvent,
NewRoomEvent: &rsapi.OutputNewRoomEvent{ NewRoomEvent: &rsapi.OutputNewRoomEvent{
Event: ev, Event: ev,

View file

@ -4,10 +4,10 @@ import (
"encoding/json" "encoding/json"
"testing" "testing"
"github.com/matrix-org/dendrite/setup/config"
"github.com/nats-io/nats.go" "github.com/nats-io/nats.go"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/base"
"github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/jetstream"
) )
@ -20,9 +20,9 @@ func MustPublishMsgs(t *testing.T, jsctx nats.JetStreamContext, msgs ...*nats.Ms
} }
} }
func NewOutputEventMsg(t *testing.T, base *base.BaseDendrite, roomID string, update api.OutputEvent) *nats.Msg { func NewOutputEventMsg(t *testing.T, cfg *config.Dendrite, roomID string, update api.OutputEvent) *nats.Msg {
t.Helper() t.Helper()
msg := nats.NewMsg(base.Cfg.Global.JetStream.Prefixed(jetstream.OutputRoomEvent)) msg := nats.NewMsg(cfg.Global.JetStream.Prefixed(jetstream.OutputRoomEvent))
msg.Header.Set(jetstream.RoomEventType, string(update.Type)) msg.Header.Set(jetstream.RoomEventType, string(update.Type))
msg.Header.Set(jetstream.RoomID, roomID) msg.Header.Set(jetstream.RoomID, roomID)
var err error var err error