Wiring for rooomserver input API and federation sender input API

This commit is contained in:
Neil Alexander 2020-04-28 16:58:42 +01:00
parent 328804e858
commit 9171b28b9a
11 changed files with 58 additions and 24 deletions

View file

@ -37,12 +37,12 @@ func main() {
asQuery := base.CreateHTTPAppServiceAPIs() asQuery := base.CreateHTTPAppServiceAPIs()
alias, input, query := base.CreateHTTPRoomserverAPIs() alias, input, query := base.CreateHTTPRoomserverAPIs()
fedSenderAPI := base.CreateHTTPFederationSenderAPIs() fedSenderQueryAPI, _ := base.CreateHTTPFederationSenderAPIs()
eduInputAPI := eduserver.SetupEDUServerComponent(base, cache.New()) eduInputAPI := eduserver.SetupEDUServerComponent(base, cache.New())
clientapi.SetupClientAPIComponent( clientapi.SetupClientAPIComponent(
base, deviceDB, accountDB, federation, &keyRing, base, deviceDB, accountDB, federation, &keyRing,
alias, input, query, eduInputAPI, asQuery, transactions.New(), fedSenderAPI, alias, input, query, eduInputAPI, asQuery, transactions.New(), fedSenderQueryAPI,
) )
base.SetupAndServeHTTP(string(base.Cfg.Bind.ClientAPI), string(base.Cfg.Listen.ClientAPI)) base.SetupAndServeHTTP(string(base.Cfg.Bind.ClientAPI), string(base.Cfg.Listen.ClientAPI))

View file

@ -153,15 +153,15 @@ func main() {
asQuery := appservice.SetupAppServiceAPIComponent( asQuery := appservice.SetupAppServiceAPIComponent(
&base.Base, accountDB, deviceDB, federation, alias, query, transactions.New(), &base.Base, accountDB, deviceDB, federation, alias, query, transactions.New(),
) )
fedSenderAPI := federationsender.SetupFederationSenderComponent(&base.Base, federation, query, input) fedSenderQueryAPI, _ := federationsender.SetupFederationSenderComponent(&base.Base, federation, query, input)
clientapi.SetupClientAPIComponent( clientapi.SetupClientAPIComponent(
&base.Base, deviceDB, accountDB, &base.Base, deviceDB, accountDB,
federation, &keyRing, alias, input, query, federation, &keyRing, alias, input, query,
eduInputAPI, asQuery, transactions.New(), fedSenderAPI, eduInputAPI, asQuery, transactions.New(), fedSenderQueryAPI,
) )
eduProducer := producers.NewEDUServerProducer(eduInputAPI) eduProducer := producers.NewEDUServerProducer(eduInputAPI)
federationapi.SetupFederationAPIComponent(&base.Base, accountDB, deviceDB, federation, &keyRing, alias, input, query, asQuery, fedSenderAPI, eduProducer) federationapi.SetupFederationAPIComponent(&base.Base, accountDB, deviceDB, federation, &keyRing, alias, input, query, asQuery, fedSenderQueryAPI, eduProducer)
mediaapi.SetupMediaAPIComponent(&base.Base, deviceDB) mediaapi.SetupMediaAPIComponent(&base.Base, deviceDB)
publicRoomsDB, err := storage.NewPublicRoomsServerDatabaseWithPubSub(string(base.Base.Cfg.Database.PublicRoomsAPI), base.LibP2PPubsub) publicRoomsDB, err := storage.NewPublicRoomsServerDatabaseWithPubSub(string(base.Base.Cfg.Database.PublicRoomsAPI), base.LibP2PPubsub)
if err != nil { if err != nil {

View file

@ -32,7 +32,7 @@ func main() {
deviceDB := base.CreateDeviceDB() deviceDB := base.CreateDeviceDB()
keyDB := base.CreateKeyDB() keyDB := base.CreateKeyDB()
federation := base.CreateFederationClient() federation := base.CreateFederationClient()
federationSender := base.CreateHTTPFederationSenderAPIs() fedSenderQueryAPI, _ := base.CreateHTTPFederationSenderAPIs()
keyRing := keydb.CreateKeyRing(federation.Client, keyDB, cfg.Matrix.KeyPerspectives) keyRing := keydb.CreateKeyRing(federation.Client, keyDB, cfg.Matrix.KeyPerspectives)
alias, input, query := base.CreateHTTPRoomserverAPIs() alias, input, query := base.CreateHTTPRoomserverAPIs()
@ -42,7 +42,7 @@ func main() {
federationapi.SetupFederationAPIComponent( federationapi.SetupFederationAPIComponent(
base, accountDB, deviceDB, federation, &keyRing, base, accountDB, deviceDB, federation, &keyRing,
alias, input, query, asQuery, federationSender, eduProducer, alias, input, query, asQuery, fedSenderQueryAPI, eduProducer,
) )
base.SetupAndServeHTTP(string(base.Cfg.Bind.FederationAPI), string(base.Cfg.Listen.FederationAPI)) base.SetupAndServeHTTP(string(base.Cfg.Bind.FederationAPI), string(base.Cfg.Listen.FederationAPI))

View file

@ -62,15 +62,16 @@ func main() {
asQuery := appservice.SetupAppServiceAPIComponent( asQuery := appservice.SetupAppServiceAPIComponent(
base, accountDB, deviceDB, federation, alias, query, transactions.New(), base, accountDB, deviceDB, federation, alias, query, transactions.New(),
) )
fedSenderAPI := federationsender.SetupFederationSenderComponent(base, federation, query, input) fedSenderQueryAPI, fedSenderInputAPI := federationsender.SetupFederationSenderComponent(base, federation, query, input)
input.SetFederationSenderInputAPI(fedSenderInputAPI)
clientapi.SetupClientAPIComponent( clientapi.SetupClientAPIComponent(
base, deviceDB, accountDB, base, deviceDB, accountDB,
federation, &keyRing, alias, input, query, federation, &keyRing, alias, input, query,
eduInputAPI, asQuery, transactions.New(), fedSenderAPI, eduInputAPI, asQuery, transactions.New(), fedSenderQueryAPI,
) )
eduProducer := producers.NewEDUServerProducer(eduInputAPI) eduProducer := producers.NewEDUServerProducer(eduInputAPI)
federationapi.SetupFederationAPIComponent(base, accountDB, deviceDB, federation, &keyRing, alias, input, query, asQuery, fedSenderAPI, eduProducer) federationapi.SetupFederationAPIComponent(base, accountDB, deviceDB, federation, &keyRing, alias, input, query, asQuery, fedSenderQueryAPI, eduProducer)
mediaapi.SetupMediaAPIComponent(base, deviceDB) mediaapi.SetupMediaAPIComponent(base, deviceDB)
publicRoomsDB, err := storage.NewPublicRoomsServerDatabase(string(base.Cfg.Database.PublicRoomsAPI)) publicRoomsDB, err := storage.NewPublicRoomsServerDatabase(string(base.Cfg.Database.PublicRoomsAPI))
if err != nil { if err != nil {

View file

@ -30,7 +30,9 @@ func main() {
federation := base.CreateFederationClient() federation := base.CreateFederationClient()
keyRing := keydb.CreateKeyRing(federation.Client, keyDB, cfg.Matrix.KeyPerspectives) keyRing := keydb.CreateKeyRing(federation.Client, keyDB, cfg.Matrix.KeyPerspectives)
roomserver.SetupRoomServerComponent(base, keyRing) _, fsInputAPI := base.CreateHTTPFederationSenderAPIs()
_, input, _ := roomserver.SetupRoomServerComponent(base, keyRing)
input.SetFederationSenderInputAPI(fsInputAPI)
base.SetupAndServeHTTP(string(base.Cfg.Bind.RoomServer), string(base.Cfg.Listen.RoomServer)) base.SetupAndServeHTTP(string(base.Cfg.Bind.RoomServer), string(base.Cfg.Listen.RoomServer))

View file

@ -128,15 +128,16 @@ func main() {
asQuery := appservice.SetupAppServiceAPIComponent( asQuery := appservice.SetupAppServiceAPIComponent(
base, accountDB, deviceDB, federation, alias, query, transactions.New(), base, accountDB, deviceDB, federation, alias, query, transactions.New(),
) )
fedSenderAPI := federationsender.SetupFederationSenderComponent(base, federation, query, input) fedSenderQueryAPI, fedSenderInputAPI := federationsender.SetupFederationSenderComponent(base, federation, query, input)
input.SetFederationSenderInputAPI(fedSenderInputAPI)
clientapi.SetupClientAPIComponent( clientapi.SetupClientAPIComponent(
base, deviceDB, accountDB, base, deviceDB, accountDB,
federation, &keyRing, alias, input, query, federation, &keyRing, alias, input, query,
eduInputAPI, asQuery, transactions.New(), fedSenderAPI, eduInputAPI, asQuery, transactions.New(), fedSenderQueryAPI,
) )
eduProducer := producers.NewEDUServerProducer(eduInputAPI) eduProducer := producers.NewEDUServerProducer(eduInputAPI)
federationapi.SetupFederationAPIComponent(base, accountDB, deviceDB, federation, &keyRing, alias, input, query, asQuery, fedSenderAPI, eduProducer) federationapi.SetupFederationAPIComponent(base, accountDB, deviceDB, federation, &keyRing, alias, input, query, asQuery, fedSenderQueryAPI, eduProducer)
mediaapi.SetupMediaAPIComponent(base, deviceDB) mediaapi.SetupMediaAPIComponent(base, deviceDB)
publicRoomsDB, err := storage.NewPublicRoomsServerDatabase(string(base.Cfg.Database.PublicRoomsAPI)) publicRoomsDB, err := storage.NewPublicRoomsServerDatabase(string(base.Cfg.Database.PublicRoomsAPI))
if err != nil { if err != nil {

View file

@ -20,6 +20,7 @@ import (
"github.com/matrix-org/dendrite/common/basecomponent" "github.com/matrix-org/dendrite/common/basecomponent"
"github.com/matrix-org/dendrite/federationsender/api" "github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/federationsender/consumers" "github.com/matrix-org/dendrite/federationsender/consumers"
"github.com/matrix-org/dendrite/federationsender/input"
"github.com/matrix-org/dendrite/federationsender/producers" "github.com/matrix-org/dendrite/federationsender/producers"
"github.com/matrix-org/dendrite/federationsender/query" "github.com/matrix-org/dendrite/federationsender/query"
"github.com/matrix-org/dendrite/federationsender/queue" "github.com/matrix-org/dendrite/federationsender/queue"
@ -36,7 +37,7 @@ func SetupFederationSenderComponent(
federation *gomatrixserverlib.FederationClient, federation *gomatrixserverlib.FederationClient,
rsQueryAPI roomserverAPI.RoomserverQueryAPI, rsQueryAPI roomserverAPI.RoomserverQueryAPI,
rsInputAPI roomserverAPI.RoomserverInputAPI, rsInputAPI roomserverAPI.RoomserverInputAPI,
) api.FederationSenderQueryAPI { ) (api.FederationSenderQueryAPI, api.FederationSenderInputAPI) {
federationSenderDB, err := storage.NewDatabase(string(base.Cfg.Database.FederationSender)) federationSenderDB, err := storage.NewDatabase(string(base.Cfg.Database.FederationSender))
if err != nil { if err != nil {
logrus.WithError(err).Panic("failed to connect to federation sender db") logrus.WithError(err).Panic("failed to connect to federation sender db")
@ -61,10 +62,16 @@ func SetupFederationSenderComponent(
logrus.WithError(err).Panic("failed to start typing server consumer") logrus.WithError(err).Panic("failed to start typing server consumer")
} }
inputAPI := input.FederationSenderInputAPI{
RoomserverInputAPI: rsInputAPI,
}
inputAPI.SetupHTTP(http.DefaultServeMux)
queryAPI := query.FederationSenderQueryAPI{ queryAPI := query.FederationSenderQueryAPI{
DB: federationSenderDB, DB: federationSenderDB,
} }
queryAPI.SetupHTTP(http.DefaultServeMux) queryAPI.SetupHTTP(http.DefaultServeMux)
return &queryAPI return &queryAPI, &inputAPI
} }

View file

@ -19,20 +19,16 @@ import (
"encoding/json" "encoding/json"
"net/http" "net/http"
"github.com/Shopify/sarama"
"github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/federationsender/api" "github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/federationsender/storage"
"github.com/matrix-org/util" "github.com/matrix-org/util"
rsAPI "github.com/matrix-org/dendrite/roomserver/api"
) )
// FederationSenderInputAPI implements api.FederationSenderInputAPI // FederationSenderInputAPI implements api.FederationSenderInputAPI
type FederationSenderInputAPI struct { type FederationSenderInputAPI struct {
DB storage.Database RoomserverInputAPI rsAPI.RoomserverInputAPI
Producer sarama.SyncProducer
// The kafkaesque topic to output new room events to.
// This is the name used in kafka to identify the stream to write events to.
OutputRoomEventTopic string
} }
// SetupHTTP adds the FederationSenderInputAPI handlers to the http.ServeMux. // SetupHTTP adds the FederationSenderInputAPI handlers to the http.ServeMux.

View file

@ -21,6 +21,7 @@ import (
"net/http" "net/http"
commonHTTP "github.com/matrix-org/dendrite/common/http" commonHTTP "github.com/matrix-org/dendrite/common/http"
fsAPI "github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
opentracing "github.com/opentracing/opentracing-go" opentracing "github.com/opentracing/opentracing-go"
) )
@ -106,6 +107,9 @@ type InputRoomEventsResponse struct {
// RoomserverInputAPI is used to write events to the room server. // RoomserverInputAPI is used to write events to the room server.
type RoomserverInputAPI interface { type RoomserverInputAPI interface {
// needed to avoid chicken and egg scenario when setting up the
// interdependencies between the roomserver and the FS input API
SetFederationSenderInputAPI(fsInputAPI fsAPI.FederationSenderInputAPI)
InputRoomEvents( InputRoomEvents(
ctx context.Context, ctx context.Context,
request *InputRoomEventsRequest, request *InputRoomEventsRequest,
@ -122,12 +126,22 @@ func NewRoomserverInputAPIHTTP(roomserverURL string, httpClient *http.Client) (R
if httpClient == nil { if httpClient == nil {
return nil, errors.New("NewRoomserverInputAPIHTTP: httpClient is <nil>") return nil, errors.New("NewRoomserverInputAPIHTTP: httpClient is <nil>")
} }
return &httpRoomserverInputAPI{roomserverURL, httpClient}, nil return &httpRoomserverInputAPI{roomserverURL, httpClient, nil}, nil
} }
type httpRoomserverInputAPI struct { type httpRoomserverInputAPI struct {
roomserverURL string roomserverURL string
httpClient *http.Client httpClient *http.Client
// The federation sender API allows us to send federation
// requests from the new perform input requests, still TODO.
fsInputAPI fsAPI.FederationSenderInputAPI
}
// SetFederationSenderInputAPI passes in a federation sender input API reference
// so that we can avoid the chicken-and-egg problem of both the roomserver input API
// and the federation sender input API being interdependent.
func (h *httpRoomserverInputAPI) SetFederationSenderInputAPI(fsInputAPI fsAPI.FederationSenderInputAPI) {
h.fsInputAPI = fsInputAPI
} }
// InputRoomEvents implements RoomserverInputAPI // InputRoomEvents implements RoomserverInputAPI

View file

@ -26,6 +26,8 @@ import (
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/storage" "github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/util" "github.com/matrix-org/util"
fsAPI "github.com/matrix-org/dendrite/federationsender/api"
) )
// RoomserverInputAPI implements api.RoomserverInputAPI // RoomserverInputAPI implements api.RoomserverInputAPI
@ -37,6 +39,16 @@ type RoomserverInputAPI struct {
OutputRoomEventTopic string OutputRoomEventTopic string
// Protects calls to processRoomEvent // Protects calls to processRoomEvent
mutex sync.Mutex mutex sync.Mutex
// The federation sender API allows us to send federation
// requests from the new perform input requests, still TODO.
fsInputAPI fsAPI.FederationSenderInputAPI
}
// SetFederationSenderInputAPI passes in a federation sender input API reference
// so that we can avoid the chicken-and-egg problem of both the roomserver input API
// and the federation sender input API being interdependent.
func (r *RoomserverInputAPI) SetFederationSenderInputAPI(fsInputAPI fsAPI.FederationSenderInputAPI) {
r.fsInputAPI = fsInputAPI
} }
// WriteOutputEvents implements OutputRoomEventWriter // WriteOutputEvents implements OutputRoomEventWriter

View file

@ -21,6 +21,7 @@ import (
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
asQuery "github.com/matrix-org/dendrite/appservice/query" asQuery "github.com/matrix-org/dendrite/appservice/query"
"github.com/matrix-org/dendrite/common/basecomponent" "github.com/matrix-org/dendrite/common/basecomponent"
"github.com/matrix-org/dendrite/roomserver/alias" "github.com/matrix-org/dendrite/roomserver/alias"
"github.com/matrix-org/dendrite/roomserver/input" "github.com/matrix-org/dendrite/roomserver/input"