diff --git a/cmd/dendrite-client-api-server/main.go b/cmd/dendrite-client-api-server/main.go index 815a978a8..9d545d434 100644 --- a/cmd/dendrite-client-api-server/main.go +++ b/cmd/dendrite-client-api-server/main.go @@ -37,12 +37,12 @@ func main() { asQuery := base.CreateHTTPAppServiceAPIs() alias, input, query := base.CreateHTTPRoomserverAPIs() - fedSenderAPI := base.CreateHTTPFederationSenderAPIs() + fedSenderQueryAPI, _ := base.CreateHTTPFederationSenderAPIs() eduInputAPI := eduserver.SetupEDUServerComponent(base, cache.New()) clientapi.SetupClientAPIComponent( base, deviceDB, accountDB, federation, &keyRing, - alias, input, query, eduInputAPI, asQuery, transactions.New(), fedSenderAPI, + alias, input, query, eduInputAPI, asQuery, transactions.New(), fedSenderQueryAPI, ) base.SetupAndServeHTTP(string(base.Cfg.Bind.ClientAPI), string(base.Cfg.Listen.ClientAPI)) diff --git a/cmd/dendrite-demo-libp2p/main.go b/cmd/dendrite-demo-libp2p/main.go index f1d18d0e0..57ec37597 100644 --- a/cmd/dendrite-demo-libp2p/main.go +++ b/cmd/dendrite-demo-libp2p/main.go @@ -153,15 +153,15 @@ func main() { asQuery := appservice.SetupAppServiceAPIComponent( &base.Base, accountDB, deviceDB, federation, alias, query, transactions.New(), ) - fedSenderAPI := federationsender.SetupFederationSenderComponent(&base.Base, federation, query, input) + fedSenderQueryAPI, _ := federationsender.SetupFederationSenderComponent(&base.Base, federation, query, input) clientapi.SetupClientAPIComponent( &base.Base, deviceDB, accountDB, federation, &keyRing, alias, input, query, - eduInputAPI, asQuery, transactions.New(), fedSenderAPI, + eduInputAPI, asQuery, transactions.New(), fedSenderQueryAPI, ) eduProducer := producers.NewEDUServerProducer(eduInputAPI) - federationapi.SetupFederationAPIComponent(&base.Base, accountDB, deviceDB, federation, &keyRing, alias, input, query, asQuery, fedSenderAPI, eduProducer) + federationapi.SetupFederationAPIComponent(&base.Base, accountDB, deviceDB, federation, &keyRing, alias, input, query, asQuery, fedSenderQueryAPI, eduProducer) mediaapi.SetupMediaAPIComponent(&base.Base, deviceDB) publicRoomsDB, err := storage.NewPublicRoomsServerDatabaseWithPubSub(string(base.Base.Cfg.Database.PublicRoomsAPI), base.LibP2PPubsub) if err != nil { diff --git a/cmd/dendrite-federation-api-server/main.go b/cmd/dendrite-federation-api-server/main.go index dd06cd3f9..2f045ece9 100644 --- a/cmd/dendrite-federation-api-server/main.go +++ b/cmd/dendrite-federation-api-server/main.go @@ -32,7 +32,7 @@ func main() { deviceDB := base.CreateDeviceDB() keyDB := base.CreateKeyDB() federation := base.CreateFederationClient() - federationSender := base.CreateHTTPFederationSenderAPIs() + fedSenderQueryAPI, _ := base.CreateHTTPFederationSenderAPIs() keyRing := keydb.CreateKeyRing(federation.Client, keyDB, cfg.Matrix.KeyPerspectives) alias, input, query := base.CreateHTTPRoomserverAPIs() @@ -42,7 +42,7 @@ func main() { federationapi.SetupFederationAPIComponent( base, accountDB, deviceDB, federation, &keyRing, - alias, input, query, asQuery, federationSender, eduProducer, + alias, input, query, asQuery, fedSenderQueryAPI, eduProducer, ) base.SetupAndServeHTTP(string(base.Cfg.Bind.FederationAPI), string(base.Cfg.Listen.FederationAPI)) diff --git a/cmd/dendrite-monolith-server/main.go b/cmd/dendrite-monolith-server/main.go index 8cfb75665..4e10e995b 100644 --- a/cmd/dendrite-monolith-server/main.go +++ b/cmd/dendrite-monolith-server/main.go @@ -62,15 +62,16 @@ func main() { asQuery := appservice.SetupAppServiceAPIComponent( base, accountDB, deviceDB, federation, alias, query, transactions.New(), ) - fedSenderAPI := federationsender.SetupFederationSenderComponent(base, federation, query, input) + fedSenderQueryAPI, fedSenderInputAPI := federationsender.SetupFederationSenderComponent(base, federation, query, input) + input.SetFederationSenderInputAPI(fedSenderInputAPI) clientapi.SetupClientAPIComponent( base, deviceDB, accountDB, federation, &keyRing, alias, input, query, - eduInputAPI, asQuery, transactions.New(), fedSenderAPI, + eduInputAPI, asQuery, transactions.New(), fedSenderQueryAPI, ) eduProducer := producers.NewEDUServerProducer(eduInputAPI) - federationapi.SetupFederationAPIComponent(base, accountDB, deviceDB, federation, &keyRing, alias, input, query, asQuery, fedSenderAPI, eduProducer) + federationapi.SetupFederationAPIComponent(base, accountDB, deviceDB, federation, &keyRing, alias, input, query, asQuery, fedSenderQueryAPI, eduProducer) mediaapi.SetupMediaAPIComponent(base, deviceDB) publicRoomsDB, err := storage.NewPublicRoomsServerDatabase(string(base.Cfg.Database.PublicRoomsAPI)) if err != nil { diff --git a/cmd/dendrite-room-server/main.go b/cmd/dendrite-room-server/main.go index f33a2b88f..4980f879f 100644 --- a/cmd/dendrite-room-server/main.go +++ b/cmd/dendrite-room-server/main.go @@ -30,7 +30,9 @@ func main() { federation := base.CreateFederationClient() keyRing := keydb.CreateKeyRing(federation.Client, keyDB, cfg.Matrix.KeyPerspectives) - roomserver.SetupRoomServerComponent(base, keyRing) + _, fsInputAPI := base.CreateHTTPFederationSenderAPIs() + _, input, _ := roomserver.SetupRoomServerComponent(base, keyRing) + input.SetFederationSenderInputAPI(fsInputAPI) base.SetupAndServeHTTP(string(base.Cfg.Bind.RoomServer), string(base.Cfg.Listen.RoomServer)) diff --git a/cmd/dendritejs/main.go b/cmd/dendritejs/main.go index 23d4327d7..aa19aa677 100644 --- a/cmd/dendritejs/main.go +++ b/cmd/dendritejs/main.go @@ -128,15 +128,16 @@ func main() { asQuery := appservice.SetupAppServiceAPIComponent( base, accountDB, deviceDB, federation, alias, query, transactions.New(), ) - fedSenderAPI := federationsender.SetupFederationSenderComponent(base, federation, query, input) + fedSenderQueryAPI, fedSenderInputAPI := federationsender.SetupFederationSenderComponent(base, federation, query, input) + input.SetFederationSenderInputAPI(fedSenderInputAPI) clientapi.SetupClientAPIComponent( base, deviceDB, accountDB, federation, &keyRing, alias, input, query, - eduInputAPI, asQuery, transactions.New(), fedSenderAPI, + eduInputAPI, asQuery, transactions.New(), fedSenderQueryAPI, ) eduProducer := producers.NewEDUServerProducer(eduInputAPI) - federationapi.SetupFederationAPIComponent(base, accountDB, deviceDB, federation, &keyRing, alias, input, query, asQuery, fedSenderAPI, eduProducer) + federationapi.SetupFederationAPIComponent(base, accountDB, deviceDB, federation, &keyRing, alias, input, query, asQuery, fedSenderQueryAPI, eduProducer) mediaapi.SetupMediaAPIComponent(base, deviceDB) publicRoomsDB, err := storage.NewPublicRoomsServerDatabase(string(base.Cfg.Database.PublicRoomsAPI)) if err != nil { diff --git a/federationsender/federationsender.go b/federationsender/federationsender.go index a06caf402..8053e8f06 100644 --- a/federationsender/federationsender.go +++ b/federationsender/federationsender.go @@ -20,6 +20,7 @@ import ( "github.com/matrix-org/dendrite/common/basecomponent" "github.com/matrix-org/dendrite/federationsender/api" "github.com/matrix-org/dendrite/federationsender/consumers" + "github.com/matrix-org/dendrite/federationsender/input" "github.com/matrix-org/dendrite/federationsender/producers" "github.com/matrix-org/dendrite/federationsender/query" "github.com/matrix-org/dendrite/federationsender/queue" @@ -36,7 +37,7 @@ func SetupFederationSenderComponent( federation *gomatrixserverlib.FederationClient, rsQueryAPI roomserverAPI.RoomserverQueryAPI, rsInputAPI roomserverAPI.RoomserverInputAPI, -) api.FederationSenderQueryAPI { +) (api.FederationSenderQueryAPI, api.FederationSenderInputAPI) { federationSenderDB, err := storage.NewDatabase(string(base.Cfg.Database.FederationSender)) if err != nil { logrus.WithError(err).Panic("failed to connect to federation sender db") @@ -61,10 +62,16 @@ func SetupFederationSenderComponent( logrus.WithError(err).Panic("failed to start typing server consumer") } + inputAPI := input.FederationSenderInputAPI{ + RoomserverInputAPI: rsInputAPI, + } + + inputAPI.SetupHTTP(http.DefaultServeMux) + queryAPI := query.FederationSenderQueryAPI{ DB: federationSenderDB, } queryAPI.SetupHTTP(http.DefaultServeMux) - return &queryAPI + return &queryAPI, &inputAPI } diff --git a/federationsender/input/input.go b/federationsender/input/input.go index 4a323af60..2219329fb 100644 --- a/federationsender/input/input.go +++ b/federationsender/input/input.go @@ -19,20 +19,16 @@ import ( "encoding/json" "net/http" - "github.com/Shopify/sarama" "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/federationsender/api" - "github.com/matrix-org/dendrite/federationsender/storage" "github.com/matrix-org/util" + + rsAPI "github.com/matrix-org/dendrite/roomserver/api" ) // FederationSenderInputAPI implements api.FederationSenderInputAPI type FederationSenderInputAPI struct { - DB storage.Database - Producer sarama.SyncProducer - // The kafkaesque topic to output new room events to. - // This is the name used in kafka to identify the stream to write events to. - OutputRoomEventTopic string + RoomserverInputAPI rsAPI.RoomserverInputAPI } // SetupHTTP adds the FederationSenderInputAPI handlers to the http.ServeMux. diff --git a/roomserver/api/input.go b/roomserver/api/input.go index bb4e040de..001a63943 100644 --- a/roomserver/api/input.go +++ b/roomserver/api/input.go @@ -21,6 +21,7 @@ import ( "net/http" commonHTTP "github.com/matrix-org/dendrite/common/http" + fsAPI "github.com/matrix-org/dendrite/federationsender/api" "github.com/matrix-org/gomatrixserverlib" opentracing "github.com/opentracing/opentracing-go" ) @@ -106,6 +107,9 @@ type InputRoomEventsResponse struct { // RoomserverInputAPI is used to write events to the room server. type RoomserverInputAPI interface { + // needed to avoid chicken and egg scenario when setting up the + // interdependencies between the roomserver and the FS input API + SetFederationSenderInputAPI(fsInputAPI fsAPI.FederationSenderInputAPI) InputRoomEvents( ctx context.Context, request *InputRoomEventsRequest, @@ -122,12 +126,22 @@ func NewRoomserverInputAPIHTTP(roomserverURL string, httpClient *http.Client) (R if httpClient == nil { return nil, errors.New("NewRoomserverInputAPIHTTP: httpClient is ") } - return &httpRoomserverInputAPI{roomserverURL, httpClient}, nil + return &httpRoomserverInputAPI{roomserverURL, httpClient, nil}, nil } type httpRoomserverInputAPI struct { roomserverURL string httpClient *http.Client + // The federation sender API allows us to send federation + // requests from the new perform input requests, still TODO. + fsInputAPI fsAPI.FederationSenderInputAPI +} + +// SetFederationSenderInputAPI passes in a federation sender input API reference +// so that we can avoid the chicken-and-egg problem of both the roomserver input API +// and the federation sender input API being interdependent. +func (h *httpRoomserverInputAPI) SetFederationSenderInputAPI(fsInputAPI fsAPI.FederationSenderInputAPI) { + h.fsInputAPI = fsInputAPI } // InputRoomEvents implements RoomserverInputAPI diff --git a/roomserver/input/input.go b/roomserver/input/input.go index cb588380a..b190ae598 100644 --- a/roomserver/input/input.go +++ b/roomserver/input/input.go @@ -26,6 +26,8 @@ import ( "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/storage" "github.com/matrix-org/util" + + fsAPI "github.com/matrix-org/dendrite/federationsender/api" ) // RoomserverInputAPI implements api.RoomserverInputAPI @@ -37,6 +39,16 @@ type RoomserverInputAPI struct { OutputRoomEventTopic string // Protects calls to processRoomEvent mutex sync.Mutex + // The federation sender API allows us to send federation + // requests from the new perform input requests, still TODO. + fsInputAPI fsAPI.FederationSenderInputAPI +} + +// SetFederationSenderInputAPI passes in a federation sender input API reference +// so that we can avoid the chicken-and-egg problem of both the roomserver input API +// and the federation sender input API being interdependent. +func (r *RoomserverInputAPI) SetFederationSenderInputAPI(fsInputAPI fsAPI.FederationSenderInputAPI) { + r.fsInputAPI = fsInputAPI } // WriteOutputEvents implements OutputRoomEventWriter diff --git a/roomserver/roomserver.go b/roomserver/roomserver.go index ea1c5c4c3..31b95ba7d 100644 --- a/roomserver/roomserver.go +++ b/roomserver/roomserver.go @@ -21,6 +21,7 @@ import ( "github.com/matrix-org/gomatrixserverlib" asQuery "github.com/matrix-org/dendrite/appservice/query" + "github.com/matrix-org/dendrite/common/basecomponent" "github.com/matrix-org/dendrite/roomserver/alias" "github.com/matrix-org/dendrite/roomserver/input"