Factor out how monolith routes get added (#1107)

Previously we had 3 monoliths:
 - dendrite-monolith-server
 - dendrite-demo-libp2p
 - dendritejs

which all had their own of setting up public routes. Factor this
out into a new `setup.Monolith` struct which gets all dependencies
set as fields. This is different to `basecomponent.Base` which
doesn't provide any way to set configured deps (e.g public rooms db)

Part of a larger process to clean up how we initialise Dendrite.
This commit is contained in:
Kegsay 2020-06-09 12:07:33 +01:00 committed by GitHub
parent 4f171c56a8
commit 85ac8a3f5b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 170 additions and 76 deletions

View file

@ -15,6 +15,7 @@
package clientapi package clientapi
import ( import (
"github.com/Shopify/sarama"
"github.com/gorilla/mux" "github.com/gorilla/mux"
appserviceAPI "github.com/matrix-org/dendrite/appservice/api" appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
@ -24,7 +25,7 @@ import (
"github.com/matrix-org/dendrite/clientapi/routing" "github.com/matrix-org/dendrite/clientapi/routing"
eduServerAPI "github.com/matrix-org/dendrite/eduserver/api" eduServerAPI "github.com/matrix-org/dendrite/eduserver/api"
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api" federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/internal/basecomponent" "github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/internal/transactions" "github.com/matrix-org/dendrite/internal/transactions"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
@ -34,7 +35,9 @@ import (
// AddPublicRoutes sets up and registers HTTP handlers for the ClientAPI component. // AddPublicRoutes sets up and registers HTTP handlers for the ClientAPI component.
func AddPublicRoutes( func AddPublicRoutes(
router *mux.Router, router *mux.Router,
base *basecomponent.BaseDendrite, cfg *config.Dendrite,
consumer sarama.Consumer,
producer sarama.SyncProducer,
deviceDB devices.Database, deviceDB devices.Database,
accountsDB accounts.Database, accountsDB accounts.Database,
federation *gomatrixserverlib.FederationClient, federation *gomatrixserverlib.FederationClient,
@ -49,24 +52,24 @@ func AddPublicRoutes(
eduProducer := producers.NewEDUServerProducer(eduInputAPI) eduProducer := producers.NewEDUServerProducer(eduInputAPI)
userUpdateProducer := &producers.UserUpdateProducer{ userUpdateProducer := &producers.UserUpdateProducer{
Producer: base.KafkaProducer, Producer: producer,
Topic: string(base.Cfg.Kafka.Topics.UserUpdates), Topic: string(cfg.Kafka.Topics.UserUpdates),
} }
syncProducer := &producers.SyncAPIProducer{ syncProducer := &producers.SyncAPIProducer{
Producer: base.KafkaProducer, Producer: producer,
Topic: string(base.Cfg.Kafka.Topics.OutputClientData), Topic: string(cfg.Kafka.Topics.OutputClientData),
} }
consumer := consumers.NewOutputRoomEventConsumer( roomEventConsumer := consumers.NewOutputRoomEventConsumer(
base.Cfg, base.KafkaConsumer, accountsDB, rsAPI, cfg, consumer, accountsDB, rsAPI,
) )
if err := consumer.Start(); err != nil { if err := roomEventConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start room server consumer") logrus.WithError(err).Panicf("failed to start room server consumer")
} }
routing.Setup( routing.Setup(
router, base.Cfg, roomserverProducer, rsAPI, asAPI, router, cfg, roomserverProducer, rsAPI, asAPI,
accountsDB, deviceDB, federation, *keyRing, userUpdateProducer, accountsDB, deviceDB, federation, *keyRing, userUpdateProducer,
syncProducer, eduProducer, transactionsCache, fsAPI, syncProducer, eduProducer, transactionsCache, fsAPI,
) )

View file

@ -39,7 +39,7 @@ func main() {
eduInputAPI := base.EDUServerClient() eduInputAPI := base.EDUServerClient()
clientapi.AddPublicRoutes( clientapi.AddPublicRoutes(
base.PublicAPIMux, base, deviceDB, accountDB, federation, keyRing, base.PublicAPIMux, base.Cfg, base.KafkaConsumer, base.KafkaProducer, deviceDB, accountDB, federation, keyRing,
rsAPI, eduInputAPI, asQuery, transactions.New(), fsAPI, rsAPI, eduInputAPI, asQuery, transactions.New(), fsAPI,
) )

View file

@ -29,20 +29,15 @@ import (
p2phttp "github.com/libp2p/go-libp2p-http" p2phttp "github.com/libp2p/go-libp2p-http"
p2pdisc "github.com/libp2p/go-libp2p/p2p/discovery" p2pdisc "github.com/libp2p/go-libp2p/p2p/discovery"
"github.com/matrix-org/dendrite/appservice" "github.com/matrix-org/dendrite/appservice"
"github.com/matrix-org/dendrite/clientapi"
"github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-libp2p/storage" "github.com/matrix-org/dendrite/cmd/dendrite-demo-libp2p/storage"
"github.com/matrix-org/dendrite/eduserver" "github.com/matrix-org/dendrite/eduserver"
"github.com/matrix-org/dendrite/federationapi"
"github.com/matrix-org/dendrite/federationsender" "github.com/matrix-org/dendrite/federationsender"
"github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/internal/transactions" "github.com/matrix-org/dendrite/internal/setup"
"github.com/matrix-org/dendrite/mediaapi"
"github.com/matrix-org/dendrite/publicroomsapi"
"github.com/matrix-org/dendrite/roomserver" "github.com/matrix-org/dendrite/roomserver"
"github.com/matrix-org/dendrite/serverkeyapi" "github.com/matrix-org/dendrite/serverkeyapi"
"github.com/matrix-org/dendrite/syncapi"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/dendrite/eduserver/cache" "github.com/matrix-org/dendrite/eduserver/cache"
@ -151,26 +146,35 @@ func main() {
&base.Base, cache.New(), deviceDB, &base.Base, cache.New(), deviceDB,
) )
asAPI := appservice.NewInternalAPI(&base.Base, accountDB, deviceDB, rsAPI) asAPI := appservice.NewInternalAPI(&base.Base, accountDB, deviceDB, rsAPI)
appservice.AddPublicRoutes(base.Base.PublicAPIMux, &cfg, rsAPI, accountDB, federation, transactions.New())
fsAPI := federationsender.NewInternalAPI( fsAPI := federationsender.NewInternalAPI(
&base.Base, federation, rsAPI, keyRing, &base.Base, federation, rsAPI, keyRing,
) )
rsAPI.SetFederationSenderAPI(fsAPI) rsAPI.SetFederationSenderAPI(fsAPI)
clientapi.AddPublicRoutes(
base.Base.PublicAPIMux, &base.Base, deviceDB, accountDB,
federation, keyRing, rsAPI,
eduInputAPI, asAPI, transactions.New(), fsAPI,
)
eduProducer := producers.NewEDUServerProducer(eduInputAPI) eduProducer := producers.NewEDUServerProducer(eduInputAPI)
federationapi.AddPublicRoutes(base.Base.PublicAPIMux, base.Base.Cfg, accountDB, deviceDB, federation, keyRing, rsAPI, asAPI, fsAPI, eduProducer)
mediaapi.AddPublicRoutes(base.Base.PublicAPIMux, base.Base.Cfg, deviceDB)
publicRoomsDB, err := storage.NewPublicRoomsServerDatabaseWithPubSub(string(base.Base.Cfg.Database.PublicRoomsAPI), base.LibP2PPubsub, cfg.Matrix.ServerName) publicRoomsDB, err := storage.NewPublicRoomsServerDatabaseWithPubSub(string(base.Base.Cfg.Database.PublicRoomsAPI), base.LibP2PPubsub, cfg.Matrix.ServerName)
if err != nil { if err != nil {
logrus.WithError(err).Panicf("failed to connect to public rooms db") logrus.WithError(err).Panicf("failed to connect to public rooms db")
} }
publicroomsapi.AddPublicRoutes(base.Base.PublicAPIMux, &base.Base, deviceDB, publicRoomsDB, rsAPI, federation, nil) // Check this later
syncapi.AddPublicRoutes(base.Base.PublicAPIMux, &base.Base, deviceDB, accountDB, rsAPI, federation, &cfg) monolith := setup.Monolith{
Config: base.Base.Cfg,
AccountDB: accountDB,
DeviceDB: deviceDB,
FedClient: federation,
KeyRing: keyRing,
KafkaConsumer: base.Base.KafkaConsumer,
KafkaProducer: base.Base.KafkaProducer,
AppserviceAPI: asAPI,
EDUInternalAPI: eduInputAPI,
EDUProducer: eduProducer,
FederationSenderAPI: fsAPI,
RoomserverAPI: rsAPI,
ServerKeyAPI: serverKeyAPI,
PublicRoomsDB: publicRoomsDB,
}
monolith.AddAllPublicRoutes(base.Base.PublicAPIMux)
internal.SetupHTTPAPI( internal.SetupHTTPAPI(
http.DefaultServeMux, http.DefaultServeMux,

View file

@ -19,23 +19,17 @@ import (
"net/http" "net/http"
"github.com/matrix-org/dendrite/appservice" "github.com/matrix-org/dendrite/appservice"
"github.com/matrix-org/dendrite/clientapi"
"github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/eduserver" "github.com/matrix-org/dendrite/eduserver"
"github.com/matrix-org/dendrite/eduserver/cache" "github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/dendrite/federationapi"
"github.com/matrix-org/dendrite/federationsender" "github.com/matrix-org/dendrite/federationsender"
"github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/basecomponent" "github.com/matrix-org/dendrite/internal/basecomponent"
"github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/internal/transactions" "github.com/matrix-org/dendrite/internal/setup"
"github.com/matrix-org/dendrite/keyserver"
"github.com/matrix-org/dendrite/mediaapi"
"github.com/matrix-org/dendrite/publicroomsapi"
"github.com/matrix-org/dendrite/publicroomsapi/storage" "github.com/matrix-org/dendrite/publicroomsapi/storage"
"github.com/matrix-org/dendrite/roomserver" "github.com/matrix-org/dendrite/roomserver"
"github.com/matrix-org/dendrite/serverkeyapi" "github.com/matrix-org/dendrite/serverkeyapi"
"github.com/matrix-org/dendrite/syncapi"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
@ -97,7 +91,6 @@ func main() {
} }
asAPI := appservice.NewInternalAPI(base, accountDB, deviceDB, rsAPI) asAPI := appservice.NewInternalAPI(base, accountDB, deviceDB, rsAPI)
appservice.AddPublicRoutes(base.PublicAPIMux, cfg, rsAPI, accountDB, federation, transactions.New())
if base.UseHTTPAPIs { if base.UseHTTPAPIs {
appservice.AddInternalRoutes(base.InternalAPIMux, asAPI) appservice.AddInternalRoutes(base.InternalAPIMux, asAPI)
asAPI = base.AppserviceHTTPClient() asAPI = base.AppserviceHTTPClient()
@ -112,24 +105,31 @@ func main() {
} }
rsComponent.SetFederationSenderAPI(fsAPI) rsComponent.SetFederationSenderAPI(fsAPI)
clientapi.AddPublicRoutes(
base.PublicAPIMux, base, deviceDB, accountDB,
federation, keyRing, rsAPI,
eduInputAPI, asAPI, transactions.New(), fsAPI,
)
keyserver.AddPublicRoutes(
base.PublicAPIMux, base.Cfg, deviceDB, accountDB,
)
eduProducer := producers.NewEDUServerProducer(eduInputAPI) eduProducer := producers.NewEDUServerProducer(eduInputAPI)
federationapi.AddPublicRoutes(base.PublicAPIMux, base.Cfg, accountDB, deviceDB, federation, keyRing, rsAPI, asAPI, fsAPI, eduProducer)
mediaapi.AddPublicRoutes(base.PublicAPIMux, base.Cfg, deviceDB)
publicRoomsDB, err := storage.NewPublicRoomsServerDatabase(string(base.Cfg.Database.PublicRoomsAPI), base.Cfg.DbProperties(), cfg.Matrix.ServerName) publicRoomsDB, err := storage.NewPublicRoomsServerDatabase(string(base.Cfg.Database.PublicRoomsAPI), base.Cfg.DbProperties(), cfg.Matrix.ServerName)
if err != nil { if err != nil {
logrus.WithError(err).Panicf("failed to connect to public rooms db") logrus.WithError(err).Panicf("failed to connect to public rooms db")
} }
publicroomsapi.AddPublicRoutes(base.PublicAPIMux, base, deviceDB, publicRoomsDB, rsAPI, federation, nil)
syncapi.AddPublicRoutes(base.PublicAPIMux, base, deviceDB, accountDB, rsAPI, federation, cfg) monolith := setup.Monolith{
Config: base.Cfg,
AccountDB: accountDB,
DeviceDB: deviceDB,
FedClient: federation,
KeyRing: keyRing,
KafkaConsumer: base.KafkaConsumer,
KafkaProducer: base.KafkaProducer,
AppserviceAPI: asAPI,
EDUInternalAPI: eduInputAPI,
EDUProducer: eduProducer,
FederationSenderAPI: fsAPI,
RoomserverAPI: rsAPI,
ServerKeyAPI: serverKeyAPI,
PublicRoomsDB: publicRoomsDB,
}
monolith.AddAllPublicRoutes(base.PublicAPIMux)
internal.SetupHTTPAPI( internal.SetupHTTPAPI(
http.DefaultServeMux, http.DefaultServeMux,

View file

@ -34,7 +34,7 @@ func main() {
if err != nil { if err != nil {
logrus.WithError(err).Panicf("failed to connect to public rooms db") logrus.WithError(err).Panicf("failed to connect to public rooms db")
} }
publicroomsapi.AddPublicRoutes(base.PublicAPIMux, base, deviceDB, publicRoomsDB, rsAPI, nil, nil) publicroomsapi.AddPublicRoutes(base.PublicAPIMux, base.Cfg, base.KafkaConsumer, deviceDB, publicRoomsDB, rsAPI, nil, nil)
base.SetupAndServeHTTP(string(base.Cfg.Bind.PublicRoomsAPI), string(base.Cfg.Listen.PublicRoomsAPI)) base.SetupAndServeHTTP(string(base.Cfg.Bind.PublicRoomsAPI), string(base.Cfg.Listen.PublicRoomsAPI))

View file

@ -30,7 +30,7 @@ func main() {
rsAPI := base.RoomserverHTTPClient() rsAPI := base.RoomserverHTTPClient()
syncapi.AddPublicRoutes(base.PublicAPIMux, base, deviceDB, accountDB, rsAPI, federation, cfg) syncapi.AddPublicRoutes(base.PublicAPIMux, base.KafkaConsumer, deviceDB, accountDB, rsAPI, federation, cfg)
base.SetupAndServeHTTP(string(base.Cfg.Bind.SyncAPI), string(base.Cfg.Listen.SyncAPI)) base.SetupAndServeHTTP(string(base.Cfg.Bind.SyncAPI), string(base.Cfg.Listen.SyncAPI))

View file

@ -23,21 +23,16 @@ import (
"syscall/js" "syscall/js"
"github.com/matrix-org/dendrite/appservice" "github.com/matrix-org/dendrite/appservice"
"github.com/matrix-org/dendrite/clientapi"
"github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/eduserver" "github.com/matrix-org/dendrite/eduserver"
"github.com/matrix-org/dendrite/eduserver/cache" "github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/dendrite/federationapi"
"github.com/matrix-org/dendrite/federationsender" "github.com/matrix-org/dendrite/federationsender"
"github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/basecomponent" "github.com/matrix-org/dendrite/internal/basecomponent"
"github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/internal/transactions" "github.com/matrix-org/dendrite/internal/setup"
"github.com/matrix-org/dendrite/mediaapi"
"github.com/matrix-org/dendrite/publicroomsapi"
"github.com/matrix-org/dendrite/publicroomsapi/storage" "github.com/matrix-org/dendrite/publicroomsapi/storage"
"github.com/matrix-org/dendrite/roomserver" "github.com/matrix-org/dendrite/roomserver"
"github.com/matrix-org/dendrite/syncapi"
go_http_js_libp2p "github.com/matrix-org/go-http-js-libp2p" go_http_js_libp2p "github.com/matrix-org/go-http-js-libp2p"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
@ -215,20 +210,32 @@ func main() {
rsAPI.SetFederationSenderAPI(fedSenderAPI) rsAPI.SetFederationSenderAPI(fedSenderAPI)
p2pPublicRoomProvider := NewLibP2PPublicRoomsProvider(node, fedSenderAPI) p2pPublicRoomProvider := NewLibP2PPublicRoomsProvider(node, fedSenderAPI)
clientapi.AddPublicRoutes(
base.PublicAPIMux, base, deviceDB, accountDB,
federation, &keyRing, rsAPI,
eduInputAPI, asQuery, transactions.New(), fedSenderAPI,
)
eduProducer := producers.NewEDUServerProducer(eduInputAPI) eduProducer := producers.NewEDUServerProducer(eduInputAPI)
federationapi.AddPublicRoutes(base.PublicAPIMux, base.Cfg, accountDB, deviceDB, federation, &keyRing, rsAPI, asQuery, fedSenderAPI, eduProducer)
mediaapi.AddPublicRoutes(base.PublicAPIMux, base.Cfg, deviceDB)
publicRoomsDB, err := storage.NewPublicRoomsServerDatabase(string(base.Cfg.Database.PublicRoomsAPI), cfg.Matrix.ServerName) publicRoomsDB, err := storage.NewPublicRoomsServerDatabase(string(base.Cfg.Database.PublicRoomsAPI), cfg.Matrix.ServerName)
if err != nil { if err != nil {
logrus.WithError(err).Panicf("failed to connect to public rooms db") logrus.WithError(err).Panicf("failed to connect to public rooms db")
} }
publicroomsapi.AddPublicRoutes(base.PublicAPIMux, base, deviceDB, publicRoomsDB, rsAPI, federation, p2pPublicRoomProvider)
syncapi.AddPublicRoutes(base.PublicAPIMux, base, deviceDB, accountDB, rsAPI, federation, cfg) monolith := setup.Monolith{
Config: base.Cfg,
AccountDB: accountDB,
DeviceDB: deviceDB,
FedClient: federation,
KeyRing: &keyRing,
KafkaConsumer: base.KafkaConsumer,
KafkaProducer: base.KafkaProducer,
AppserviceAPI: asQuery,
EDUInternalAPI: eduInputAPI,
EDUProducer: eduProducer,
FederationSenderAPI: fedSenderAPI,
RoomserverAPI: rsAPI,
//ServerKeyAPI: serverKeyAPI,
PublicRoomsDB: publicRoomsDB,
ExtPublicRoomsProvider: p2pPublicRoomProvider,
}
monolith.AddAllPublicRoutes(base.PublicAPIMux)
internal.SetupHTTPAPI( internal.SetupHTTPAPI(
http.DefaultServeMux, http.DefaultServeMux,

View file

@ -0,0 +1,78 @@
package setup
import (
"github.com/Shopify/sarama"
"github.com/gorilla/mux"
appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
"github.com/matrix-org/dendrite/clientapi"
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
"github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
"github.com/matrix-org/dendrite/clientapi/producers"
eduServerAPI "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/federationapi"
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/internal/transactions"
"github.com/matrix-org/dendrite/keyserver"
"github.com/matrix-org/dendrite/mediaapi"
"github.com/matrix-org/dendrite/publicroomsapi"
"github.com/matrix-org/dendrite/publicroomsapi/storage"
"github.com/matrix-org/dendrite/publicroomsapi/types"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
serverKeyAPI "github.com/matrix-org/dendrite/serverkeyapi/api"
"github.com/matrix-org/dendrite/syncapi"
"github.com/matrix-org/gomatrixserverlib"
)
// Monolith represents an instantiation of all dependencies required to build
// all components of Dendrite, for use in monolith mode.
type Monolith struct {
Config *config.Dendrite
DeviceDB devices.Database
AccountDB accounts.Database
KeyRing *gomatrixserverlib.KeyRing
FedClient *gomatrixserverlib.FederationClient
KafkaConsumer sarama.Consumer
KafkaProducer sarama.SyncProducer
AppserviceAPI appserviceAPI.AppServiceQueryAPI
EDUInternalAPI eduServerAPI.EDUServerInputAPI
FederationSenderAPI federationSenderAPI.FederationSenderInternalAPI
RoomserverAPI roomserverAPI.RoomserverInternalAPI
ServerKeyAPI serverKeyAPI.ServerKeyInternalAPI
// TODO: remove, this isn't even a producer
EDUProducer *producers.EDUServerProducer
// TODO: can we remove this? It's weird that we are required the database
// yet every other component can do that on its own. libp2p-demo uses a custom
// database though annoyingly.
PublicRoomsDB storage.Database
// Optional
ExtPublicRoomsProvider types.ExternalPublicRoomsProvider
}
// AddAllPublicRoutes attaches all public paths to the given router
func (m *Monolith) AddAllPublicRoutes(publicMux *mux.Router) {
clientapi.AddPublicRoutes(
publicMux, m.Config, m.KafkaConsumer, m.KafkaProducer, m.DeviceDB, m.AccountDB,
m.FedClient, m.KeyRing, m.RoomserverAPI,
m.EDUInternalAPI, m.AppserviceAPI, transactions.New(),
m.FederationSenderAPI,
)
keyserver.AddPublicRoutes(publicMux, m.Config, m.DeviceDB, m.AccountDB)
federationapi.AddPublicRoutes(
publicMux, m.Config, m.AccountDB, m.DeviceDB, m.FedClient,
m.KeyRing, m.RoomserverAPI, m.AppserviceAPI, m.FederationSenderAPI,
m.EDUProducer,
)
mediaapi.AddPublicRoutes(publicMux, m.Config, m.DeviceDB)
publicroomsapi.AddPublicRoutes(
publicMux, m.Config, m.KafkaConsumer, m.DeviceDB, m.PublicRoomsDB, m.RoomserverAPI, m.FedClient,
m.ExtPublicRoomsProvider,
)
syncapi.AddPublicRoutes(
publicMux, m.KafkaConsumer, m.DeviceDB, m.AccountDB, m.RoomserverAPI, m.FedClient, m.Config,
)
}

View file

@ -15,9 +15,10 @@
package publicroomsapi package publicroomsapi
import ( import (
"github.com/Shopify/sarama"
"github.com/gorilla/mux" "github.com/gorilla/mux"
"github.com/matrix-org/dendrite/clientapi/auth/storage/devices" "github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
"github.com/matrix-org/dendrite/internal/basecomponent" "github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/publicroomsapi/consumers" "github.com/matrix-org/dendrite/publicroomsapi/consumers"
"github.com/matrix-org/dendrite/publicroomsapi/routing" "github.com/matrix-org/dendrite/publicroomsapi/routing"
"github.com/matrix-org/dendrite/publicroomsapi/storage" "github.com/matrix-org/dendrite/publicroomsapi/storage"
@ -31,7 +32,8 @@ import (
// component. // component.
func AddPublicRoutes( func AddPublicRoutes(
router *mux.Router, router *mux.Router,
base *basecomponent.BaseDendrite, cfg *config.Dendrite,
consumer sarama.Consumer,
deviceDB devices.Database, deviceDB devices.Database,
publicRoomsDB storage.Database, publicRoomsDB storage.Database,
rsAPI roomserverAPI.RoomserverInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI,
@ -39,7 +41,7 @@ func AddPublicRoutes(
extRoomsProvider types.ExternalPublicRoomsProvider, extRoomsProvider types.ExternalPublicRoomsProvider,
) { ) {
rsConsumer := consumers.NewOutputRoomEventConsumer( rsConsumer := consumers.NewOutputRoomEventConsumer(
base.Cfg, base.KafkaConsumer, publicRoomsDB, rsAPI, cfg, consumer, publicRoomsDB, rsAPI,
) )
if err := rsConsumer.Start(); err != nil { if err := rsConsumer.Start(); err != nil {
logrus.WithError(err).Panic("failed to start public rooms server consumer") logrus.WithError(err).Panic("failed to start public rooms server consumer")

View file

@ -17,11 +17,11 @@ package syncapi
import ( import (
"context" "context"
"github.com/Shopify/sarama"
"github.com/gorilla/mux" "github.com/gorilla/mux"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
"github.com/matrix-org/dendrite/internal/basecomponent"
"github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
@ -37,14 +37,14 @@ import (
// component. // component.
func AddPublicRoutes( func AddPublicRoutes(
router *mux.Router, router *mux.Router,
base *basecomponent.BaseDendrite, consumer sarama.Consumer,
deviceDB devices.Database, deviceDB devices.Database,
accountsDB accounts.Database, accountsDB accounts.Database,
rsAPI api.RoomserverInternalAPI, rsAPI api.RoomserverInternalAPI,
federation *gomatrixserverlib.FederationClient, federation *gomatrixserverlib.FederationClient,
cfg *config.Dendrite, cfg *config.Dendrite,
) { ) {
syncDB, err := storage.NewSyncServerDatasource(string(base.Cfg.Database.SyncAPI), base.Cfg.DbProperties()) syncDB, err := storage.NewSyncServerDatasource(string(cfg.Database.SyncAPI), cfg.DbProperties())
if err != nil { if err != nil {
logrus.WithError(err).Panicf("failed to connect to sync db") logrus.WithError(err).Panicf("failed to connect to sync db")
} }
@ -63,28 +63,28 @@ func AddPublicRoutes(
requestPool := sync.NewRequestPool(syncDB, notifier, accountsDB) requestPool := sync.NewRequestPool(syncDB, notifier, accountsDB)
roomConsumer := consumers.NewOutputRoomEventConsumer( roomConsumer := consumers.NewOutputRoomEventConsumer(
base.Cfg, base.KafkaConsumer, notifier, syncDB, rsAPI, cfg, consumer, notifier, syncDB, rsAPI,
) )
if err = roomConsumer.Start(); err != nil { if err = roomConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start room server consumer") logrus.WithError(err).Panicf("failed to start room server consumer")
} }
clientConsumer := consumers.NewOutputClientDataConsumer( clientConsumer := consumers.NewOutputClientDataConsumer(
base.Cfg, base.KafkaConsumer, notifier, syncDB, cfg, consumer, notifier, syncDB,
) )
if err = clientConsumer.Start(); err != nil { if err = clientConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start client data consumer") logrus.WithError(err).Panicf("failed to start client data consumer")
} }
typingConsumer := consumers.NewOutputTypingEventConsumer( typingConsumer := consumers.NewOutputTypingEventConsumer(
base.Cfg, base.KafkaConsumer, notifier, syncDB, cfg, consumer, notifier, syncDB,
) )
if err = typingConsumer.Start(); err != nil { if err = typingConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start typing consumer") logrus.WithError(err).Panicf("failed to start typing consumer")
} }
sendToDeviceConsumer := consumers.NewOutputSendToDeviceEventConsumer( sendToDeviceConsumer := consumers.NewOutputSendToDeviceEventConsumer(
base.Cfg, base.KafkaConsumer, notifier, syncDB, cfg, consumer, notifier, syncDB,
) )
if err = sendToDeviceConsumer.Start(); err != nil { if err = sendToDeviceConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start send-to-device consumer") logrus.WithError(err).Panicf("failed to start send-to-device consumer")