Remove BaseDendrite from almost all roomserver tests

This commit is contained in:
Till Faelligen 2023-03-21 10:17:33 +01:00
parent b9b1838044
commit 85f76d7491
No known key found for this signature in database
GPG key ID: ACCDC9606D472758
3 changed files with 81 additions and 98 deletions

View file

@ -18,7 +18,6 @@ import (
"github.com/matrix-org/dendrite/roomserver/internal/query" "github.com/matrix-org/dendrite/roomserver/internal/query"
"github.com/matrix-org/dendrite/roomserver/producers" "github.com/matrix-org/dendrite/roomserver/producers"
"github.com/matrix-org/dendrite/roomserver/storage" "github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/dendrite/setup/base"
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/dendrite/setup/process"
@ -41,7 +40,6 @@ type RoomserverInternalAPI struct {
*perform.Upgrader *perform.Upgrader
*perform.Admin *perform.Admin
ProcessContext *process.ProcessContext ProcessContext *process.ProcessContext
Base *base.BaseDendrite
DB storage.Database DB storage.Database
Cfg *config.Dendrite Cfg *config.Dendrite
Cache caching.RoomServerCaches Cache caching.RoomServerCaches

View file

@ -2,84 +2,69 @@ package input_test
import ( import (
"context" "context"
"os"
"testing" "testing"
"time" "time"
"github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/roomserver"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/internal/input" "github.com/matrix-org/dendrite/roomserver/internal/input"
"github.com/matrix-org/dendrite/roomserver/storage" "github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/test"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/test/testrig" "github.com/matrix-org/dendrite/test/testrig"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
) )
var js nats.JetStreamContext
var jc *nats.Conn
func TestMain(m *testing.M) {
var b *base.BaseDendrite
b, js, jc = testrig.Base(nil)
code := m.Run()
b.ShutdownDendrite()
b.WaitForComponentsToFinish()
os.Exit(code)
}
func TestSingleTransactionOnInput(t *testing.T) { func TestSingleTransactionOnInput(t *testing.T) {
deadline, _ := t.Deadline() test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
if max := time.Now().Add(time.Second * 3); deadline.After(max) { cfg, processCtx, close := testrig.CreateConfig(t, dbType)
deadline = max defer close()
} cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
ctx, cancel := context.WithDeadline(context.Background(), deadline)
defer cancel()
event, err := gomatrixserverlib.NewEventFromTrustedJSON( natsInstance := &jetstream.NATSInstance{}
[]byte(`{"auth_events":[],"content":{"creator":"@neilalexander:dendrite.matrix.org","room_version":"6"},"depth":1,"hashes":{"sha256":"jqOqdNEH5r0NiN3xJtj0u5XUVmRqq9YvGbki1wxxuuM"},"origin":"dendrite.matrix.org","origin_server_ts":1644595362726,"prev_events":[],"prev_state":[],"room_id":"!jSZZRknA6GkTBXNP:dendrite.matrix.org","sender":"@neilalexander:dendrite.matrix.org","signatures":{"dendrite.matrix.org":{"ed25519:6jB2aB":"bsQXO1wketf1OSe9xlndDIWe71W9KIundc6rBw4KEZdGPW7x4Tv4zDWWvbxDsG64sS2IPWfIm+J0OOozbrWIDw"}},"state_key":"","type":"m.room.create"}`), js, jc := natsInstance.Prepare(processCtx, &cfg.Global.JetStream)
false, gomatrixserverlib.RoomVersionV6, caches := caching.NewRistrettoCache(8*1024*1024, time.Hour, caching.DisableMetrics)
) rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, natsInstance, caches, caching.DisableMetrics)
if err != nil { rsAPI.SetFederationAPI(nil, nil)
t.Fatal(err)
} deadline, _ := t.Deadline()
in := api.InputRoomEvent{ if max := time.Now().Add(time.Second * 3); deadline.Before(max) {
Kind: api.KindOutlier, // don't panic if we generate an output event deadline = max
Event: event.Headered(gomatrixserverlib.RoomVersionV6), }
} ctx, cancel := context.WithDeadline(processCtx.Context(), deadline)
cm := sqlutil.NewConnectionManager(nil, config.DatabaseOptions{}) defer cancel()
db, err := storage.Open(
context.Background(), cm, event, err := gomatrixserverlib.NewEventFromTrustedJSON(
&config.DatabaseOptions{ []byte(`{"auth_events":[],"content":{"creator":"@neilalexander:dendrite.matrix.org","room_version":"6"},"depth":1,"hashes":{"sha256":"jqOqdNEH5r0NiN3xJtj0u5XUVmRqq9YvGbki1wxxuuM"},"origin":"dendrite.matrix.org","origin_server_ts":1644595362726,"prev_events":[],"prev_state":[],"room_id":"!jSZZRknA6GkTBXNP:dendrite.matrix.org","sender":"@neilalexander:dendrite.matrix.org","signatures":{"dendrite.matrix.org":{"ed25519:6jB2aB":"bsQXO1wketf1OSe9xlndDIWe71W9KIundc6rBw4KEZdGPW7x4Tv4zDWWvbxDsG64sS2IPWfIm+J0OOozbrWIDw"}},"state_key":"","type":"m.room.create"}`),
ConnectionString: "", false, gomatrixserverlib.RoomVersionV6,
MaxOpenConnections: 1, )
MaxIdleConnections: 1, if err != nil {
}, t.Fatal(err)
caching.NewRistrettoCache(8*1024*1024, time.Hour, caching.DisableMetrics), }
) in := api.InputRoomEvent{
if err != nil { Kind: api.KindOutlier, // don't panic if we generate an output event
t.Logf("PostgreSQL not available (%s), skipping", err) Event: event.Headered(gomatrixserverlib.RoomVersionV6),
t.SkipNow() }
}
inputter := &input.Inputer{ inputter := &input.Inputer{
DB: db, JetStream: js,
JetStream: js, NATSClient: jc,
NATSClient: jc, Cfg: &cfg.RoomServer,
} }
res := &api.InputRoomEventsResponse{} res := &api.InputRoomEventsResponse{}
inputter.InputRoomEvents( inputter.InputRoomEvents(
ctx, ctx,
&api.InputRoomEventsRequest{ &api.InputRoomEventsRequest{
InputRoomEvents: []api.InputRoomEvent{in}, InputRoomEvents: []api.InputRoomEvent{in},
Asynchronous: false, Asynchronous: false,
}, },
res, res,
) )
// If we fail here then it's because we've hit the test deadline, // If we fail here then it's because we've hit the test deadline,
// so we probably deadlocked // so we probably deadlocked
if err := res.Err(); err != nil { if err := res.Err(); err != nil {
t.Fatal(err) t.Fatal(err)
} }
})
} }

View file

@ -8,11 +8,12 @@ import (
"time" "time"
"github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/matrix-org/dendrite/roomserver/state" "github.com/matrix-org/dendrite/roomserver/state"
"github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/setup/base"
"github.com/matrix-org/dendrite/userapi" "github.com/matrix-org/dendrite/userapi"
userAPI "github.com/matrix-org/dendrite/userapi/api" userAPI "github.com/matrix-org/dendrite/userapi/api"
@ -30,24 +31,14 @@ import (
"github.com/matrix-org/dendrite/test/testrig" "github.com/matrix-org/dendrite/test/testrig"
) )
func mustCreateDatabase(t *testing.T, dbType test.DBType) (*base.BaseDendrite, storage.Database, func()) {
t.Helper()
base, close := testrig.CreateBaseDendrite(t, dbType)
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
db, err := storage.Open(base.ProcessContext.Context(), base.ConnectionManager, &base.Cfg.RoomServer.Database, caches)
if err != nil {
t.Fatalf("failed to create Database: %v", err)
}
return base, db, close
}
func TestUsers(t *testing.T) { func TestUsers(t *testing.T) {
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) { test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
base, close := testrig.CreateBaseDendrite(t, dbType) cfg, processCtx, close := testrig.CreateConfig(t, dbType)
defer close() defer close()
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics) caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
natsInstance := jetstream.NATSInstance{} natsInstance := jetstream.NATSInstance{}
rsAPI := roomserver.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, caches, base.EnableMetrics) cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
// SetFederationAPI starts the room event input consumer // SetFederationAPI starts the room event input consumer
rsAPI.SetFederationAPI(nil, nil) rsAPI.SetFederationAPI(nil, nil)
@ -56,7 +47,7 @@ func TestUsers(t *testing.T) {
}) })
t.Run("kick users", func(t *testing.T) { t.Run("kick users", func(t *testing.T) {
usrAPI := userapi.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, rsAPI, nil) usrAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil)
rsAPI.SetUserAPI(usrAPI) rsAPI.SetUserAPI(usrAPI)
testKickUsers(t, rsAPI, usrAPI) testKickUsers(t, rsAPI, usrAPI)
}) })
@ -182,12 +173,13 @@ func Test_QueryLeftUsers(t *testing.T) {
ctx := context.Background() ctx := context.Background()
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) { test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
base, _, close := mustCreateDatabase(t, dbType) cfg, processCtx, close := testrig.CreateConfig(t, dbType)
defer close() defer close()
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics) caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
natsInstance := jetstream.NATSInstance{} natsInstance := jetstream.NATSInstance{}
rsAPI := roomserver.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, caches, base.EnableMetrics) cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
// SetFederationAPI starts the room event input consumer // SetFederationAPI starts the room event input consumer
rsAPI.SetFederationAPI(nil, nil) rsAPI.SetFederationAPI(nil, nil)
// Create the room // Create the room
@ -231,21 +223,23 @@ func TestPurgeRoom(t *testing.T) {
ctx := context.Background() ctx := context.Background()
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) { test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
base, db, close := mustCreateDatabase(t, dbType) cfg, processCtx, close := testrig.CreateConfig(t, dbType)
natsInstance := jetstream.NATSInstance{} natsInstance := jetstream.NATSInstance{}
defer close() defer close()
routers := httputil.NewRouters()
jsCtx, _ := natsInstance.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream) cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
defer jetstream.DeleteAllStreams(jsCtx, &base.Cfg.Global.JetStream)
fedClient := base.CreateFederationClient()
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics) caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
rsAPI := roomserver.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, caches, base.EnableMetrics) db, err := storage.Open(processCtx.Context(), cm, &cfg.RoomServer.Database, caches)
userAPI := userapi.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, rsAPI, nil)
jsCtx, _ := natsInstance.Prepare(processCtx, &cfg.Global.JetStream)
defer jetstream.DeleteAllStreams(jsCtx, &cfg.Global.JetStream)
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil)
// this starts the JetStream consumers // this starts the JetStream consumers
syncapi.AddPublicRoutes(base.ProcessContext, base.Routers, base.Cfg, base.ConnectionManager, &natsInstance, userAPI, rsAPI, caches, base.EnableMetrics) syncapi.AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, userAPI, rsAPI, caches, caching.DisableMetrics)
federationapi.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, fedClient, rsAPI, caches, nil, true) federationapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, nil, rsAPI, caches, nil, true)
rsAPI.SetFederationAPI(nil, nil) rsAPI.SetFederationAPI(nil, nil)
// Create the room // Create the room
@ -343,7 +337,7 @@ func TestPurgeRoom(t *testing.T) {
t.Fatalf("test timed out after %s", timeout) t.Fatalf("test timed out after %s", timeout)
} }
sum = 0 sum = 0
consumerCh := jsCtx.Consumers(base.Cfg.Global.JetStream.Prefixed(jetstream.OutputRoomEvent)) consumerCh := jsCtx.Consumers(cfg.Global.JetStream.Prefixed(jetstream.OutputRoomEvent))
for x := range consumerCh { for x := range consumerCh {
sum += x.NumAckPending sum += x.NumAckPending
} }
@ -511,8 +505,14 @@ func TestRedaction(t *testing.T) {
ctx := context.Background() ctx := context.Background()
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) { test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
_, db, close := mustCreateDatabase(t, dbType) cfg, processCtx, close := testrig.CreateConfig(t, dbType)
defer close() defer close()
cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
db, err := storage.Open(processCtx.Context(), cm, &cfg.RoomServer.Database, caches)
if err != nil {
t.Fatal(err)
}
for _, tc := range testCases { for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {