Glue it all together

This commit is contained in:
Kegan Dougal 2020-08-04 10:40:38 +01:00
parent 1ed7102f07
commit cfcd1cfd99
7 changed files with 20 additions and 17 deletions

View file

@ -131,17 +131,15 @@ func (m *DendriteMonolith) Start() {
) )
asAPI := appservice.NewInternalAPI(base, userAPI, rsAPI) asAPI := appservice.NewInternalAPI(base, userAPI, rsAPI)
stateAPI := currentstateserver.NewInternalAPI(base.Cfg, base.KafkaConsumer)
fsAPI := federationsender.NewInternalAPI( fsAPI := federationsender.NewInternalAPI(
base, federation, rsAPI, keyRing, base, federation, rsAPI, stateAPI, keyRing,
) )
// The underlying roomserver implementation needs to be able to call the fedsender. // The underlying roomserver implementation needs to be able to call the fedsender.
// This is different to rsAPI which can be the http client which doesn't need this dependency // This is different to rsAPI which can be the http client which doesn't need this dependency
rsAPI.SetFederationSenderAPI(fsAPI) rsAPI.SetFederationSenderAPI(fsAPI)
stateAPI := currentstateserver.NewInternalAPI(base.Cfg, base.KafkaConsumer)
monolith := setup.Monolith{ monolith := setup.Monolith{
Config: base.Cfg, Config: base.Cfg,
AccountDB: accountDB, AccountDB: accountDB,

View file

@ -153,6 +153,7 @@ func main() {
base, serverKeyAPI, base, serverKeyAPI,
) )
stateAPI := currentstateserver.NewInternalAPI(base.Base.Cfg, base.Base.KafkaConsumer)
rsAPI := roomserver.NewInternalAPI( rsAPI := roomserver.NewInternalAPI(
&base.Base, keyRing, federation, &base.Base, keyRing, federation,
) )
@ -161,10 +162,9 @@ func main() {
) )
asAPI := appservice.NewInternalAPI(&base.Base, userAPI, rsAPI) asAPI := appservice.NewInternalAPI(&base.Base, userAPI, rsAPI)
fsAPI := federationsender.NewInternalAPI( fsAPI := federationsender.NewInternalAPI(
&base.Base, federation, rsAPI, keyRing, &base.Base, federation, rsAPI, stateAPI, keyRing,
) )
rsAPI.SetFederationSenderAPI(fsAPI) rsAPI.SetFederationSenderAPI(fsAPI)
stateAPI := currentstateserver.NewInternalAPI(base.Base.Cfg, base.Base.KafkaConsumer)
provider := newPublicRoomsProvider(base.LibP2PPubsub, rsAPI, stateAPI) provider := newPublicRoomsProvider(base.LibP2PPubsub, rsAPI, stateAPI)
err = provider.Start() err = provider.Start()
if err != nil { if err != nil {

View file

@ -117,17 +117,15 @@ func main() {
) )
asAPI := appservice.NewInternalAPI(base, userAPI, rsAPI) asAPI := appservice.NewInternalAPI(base, userAPI, rsAPI)
stateAPI := currentstateserver.NewInternalAPI(base.Cfg, base.KafkaConsumer)
fsAPI := federationsender.NewInternalAPI( fsAPI := federationsender.NewInternalAPI(
base, federation, rsAPI, keyRing, base, federation, rsAPI, stateAPI, keyRing,
) )
rsComponent.SetFederationSenderAPI(fsAPI) rsComponent.SetFederationSenderAPI(fsAPI)
embed.Embed(base.BaseMux, *instancePort, "Yggdrasil Demo") embed.Embed(base.BaseMux, *instancePort, "Yggdrasil Demo")
stateAPI := currentstateserver.NewInternalAPI(base.Cfg, base.KafkaConsumer)
monolith := setup.Monolith{ monolith := setup.Monolith{
Config: base.Cfg, Config: base.Cfg,
AccountDB: accountDB, AccountDB: accountDB,

View file

@ -31,7 +31,7 @@ func main() {
rsAPI := base.RoomserverHTTPClient() rsAPI := base.RoomserverHTTPClient()
fsAPI := federationsender.NewInternalAPI( fsAPI := federationsender.NewInternalAPI(
base, federation, rsAPI, keyRing, base, federation, rsAPI, base.CurrentStateAPIClient(), keyRing,
) )
federationsender.AddInternalRoutes(base.InternalAPIMux, fsAPI) federationsender.AddInternalRoutes(base.InternalAPIMux, fsAPI)

View file

@ -109,8 +109,10 @@ func main() {
asAPI = base.AppserviceHTTPClient() asAPI = base.AppserviceHTTPClient()
} }
stateAPI := currentstateserver.NewInternalAPI(base.Cfg, base.KafkaConsumer)
fsAPI := federationsender.NewInternalAPI( fsAPI := federationsender.NewInternalAPI(
base, federation, rsAPI, keyRing, base, federation, rsAPI, stateAPI, keyRing,
) )
if base.UseHTTPAPIs { if base.UseHTTPAPIs {
federationsender.AddInternalRoutes(base.InternalAPIMux, fsAPI) federationsender.AddInternalRoutes(base.InternalAPIMux, fsAPI)
@ -120,8 +122,6 @@ func main() {
// This is different to rsAPI which can be the http client which doesn't need this dependency // This is different to rsAPI which can be the http client which doesn't need this dependency
rsImpl.SetFederationSenderAPI(fsAPI) rsImpl.SetFederationSenderAPI(fsAPI)
stateAPI := currentstateserver.NewInternalAPI(base.Cfg, base.KafkaConsumer)
monolith := setup.Monolith{ monolith := setup.Monolith{
Config: base.Cfg, Config: base.Cfg,
AccountDB: accountDB, AccountDB: accountDB,

View file

@ -208,17 +208,16 @@ func main() {
KeyDatabase: fetcher, KeyDatabase: fetcher,
} }
stateAPI := currentstateserver.NewInternalAPI(base.Cfg, base.KafkaConsumer)
rsAPI := roomserver.NewInternalAPI(base, keyRing, federation) rsAPI := roomserver.NewInternalAPI(base, keyRing, federation)
eduInputAPI := eduserver.NewInternalAPI(base, cache.New(), userAPI) eduInputAPI := eduserver.NewInternalAPI(base, cache.New(), userAPI)
asQuery := appservice.NewInternalAPI( asQuery := appservice.NewInternalAPI(
base, userAPI, rsAPI, base, userAPI, rsAPI,
) )
fedSenderAPI := federationsender.NewInternalAPI(base, federation, rsAPI, &keyRing) fedSenderAPI := federationsender.NewInternalAPI(base, federation, rsAPI, stateAPI, &keyRing)
rsAPI.SetFederationSenderAPI(fedSenderAPI) rsAPI.SetFederationSenderAPI(fedSenderAPI)
p2pPublicRoomProvider := NewLibP2PPublicRoomsProvider(node, fedSenderAPI, federation) p2pPublicRoomProvider := NewLibP2PPublicRoomsProvider(node, fedSenderAPI, federation)
stateAPI := currentstateserver.NewInternalAPI(base.Cfg, base.KafkaConsumer)
monolith := setup.Monolith{ monolith := setup.Monolith{
Config: base.Cfg, Config: base.Cfg,
AccountDB: accountDB, AccountDB: accountDB,

View file

@ -16,6 +16,7 @@ package federationsender
import ( import (
"github.com/gorilla/mux" "github.com/gorilla/mux"
stateapi "github.com/matrix-org/dendrite/currentstateserver/api"
"github.com/matrix-org/dendrite/federationsender/api" "github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/federationsender/consumers" "github.com/matrix-org/dendrite/federationsender/consumers"
"github.com/matrix-org/dendrite/federationsender/internal" "github.com/matrix-org/dendrite/federationsender/internal"
@ -41,6 +42,7 @@ func NewInternalAPI(
base *setup.BaseDendrite, base *setup.BaseDendrite,
federation *gomatrixserverlib.FederationClient, federation *gomatrixserverlib.FederationClient,
rsAPI roomserverAPI.RoomserverInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI,
stateAPI stateapi.CurrentStateInternalAPI,
keyRing *gomatrixserverlib.KeyRing, keyRing *gomatrixserverlib.KeyRing,
) api.FederationSenderInternalAPI { ) api.FederationSenderInternalAPI {
federationSenderDB, err := storage.NewDatabase(string(base.Cfg.Database.FederationSender), base.Cfg.DbProperties()) federationSenderDB, err := storage.NewDatabase(string(base.Cfg.Database.FederationSender), base.Cfg.DbProperties())
@ -76,6 +78,12 @@ func NewInternalAPI(
if err := tsConsumer.Start(); err != nil { if err := tsConsumer.Start(); err != nil {
logrus.WithError(err).Panic("failed to start typing server consumer") logrus.WithError(err).Panic("failed to start typing server consumer")
} }
keyConsumer := consumers.NewKeyChangeConsumer(
base.Cfg, base.KafkaConsumer, queues, federationSenderDB, stateAPI,
)
if err := keyConsumer.Start(); err != nil {
logrus.WithError(err).Panic("failed to start key server consumer")
}
return internal.NewFederationSenderInternalAPI(federationSenderDB, base.Cfg, rsAPI, federation, keyRing, stats, queues) return internal.NewFederationSenderInternalAPI(federationSenderDB, base.Cfg, rsAPI, federation, keyRing, stats, queues)
} }