Merge branch 'master' into kegan/shared-backfill

This commit is contained in:
Kegan Dougal 2020-04-29 18:35:50 +01:00
commit e6e01d7225
28 changed files with 543 additions and 258 deletions

View file

@ -44,7 +44,7 @@ func SetupClientAPIComponent(
eduInputAPI eduServerAPI.EDUServerInputAPI, eduInputAPI eduServerAPI.EDUServerInputAPI,
asAPI appserviceAPI.AppServiceQueryAPI, asAPI appserviceAPI.AppServiceQueryAPI,
transactionsCache *transactions.Cache, transactionsCache *transactions.Cache,
fedSenderAPI federationSenderAPI.FederationSenderQueryAPI, fsAPI federationSenderAPI.FederationSenderInternalAPI,
) { ) {
roomserverProducer := producers.NewRoomserverProducer(inputAPI, queryAPI) roomserverProducer := producers.NewRoomserverProducer(inputAPI, queryAPI)
eduProducer := producers.NewEDUServerProducer(eduInputAPI) eduProducer := producers.NewEDUServerProducer(eduInputAPI)
@ -69,6 +69,6 @@ func SetupClientAPIComponent(
routing.Setup( routing.Setup(
base.APIMux, base.Cfg, roomserverProducer, queryAPI, aliasAPI, asAPI, base.APIMux, base.Cfg, roomserverProducer, queryAPI, aliasAPI, asAPI,
accountsDB, deviceDB, federation, *keyRing, userUpdateProducer, accountsDB, deviceDB, federation, *keyRing, userUpdateProducer,
syncProducer, eduProducer, transactionsCache, fedSenderAPI, syncProducer, eduProducer, transactionsCache, fsAPI,
) )
} }

View file

@ -47,7 +47,7 @@ func DirectoryRoom(
federation *gomatrixserverlib.FederationClient, federation *gomatrixserverlib.FederationClient,
cfg *config.Dendrite, cfg *config.Dendrite,
rsAPI roomserverAPI.RoomserverAliasAPI, rsAPI roomserverAPI.RoomserverAliasAPI,
fedSenderAPI federationSenderAPI.FederationSenderQueryAPI, fedSenderAPI federationSenderAPI.FederationSenderInternalAPI,
) util.JSONResponse { ) util.JSONResponse {
_, domain, err := gomatrixserverlib.SplitID('#', roomAlias) _, domain, err := gomatrixserverlib.SplitID('#', roomAlias)
if err != nil { if err != nil {

View file

@ -27,12 +27,11 @@ import (
"github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/common/config" "github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/dendrite/roomserver/api" federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrix" "github.com/matrix-org/gomatrix"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util" "github.com/matrix-org/util"
"github.com/sirupsen/logrus"
) )
// JoinRoomByIDOrAlias implements the "/join/{roomIDOrAlias}" API. // JoinRoomByIDOrAlias implements the "/join/{roomIDOrAlias}" API.
@ -46,6 +45,7 @@ func JoinRoomByIDOrAlias(
producer *producers.RoomserverProducer, producer *producers.RoomserverProducer,
queryAPI roomserverAPI.RoomserverQueryAPI, queryAPI roomserverAPI.RoomserverQueryAPI,
aliasAPI roomserverAPI.RoomserverAliasAPI, aliasAPI roomserverAPI.RoomserverAliasAPI,
fsAPI federationSenderAPI.FederationSenderInternalAPI,
keyRing gomatrixserverlib.KeyRing, keyRing gomatrixserverlib.KeyRing,
accountDB accounts.Database, accountDB accounts.Database,
) util.JSONResponse { ) util.JSONResponse {
@ -79,7 +79,8 @@ func JoinRoomByIDOrAlias(
content["avatar_url"] = profile.AvatarURL content["avatar_url"] = profile.AvatarURL
r := joinRoomReq{ r := joinRoomReq{
req, evTime, content, device.UserID, cfg, federation, producer, queryAPI, aliasAPI, keyRing, req, evTime, content, device.UserID, cfg, federation, producer,
queryAPI, aliasAPI, fsAPI, keyRing,
} }
if strings.HasPrefix(roomIDOrAlias, "!") { if strings.HasPrefix(roomIDOrAlias, "!") {
@ -107,6 +108,7 @@ type joinRoomReq struct {
producer *producers.RoomserverProducer producer *producers.RoomserverProducer
queryAPI roomserverAPI.RoomserverQueryAPI queryAPI roomserverAPI.RoomserverQueryAPI
aliasAPI roomserverAPI.RoomserverAliasAPI aliasAPI roomserverAPI.RoomserverAliasAPI
fsAPI federationSenderAPI.FederationSenderInternalAPI
keyRing gomatrixserverlib.KeyRing keyRing gomatrixserverlib.KeyRing
} }
@ -326,71 +328,15 @@ func (r joinRoomReq) joinRoomUsingServers(
// server was invalid this returns an error. // server was invalid this returns an error.
// Otherwise this returns a JSONResponse. // Otherwise this returns a JSONResponse.
func (r joinRoomReq) joinRoomUsingServer(roomID string, server gomatrixserverlib.ServerName) (*util.JSONResponse, error) { func (r joinRoomReq) joinRoomUsingServer(roomID string, server gomatrixserverlib.ServerName) (*util.JSONResponse, error) {
// Ask the room server for information about room versions. fedJoinReq := federationSenderAPI.PerformJoinRequest{
var request api.QueryRoomVersionCapabilitiesRequest RoomID: roomID,
var response api.QueryRoomVersionCapabilitiesResponse UserID: r.userID,
if err := r.queryAPI.QueryRoomVersionCapabilities(r.req.Context(), &request, &response); err != nil { ServerName: server,
}
fedJoinRes := federationSenderAPI.PerformJoinResponse{}
if err := r.fsAPI.PerformJoin(r.req.Context(), &fedJoinReq, &fedJoinRes); err != nil {
return nil, err return nil, err
} }
var supportedVersions []gomatrixserverlib.RoomVersion
for version := range response.AvailableRoomVersions {
supportedVersions = append(supportedVersions, version)
}
respMakeJoin, err := r.federation.MakeJoin(r.req.Context(), server, roomID, r.userID, supportedVersions)
if err != nil {
// TODO: Check if the user was not allowed to join the room.
return nil, fmt.Errorf("r.federation.MakeJoin: %w", err)
}
// Set all the fields to be what they should be, this should be a no-op
// but it's possible that the remote server returned us something "odd"
err = r.writeToBuilder(&respMakeJoin.JoinEvent, roomID)
if err != nil {
return nil, fmt.Errorf("r.writeToBuilder: %w", err)
}
if respMakeJoin.RoomVersion == "" {
respMakeJoin.RoomVersion = gomatrixserverlib.RoomVersionV1
}
if _, err = respMakeJoin.RoomVersion.EventFormat(); err != nil {
return &util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.UnsupportedRoomVersion(
fmt.Sprintf("Room version '%s' is not supported", respMakeJoin.RoomVersion),
),
}, nil
}
event, err := respMakeJoin.JoinEvent.Build(
r.evTime, r.cfg.Matrix.ServerName, r.cfg.Matrix.KeyID,
r.cfg.Matrix.PrivateKey, respMakeJoin.RoomVersion,
)
if err != nil {
return nil, fmt.Errorf("respMakeJoin.JoinEvent.Build: %w", err)
}
respSendJoin, err := r.federation.SendJoin(r.req.Context(), server, event, respMakeJoin.RoomVersion)
if err != nil {
return nil, fmt.Errorf("r.federation.SendJoin: %w", err)
}
if err = r.checkSendJoinResponse(event, server, respMakeJoin, respSendJoin); err != nil {
return nil, err
}
util.GetLogger(r.req.Context()).WithFields(logrus.Fields{
"room_id": roomID,
"num_auth_events": len(respSendJoin.AuthEvents),
"num_state_events": len(respSendJoin.StateEvents),
}).Info("Room join signature and auth verification passed")
if err = r.producer.SendEventWithState(
r.req.Context(),
respSendJoin.ToRespState(),
event.Headered(respMakeJoin.RoomVersion),
); err != nil {
util.GetLogger(r.req.Context()).WithError(err).Error("r.producer.SendEventWithState")
}
return &util.JSONResponse{ return &util.JSONResponse{
Code: http.StatusOK, Code: http.StatusOK,
@ -400,49 +346,3 @@ func (r joinRoomReq) joinRoomUsingServer(roomID string, server gomatrixserverlib
}{roomID}, }{roomID},
}, nil }, nil
} }
// checkSendJoinResponse checks that all of the signatures are correct
// and that the join is allowed by the supplied state.
func (r joinRoomReq) checkSendJoinResponse(
event gomatrixserverlib.Event,
server gomatrixserverlib.ServerName,
respMakeJoin gomatrixserverlib.RespMakeJoin,
respSendJoin gomatrixserverlib.RespSendJoin,
) error {
// A list of events that we have retried, if they were not included in
// the auth events supplied in the send_join.
retries := map[string]bool{}
retryCheck:
// TODO: Can we expand Check here to return a list of missing auth
// events rather than failing one at a time?
if err := respSendJoin.Check(r.req.Context(), r.keyRing, event); err != nil {
switch e := err.(type) {
case gomatrixserverlib.MissingAuthEventError:
// Check that we haven't already retried for this event, prevents
// us from ending up in endless loops
if !retries[e.AuthEventID] {
// Ask the server that we're talking to right now for the event
tx, txerr := r.federation.GetEvent(r.req.Context(), server, e.AuthEventID)
if txerr != nil {
return fmt.Errorf("r.federation.GetEvent: %w", txerr)
}
// For each event returned, add it to the auth events.
for _, pdu := range tx.PDUs {
ev, everr := gomatrixserverlib.NewEventFromUntrustedJSON(pdu, respMakeJoin.RoomVersion)
if everr != nil {
return fmt.Errorf("gomatrixserverlib.NewEventFromUntrustedJSON: %w", everr)
}
respSendJoin.AuthEvents = append(respSendJoin.AuthEvents, ev)
}
// Mark the event as retried and then give the check another go.
retries[e.AuthEventID] = true
goto retryCheck
}
return fmt.Errorf("respSendJoin (after retries): %w", e)
default:
return fmt.Errorf("respSendJoin: %w", err)
}
}
return nil
}

View file

@ -60,7 +60,7 @@ func Setup(
syncProducer *producers.SyncAPIProducer, syncProducer *producers.SyncAPIProducer,
eduProducer *producers.EDUServerProducer, eduProducer *producers.EDUServerProducer,
transactionsCache *transactions.Cache, transactionsCache *transactions.Cache,
federationSender federationSenderAPI.FederationSenderQueryAPI, federationSender federationSenderAPI.FederationSenderInternalAPI,
) { ) {
apiMux.Handle("/_matrix/client/versions", apiMux.Handle("/_matrix/client/versions",
@ -101,7 +101,8 @@ func Setup(
return util.ErrorResponse(err) return util.ErrorResponse(err)
} }
return JoinRoomByIDOrAlias( return JoinRoomByIDOrAlias(
req, device, vars["roomIDOrAlias"], cfg, federation, producer, queryAPI, aliasAPI, keyRing, accountDB, req, device, vars["roomIDOrAlias"], cfg, federation, producer,
queryAPI, aliasAPI, federationSender, keyRing, accountDB,
) )
}), }),
).Methods(http.MethodPost, http.MethodOptions) ).Methods(http.MethodPost, http.MethodOptions)

View file

@ -37,12 +37,12 @@ func main() {
asQuery := base.CreateHTTPAppServiceAPIs() asQuery := base.CreateHTTPAppServiceAPIs()
alias, input, query := base.CreateHTTPRoomserverAPIs() alias, input, query := base.CreateHTTPRoomserverAPIs()
fedSenderAPI := base.CreateHTTPFederationSenderAPIs() fsAPI := base.CreateHTTPFederationSenderAPIs()
eduInputAPI := eduserver.SetupEDUServerComponent(base, cache.New()) eduInputAPI := eduserver.SetupEDUServerComponent(base, cache.New())
clientapi.SetupClientAPIComponent( clientapi.SetupClientAPIComponent(
base, deviceDB, accountDB, federation, &keyRing, base, deviceDB, accountDB, federation, &keyRing,
alias, input, query, eduInputAPI, asQuery, transactions.New(), fedSenderAPI, alias, input, query, eduInputAPI, asQuery, transactions.New(), fsAPI,
) )
base.SetupAndServeHTTP(string(base.Cfg.Bind.ClientAPI), string(base.Cfg.Listen.ClientAPI)) base.SetupAndServeHTTP(string(base.Cfg.Bind.ClientAPI), string(base.Cfg.Listen.ClientAPI))

View file

@ -153,15 +153,15 @@ func main() {
asQuery := appservice.SetupAppServiceAPIComponent( asQuery := appservice.SetupAppServiceAPIComponent(
&base.Base, accountDB, deviceDB, federation, alias, query, transactions.New(), &base.Base, accountDB, deviceDB, federation, alias, query, transactions.New(),
) )
fedSenderAPI := federationsender.SetupFederationSenderComponent(&base.Base, federation, query, input) fsAPI := federationsender.SetupFederationSenderComponent(&base.Base, federation, query, input, &keyRing)
clientapi.SetupClientAPIComponent( clientapi.SetupClientAPIComponent(
&base.Base, deviceDB, accountDB, &base.Base, deviceDB, accountDB,
federation, &keyRing, alias, input, query, federation, &keyRing, alias, input, query,
eduInputAPI, asQuery, transactions.New(), fedSenderAPI, eduInputAPI, asQuery, transactions.New(), fsAPI,
) )
eduProducer := producers.NewEDUServerProducer(eduInputAPI) eduProducer := producers.NewEDUServerProducer(eduInputAPI)
federationapi.SetupFederationAPIComponent(&base.Base, accountDB, deviceDB, federation, &keyRing, alias, input, query, asQuery, fedSenderAPI, eduProducer) federationapi.SetupFederationAPIComponent(&base.Base, accountDB, deviceDB, federation, &keyRing, alias, input, query, asQuery, fsAPI, eduProducer)
mediaapi.SetupMediaAPIComponent(&base.Base, deviceDB) mediaapi.SetupMediaAPIComponent(&base.Base, deviceDB)
publicRoomsDB, err := storage.NewPublicRoomsServerDatabaseWithPubSub(string(base.Base.Cfg.Database.PublicRoomsAPI), base.LibP2PPubsub) publicRoomsDB, err := storage.NewPublicRoomsServerDatabaseWithPubSub(string(base.Base.Cfg.Database.PublicRoomsAPI), base.LibP2PPubsub)
if err != nil { if err != nil {

View file

@ -32,7 +32,7 @@ func main() {
deviceDB := base.CreateDeviceDB() deviceDB := base.CreateDeviceDB()
keyDB := base.CreateKeyDB() keyDB := base.CreateKeyDB()
federation := base.CreateFederationClient() federation := base.CreateFederationClient()
federationSender := base.CreateHTTPFederationSenderAPIs() fsAPI := base.CreateHTTPFederationSenderAPIs()
keyRing := keydb.CreateKeyRing(federation.Client, keyDB, cfg.Matrix.KeyPerspectives) keyRing := keydb.CreateKeyRing(federation.Client, keyDB, cfg.Matrix.KeyPerspectives)
alias, input, query := base.CreateHTTPRoomserverAPIs() alias, input, query := base.CreateHTTPRoomserverAPIs()
@ -42,7 +42,7 @@ func main() {
federationapi.SetupFederationAPIComponent( federationapi.SetupFederationAPIComponent(
base, accountDB, deviceDB, federation, &keyRing, base, accountDB, deviceDB, federation, &keyRing,
alias, input, query, asQuery, federationSender, eduProducer, alias, input, query, asQuery, fsAPI, eduProducer,
) )
base.SetupAndServeHTTP(string(base.Cfg.Bind.FederationAPI), string(base.Cfg.Listen.FederationAPI)) base.SetupAndServeHTTP(string(base.Cfg.Bind.FederationAPI), string(base.Cfg.Listen.FederationAPI))

View file

@ -16,6 +16,7 @@ package main
import ( import (
"github.com/matrix-org/dendrite/common/basecomponent" "github.com/matrix-org/dendrite/common/basecomponent"
"github.com/matrix-org/dendrite/common/keydb"
"github.com/matrix-org/dendrite/federationsender" "github.com/matrix-org/dendrite/federationsender"
) )
@ -25,11 +26,13 @@ func main() {
defer base.Close() // nolint: errcheck defer base.Close() // nolint: errcheck
federation := base.CreateFederationClient() federation := base.CreateFederationClient()
keyDB := base.CreateKeyDB()
keyRing := keydb.CreateKeyRing(federation.Client, keyDB, cfg.Matrix.KeyPerspectives)
_, input, query := base.CreateHTTPRoomserverAPIs() _, input, query := base.CreateHTTPRoomserverAPIs()
federationsender.SetupFederationSenderComponent( federationsender.SetupFederationSenderComponent(
base, federation, query, input, base, federation, query, input, &keyRing,
) )
base.SetupAndServeHTTP(string(base.Cfg.Bind.FederationSender), string(base.Cfg.Listen.FederationSender)) base.SetupAndServeHTTP(string(base.Cfg.Bind.FederationSender), string(base.Cfg.Listen.FederationSender))

View file

@ -62,15 +62,16 @@ func main() {
asQuery := appservice.SetupAppServiceAPIComponent( asQuery := appservice.SetupAppServiceAPIComponent(
base, accountDB, deviceDB, federation, alias, query, transactions.New(), base, accountDB, deviceDB, federation, alias, query, transactions.New(),
) )
fedSenderAPI := federationsender.SetupFederationSenderComponent(base, federation, query, input) fsAPI := federationsender.SetupFederationSenderComponent(base, federation, query, input, &keyRing)
input.SetFederationSenderAPI(fsAPI)
clientapi.SetupClientAPIComponent( clientapi.SetupClientAPIComponent(
base, deviceDB, accountDB, base, deviceDB, accountDB,
federation, &keyRing, alias, input, query, federation, &keyRing, alias, input, query,
eduInputAPI, asQuery, transactions.New(), fedSenderAPI, eduInputAPI, asQuery, transactions.New(), fsAPI,
) )
eduProducer := producers.NewEDUServerProducer(eduInputAPI) eduProducer := producers.NewEDUServerProducer(eduInputAPI)
federationapi.SetupFederationAPIComponent(base, accountDB, deviceDB, federation, &keyRing, alias, input, query, asQuery, fedSenderAPI, eduProducer) federationapi.SetupFederationAPIComponent(base, accountDB, deviceDB, federation, &keyRing, alias, input, query, asQuery, fsAPI, eduProducer)
mediaapi.SetupMediaAPIComponent(base, deviceDB) mediaapi.SetupMediaAPIComponent(base, deviceDB)
publicRoomsDB, err := storage.NewPublicRoomsServerDatabase(string(base.Cfg.Database.PublicRoomsAPI)) publicRoomsDB, err := storage.NewPublicRoomsServerDatabase(string(base.Cfg.Database.PublicRoomsAPI))
if err != nil { if err != nil {

View file

@ -30,7 +30,9 @@ func main() {
federation := base.CreateFederationClient() federation := base.CreateFederationClient()
keyRing := keydb.CreateKeyRing(federation.Client, keyDB, cfg.Matrix.KeyPerspectives) keyRing := keydb.CreateKeyRing(federation.Client, keyDB, cfg.Matrix.KeyPerspectives)
roomserver.SetupRoomServerComponent(base, keyRing, federation) fsAPI := base.CreateHTTPFederationSenderAPIs()
_, input, _ := roomserver.SetupRoomServerComponent(base, keyRing, federation)
input.SetFederationSenderAPI(fsAPI)
base.SetupAndServeHTTP(string(base.Cfg.Bind.RoomServer), string(base.Cfg.Listen.RoomServer)) base.SetupAndServeHTTP(string(base.Cfg.Bind.RoomServer), string(base.Cfg.Listen.RoomServer))

View file

@ -128,7 +128,8 @@ func main() {
asQuery := appservice.SetupAppServiceAPIComponent( asQuery := appservice.SetupAppServiceAPIComponent(
base, accountDB, deviceDB, federation, alias, query, transactions.New(), base, accountDB, deviceDB, federation, alias, query, transactions.New(),
) )
fedSenderAPI := federationsender.SetupFederationSenderComponent(base, federation, query, input) fedSenderAPI := federationsender.SetupFederationSenderComponent(base, federation, query, input, &keyRing)
input.SetFederationSenderAPI(fedSenderAPI)
clientapi.SetupClientAPIComponent( clientapi.SetupClientAPIComponent(
base, deviceDB, accountDB, base, deviceDB, accountDB,

View file

@ -149,12 +149,12 @@ func (b *BaseDendrite) CreateHTTPEDUServerAPIs() eduServerAPI.EDUServerInputAPI
return e return e
} }
// CreateHTTPFederationSenderAPIs returns FederationSenderQueryAPI for hitting // CreateHTTPFederationSenderAPIs returns FederationSenderInternalAPI for hitting
// the federation sender over HTTP // the federation sender over HTTP
func (b *BaseDendrite) CreateHTTPFederationSenderAPIs() federationSenderAPI.FederationSenderQueryAPI { func (b *BaseDendrite) CreateHTTPFederationSenderAPIs() federationSenderAPI.FederationSenderInternalAPI {
f, err := federationSenderAPI.NewFederationSenderQueryAPIHTTP(b.Cfg.FederationSenderURL(), b.httpClient) f, err := federationSenderAPI.NewFederationSenderInternalAPIHTTP(b.Cfg.FederationSenderURL(), b.httpClient)
if err != nil { if err != nil {
logrus.WithError(err).Panic("NewFederationSenderQueryAPIHTTP failed", b.httpClient) logrus.WithError(err).Panic("NewFederationSenderInternalAPIHTTP failed", b.httpClient)
} }
return f return f
} }

View file

@ -40,7 +40,7 @@ func SetupFederationAPIComponent(
inputAPI roomserverAPI.RoomserverInputAPI, inputAPI roomserverAPI.RoomserverInputAPI,
queryAPI roomserverAPI.RoomserverQueryAPI, queryAPI roomserverAPI.RoomserverQueryAPI,
asAPI appserviceAPI.AppServiceQueryAPI, asAPI appserviceAPI.AppServiceQueryAPI,
federationSenderAPI federationSenderAPI.FederationSenderQueryAPI, federationSenderAPI federationSenderAPI.FederationSenderInternalAPI,
eduProducer *producers.EDUServerProducer, eduProducer *producers.EDUServerProducer,
) { ) {
roomserverProducer := producers.NewRoomserverProducer(inputAPI, queryAPI) roomserverProducer := producers.NewRoomserverProducer(inputAPI, queryAPI)

View file

@ -33,7 +33,7 @@ func RoomAliasToID(
federation *gomatrixserverlib.FederationClient, federation *gomatrixserverlib.FederationClient,
cfg *config.Dendrite, cfg *config.Dendrite,
aliasAPI roomserverAPI.RoomserverAliasAPI, aliasAPI roomserverAPI.RoomserverAliasAPI,
senderAPI federationSenderAPI.FederationSenderQueryAPI, senderAPI federationSenderAPI.FederationSenderInternalAPI,
) util.JSONResponse { ) util.JSONResponse {
roomAlias := httpReq.FormValue("room_alias") roomAlias := httpReq.FormValue("room_alias")
if roomAlias == "" { if roomAlias == "" {

View file

@ -49,7 +49,7 @@ func Setup(
asAPI appserviceAPI.AppServiceQueryAPI, asAPI appserviceAPI.AppServiceQueryAPI,
producer *producers.RoomserverProducer, producer *producers.RoomserverProducer,
eduProducer *producers.EDUServerProducer, eduProducer *producers.EDUServerProducer,
federationSenderAPI federationSenderAPI.FederationSenderQueryAPI, federationSenderAPI federationSenderAPI.FederationSenderInternalAPI,
keys gomatrixserverlib.KeyRing, keys gomatrixserverlib.KeyRing,
federation *gomatrixserverlib.FederationClient, federation *gomatrixserverlib.FederationClient,
accountDB accounts.Database, accountDB accounts.Database,

View file

@ -0,0 +1,53 @@
package api
import (
"context"
"errors"
"net/http"
)
// FederationSenderInternalAPI is used to query information from the federation sender.
type FederationSenderInternalAPI interface {
// Query the joined hosts and the membership events accounting for their participation in a room.
// Note that if a server has multiple users in the room, it will have multiple entries in the returned slice.
// See `QueryJoinedHostServerNamesInRoom` for a de-duplicated version.
QueryJoinedHostsInRoom(
ctx context.Context,
request *QueryJoinedHostsInRoomRequest,
response *QueryJoinedHostsInRoomResponse,
) error
// Query the server names of the joined hosts in a room.
// Unlike QueryJoinedHostsInRoom, this function returns a de-duplicated slice
// containing only the server names (without information for membership events).
QueryJoinedHostServerNamesInRoom(
ctx context.Context,
request *QueryJoinedHostServerNamesInRoomRequest,
response *QueryJoinedHostServerNamesInRoomResponse,
) error
// Handle an instruction to make_join & send_join with a remote server.
PerformJoin(
ctx context.Context,
request *PerformJoinRequest,
response *PerformJoinResponse,
) error
// Handle an instruction to make_leave & send_leave with a remote server.
PerformLeave(
ctx context.Context,
request *PerformLeaveRequest,
response *PerformLeaveResponse,
) error
}
// NewFederationSenderInternalAPIHTTP creates a FederationSenderInternalAPI implemented by talking to a HTTP POST API.
// If httpClient is nil an error is returned
func NewFederationSenderInternalAPIHTTP(federationSenderURL string, httpClient *http.Client) (FederationSenderInternalAPI, error) {
if httpClient == nil {
return nil, errors.New("NewFederationSenderInternalAPIHTTP: httpClient is <nil>")
}
return &httpFederationSenderInternalAPI{federationSenderURL, httpClient}, nil
}
type httpFederationSenderInternalAPI struct {
federationSenderURL string
httpClient *http.Client
}

View file

@ -0,0 +1,60 @@
package api
import (
"context"
commonHTTP "github.com/matrix-org/dendrite/common/http"
"github.com/matrix-org/gomatrixserverlib"
"github.com/opentracing/opentracing-go"
)
const (
// FederationSenderPerformJoinRequestPath is the HTTP path for the PerformJoinRequest API.
FederationSenderPerformJoinRequestPath = "/api/federationsender/performJoinRequest"
// FederationSenderPerformLeaveRequestPath is the HTTP path for the PerformLeaveRequest API.
FederationSenderPerformLeaveRequestPath = "/api/federationsender/performLeaveRequest"
)
type PerformJoinRequest struct {
RoomID string `json:"room_id"`
UserID string `json:"user_id"`
ServerName gomatrixserverlib.ServerName `json:"server_name"`
Content map[string]interface{} `json:"content"`
}
type PerformJoinResponse struct {
}
// Handle an instruction to make_join & send_join with a remote server.
func (h *httpFederationSenderInternalAPI) PerformJoin(
ctx context.Context,
request *PerformJoinRequest,
response *PerformJoinResponse,
) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "PerformJoinRequest")
defer span.Finish()
apiURL := h.federationSenderURL + FederationSenderPerformJoinRequestPath
return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}
type PerformLeaveRequest struct {
RoomID string `json:"room_id"`
}
type PerformLeaveResponse struct {
}
// Handle an instruction to make_leave & send_leave with a remote server.
func (h *httpFederationSenderInternalAPI) PerformLeave(
ctx context.Context,
request *PerformLeaveRequest,
response *PerformLeaveResponse,
) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "PerformLeaveRequest")
defer span.Finish()
apiURL := h.federationSenderURL + FederationSenderPerformLeaveRequestPath
return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}

View file

@ -2,16 +2,20 @@ package api
import ( import (
"context" "context"
"errors"
"net/http"
commonHTTP "github.com/matrix-org/dendrite/common/http" commonHTTP "github.com/matrix-org/dendrite/common/http"
"github.com/matrix-org/dendrite/federationsender/types"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/dendrite/federationsender/types"
"github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go"
) )
// FederationSenderQueryJoinedHostsInRoomPath is the HTTP path for the QueryJoinedHostsInRoom API.
const FederationSenderQueryJoinedHostsInRoomPath = "/api/federationsender/queryJoinedHostsInRoom"
// FederationSenderQueryJoinedHostServerNamesInRoomPath is the HTTP path for the QueryJoinedHostServerNamesInRoom API.
const FederationSenderQueryJoinedHostServerNamesInRoomPath = "/api/federationsender/queryJoinedHostServerNamesInRoom"
// QueryJoinedHostsInRoomRequest is a request to QueryJoinedHostsInRoom // QueryJoinedHostsInRoomRequest is a request to QueryJoinedHostsInRoom
type QueryJoinedHostsInRoomRequest struct { type QueryJoinedHostsInRoomRequest struct {
RoomID string `json:"room_id"` RoomID string `json:"room_id"`
@ -22,6 +26,19 @@ type QueryJoinedHostsInRoomResponse struct {
JoinedHosts []types.JoinedHost `json:"joined_hosts"` JoinedHosts []types.JoinedHost `json:"joined_hosts"`
} }
// QueryJoinedHostsInRoom implements FederationSenderInternalAPI
func (h *httpFederationSenderInternalAPI) QueryJoinedHostsInRoom(
ctx context.Context,
request *QueryJoinedHostsInRoomRequest,
response *QueryJoinedHostsInRoomResponse,
) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "QueryJoinedHostsInRoom")
defer span.Finish()
apiURL := h.federationSenderURL + FederationSenderQueryJoinedHostsInRoomPath
return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}
// QueryJoinedHostServerNamesRequest is a request to QueryJoinedHostServerNames // QueryJoinedHostServerNamesRequest is a request to QueryJoinedHostServerNames
type QueryJoinedHostServerNamesInRoomRequest struct { type QueryJoinedHostServerNamesInRoomRequest struct {
RoomID string `json:"room_id"` RoomID string `json:"room_id"`
@ -32,61 +49,8 @@ type QueryJoinedHostServerNamesInRoomResponse struct {
ServerNames []gomatrixserverlib.ServerName `json:"server_names"` ServerNames []gomatrixserverlib.ServerName `json:"server_names"`
} }
// FederationSenderQueryAPI is used to query information from the federation sender. // QueryJoinedHostServerNamesInRoom implements FederationSenderInternalAPI
type FederationSenderQueryAPI interface { func (h *httpFederationSenderInternalAPI) QueryJoinedHostServerNamesInRoom(
// Query the joined hosts and the membership events accounting for their participation in a room.
// Note that if a server has multiple users in the room, it will have multiple entries in the returned slice.
// See `QueryJoinedHostServerNamesInRoom` for a de-duplicated version.
QueryJoinedHostsInRoom(
ctx context.Context,
request *QueryJoinedHostsInRoomRequest,
response *QueryJoinedHostsInRoomResponse,
) error
// Query the server names of the joined hosts in a room.
// Unlike QueryJoinedHostsInRoom, this function returns a de-duplicated slice
// containing only the server names (without information for membership events).
QueryJoinedHostServerNamesInRoom(
ctx context.Context,
request *QueryJoinedHostServerNamesInRoomRequest,
response *QueryJoinedHostServerNamesInRoomResponse,
) error
}
// FederationSenderQueryJoinedHostsInRoomPath is the HTTP path for the QueryJoinedHostsInRoom API.
const FederationSenderQueryJoinedHostsInRoomPath = "/api/federationsender/queryJoinedHostsInRoom"
// FederationSenderQueryJoinedHostServerNamesInRoomPath is the HTTP path for the QueryJoinedHostServerNamesInRoom API.
const FederationSenderQueryJoinedHostServerNamesInRoomPath = "/api/federationsender/queryJoinedHostServerNamesInRoom"
// NewFederationSenderQueryAPIHTTP creates a FederationSenderQueryAPI implemented by talking to a HTTP POST API.
// If httpClient is nil an error is returned
func NewFederationSenderQueryAPIHTTP(federationSenderURL string, httpClient *http.Client) (FederationSenderQueryAPI, error) {
if httpClient == nil {
return nil, errors.New("NewFederationSenderQueryAPIHTTP: httpClient is <nil>")
}
return &httpFederationSenderQueryAPI{federationSenderURL, httpClient}, nil
}
type httpFederationSenderQueryAPI struct {
federationSenderURL string
httpClient *http.Client
}
// QueryJoinedHostsInRoom implements FederationSenderQueryAPI
func (h *httpFederationSenderQueryAPI) QueryJoinedHostsInRoom(
ctx context.Context,
request *QueryJoinedHostsInRoomRequest,
response *QueryJoinedHostsInRoomResponse,
) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "QueryJoinedHostsInRoom")
defer span.Finish()
apiURL := h.federationSenderURL + FederationSenderQueryJoinedHostsInRoomPath
return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}
// QueryJoinedHostServerNamesInRoom implements FederationSenderQueryAPI
func (h *httpFederationSenderQueryAPI) QueryJoinedHostServerNamesInRoom(
ctx context.Context, ctx context.Context,
request *QueryJoinedHostServerNamesInRoomRequest, request *QueryJoinedHostServerNamesInRoomRequest,
response *QueryJoinedHostServerNamesInRoomResponse, response *QueryJoinedHostServerNamesInRoomResponse,

View file

@ -36,7 +36,8 @@ func SetupFederationSenderComponent(
federation *gomatrixserverlib.FederationClient, federation *gomatrixserverlib.FederationClient,
rsQueryAPI roomserverAPI.RoomserverQueryAPI, rsQueryAPI roomserverAPI.RoomserverQueryAPI,
rsInputAPI roomserverAPI.RoomserverInputAPI, rsInputAPI roomserverAPI.RoomserverInputAPI,
) api.FederationSenderQueryAPI { keyRing *gomatrixserverlib.KeyRing,
) api.FederationSenderInternalAPI {
federationSenderDB, err := storage.NewDatabase(string(base.Cfg.Database.FederationSender)) federationSenderDB, err := storage.NewDatabase(string(base.Cfg.Database.FederationSender))
if err != nil { if err != nil {
logrus.WithError(err).Panic("failed to connect to federation sender db") logrus.WithError(err).Panic("failed to connect to federation sender db")
@ -61,10 +62,10 @@ func SetupFederationSenderComponent(
logrus.WithError(err).Panic("failed to start typing server consumer") logrus.WithError(err).Panic("failed to start typing server consumer")
} }
queryAPI := query.FederationSenderQueryAPI{ queryAPI := query.NewFederationSenderInternalAPI(
DB: federationSenderDB, federationSenderDB, base.Cfg, roomserverProducer, federation, keyRing,
} )
queryAPI.SetupHTTP(http.DefaultServeMux) queryAPI.SetupHTTP(http.DefaultServeMux)
return &queryAPI return queryAPI
} }

View file

@ -54,6 +54,42 @@ func (c *RoomserverProducer) SendInviteResponse(
return c.SendInputRoomEvents(ctx, []api.InputRoomEvent{ire}) return c.SendInputRoomEvents(ctx, []api.InputRoomEvent{ire})
} }
// SendEventWithState writes an event with KindNew to the roomserver input log
// with the state at the event as KindOutlier before it.
func (c *RoomserverProducer) SendEventWithState(
ctx context.Context, state gomatrixserverlib.RespState, event gomatrixserverlib.HeaderedEvent,
) error {
outliers, err := state.Events()
if err != nil {
return err
}
var ires []api.InputRoomEvent
for _, outlier := range outliers {
ires = append(ires, api.InputRoomEvent{
Kind: api.KindOutlier,
Event: outlier.Headered(event.RoomVersion),
AuthEventIDs: outlier.AuthEventIDs(),
})
}
stateEventIDs := make([]string, len(state.StateEvents))
for i := range state.StateEvents {
stateEventIDs[i] = state.StateEvents[i].EventID()
}
ires = append(ires, api.InputRoomEvent{
Kind: api.KindNew,
Event: event,
AuthEventIDs: event.AuthEventIDs(),
HasState: true,
StateEventIDs: stateEventIDs,
})
_, err = c.SendInputRoomEvents(ctx, ires)
return err
}
// SendInputRoomEvents writes the given input room events to the roomserver input API. // SendInputRoomEvents writes the given input room events to the roomserver input API.
func (c *RoomserverProducer) SendInputRoomEvents( func (c *RoomserverProducer) SendInputRoomEvents(
ctx context.Context, ires []api.InputRoomEvent, ctx context.Context, ires []api.InputRoomEvent,

View file

@ -0,0 +1,97 @@
package query
import (
"encoding/json"
"net/http"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/federationsender/producers"
"github.com/matrix-org/dendrite/federationsender/storage"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
)
// FederationSenderInternalAPI is an implementation of api.FederationSenderInternalAPI
type FederationSenderInternalAPI struct {
api.FederationSenderInternalAPI
db storage.Database
cfg *config.Dendrite
producer *producers.RoomserverProducer
federation *gomatrixserverlib.FederationClient
keyRing *gomatrixserverlib.KeyRing
}
func NewFederationSenderInternalAPI(
db storage.Database, cfg *config.Dendrite,
producer *producers.RoomserverProducer,
federation *gomatrixserverlib.FederationClient,
keyRing *gomatrixserverlib.KeyRing,
) *FederationSenderInternalAPI {
return &FederationSenderInternalAPI{
db: db,
cfg: cfg,
producer: producer,
federation: federation,
keyRing: keyRing,
}
}
// SetupHTTP adds the FederationSenderInternalAPI handlers to the http.ServeMux.
func (f *FederationSenderInternalAPI) SetupHTTP(servMux *http.ServeMux) {
servMux.Handle(
api.FederationSenderQueryJoinedHostsInRoomPath,
common.MakeInternalAPI("QueryJoinedHostsInRoom", func(req *http.Request) util.JSONResponse {
var request api.QueryJoinedHostsInRoomRequest
var response api.QueryJoinedHostsInRoomResponse
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.ErrorResponse(err)
}
if err := f.QueryJoinedHostsInRoom(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err)
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
servMux.Handle(
api.FederationSenderQueryJoinedHostServerNamesInRoomPath,
common.MakeInternalAPI("QueryJoinedHostServerNamesInRoom", func(req *http.Request) util.JSONResponse {
var request api.QueryJoinedHostServerNamesInRoomRequest
var response api.QueryJoinedHostServerNamesInRoomResponse
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.ErrorResponse(err)
}
if err := f.QueryJoinedHostServerNamesInRoom(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err)
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
servMux.Handle(api.FederationSenderPerformJoinRequestPath,
common.MakeInternalAPI("PerformJoinRequest", func(req *http.Request) util.JSONResponse {
var request api.PerformJoinRequest
var response api.PerformJoinResponse
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.MessageResponse(http.StatusBadRequest, err.Error())
}
if err := f.PerformJoin(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err)
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
servMux.Handle(api.FederationSenderPerformLeaveRequestPath,
common.MakeInternalAPI("PerformLeaveRequest", func(req *http.Request) util.JSONResponse {
var request api.PerformLeaveRequest
var response api.PerformLeaveResponse
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.MessageResponse(http.StatusBadRequest, err.Error())
}
if err := f.PerformLeave(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err)
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
}

View file

@ -0,0 +1,120 @@
package query
import (
"context"
"fmt"
"time"
"github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/federationsender/query/perform"
"github.com/matrix-org/dendrite/roomserver/version"
"github.com/matrix-org/gomatrixserverlib"
)
// PerformJoinRequest implements api.FederationSenderInternalAPI
func (r *FederationSenderInternalAPI) PerformJoin(
ctx context.Context,
request *api.PerformJoinRequest,
response *api.PerformJoinResponse,
) (err error) {
// Look up the supported room versions.
var supportedVersions []gomatrixserverlib.RoomVersion
for version := range version.SupportedRoomVersions() {
supportedVersions = append(supportedVersions, version)
}
// Try to perform a make_join using the information supplied in the
// request.
respMakeJoin, err := r.federation.MakeJoin(
ctx,
request.ServerName,
request.RoomID,
request.UserID,
supportedVersions,
)
if err != nil {
// TODO: Check if the user was not allowed to join the room.
return fmt.Errorf("r.federation.MakeJoin: %w", err)
}
// Set all the fields to be what they should be, this should be a no-op
// but it's possible that the remote server returned us something "odd"
respMakeJoin.JoinEvent.Type = "m.room.member"
respMakeJoin.JoinEvent.Sender = request.UserID
respMakeJoin.JoinEvent.StateKey = &request.UserID
respMakeJoin.JoinEvent.RoomID = request.RoomID
respMakeJoin.JoinEvent.Redacts = ""
if request.Content == nil {
request.Content = map[string]interface{}{}
}
request.Content["membership"] = "join"
if err = respMakeJoin.JoinEvent.SetContent(request.Content); err != nil {
return fmt.Errorf("respMakeJoin.JoinEvent.SetContent: %w", err)
}
if err = respMakeJoin.JoinEvent.SetUnsigned(struct{}{}); err != nil {
return fmt.Errorf("respMakeJoin.JoinEvent.SetUnsigned: %w", err)
}
// Work out if we support the room version that has been supplied in
// the make_join response.
if respMakeJoin.RoomVersion == "" {
respMakeJoin.RoomVersion = gomatrixserverlib.RoomVersionV1
}
if _, err = respMakeJoin.RoomVersion.EventFormat(); err != nil {
return fmt.Errorf("respMakeJoin.RoomVersion.EventFormat: %w", err)
}
// Build the join event.
event, err := respMakeJoin.JoinEvent.Build(
time.Now(),
r.cfg.Matrix.ServerName,
r.cfg.Matrix.KeyID,
r.cfg.Matrix.PrivateKey,
respMakeJoin.RoomVersion,
)
if err != nil {
return fmt.Errorf("respMakeJoin.JoinEvent.Build: %w", err)
}
// Try to perform a send_join using the newly built event.
respSendJoin, err := r.federation.SendJoin(
ctx,
request.ServerName,
event,
respMakeJoin.RoomVersion,
)
if err != nil {
return fmt.Errorf("r.federation.SendJoin: %w", err)
}
// Check that the send_join response was valid.
joinCtx := perform.JoinContext(r.federation, r.keyRing)
if err = joinCtx.CheckSendJoinResponse(
ctx, event, request.ServerName, respMakeJoin, respSendJoin,
); err != nil {
return fmt.Errorf("perform.JoinRequest.CheckSendJoinResponse: %w", err)
}
// If we successfully performed a send_join above then the other
// server now thinks we're a part of the room. Send the newly
// returned state to the roomserver to update our local view.
if err = r.producer.SendEventWithState(
ctx,
respSendJoin.ToRespState(),
event.Headered(respMakeJoin.RoomVersion),
); err != nil {
return fmt.Errorf("r.producer.SendEventWithState: %w", err)
}
// Everything went to plan.
return nil
}
// PerformLeaveRequest implements api.FederationSenderInternalAPI
func (r *FederationSenderInternalAPI) PerformLeave(
ctx context.Context,
request *api.PerformLeaveRequest,
response *api.PerformLeaveResponse,
) (err error) {
return nil
}

View file

@ -0,0 +1,70 @@
package perform
import (
"context"
"fmt"
"github.com/matrix-org/gomatrixserverlib"
)
// This file contains helpers for the PerformJoin function.
type joinContext struct {
federation *gomatrixserverlib.FederationClient
keyRing *gomatrixserverlib.KeyRing
}
// Returns a new join context.
func JoinContext(f *gomatrixserverlib.FederationClient, k *gomatrixserverlib.KeyRing) *joinContext {
return &joinContext{
federation: f,
keyRing: k,
}
}
// checkSendJoinResponse checks that all of the signatures are correct
// and that the join is allowed by the supplied state.
func (r joinContext) CheckSendJoinResponse(
ctx context.Context,
event gomatrixserverlib.Event,
server gomatrixserverlib.ServerName,
respMakeJoin gomatrixserverlib.RespMakeJoin,
respSendJoin gomatrixserverlib.RespSendJoin,
) error {
// A list of events that we have retried, if they were not included in
// the auth events supplied in the send_join.
retries := map[string]bool{}
retryCheck:
// TODO: Can we expand Check here to return a list of missing auth
// events rather than failing one at a time?
if err := respSendJoin.Check(ctx, r.keyRing, event); err != nil {
switch e := err.(type) {
case gomatrixserverlib.MissingAuthEventError:
// Check that we haven't already retried for this event, prevents
// us from ending up in endless loops
if !retries[e.AuthEventID] {
// Ask the server that we're talking to right now for the event
tx, txerr := r.federation.GetEvent(ctx, server, e.AuthEventID)
if txerr != nil {
return fmt.Errorf("r.federation.GetEvent: %w", txerr)
}
// For each event returned, add it to the auth events.
for _, pdu := range tx.PDUs {
ev, everr := gomatrixserverlib.NewEventFromUntrustedJSON(pdu, respMakeJoin.RoomVersion)
if everr != nil {
return fmt.Errorf("gomatrixserverlib.NewEventFromUntrustedJSON: %w", everr)
}
respSendJoin.AuthEvents = append(respSendJoin.AuthEvents, ev)
}
// Mark the event as retried and then give the check another go.
retries[e.AuthEventID] = true
goto retryCheck
}
return fmt.Errorf("respSendJoin (after retries): %w", e)
default:
return fmt.Errorf("respSendJoin: %w", err)
}
}
return nil
}

View file

@ -2,45 +2,28 @@ package query
import ( import (
"context" "context"
"encoding/json"
"net/http"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/federationsender/api" "github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/federationsender/types"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
) )
// FederationSenderQueryDatabase has the APIs needed to implement the query API. // QueryJoinedHostsInRoom implements api.FederationSenderInternalAPI
type FederationSenderQueryDatabase interface { func (f *FederationSenderInternalAPI) QueryJoinedHostsInRoom(
GetJoinedHosts(
ctx context.Context, roomID string,
) ([]types.JoinedHost, error)
}
// FederationSenderQueryAPI is an implementation of api.FederationSenderQueryAPI
type FederationSenderQueryAPI struct {
DB FederationSenderQueryDatabase
}
// QueryJoinedHostsInRoom implements api.FederationSenderQueryAPI
func (f *FederationSenderQueryAPI) QueryJoinedHostsInRoom(
ctx context.Context, ctx context.Context,
request *api.QueryJoinedHostsInRoomRequest, request *api.QueryJoinedHostsInRoomRequest,
response *api.QueryJoinedHostsInRoomResponse, response *api.QueryJoinedHostsInRoomResponse,
) (err error) { ) (err error) {
response.JoinedHosts, err = f.DB.GetJoinedHosts(ctx, request.RoomID) response.JoinedHosts, err = f.db.GetJoinedHosts(ctx, request.RoomID)
return return
} }
// QueryJoinedHostServerNamesInRoom implements api.FederationSenderQueryAPI // QueryJoinedHostServerNamesInRoom implements api.FederationSenderInternalAPI
func (f *FederationSenderQueryAPI) QueryJoinedHostServerNamesInRoom( func (f *FederationSenderInternalAPI) QueryJoinedHostServerNamesInRoom(
ctx context.Context, ctx context.Context,
request *api.QueryJoinedHostServerNamesInRoomRequest, request *api.QueryJoinedHostServerNamesInRoomRequest,
response *api.QueryJoinedHostServerNamesInRoomResponse, response *api.QueryJoinedHostServerNamesInRoomResponse,
) (err error) { ) (err error) {
joinedHosts, err := f.DB.GetJoinedHosts(ctx, request.RoomID) joinedHosts, err := f.db.GetJoinedHosts(ctx, request.RoomID)
if err != nil { if err != nil {
return return
} }
@ -54,35 +37,3 @@ func (f *FederationSenderQueryAPI) QueryJoinedHostServerNamesInRoom(
return return
} }
// SetupHTTP adds the FederationSenderQueryAPI handlers to the http.ServeMux.
func (f *FederationSenderQueryAPI) SetupHTTP(servMux *http.ServeMux) {
servMux.Handle(
api.FederationSenderQueryJoinedHostsInRoomPath,
common.MakeInternalAPI("QueryJoinedHostsInRoom", func(req *http.Request) util.JSONResponse {
var request api.QueryJoinedHostsInRoomRequest
var response api.QueryJoinedHostsInRoomResponse
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.ErrorResponse(err)
}
if err := f.QueryJoinedHostsInRoom(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err)
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
servMux.Handle(
api.FederationSenderQueryJoinedHostServerNamesInRoomPath,
common.MakeInternalAPI("QueryJoinedHostServerNamesInRoom", func(req *http.Request) util.JSONResponse {
var request api.QueryJoinedHostServerNamesInRoomRequest
var response api.QueryJoinedHostServerNamesInRoomResponse
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.ErrorResponse(err)
}
if err := f.QueryJoinedHostServerNamesInRoom(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err)
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
}

2
go.sum
View file

@ -367,8 +367,6 @@ github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26 h1:Hr3zjRsq2bh
github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0= github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200124100636-0c2ec91d1df5 h1:kmRjpmFOenVpOaV/DRlo9p6z/IbOKlUC+hhKsAAh8Qg= github.com/matrix-org/gomatrixserverlib v0.0.0-20200124100636-0c2ec91d1df5 h1:kmRjpmFOenVpOaV/DRlo9p6z/IbOKlUC+hhKsAAh8Qg=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200124100636-0c2ec91d1df5/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI= github.com/matrix-org/gomatrixserverlib v0.0.0-20200124100636-0c2ec91d1df5/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200428112024-9f47f9bfa4b2 h1:sy2QOqJhb4WXzq8bJhsCntAUYb64Dl6txsFtXWtxxSg=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200428112024-9f47f9bfa4b2/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200429162354-392f0b1b7421 h1:4zP29YlpfEtJ9a7sZ33Mf0FJInD2N3/KzDcLa62bRKc= github.com/matrix-org/gomatrixserverlib v0.0.0-20200429162354-392f0b1b7421 h1:4zP29YlpfEtJ9a7sZ33Mf0FJInD2N3/KzDcLa62bRKc=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200429162354-392f0b1b7421/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU= github.com/matrix-org/gomatrixserverlib v0.0.0-20200429162354-392f0b1b7421/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU=
github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1 h1:osLoFdOy+ChQqVUn2PeTDETFftVkl4w9t/OW18g3lnk= github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1 h1:osLoFdOy+ChQqVUn2PeTDETFftVkl4w9t/OW18g3lnk=

View file

@ -21,6 +21,7 @@ import (
"net/http" "net/http"
commonHTTP "github.com/matrix-org/dendrite/common/http" commonHTTP "github.com/matrix-org/dendrite/common/http"
fsAPI "github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
opentracing "github.com/opentracing/opentracing-go" opentracing "github.com/opentracing/opentracing-go"
) )
@ -106,6 +107,9 @@ type InputRoomEventsResponse struct {
// RoomserverInputAPI is used to write events to the room server. // RoomserverInputAPI is used to write events to the room server.
type RoomserverInputAPI interface { type RoomserverInputAPI interface {
// needed to avoid chicken and egg scenario when setting up the
// interdependencies between the roomserver and the FS input API
SetFederationSenderAPI(fsInputAPI fsAPI.FederationSenderInternalAPI)
InputRoomEvents( InputRoomEvents(
ctx context.Context, ctx context.Context,
request *InputRoomEventsRequest, request *InputRoomEventsRequest,
@ -122,12 +126,22 @@ func NewRoomserverInputAPIHTTP(roomserverURL string, httpClient *http.Client) (R
if httpClient == nil { if httpClient == nil {
return nil, errors.New("NewRoomserverInputAPIHTTP: httpClient is <nil>") return nil, errors.New("NewRoomserverInputAPIHTTP: httpClient is <nil>")
} }
return &httpRoomserverInputAPI{roomserverURL, httpClient}, nil return &httpRoomserverInputAPI{roomserverURL, httpClient, nil}, nil
} }
type httpRoomserverInputAPI struct { type httpRoomserverInputAPI struct {
roomserverURL string roomserverURL string
httpClient *http.Client httpClient *http.Client
// The federation sender API allows us to send federation
// requests from the new perform input requests, still TODO.
fsInputAPI fsAPI.FederationSenderInternalAPI
}
// SetFederationSenderInputAPI passes in a federation sender input API reference
// so that we can avoid the chicken-and-egg problem of both the roomserver input API
// and the federation sender input API being interdependent.
func (h *httpRoomserverInputAPI) SetFederationSenderAPI(fsInputAPI fsAPI.FederationSenderInternalAPI) {
h.fsInputAPI = fsInputAPI
} }
// InputRoomEvents implements RoomserverInputAPI // InputRoomEvents implements RoomserverInputAPI

View file

@ -26,6 +26,8 @@ import (
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/storage" "github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/util" "github.com/matrix-org/util"
fsAPI "github.com/matrix-org/dendrite/federationsender/api"
) )
// RoomserverInputAPI implements api.RoomserverInputAPI // RoomserverInputAPI implements api.RoomserverInputAPI
@ -37,6 +39,16 @@ type RoomserverInputAPI struct {
OutputRoomEventTopic string OutputRoomEventTopic string
// Protects calls to processRoomEvent // Protects calls to processRoomEvent
mutex sync.Mutex mutex sync.Mutex
// The federation sender API allows us to send federation
// requests from the new perform input requests, still TODO.
fsAPI fsAPI.FederationSenderInternalAPI
}
// SetFederationSenderInputAPI passes in a federation sender input API reference
// so that we can avoid the chicken-and-egg problem of both the roomserver input API
// and the federation sender input API being interdependent.
func (r *RoomserverInputAPI) SetFederationSenderAPI(fsAPI fsAPI.FederationSenderInternalAPI) {
r.fsAPI = fsAPI
} }
// WriteOutputEvents implements OutputRoomEventWriter // WriteOutputEvents implements OutputRoomEventWriter

View file

@ -21,6 +21,7 @@ import (
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
asQuery "github.com/matrix-org/dendrite/appservice/query" asQuery "github.com/matrix-org/dendrite/appservice/query"
"github.com/matrix-org/dendrite/common/basecomponent" "github.com/matrix-org/dendrite/common/basecomponent"
"github.com/matrix-org/dendrite/roomserver/alias" "github.com/matrix-org/dendrite/roomserver/alias"
"github.com/matrix-org/dendrite/roomserver/input" "github.com/matrix-org/dendrite/roomserver/input"