Remove BaseDendrite from roomserver

This commit is contained in:
Till Faelligen 2023-03-20 14:05:36 +01:00
parent 16fb22cad5
commit b7947c38e8
No known key found for this signature in database
GPG key ID: ACCDC9606D472758
16 changed files with 61 additions and 59 deletions

View file

@ -128,7 +128,7 @@ func TestAppserviceInternalAPI(t *testing.T) {
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics) caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
// Create required internal APIs // Create required internal APIs
natsInstance := jetstream.NATSInstance{} natsInstance := jetstream.NATSInstance{}
rsAPI := roomserver.NewInternalAPI(base, &natsInstance, caches) rsAPI := roomserver.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, caches, base.EnableMetrics)
usrAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, nil) usrAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, nil)
asAPI := appservice.NewInternalAPI(base.ProcessContext, base.Cfg, &natsInstance, usrAPI, rsAPI) asAPI := appservice.NewInternalAPI(base.ProcessContext, base.Cfg, &natsInstance, usrAPI, rsAPI)

View file

@ -182,7 +182,7 @@ func startup() {
defer base.Close() // nolint: errcheck defer base.Close() // nolint: errcheck
natsInstance := jetstream.NATSInstance{} natsInstance := jetstream.NATSInstance{}
caches := caching.NewRistrettoCache(cfg.Global.Cache.EstimatedMaxSize, cfg.Global.Cache.MaxAge, caching.EnableMetrics) caches := caching.NewRistrettoCache(cfg.Global.Cache.EstimatedMaxSize, cfg.Global.Cache.MaxAge, caching.EnableMetrics)
rsAPI := roomserver.NewInternalAPI(base, &natsInstance, caches) rsAPI := roomserver.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, caches, base.EnableMetrics)
federation := conn.CreateFederationClient(base, pSessions) federation := conn.CreateFederationClient(base, pSessions)

View file

@ -162,7 +162,7 @@ func (m *DendriteMonolith) Start() {
caches := caching.NewRistrettoCache(cfg.Global.Cache.EstimatedMaxSize, cfg.Global.Cache.MaxAge, caching.DisableMetrics) caches := caching.NewRistrettoCache(cfg.Global.Cache.EstimatedMaxSize, cfg.Global.Cache.MaxAge, caching.DisableMetrics)
natsInstance := jetstream.NATSInstance{} natsInstance := jetstream.NATSInstance{}
rsAPI := roomserver.NewInternalAPI(base, &natsInstance, caches) rsAPI := roomserver.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, caches, base.EnableMetrics)
fsAPI := federationapi.NewInternalAPI( fsAPI := federationapi.NewInternalAPI(
base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, federation, rsAPI, caches, keyRing, true, base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, federation, rsAPI, caches, keyRing, true,

View file

@ -40,7 +40,7 @@ func TestAdminResetPassword(t *testing.T) {
}) })
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics) caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
rsAPI := roomserver.NewInternalAPI(base, &natsInstance, caches) rsAPI := roomserver.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, caches, base.EnableMetrics)
// Needed for changing the password/login // Needed for changing the password/login
userAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, nil) userAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, nil)
// We mostly need the userAPI for this test, so nil for other APIs/caches etc. // We mostly need the userAPI for this test, so nil for other APIs/caches etc.
@ -156,7 +156,7 @@ func TestPurgeRoom(t *testing.T) {
defer baseClose() defer baseClose()
fedClient := base.CreateFederationClient() fedClient := base.CreateFederationClient()
rsAPI := roomserver.NewInternalAPI(base, &natsInstance, caches) rsAPI := roomserver.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, caches, base.EnableMetrics)
userAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, nil) userAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, nil)
// this starts the JetStream consumers // this starts the JetStream consumers

View file

@ -31,7 +31,7 @@ func TestJoinRoomByIDOrAlias(t *testing.T) {
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics) caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
natsInstance := jetstream.NATSInstance{} natsInstance := jetstream.NATSInstance{}
rsAPI := roomserver.NewInternalAPI(base, &natsInstance, caches) rsAPI := roomserver.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, caches, base.EnableMetrics)
userAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, nil) userAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, nil)
asAPI := appservice.NewInternalAPI(base.ProcessContext, base.Cfg, &natsInstance, userAPI, rsAPI) asAPI := appservice.NewInternalAPI(base.ProcessContext, base.Cfg, &natsInstance, userAPI, rsAPI)
rsAPI.SetFederationAPI(nil, nil) // creates the rs.Inputer etc rsAPI.SetFederationAPI(nil, nil) // creates the rs.Inputer etc

View file

@ -40,7 +40,7 @@ func TestLogin(t *testing.T) {
}) })
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics) caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
rsAPI := roomserver.NewInternalAPI(base, &natsInstance, caches) rsAPI := roomserver.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, caches, base.EnableMetrics)
// Needed for /login // Needed for /login
userAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, nil) userAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, nil)

View file

@ -411,7 +411,7 @@ func Test_register(t *testing.T) {
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics) caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
natsInstance := jetstream.NATSInstance{} natsInstance := jetstream.NATSInstance{}
rsAPI := roomserver.NewInternalAPI(base, &natsInstance, caches) rsAPI := roomserver.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, caches, base.EnableMetrics)
userAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, nil) userAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, nil)
for _, tc := range testCases { for _, tc := range testCases {
@ -584,7 +584,7 @@ func TestRegisterUserWithDisplayName(t *testing.T) {
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics) caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
natsInstance := jetstream.NATSInstance{} natsInstance := jetstream.NATSInstance{}
rsAPI := roomserver.NewInternalAPI(base, &natsInstance, caches) rsAPI := roomserver.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, caches, base.EnableMetrics)
userAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, nil) userAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, nil)
deviceName, deviceID := "deviceName", "deviceID" deviceName, deviceID := "deviceName", "deviceID"
expectedDisplayName := "DisplayName" expectedDisplayName := "DisplayName"
@ -624,7 +624,7 @@ func TestRegisterAdminUsingSharedSecret(t *testing.T) {
sharedSecret := "dendritetest" sharedSecret := "dendritetest"
base.Cfg.ClientAPI.RegistrationSharedSecret = sharedSecret base.Cfg.ClientAPI.RegistrationSharedSecret = sharedSecret
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics) caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
rsAPI := roomserver.NewInternalAPI(base, &natsInstance, caches) rsAPI := roomserver.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, caches, base.EnableMetrics)
userAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, nil) userAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, nil)
expectedDisplayName := "rabbit" expectedDisplayName := "rabbit"

View file

@ -137,7 +137,7 @@ func (p *P2PMonolith) SetupDendrite(cfg *config.Dendrite, port int, enableRelayi
caches := caching.NewRistrettoCache(cfg.Global.Cache.EstimatedMaxSize, cfg.Global.Cache.MaxAge, enableMetrics) caches := caching.NewRistrettoCache(cfg.Global.Cache.EstimatedMaxSize, cfg.Global.Cache.MaxAge, enableMetrics)
natsInstance := jetstream.NATSInstance{} natsInstance := jetstream.NATSInstance{}
rsAPI := roomserver.NewInternalAPI(p.BaseDendrite, &natsInstance, caches) rsAPI := roomserver.NewInternalAPI(p.BaseDendrite.ProcessContext, p.BaseDendrite.Cfg, p.BaseDendrite.ConnectionManager, &natsInstance, caches, p.BaseDendrite.EnableMetrics)
fsAPI := federationapi.NewInternalAPI( fsAPI := federationapi.NewInternalAPI(
p.BaseDendrite.ProcessContext, p.BaseDendrite.Cfg, p.BaseDendrite.ConnectionManager, &natsInstance, federation, rsAPI, caches, keyRing, true, p.BaseDendrite.ProcessContext, p.BaseDendrite.Cfg, p.BaseDendrite.ConnectionManager, &natsInstance, federation, rsAPI, caches, keyRing, true,
) )

View file

@ -160,7 +160,7 @@ func main() {
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.EnableMetrics) caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.EnableMetrics)
natsInstance := jetstream.NATSInstance{} natsInstance := jetstream.NATSInstance{}
rsAPI := roomserver.NewInternalAPI(base, &natsInstance, caches) rsAPI := roomserver.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, caches, base.EnableMetrics)
userAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, federation) userAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, federation)

View file

@ -76,7 +76,7 @@ func main() {
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.EnableMetrics) caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.EnableMetrics)
natsInstance := jetstream.NATSInstance{} natsInstance := jetstream.NATSInstance{}
rsAPI := roomserver.NewInternalAPI(base, &natsInstance, caches) rsAPI := roomserver.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, caches, base.EnableMetrics)
fsAPI := federationapi.NewInternalAPI( fsAPI := federationapi.NewInternalAPI(
base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, federation, rsAPI, caches, nil, false, base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, federation, rsAPI, caches, nil, false,
) )

View file

@ -117,7 +117,7 @@ func (r *RoomserverInternalAPI) RemoveRoomAlias(
request *api.RemoveRoomAliasRequest, request *api.RemoveRoomAliasRequest,
response *api.RemoveRoomAliasResponse, response *api.RemoveRoomAliasResponse,
) error { ) error {
_, virtualHost, err := r.Cfg.Matrix.SplitLocalID('@', request.UserID) _, virtualHost, err := r.Cfg.Global.SplitLocalID('@', request.UserID)
if err != nil { if err != nil {
return err return err
} }
@ -175,12 +175,12 @@ func (r *RoomserverInternalAPI) RemoveRoomAlias(
sender = ev.Sender() sender = ev.Sender()
} }
_, senderDomain, err := r.Cfg.Matrix.SplitLocalID('@', sender) _, senderDomain, err := r.Cfg.Global.SplitLocalID('@', sender)
if err != nil { if err != nil {
return err return err
} }
identity, err := r.Cfg.Matrix.SigningIdentityFor(senderDomain) identity, err := r.Cfg.Global.SigningIdentityFor(senderDomain)
if err != nil { if err != nil {
return err return err
} }
@ -206,7 +206,7 @@ func (r *RoomserverInternalAPI) RemoveRoomAlias(
return err return err
} }
newEvent, err := eventutil.BuildEvent(ctx, builder, r.Cfg.Matrix, identity, time.Now(), &eventsNeeded, stateRes) newEvent, err := eventutil.BuildEvent(ctx, builder, &r.Cfg.Global, identity, time.Now(), &eventsNeeded, stateRes)
if err != nil { if err != nil {
return err return err
} }

View file

@ -43,7 +43,7 @@ type RoomserverInternalAPI struct {
ProcessContext *process.ProcessContext ProcessContext *process.ProcessContext
Base *base.BaseDendrite Base *base.BaseDendrite
DB storage.Database DB storage.Database
Cfg *config.RoomServer Cfg *config.Dendrite
Cache caching.RoomServerCaches Cache caching.RoomServerCaches
ServerName gomatrixserverlib.ServerName ServerName gomatrixserverlib.ServerName
KeyRing gomatrixserverlib.JSONVerifier KeyRing gomatrixserverlib.JSONVerifier
@ -56,43 +56,44 @@ type RoomserverInternalAPI struct {
InputRoomEventTopic string // JetStream topic for new input room events InputRoomEventTopic string // JetStream topic for new input room events
OutputProducer *producers.RoomEventProducer OutputProducer *producers.RoomEventProducer
PerspectiveServerNames []gomatrixserverlib.ServerName PerspectiveServerNames []gomatrixserverlib.ServerName
enableMetrics bool
} }
func NewRoomserverAPI( func NewRoomserverAPI(
base *base.BaseDendrite, roomserverDB storage.Database, processContext *process.ProcessContext, dendriteCfg *config.Dendrite, roomserverDB storage.Database,
js nats.JetStreamContext, nc *nats.Conn, caches caching.RoomServerCaches, js nats.JetStreamContext, nc *nats.Conn, caches caching.RoomServerCaches, enableMetrics bool,
) *RoomserverInternalAPI { ) *RoomserverInternalAPI {
var perspectiveServerNames []gomatrixserverlib.ServerName var perspectiveServerNames []gomatrixserverlib.ServerName
for _, kp := range base.Cfg.FederationAPI.KeyPerspectives { for _, kp := range dendriteCfg.FederationAPI.KeyPerspectives {
perspectiveServerNames = append(perspectiveServerNames, kp.ServerName) perspectiveServerNames = append(perspectiveServerNames, kp.ServerName)
} }
serverACLs := acls.NewServerACLs(roomserverDB) serverACLs := acls.NewServerACLs(roomserverDB)
producer := &producers.RoomEventProducer{ producer := &producers.RoomEventProducer{
Topic: string(base.Cfg.Global.JetStream.Prefixed(jetstream.OutputRoomEvent)), Topic: string(dendriteCfg.Global.JetStream.Prefixed(jetstream.OutputRoomEvent)),
JetStream: js, JetStream: js,
ACLs: serverACLs, ACLs: serverACLs,
} }
a := &RoomserverInternalAPI{ a := &RoomserverInternalAPI{
ProcessContext: base.ProcessContext, ProcessContext: processContext,
DB: roomserverDB, DB: roomserverDB,
Base: base, Cfg: dendriteCfg,
Cfg: &base.Cfg.RoomServer,
Cache: caches, Cache: caches,
ServerName: base.Cfg.Global.ServerName, ServerName: dendriteCfg.Global.ServerName,
PerspectiveServerNames: perspectiveServerNames, PerspectiveServerNames: perspectiveServerNames,
InputRoomEventTopic: base.Cfg.Global.JetStream.Prefixed(jetstream.InputRoomEvent), InputRoomEventTopic: dendriteCfg.Global.JetStream.Prefixed(jetstream.InputRoomEvent),
OutputProducer: producer, OutputProducer: producer,
JetStream: js, JetStream: js,
NATSClient: nc, NATSClient: nc,
Durable: base.Cfg.Global.JetStream.Durable("RoomserverInputConsumer"), Durable: dendriteCfg.Global.JetStream.Durable("RoomserverInputConsumer"),
ServerACLs: serverACLs, ServerACLs: serverACLs,
Queryer: &query.Queryer{ Queryer: &query.Queryer{
DB: roomserverDB, DB: roomserverDB,
Cache: caches, Cache: caches,
IsLocalServerName: base.Cfg.Global.IsLocalServerName, IsLocalServerName: dendriteCfg.Global.IsLocalServerName,
ServerACLs: serverACLs, ServerACLs: serverACLs,
}, },
enableMetrics: enableMetrics,
// perform-er structs get initialised when we have a federation sender to use // perform-er structs get initialised when we have a federation sender to use
} }
return a return a
@ -105,15 +106,14 @@ func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.RoomserverFederatio
r.fsAPI = fsAPI r.fsAPI = fsAPI
r.KeyRing = keyRing r.KeyRing = keyRing
identity, err := r.Cfg.Matrix.SigningIdentityFor(r.ServerName) identity, err := r.Cfg.Global.SigningIdentityFor(r.ServerName)
if err != nil { if err != nil {
logrus.Panic(err) logrus.Panic(err)
} }
r.Inputer = &input.Inputer{ r.Inputer = &input.Inputer{
Cfg: &r.Base.Cfg.RoomServer, Cfg: &r.Cfg.RoomServer,
Base: r.Base, ProcessContext: r.ProcessContext,
ProcessContext: r.Base.ProcessContext,
DB: r.DB, DB: r.DB,
InputRoomEventTopic: r.InputRoomEventTopic, InputRoomEventTopic: r.InputRoomEventTopic,
OutputProducer: r.OutputProducer, OutputProducer: r.OutputProducer,
@ -129,12 +129,12 @@ func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.RoomserverFederatio
} }
r.Inviter = &perform.Inviter{ r.Inviter = &perform.Inviter{
DB: r.DB, DB: r.DB,
Cfg: r.Cfg, Cfg: &r.Cfg.RoomServer,
FSAPI: r.fsAPI, FSAPI: r.fsAPI,
Inputer: r.Inputer, Inputer: r.Inputer,
} }
r.Joiner = &perform.Joiner{ r.Joiner = &perform.Joiner{
Cfg: r.Cfg, Cfg: &r.Cfg.RoomServer,
DB: r.DB, DB: r.DB,
FSAPI: r.fsAPI, FSAPI: r.fsAPI,
RSAPI: r, RSAPI: r,
@ -143,7 +143,7 @@ func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.RoomserverFederatio
} }
r.Peeker = &perform.Peeker{ r.Peeker = &perform.Peeker{
ServerName: r.ServerName, ServerName: r.ServerName,
Cfg: r.Cfg, Cfg: &r.Cfg.RoomServer,
DB: r.DB, DB: r.DB,
FSAPI: r.fsAPI, FSAPI: r.fsAPI,
Inputer: r.Inputer, Inputer: r.Inputer,
@ -154,12 +154,12 @@ func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.RoomserverFederatio
} }
r.Unpeeker = &perform.Unpeeker{ r.Unpeeker = &perform.Unpeeker{
ServerName: r.ServerName, ServerName: r.ServerName,
Cfg: r.Cfg, Cfg: &r.Cfg.RoomServer,
FSAPI: r.fsAPI, FSAPI: r.fsAPI,
Inputer: r.Inputer, Inputer: r.Inputer,
} }
r.Leaver = &perform.Leaver{ r.Leaver = &perform.Leaver{
Cfg: r.Cfg, Cfg: &r.Cfg.RoomServer,
DB: r.DB, DB: r.DB,
FSAPI: r.fsAPI, FSAPI: r.fsAPI,
Inputer: r.Inputer, Inputer: r.Inputer,
@ -168,7 +168,7 @@ func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.RoomserverFederatio
DB: r.DB, DB: r.DB,
} }
r.Backfiller = &perform.Backfiller{ r.Backfiller = &perform.Backfiller{
IsLocalServerName: r.Cfg.Matrix.IsLocalServerName, IsLocalServerName: r.Cfg.Global.IsLocalServerName,
DB: r.DB, DB: r.DB,
FSAPI: r.fsAPI, FSAPI: r.fsAPI,
KeyRing: r.KeyRing, KeyRing: r.KeyRing,
@ -181,12 +181,12 @@ func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.RoomserverFederatio
DB: r.DB, DB: r.DB,
} }
r.Upgrader = &perform.Upgrader{ r.Upgrader = &perform.Upgrader{
Cfg: r.Cfg, Cfg: &r.Cfg.RoomServer,
URSAPI: r, URSAPI: r,
} }
r.Admin = &perform.Admin{ r.Admin = &perform.Admin{
DB: r.DB, DB: r.DB,
Cfg: r.Cfg, Cfg: &r.Cfg.RoomServer,
Inputer: r.Inputer, Inputer: r.Inputer,
Queryer: r.Queryer, Queryer: r.Queryer,
Leaver: r.Leaver, Leaver: r.Leaver,

View file

@ -39,7 +39,6 @@ import (
"github.com/matrix-org/dendrite/roomserver/producers" "github.com/matrix-org/dendrite/roomserver/producers"
"github.com/matrix-org/dendrite/roomserver/storage" "github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/setup/base"
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/dendrite/setup/process"
@ -74,7 +73,6 @@ import (
// or C. // or C.
type Inputer struct { type Inputer struct {
Cfg *config.RoomServer Cfg *config.RoomServer
Base *base.BaseDendrite
ProcessContext *process.ProcessContext ProcessContext *process.ProcessContext
DB storage.RoomDatabase DB storage.RoomDatabase
NATSClient *nats.Conn NATSClient *nats.Conn
@ -89,8 +87,9 @@ type Inputer struct {
OutputProducer *producers.RoomEventProducer OutputProducer *producers.RoomEventProducer
workers sync.Map // room ID -> *worker workers sync.Map // room ID -> *worker
Queryer *query.Queryer Queryer *query.Queryer
UserAPI userapi.RoomserverUserAPI UserAPI userapi.RoomserverUserAPI
enableMetrics bool
} }
// If a room consumer is inactive for a while then we will allow NATS // If a room consumer is inactive for a while then we will allow NATS
@ -177,7 +176,7 @@ func (r *Inputer) startWorkerForRoom(roomID string) {
// will look to see if we have a worker for that room which has its // will look to see if we have a worker for that room which has its
// own consumer. If we don't, we'll start one. // own consumer. If we don't, we'll start one.
func (r *Inputer) Start() error { func (r *Inputer) Start() error {
if r.Base.EnableMetrics { if r.enableMetrics {
prometheus.MustRegister(roomserverInputBackpressure, processRoomEventDuration) prometheus.MustRegister(roomserverInputBackpressure, processRoomEventDuration)
} }
_, err := r.JetStream.Subscribe( _, err := r.JetStream.Subscribe(

View file

@ -16,31 +16,34 @@ package roomserver
import ( import (
"github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/internal" "github.com/matrix-org/dendrite/roomserver/internal"
"github.com/matrix-org/dendrite/roomserver/storage" "github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/dendrite/setup/base"
) )
// NewInternalAPI returns a concrete implementation of the internal API. // NewInternalAPI returns a concrete implementation of the internal API.
func NewInternalAPI( func NewInternalAPI(
base *base.BaseDendrite, processContext *process.ProcessContext,
cfg *config.Dendrite,
cm sqlutil.Connections,
natsInstance *jetstream.NATSInstance, natsInstance *jetstream.NATSInstance,
caches caching.RoomServerCaches, caches caching.RoomServerCaches,
enableMetrics bool,
) api.RoomserverInternalAPI { ) api.RoomserverInternalAPI {
cfg := &base.Cfg.RoomServer roomserverDB, err := storage.Open(processContext.Context(), cm, &cfg.RoomServer.Database, caches)
roomserverDB, err := storage.Open(base.ProcessContext.Context(), base.ConnectionManager, &cfg.Database, caches)
if err != nil { if err != nil {
logrus.WithError(err).Panicf("failed to connect to room server db") logrus.WithError(err).Panicf("failed to connect to room server db")
} }
js, nc := natsInstance.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) js, nc := natsInstance.Prepare(processContext, &cfg.Global.JetStream)
return internal.NewRoomserverAPI( return internal.NewRoomserverAPI(
base, roomserverDB, js, nc, caches, processContext, cfg, roomserverDB, js, nc, caches, enableMetrics,
) )
} }

View file

@ -47,7 +47,7 @@ func TestUsers(t *testing.T) {
defer close() defer close()
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics) caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
natsInstance := jetstream.NATSInstance{} natsInstance := jetstream.NATSInstance{}
rsAPI := roomserver.NewInternalAPI(base, &natsInstance, caches) rsAPI := roomserver.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, caches, base.EnableMetrics)
// SetFederationAPI starts the room event input consumer // SetFederationAPI starts the room event input consumer
rsAPI.SetFederationAPI(nil, nil) rsAPI.SetFederationAPI(nil, nil)
@ -187,7 +187,7 @@ func Test_QueryLeftUsers(t *testing.T) {
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics) caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
natsInstance := jetstream.NATSInstance{} natsInstance := jetstream.NATSInstance{}
rsAPI := roomserver.NewInternalAPI(base, &natsInstance, caches) rsAPI := roomserver.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, caches, base.EnableMetrics)
// SetFederationAPI starts the room event input consumer // SetFederationAPI starts the room event input consumer
rsAPI.SetFederationAPI(nil, nil) rsAPI.SetFederationAPI(nil, nil)
// Create the room // Create the room
@ -240,7 +240,7 @@ func TestPurgeRoom(t *testing.T) {
fedClient := base.CreateFederationClient() fedClient := base.CreateFederationClient()
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics) caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
rsAPI := roomserver.NewInternalAPI(base, &natsInstance, caches) rsAPI := roomserver.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, caches, base.EnableMetrics)
userAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, nil) userAPI := userapi.NewInternalAPI(base, &natsInstance, rsAPI, nil)
// this starts the JetStream consumers // this starts the JetStream consumers

View file

@ -426,7 +426,7 @@ func testHistoryVisibility(t *testing.T, dbType test.DBType) {
// Use the actual internal roomserver API // Use the actual internal roomserver API
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics) caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
rsAPI := roomserver.NewInternalAPI(base, &natsInstance, caches) rsAPI := roomserver.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, caches, base.EnableMetrics)
rsAPI.SetFederationAPI(nil, nil) rsAPI.SetFederationAPI(nil, nil)
AddPublicRoutes(base, &natsInstance, &syncUserAPI{accounts: []userapi.Device{aliceDev, bobDev}}, rsAPI, caches) AddPublicRoutes(base, &natsInstance, &syncUserAPI{accounts: []userapi.Device{aliceDev, bobDev}}, rsAPI, caches)
@ -726,7 +726,7 @@ func TestGetMembership(t *testing.T) {
// Use an actual roomserver for this // Use an actual roomserver for this
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics) caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
rsAPI := roomserver.NewInternalAPI(base, &natsInstance, caches) rsAPI := roomserver.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, caches, base.EnableMetrics)
rsAPI.SetFederationAPI(nil, nil) rsAPI.SetFederationAPI(nil, nil)
AddPublicRoutes(base, &natsInstance, &syncUserAPI{accounts: []userapi.Device{aliceDev, bobDev}}, rsAPI, caches) AddPublicRoutes(base, &natsInstance, &syncUserAPI{accounts: []userapi.Device{aliceDev, bobDev}}, rsAPI, caches)
@ -1015,7 +1015,7 @@ func testContext(t *testing.T, dbType test.DBType) {
// Use an actual roomserver for this // Use an actual roomserver for this
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics) caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
natsInstance := jetstream.NATSInstance{} natsInstance := jetstream.NATSInstance{}
rsAPI := roomserver.NewInternalAPI(base, &natsInstance, caches) rsAPI := roomserver.NewInternalAPI(base.ProcessContext, base.Cfg, base.ConnectionManager, &natsInstance, caches, base.EnableMetrics)
rsAPI.SetFederationAPI(nil, nil) rsAPI.SetFederationAPI(nil, nil)
AddPublicRoutes(base, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, rsAPI, caches) AddPublicRoutes(base, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, rsAPI, caches)