diff --git a/build/gobind/monolith.go b/build/gobind/monolith.go index e2ff79c34..84103f385 100644 --- a/build/gobind/monolith.go +++ b/build/gobind/monolith.go @@ -131,17 +131,15 @@ func (m *DendriteMonolith) Start() { ) asAPI := appservice.NewInternalAPI(base, userAPI, rsAPI) - + stateAPI := currentstateserver.NewInternalAPI(base.Cfg, base.KafkaConsumer) fsAPI := federationsender.NewInternalAPI( - base, federation, rsAPI, keyRing, + base, federation, rsAPI, stateAPI, keyRing, ) // The underlying roomserver implementation needs to be able to call the fedsender. // This is different to rsAPI which can be the http client which doesn't need this dependency rsAPI.SetFederationSenderAPI(fsAPI) - stateAPI := currentstateserver.NewInternalAPI(base.Cfg, base.KafkaConsumer) - monolith := setup.Monolith{ Config: base.Cfg, AccountDB: accountDB, diff --git a/cmd/dendrite-demo-libp2p/main.go b/cmd/dendrite-demo-libp2p/main.go index 4e774acc9..7333e8b44 100644 --- a/cmd/dendrite-demo-libp2p/main.go +++ b/cmd/dendrite-demo-libp2p/main.go @@ -153,6 +153,7 @@ func main() { base, serverKeyAPI, ) + stateAPI := currentstateserver.NewInternalAPI(base.Base.Cfg, base.Base.KafkaConsumer) rsAPI := roomserver.NewInternalAPI( &base.Base, keyRing, federation, ) @@ -161,10 +162,9 @@ func main() { ) asAPI := appservice.NewInternalAPI(&base.Base, userAPI, rsAPI) fsAPI := federationsender.NewInternalAPI( - &base.Base, federation, rsAPI, keyRing, + &base.Base, federation, rsAPI, stateAPI, keyRing, ) rsAPI.SetFederationSenderAPI(fsAPI) - stateAPI := currentstateserver.NewInternalAPI(base.Base.Cfg, base.Base.KafkaConsumer) provider := newPublicRoomsProvider(base.LibP2PPubsub, rsAPI, stateAPI) err = provider.Start() if err != nil { diff --git a/cmd/dendrite-demo-yggdrasil/main.go b/cmd/dendrite-demo-yggdrasil/main.go index 0655a2a3b..8f6b0eaf4 100644 --- a/cmd/dendrite-demo-yggdrasil/main.go +++ b/cmd/dendrite-demo-yggdrasil/main.go @@ -117,17 +117,15 @@ func main() { ) asAPI := appservice.NewInternalAPI(base, userAPI, rsAPI) - + stateAPI := currentstateserver.NewInternalAPI(base.Cfg, base.KafkaConsumer) fsAPI := federationsender.NewInternalAPI( - base, federation, rsAPI, keyRing, + base, federation, rsAPI, stateAPI, keyRing, ) rsComponent.SetFederationSenderAPI(fsAPI) embed.Embed(base.BaseMux, *instancePort, "Yggdrasil Demo") - stateAPI := currentstateserver.NewInternalAPI(base.Cfg, base.KafkaConsumer) - monolith := setup.Monolith{ Config: base.Cfg, AccountDB: accountDB, diff --git a/cmd/dendrite-federation-sender-server/main.go b/cmd/dendrite-federation-sender-server/main.go index 20bc1070f..fa6cf7ab9 100644 --- a/cmd/dendrite-federation-sender-server/main.go +++ b/cmd/dendrite-federation-sender-server/main.go @@ -31,7 +31,7 @@ func main() { rsAPI := base.RoomserverHTTPClient() fsAPI := federationsender.NewInternalAPI( - base, federation, rsAPI, keyRing, + base, federation, rsAPI, base.CurrentStateAPIClient(), keyRing, ) federationsender.AddInternalRoutes(base.InternalAPIMux, fsAPI) diff --git a/cmd/dendrite-monolith-server/main.go b/cmd/dendrite-monolith-server/main.go index bce5fce08..c75ef8fbb 100644 --- a/cmd/dendrite-monolith-server/main.go +++ b/cmd/dendrite-monolith-server/main.go @@ -109,8 +109,10 @@ func main() { asAPI = base.AppserviceHTTPClient() } + stateAPI := currentstateserver.NewInternalAPI(base.Cfg, base.KafkaConsumer) + fsAPI := federationsender.NewInternalAPI( - base, federation, rsAPI, keyRing, + base, federation, rsAPI, stateAPI, keyRing, ) if base.UseHTTPAPIs { federationsender.AddInternalRoutes(base.InternalAPIMux, fsAPI) @@ -120,8 +122,6 @@ func main() { // This is different to rsAPI which can be the http client which doesn't need this dependency rsImpl.SetFederationSenderAPI(fsAPI) - stateAPI := currentstateserver.NewInternalAPI(base.Cfg, base.KafkaConsumer) - monolith := setup.Monolith{ Config: base.Cfg, AccountDB: accountDB, diff --git a/cmd/dendritejs/main.go b/cmd/dendritejs/main.go index 0df53e06d..fd407e6ed 100644 --- a/cmd/dendritejs/main.go +++ b/cmd/dendritejs/main.go @@ -208,17 +208,16 @@ func main() { KeyDatabase: fetcher, } + stateAPI := currentstateserver.NewInternalAPI(base.Cfg, base.KafkaConsumer) rsAPI := roomserver.NewInternalAPI(base, keyRing, federation) eduInputAPI := eduserver.NewInternalAPI(base, cache.New(), userAPI) asQuery := appservice.NewInternalAPI( base, userAPI, rsAPI, ) - fedSenderAPI := federationsender.NewInternalAPI(base, federation, rsAPI, &keyRing) + fedSenderAPI := federationsender.NewInternalAPI(base, federation, rsAPI, stateAPI, &keyRing) rsAPI.SetFederationSenderAPI(fedSenderAPI) p2pPublicRoomProvider := NewLibP2PPublicRoomsProvider(node, fedSenderAPI, federation) - stateAPI := currentstateserver.NewInternalAPI(base.Cfg, base.KafkaConsumer) - monolith := setup.Monolith{ Config: base.Cfg, AccountDB: accountDB, diff --git a/federationsender/federationsender.go b/federationsender/federationsender.go index 9e14f6ec5..fbf506aa1 100644 --- a/federationsender/federationsender.go +++ b/federationsender/federationsender.go @@ -16,6 +16,7 @@ package federationsender import ( "github.com/gorilla/mux" + stateapi "github.com/matrix-org/dendrite/currentstateserver/api" "github.com/matrix-org/dendrite/federationsender/api" "github.com/matrix-org/dendrite/federationsender/consumers" "github.com/matrix-org/dendrite/federationsender/internal" @@ -41,6 +42,7 @@ func NewInternalAPI( base *setup.BaseDendrite, federation *gomatrixserverlib.FederationClient, rsAPI roomserverAPI.RoomserverInternalAPI, + stateAPI stateapi.CurrentStateInternalAPI, keyRing *gomatrixserverlib.KeyRing, ) api.FederationSenderInternalAPI { federationSenderDB, err := storage.NewDatabase(string(base.Cfg.Database.FederationSender), base.Cfg.DbProperties()) @@ -76,6 +78,12 @@ func NewInternalAPI( if err := tsConsumer.Start(); err != nil { logrus.WithError(err).Panic("failed to start typing server consumer") } + keyConsumer := consumers.NewKeyChangeConsumer( + base.Cfg, base.KafkaConsumer, queues, federationSenderDB, stateAPI, + ) + if err := keyConsumer.Start(); err != nil { + logrus.WithError(err).Panic("failed to start key server consumer") + } return internal.NewFederationSenderInternalAPI(federationSenderDB, base.Cfg, rsAPI, federation, keyRing, stats, queues) }