Merge branch 'master' into neilalexander/yggembed

This commit is contained in:
Neil Alexander 2020-06-10 16:29:31 +01:00
commit 002ffc10de
33 changed files with 377 additions and 325 deletions

View file

@ -26,7 +26,6 @@ import (
"github.com/matrix-org/dendrite/appservice/consumers"
"github.com/matrix-org/dendrite/appservice/inthttp"
"github.com/matrix-org/dendrite/appservice/query"
"github.com/matrix-org/dendrite/appservice/routing"
"github.com/matrix-org/dendrite/appservice/storage"
"github.com/matrix-org/dendrite/appservice/types"
"github.com/matrix-org/dendrite/appservice/workers"
@ -35,22 +34,10 @@ import (
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/basecomponent"
"github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/internal/transactions"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
)
// AddPublicRoutes registers HTTP handlers for CS API calls
func AddPublicRoutes(router *mux.Router, cfg *config.Dendrite, rsAPI roomserverAPI.RoomserverInternalAPI,
accountsDB accounts.Database, federation *gomatrixserverlib.FederationClient, txnCache *transactions.Cache) {
routing.Setup(
router, cfg, rsAPI,
accountsDB, federation, txnCache,
)
}
// AddInternalRoutes registers HTTP handlers for internal API calls
func AddInternalRoutes(router *mux.Router, queryAPI appserviceAPI.AppServiceQueryAPI) {
inthttp.AddRoutes(queryAPI, router)

View file

@ -1,65 +0,0 @@
// Copyright 2018 Vector Creations Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package routing
import (
"net/http"
"github.com/gorilla/mux"
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/internal/transactions"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
)
const pathPrefixApp = "/app/v1"
// Setup registers HTTP handlers with the given ServeMux. It also supplies the given http.Client
// to clients which need to make outbound HTTP requests.
//
// Due to Setup being used to call many other functions, a gocyclo nolint is
// applied:
// nolint: gocyclo
func Setup(
apiMux *mux.Router, cfg *config.Dendrite, // nolint: unparam
rsAPI api.RoomserverInternalAPI, // nolint: unparam
accountDB accounts.Database, // nolint: unparam
federation *gomatrixserverlib.FederationClient, // nolint: unparam
transactionsCache *transactions.Cache, // nolint: unparam
) {
appMux := apiMux.PathPrefix(pathPrefixApp).Subrouter()
appMux.Handle("/alias",
internal.MakeExternalAPI("alias", func(req *http.Request) util.JSONResponse {
// TODO: Implement
return util.JSONResponse{
Code: http.StatusOK,
JSON: nil,
}
}),
).Methods(http.MethodGet, http.MethodOptions)
appMux.Handle("/user",
internal.MakeExternalAPI("user", func(req *http.Request) util.JSONResponse {
// TODO: Implement
return util.JSONResponse{
Code: http.StatusOK,
JSON: nil,
}
}),
).Methods(http.MethodGet, http.MethodOptions)
}

View file

@ -48,9 +48,6 @@ func AddPublicRoutes(
transactionsCache *transactions.Cache,
fsAPI federationSenderAPI.FederationSenderInternalAPI,
) {
roomserverProducer := producers.NewRoomserverProducer(rsAPI)
eduProducer := producers.NewEDUServerProducer(eduInputAPI)
syncProducer := &producers.SyncAPIProducer{
Producer: producer,
Topic: string(cfg.Kafka.Topics.OutputClientData),
@ -64,8 +61,8 @@ func AddPublicRoutes(
}
routing.Setup(
router, cfg, roomserverProducer, rsAPI, asAPI,
router, cfg, eduInputAPI, rsAPI, asAPI,
accountsDB, deviceDB, federation, *keyRing,
syncProducer, eduProducer, transactionsCache, fsAPI,
syncProducer, transactionsCache, fsAPI,
)
}

View file

@ -29,7 +29,6 @@ import (
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/clientapi/threepid"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/config"
@ -137,21 +136,21 @@ type fledglingEvent struct {
// CreateRoom implements /createRoom
func CreateRoom(
req *http.Request, device *authtypes.Device,
cfg *config.Dendrite, producer *producers.RoomserverProducer,
cfg *config.Dendrite,
accountDB accounts.Database, rsAPI roomserverAPI.RoomserverInternalAPI,
asAPI appserviceAPI.AppServiceQueryAPI,
) util.JSONResponse {
// TODO (#267): Check room ID doesn't clash with an existing one, and we
// probably shouldn't be using pseudo-random strings, maybe GUIDs?
roomID := fmt.Sprintf("!%s:%s", util.RandomString(16), cfg.Matrix.ServerName)
return createRoom(req, device, cfg, roomID, producer, accountDB, rsAPI, asAPI)
return createRoom(req, device, cfg, roomID, accountDB, rsAPI, asAPI)
}
// createRoom implements /createRoom
// nolint: gocyclo
func createRoom(
req *http.Request, device *authtypes.Device,
cfg *config.Dendrite, roomID string, producer *producers.RoomserverProducer,
cfg *config.Dendrite, roomID string,
accountDB accounts.Database, rsAPI roomserverAPI.RoomserverInternalAPI,
asAPI appserviceAPI.AppServiceQueryAPI,
) util.JSONResponse {
@ -344,9 +343,9 @@ func createRoom(
}
// send events to the room server
_, err = producer.SendEvents(req.Context(), builtEvents, cfg.Matrix.ServerName, nil)
_, err = roomserverAPI.SendEvents(req.Context(), rsAPI, builtEvents, cfg.Matrix.ServerName, nil)
if err != nil {
util.GetLogger(req.Context()).WithError(err).Error("producer.SendEvents failed")
util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed")
return jsonerror.InternalServerError()
}
@ -404,14 +403,14 @@ func createRoom(
}
}
// Send the invite event to the roomserver.
if err = producer.SendInvite(
req.Context(),
if err = roomserverAPI.SendInvite(
req.Context(), rsAPI,
inviteEvent.Headered(roomVersion),
strippedState, // invite room state
cfg.Matrix.ServerName, // send as server
nil, // transaction ID
); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("producer.SendEvents failed")
util.GetLogger(req.Context()).WithError(err).Error("SendInvite failed")
return jsonerror.InternalServerError()
}
}

View file

@ -25,7 +25,6 @@ import (
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/clientapi/threepid"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/config"
@ -46,7 +45,6 @@ func SendMembership(
req *http.Request, accountDB accounts.Database, device *authtypes.Device,
roomID string, membership string, cfg *config.Dendrite,
rsAPI roomserverAPI.RoomserverInternalAPI, asAPI appserviceAPI.AppServiceQueryAPI,
producer *producers.RoomserverProducer,
) util.JSONResponse {
verReq := api.QueryRoomVersionForRoomRequest{RoomID: roomID}
verRes := api.QueryRoomVersionForRoomResponse{}
@ -71,7 +69,7 @@ func SendMembership(
}
inviteStored, jsonErrResp := checkAndProcessThreepid(
req, device, &body, cfg, rsAPI, accountDB, producer,
req, device, &body, cfg, rsAPI, accountDB,
membership, roomID, evTime,
)
if jsonErrResp != nil {
@ -112,8 +110,8 @@ func SendMembership(
switch membership {
case gomatrixserverlib.Invite:
// Invites need to be handled specially
err = producer.SendInvite(
req.Context(),
err = roomserverAPI.SendInvite(
req.Context(), rsAPI,
event.Headered(verRes.RoomVersion),
nil, // ask the roomserver to draw up invite room state for us
cfg.Matrix.ServerName,
@ -130,14 +128,14 @@ func SendMembership(
}{roomID}
fallthrough
default:
_, err = producer.SendEvents(
req.Context(),
_, err = roomserverAPI.SendEvents(
req.Context(), rsAPI,
[]gomatrixserverlib.HeaderedEvent{event.Headered(verRes.RoomVersion)},
cfg.Matrix.ServerName,
nil,
)
if err != nil {
util.GetLogger(req.Context()).WithError(err).Error("producer.SendEvents failed")
util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed")
return jsonerror.InternalServerError()
}
}
@ -252,13 +250,12 @@ func checkAndProcessThreepid(
cfg *config.Dendrite,
rsAPI roomserverAPI.RoomserverInternalAPI,
accountDB accounts.Database,
producer *producers.RoomserverProducer,
membership, roomID string,
evTime time.Time,
) (inviteStored bool, errRes *util.JSONResponse) {
inviteStored, err := threepid.CheckAndProcessInvite(
req.Context(), device, body, cfg, rsAPI, accountDB, producer,
req.Context(), device, body, cfg, rsAPI, accountDB,
membership, roomID, evTime,
)
if err == threepid.ErrMissingParameter {

View file

@ -24,7 +24,6 @@ import (
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/roomserver/api"
@ -94,8 +93,7 @@ func GetAvatarURL(
// nolint:gocyclo
func SetAvatarURL(
req *http.Request, accountDB accounts.Database, device *authtypes.Device,
userID string, cfg *config.Dendrite,
rsProducer *producers.RoomserverProducer, rsAPI api.RoomserverInternalAPI,
userID string, cfg *config.Dendrite, rsAPI api.RoomserverInternalAPI,
) util.JSONResponse {
if userID != device.UserID {
return util.JSONResponse{
@ -167,8 +165,8 @@ func SetAvatarURL(
return jsonerror.InternalServerError()
}
if _, err := rsProducer.SendEvents(req.Context(), events, cfg.Matrix.ServerName, nil); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("rsProducer.SendEvents failed")
if _, err := api.SendEvents(req.Context(), rsAPI, events, cfg.Matrix.ServerName, nil); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed")
return jsonerror.InternalServerError()
}
@ -209,8 +207,7 @@ func GetDisplayName(
// nolint:gocyclo
func SetDisplayName(
req *http.Request, accountDB accounts.Database, device *authtypes.Device,
userID string, cfg *config.Dendrite,
rsProducer *producers.RoomserverProducer, rsAPI api.RoomserverInternalAPI,
userID string, cfg *config.Dendrite, rsAPI api.RoomserverInternalAPI,
) util.JSONResponse {
if userID != device.UserID {
return util.JSONResponse{
@ -282,8 +279,8 @@ func SetDisplayName(
return jsonerror.InternalServerError()
}
if _, err := rsProducer.SendEvents(req.Context(), events, cfg.Matrix.ServerName, nil); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("rsProducer.SendEvents failed")
if _, err := api.SendEvents(req.Context(), rsAPI, events, cfg.Matrix.ServerName, nil); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed")
return jsonerror.InternalServerError()
}

View file

@ -27,6 +27,7 @@ import (
"github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/clientapi/producers"
eduServerAPI "github.com/matrix-org/dendrite/eduserver/api"
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/config"
@ -48,7 +49,7 @@ const pathPrefixUnstable = "/client/unstable"
// nolint: gocyclo
func Setup(
publicAPIMux *mux.Router, cfg *config.Dendrite,
producer *producers.RoomserverProducer,
eduAPI eduServerAPI.EDUServerInputAPI,
rsAPI roomserverAPI.RoomserverInternalAPI,
asAPI appserviceAPI.AppServiceQueryAPI,
accountDB accounts.Database,
@ -56,7 +57,6 @@ func Setup(
federation *gomatrixserverlib.FederationClient,
keyRing gomatrixserverlib.KeyRing,
syncProducer *producers.SyncAPIProducer,
eduProducer *producers.EDUServerProducer,
transactionsCache *transactions.Cache,
federationSender federationSenderAPI.FederationSenderInternalAPI,
) {
@ -89,7 +89,7 @@ func Setup(
r0mux.Handle("/createRoom",
internal.MakeAuthAPI("createRoom", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
return CreateRoom(req, device, cfg, producer, accountDB, rsAPI, asAPI)
return CreateRoom(req, device, cfg, accountDB, rsAPI, asAPI)
}),
).Methods(http.MethodPost, http.MethodOptions)
r0mux.Handle("/join/{roomIDOrAlias}",
@ -125,7 +125,7 @@ func Setup(
if err != nil {
return util.ErrorResponse(err)
}
return SendMembership(req, accountDB, device, vars["roomID"], vars["membership"], cfg, rsAPI, asAPI, producer)
return SendMembership(req, accountDB, device, vars["roomID"], vars["membership"], cfg, rsAPI, asAPI)
}),
).Methods(http.MethodPost, http.MethodOptions)
r0mux.Handle("/rooms/{roomID}/send/{eventType}",
@ -134,7 +134,7 @@ func Setup(
if err != nil {
return util.ErrorResponse(err)
}
return SendEvent(req, device, vars["roomID"], vars["eventType"], nil, nil, cfg, rsAPI, producer, nil)
return SendEvent(req, device, vars["roomID"], vars["eventType"], nil, nil, cfg, rsAPI, nil)
}),
).Methods(http.MethodPost, http.MethodOptions)
r0mux.Handle("/rooms/{roomID}/send/{eventType}/{txnID}",
@ -145,7 +145,7 @@ func Setup(
}
txnID := vars["txnID"]
return SendEvent(req, device, vars["roomID"], vars["eventType"], &txnID,
nil, cfg, rsAPI, producer, transactionsCache)
nil, cfg, rsAPI, transactionsCache)
}),
).Methods(http.MethodPut, http.MethodOptions)
r0mux.Handle("/rooms/{roomID}/event/{eventID}",
@ -194,7 +194,7 @@ func Setup(
if strings.HasSuffix(eventType, "/") {
eventType = eventType[:len(eventType)-1]
}
return SendEvent(req, device, vars["roomID"], eventType, nil, &emptyString, cfg, rsAPI, producer, nil)
return SendEvent(req, device, vars["roomID"], eventType, nil, &emptyString, cfg, rsAPI, nil)
}),
).Methods(http.MethodPut, http.MethodOptions)
@ -205,7 +205,7 @@ func Setup(
return util.ErrorResponse(err)
}
stateKey := vars["stateKey"]
return SendEvent(req, device, vars["roomID"], vars["eventType"], nil, &stateKey, cfg, rsAPI, producer, nil)
return SendEvent(req, device, vars["roomID"], vars["eventType"], nil, &stateKey, cfg, rsAPI, nil)
}),
).Methods(http.MethodPut, http.MethodOptions)
@ -269,7 +269,7 @@ func Setup(
if err != nil {
return util.ErrorResponse(err)
}
return SendTyping(req, device, vars["roomID"], vars["userID"], accountDB, eduProducer)
return SendTyping(req, device, vars["roomID"], vars["userID"], accountDB, eduAPI)
}),
).Methods(http.MethodPut, http.MethodOptions)
@ -280,7 +280,7 @@ func Setup(
return util.ErrorResponse(err)
}
txnID := vars["txnID"]
return SendToDevice(req, device, eduProducer, transactionsCache, vars["eventType"], &txnID)
return SendToDevice(req, device, eduAPI, transactionsCache, vars["eventType"], &txnID)
}),
).Methods(http.MethodPut, http.MethodOptions)
@ -294,7 +294,7 @@ func Setup(
return util.ErrorResponse(err)
}
txnID := vars["txnID"]
return SendToDevice(req, device, eduProducer, transactionsCache, vars["eventType"], &txnID)
return SendToDevice(req, device, eduAPI, transactionsCache, vars["eventType"], &txnID)
}),
).Methods(http.MethodPut, http.MethodOptions)
@ -386,7 +386,7 @@ func Setup(
if err != nil {
return util.ErrorResponse(err)
}
return SetAvatarURL(req, accountDB, device, vars["userID"], cfg, producer, rsAPI)
return SetAvatarURL(req, accountDB, device, vars["userID"], cfg, rsAPI)
}),
).Methods(http.MethodPut, http.MethodOptions)
// Browsers use the OPTIONS HTTP method to check if the CORS policy allows
@ -408,7 +408,7 @@ func Setup(
if err != nil {
return util.ErrorResponse(err)
}
return SetDisplayName(req, accountDB, device, vars["userID"], cfg, producer, rsAPI)
return SetDisplayName(req, accountDB, device, vars["userID"], cfg, rsAPI)
}),
).Methods(http.MethodPut, http.MethodOptions)
// Browsers use the OPTIONS HTTP method to check if the CORS policy allows

View file

@ -20,7 +20,6 @@ import (
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/internal/transactions"
@ -46,7 +45,6 @@ func SendEvent(
roomID, eventType string, txnID, stateKey *string,
cfg *config.Dendrite,
rsAPI api.RoomserverInternalAPI,
producer *producers.RoomserverProducer,
txnCache *transactions.Cache,
) util.JSONResponse {
verReq := api.QueryRoomVersionForRoomRequest{RoomID: roomID}
@ -80,8 +78,8 @@ func SendEvent(
// pass the new event to the roomserver and receive the correct event ID
// event ID in case of duplicate transaction is discarded
eventID, err := producer.SendEvents(
req.Context(),
eventID, err := api.SendEvents(
req.Context(), rsAPI,
[]gomatrixserverlib.HeaderedEvent{
e.Headered(verRes.RoomVersion),
},
@ -89,7 +87,7 @@ func SendEvent(
txnAndSessionID,
)
if err != nil {
util.GetLogger(req.Context()).WithError(err).Error("producer.SendEvents failed")
util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed")
return jsonerror.InternalServerError()
}
util.GetLogger(req.Context()).WithFields(logrus.Fields{

View file

@ -19,7 +19,7 @@ import (
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/internal/transactions"
"github.com/matrix-org/util"
)
@ -28,7 +28,7 @@ import (
// sends the device events to the EDU Server
func SendToDevice(
req *http.Request, device *authtypes.Device,
eduProducer *producers.EDUServerProducer,
eduAPI api.EDUServerInputAPI,
txnCache *transactions.Cache,
eventType string, txnID *string,
) util.JSONResponse {
@ -48,8 +48,8 @@ func SendToDevice(
for userID, byUser := range httpReq.Messages {
for deviceID, message := range byUser {
if err := eduProducer.SendToDevice(
req.Context(), device.UserID, userID, deviceID, eventType, message,
if err := api.SendToDevice(
req.Context(), eduAPI, device.UserID, userID, deviceID, eventType, message,
); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("eduProducer.SendToDevice failed")
return jsonerror.InternalServerError()

View file

@ -20,8 +20,8 @@ import (
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/clientapi/userutil"
"github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/util"
)
@ -35,7 +35,7 @@ type typingContentJSON struct {
func SendTyping(
req *http.Request, device *authtypes.Device, roomID string,
userID string, accountDB accounts.Database,
eduProducer *producers.EDUServerProducer,
eduAPI api.EDUServerInputAPI,
) util.JSONResponse {
if device.UserID != userID {
return util.JSONResponse{
@ -69,8 +69,8 @@ func SendTyping(
return *resErr
}
if err = eduProducer.SendTyping(
req.Context(), userID, roomID, r.Typing, r.Timeout,
if err = api.SendTyping(
req.Context(), eduAPI, userID, roomID, r.Typing, r.Timeout,
); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("eduProducer.Send failed")
return jsonerror.InternalServerError()

View file

@ -26,7 +26,6 @@ import (
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/roomserver/api"
@ -88,7 +87,7 @@ func CheckAndProcessInvite(
ctx context.Context,
device *authtypes.Device, body *MembershipRequest, cfg *config.Dendrite,
rsAPI api.RoomserverInternalAPI, db accounts.Database,
producer *producers.RoomserverProducer, membership string, roomID string,
membership string, roomID string,
evTime time.Time,
) (inviteStoredOnIDServer bool, err error) {
if membership != gomatrixserverlib.Invite || (body.Address == "" && body.IDServer == "" && body.Medium == "") {
@ -112,7 +111,7 @@ func CheckAndProcessInvite(
// "m.room.third_party_invite" have to be emitted from the data in
// storeInviteRes.
err = emit3PIDInviteEvent(
ctx, body, storeInviteRes, device, roomID, cfg, rsAPI, producer, evTime,
ctx, body, storeInviteRes, device, roomID, cfg, rsAPI, evTime,
)
inviteStoredOnIDServer = err == nil
@ -331,7 +330,7 @@ func emit3PIDInviteEvent(
ctx context.Context,
body *MembershipRequest, res *idServerStoreInviteResponse,
device *authtypes.Device, roomID string, cfg *config.Dendrite,
rsAPI api.RoomserverInternalAPI, producer *producers.RoomserverProducer,
rsAPI api.RoomserverInternalAPI,
evTime time.Time,
) error {
builder := &gomatrixserverlib.EventBuilder{
@ -359,8 +358,8 @@ func emit3PIDInviteEvent(
return err
}
_, err = producer.SendEvents(
ctx,
_, err = api.SendEvents(
ctx, rsAPI,
[]gomatrixserverlib.HeaderedEvent{
(*event).Headered(queryRes.RoomVersion),
},

View file

@ -17,7 +17,6 @@ package main
import (
"github.com/matrix-org/dendrite/appservice"
"github.com/matrix-org/dendrite/internal/basecomponent"
"github.com/matrix-org/dendrite/internal/transactions"
)
func main() {
@ -27,13 +26,10 @@ func main() {
defer base.Close() // nolint: errcheck
accountDB := base.CreateAccountsDB()
deviceDB := base.CreateDeviceDB()
federation := base.CreateFederationClient()
rsAPI := base.RoomserverHTTPClient()
cache := transactions.New()
intAPI := appservice.NewInternalAPI(base, accountDB, deviceDB, rsAPI)
appservice.AddInternalRoutes(base.InternalAPIMux, intAPI)
appservice.AddPublicRoutes(base.PublicAPIMux, base.Cfg, rsAPI, accountDB, federation, cache)
base.SetupAndServeHTTP(string(base.Cfg.Bind.AppServiceAPI), string(base.Cfg.Listen.AppServiceAPI))

View file

@ -29,7 +29,6 @@ import (
p2phttp "github.com/libp2p/go-libp2p-http"
p2pdisc "github.com/libp2p/go-libp2p/p2p/discovery"
"github.com/matrix-org/dendrite/appservice"
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-libp2p/storage"
"github.com/matrix-org/dendrite/eduserver"
"github.com/matrix-org/dendrite/federationsender"
@ -149,7 +148,6 @@ func main() {
&base.Base, federation, rsAPI, keyRing,
)
rsAPI.SetFederationSenderAPI(fsAPI)
eduProducer := producers.NewEDUServerProducer(eduInputAPI)
publicRoomsDB, err := storage.NewPublicRoomsServerDatabaseWithPubSub(string(base.Base.Cfg.Database.PublicRoomsAPI), base.LibP2PPubsub, cfg.Matrix.ServerName)
if err != nil {
logrus.WithError(err).Panicf("failed to connect to public rooms db")
@ -166,7 +164,6 @@ func main() {
AppserviceAPI: asAPI,
EDUInternalAPI: eduInputAPI,
EDUProducer: eduProducer,
FederationSenderAPI: fsAPI,
RoomserverAPI: rsAPI,
ServerKeyAPI: serverKeyAPI,

View file

@ -0,0 +1,53 @@
// Copyright 2019 Google LLC
//
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file or at
// https://developers.google.com/open-source/licenses/bsd
//
// Original code from https://github.com/FiloSottile/age/blob/bbab440e198a4d67ba78591176c7853e62d29e04/internal/age/ssh.go
package convert
import (
"crypto/ed25519"
"crypto/sha512"
"math/big"
"golang.org/x/crypto/curve25519"
)
var curve25519P, _ = new(big.Int).SetString("57896044618658097711785492504343953926634992332820282019728792003956564819949", 10)
func Ed25519PrivateKeyToCurve25519(pk ed25519.PrivateKey) []byte {
h := sha512.New()
_, _ = h.Write(pk.Seed())
out := h.Sum(nil)
return out[:curve25519.ScalarSize]
}
func Ed25519PublicKeyToCurve25519(pk ed25519.PublicKey) []byte {
// ed25519.PublicKey is a little endian representation of the y-coordinate,
// with the most significant bit set based on the sign of the x-coordinate.
bigEndianY := make([]byte, ed25519.PublicKeySize)
for i, b := range pk {
bigEndianY[ed25519.PublicKeySize-i-1] = b
}
bigEndianY[0] &= 0b0111_1111
// The Montgomery u-coordinate is derived through the bilinear map
// u = (1 + y) / (1 - y)
// See https://blog.filippo.io/using-ed25519-keys-for-encryption.
y := new(big.Int).SetBytes(bigEndianY)
denom := big.NewInt(1)
denom.ModInverse(denom.Sub(denom, y), curve25519P) // 1 / (1 - y)
u := y.Mul(y.Add(y, big.NewInt(1)), denom)
u.Mod(u, curve25519P)
out := make([]byte, curve25519.PointSize)
uBytes := u.Bytes()
for i, b := range uBytes {
out[len(uBytes)-i-1] = b
}
return out
}

View file

@ -0,0 +1,51 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package convert
import (
"bytes"
"crypto/ed25519"
"encoding/hex"
"testing"
"golang.org/x/crypto/curve25519"
)
func TestKeyConversion(t *testing.T) {
edPub, edPriv, err := ed25519.GenerateKey(nil)
if err != nil {
t.Fatal(err)
}
t.Log("Signing public:", hex.EncodeToString(edPub))
t.Log("Signing private:", hex.EncodeToString(edPriv))
cuPriv := Ed25519PrivateKeyToCurve25519(edPriv)
t.Log("Encryption private:", hex.EncodeToString(cuPriv))
cuPub := Ed25519PublicKeyToCurve25519(edPub)
t.Log("Converted encryption public:", hex.EncodeToString(cuPub))
var realPub, realPriv [32]byte
copy(realPriv[:32], cuPriv[:32])
curve25519.ScalarBaseMult(&realPub, &realPriv)
t.Log("Scalar-multed encryption public:", hex.EncodeToString(realPub[:]))
if !bytes.Equal(realPriv[:], cuPriv[:]) {
t.Fatal("Private keys should be equal (this means the test is broken)")
}
if !bytes.Equal(realPub[:], cuPub[:]) {
t.Fatal("Public keys should be equal")
}
}

View file

@ -16,7 +16,9 @@ package main
import (
"context"
"crypto/ed25519"
"crypto/tls"
"encoding/hex"
"flag"
"fmt"
"net"
@ -25,7 +27,8 @@ import (
"time"
"github.com/matrix-org/dendrite/appservice"
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/convert"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/yggconn"
"github.com/matrix-org/dendrite/eduserver"
"github.com/matrix-org/dendrite/eduserver/cache"
@ -37,7 +40,6 @@ import (
"github.com/matrix-org/dendrite/internal/setup"
"github.com/matrix-org/dendrite/publicroomsapi/storage"
"github.com/matrix-org/dendrite/roomserver"
"github.com/matrix-org/dendrite/serverkeyapi"
"github.com/matrix-org/gomatrixserverlib"
_ "github.com/matrix-org/dendrite/internal/embed"
@ -65,7 +67,13 @@ func createFederationClient(
) *gomatrixserverlib.FederationClient {
yggdialer := func(_, address string) (net.Conn, error) {
tokens := strings.Split(address, ":")
return n.Dial("curve25519", tokens[0])
raw, err := hex.DecodeString(tokens[0])
if err != nil {
return nil, fmt.Errorf("hex.DecodeString: %w", err)
}
converted := convert.Ed25519PublicKeyToCurve25519(ed25519.PublicKey(raw))
convhex := hex.EncodeToString(converted)
return n.Dial("curve25519", convhex)
}
yggdialerctx := func(ctx context.Context, network, address string) (net.Conn, error) {
return yggdialer(network, address)
@ -106,9 +114,9 @@ func main() {
cfg := &config.Dendrite{}
cfg.SetDefaults()
cfg.Matrix.ServerName = gomatrixserverlib.ServerName(ygg.EncryptionPublicKey())
cfg.Matrix.ServerName = gomatrixserverlib.ServerName(ygg.DerivedServerName())
cfg.Matrix.PrivateKey = ygg.SigningPrivateKey()
cfg.Matrix.KeyID = gomatrixserverlib.KeyID("ed25519:auto")
cfg.Matrix.KeyID = gomatrixserverlib.KeyID(signing.KeyID)
cfg.Kafka.UseNaffka = true
cfg.Kafka.Topics.OutputRoomEvent = "roomserverOutput"
cfg.Kafka.Topics.OutputClientData = "clientapiOutput"
@ -134,9 +142,7 @@ func main() {
deviceDB := base.CreateDeviceDB()
federation := createFederationClient(base, ygg)
serverKeyAPI := serverkeyapi.NewInternalAPI(
base.Cfg, federation, base.Caches,
)
serverKeyAPI := &signing.YggdrasilKeys{}
keyRing := serverKeyAPI.KeyRing()
rsComponent := roomserver.NewInternalAPI(
@ -156,7 +162,6 @@ func main() {
rsComponent.SetFederationSenderAPI(fsAPI)
eduProducer := producers.NewEDUServerProducer(eduInputAPI)
publicRoomsDB, err := storage.NewPublicRoomsServerDatabase(string(base.Cfg.Database.PublicRoomsAPI), base.Cfg.DbProperties(), cfg.Matrix.ServerName)
if err != nil {
logrus.WithError(err).Panicf("failed to connect to public rooms db")
@ -175,10 +180,9 @@ func main() {
AppserviceAPI: asAPI,
EDUInternalAPI: eduInputAPI,
EDUProducer: eduProducer,
FederationSenderAPI: fsAPI,
RoomserverAPI: rsAPI,
ServerKeyAPI: serverKeyAPI,
//ServerKeyAPI: serverKeyAPI,
PublicRoomsDB: publicRoomsDB,
}
@ -193,7 +197,7 @@ func main() {
)
go func() {
logrus.Info("Listening on ", ygg.EncryptionPublicKey())
logrus.Info("Listening on ", ygg.DerivedServerName())
logrus.Fatal(httpServer.Serve(ygg))
}()
go func() {

View file

@ -0,0 +1,69 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package signing
import (
"context"
"encoding/hex"
"fmt"
"time"
"github.com/matrix-org/gomatrixserverlib"
)
const KeyID = "ed25519:dendrite-demo-yggdrasil"
type YggdrasilKeys struct {
}
func (f *YggdrasilKeys) KeyRing() *gomatrixserverlib.KeyRing {
return &gomatrixserverlib.KeyRing{
KeyDatabase: f,
}
}
func (f *YggdrasilKeys) FetchKeys(
ctx context.Context,
requests map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.Timestamp,
) (map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult, error) {
res := make(map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult)
for req := range requests {
if req.KeyID != KeyID {
return nil, fmt.Errorf("FetchKeys: cannot fetch key with ID %s, should be %s", req.KeyID, KeyID)
}
hexkey, err := hex.DecodeString(string(req.ServerName))
if err != nil {
return nil, fmt.Errorf("FetchKeys: can't decode server name %q: %w", req.ServerName, err)
}
res[req] = gomatrixserverlib.PublicKeyLookupResult{
VerifyKey: gomatrixserverlib.VerifyKey{
Key: hexkey,
},
ExpiredTS: gomatrixserverlib.PublicKeyNotExpired,
ValidUntilTS: gomatrixserverlib.AsTimestamp(time.Now().Add(24 * time.Hour * 365)),
}
}
return res, nil
}
func (f *YggdrasilKeys) FetcherName() string {
return "YggdrasilKeys"
}
func (f *YggdrasilKeys) StoreKeys(ctx context.Context, results map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult) error {
return nil
}

View file

@ -24,6 +24,8 @@ import (
"os"
"sync"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/convert"
"github.com/libp2p/go-yamux"
yggdrasiladmin "github.com/yggdrasil-network/yggdrasil-go/src/admin"
yggdrasilconfig "github.com/yggdrasil-network/yggdrasil-go/src/config"
@ -56,8 +58,6 @@ func Setup(instanceName, instancePeer string) (*Node, error) {
log: gologme.New(os.Stdout, "YGG ", log.Flags()),
incoming: make(chan *yamux.Stream),
}
n.config.AdminListen = fmt.Sprintf("unix://./%s-yggdrasil.sock", instanceName)
n.config.MulticastInterfaces = []string{".*"}
yggfile := fmt.Sprintf("%s-yggdrasil.conf", instanceName)
if _, err := os.Stat(yggfile); !os.IsNotExist(err) {
@ -69,6 +69,11 @@ func Setup(instanceName, instancePeer string) (*Node, error) {
panic(err)
}
} else {
n.config.AdminListen = fmt.Sprintf("unix://./%s-yggdrasil.sock", instanceName)
n.config.MulticastInterfaces = []string{".*"}
n.config.EncryptionPrivateKey = hex.EncodeToString(n.EncryptionPrivateKey())
n.config.EncryptionPublicKey = hex.EncodeToString(n.EncryptionPublicKey())
j, err := json.MarshalIndent(n.config, "", " ")
if err != nil {
panic(err)
@ -119,8 +124,23 @@ func Setup(instanceName, instancePeer string) (*Node, error) {
return n, nil
}
func (n *Node) EncryptionPublicKey() string {
return n.core.EncryptionPublicKey()
func (n *Node) DerivedServerName() string {
return hex.EncodeToString(n.SigningPublicKey())
}
func (n *Node) EncryptionPublicKey() []byte {
edkey := n.SigningPublicKey()
return convert.Ed25519PublicKeyToCurve25519(edkey)
}
func (n *Node) EncryptionPrivateKey() []byte {
edkey := n.SigningPrivateKey()
return convert.Ed25519PrivateKeyToCurve25519(edkey)
}
func (n *Node) SigningPublicKey() ed25519.PublicKey {
pubBytes, _ := hex.DecodeString(n.config.SigningPublicKey)
return ed25519.PublicKey(pubBytes)
}
func (n *Node) SigningPrivateKey() ed25519.PrivateKey {

View file

@ -40,10 +40,10 @@ func (n *Node) listenFromYgg() {
return
}
var session *yamux.Session
if strings.Compare(n.EncryptionPublicKey(), conn.RemoteAddr().String()) < 0 {
session, err = yamux.Client(conn, n.yamuxConfig())
} else {
if strings.Compare(conn.RemoteAddr().String(), n.DerivedServerName()) < 0 {
session, err = yamux.Server(conn, n.yamuxConfig())
} else {
session, err = yamux.Client(conn, n.yamuxConfig())
}
if err != nil {
return
@ -96,7 +96,7 @@ func (n *Node) DialContext(ctx context.Context, network, address string) (net.Co
n.log.Println("n.dialer.DialContext:", err)
return nil, err
}
if strings.Compare(n.EncryptionPublicKey(), address) < 0 {
if strings.Compare(address, n.DerivedServerName()) > 0 {
session, err = yamux.Client(conn, n.yamuxConfig())
} else {
session, err = yamux.Server(conn, n.yamuxConfig())

View file

@ -15,7 +15,6 @@
package main
import (
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/federationapi"
"github.com/matrix-org/dendrite/internal/basecomponent"
)
@ -33,12 +32,10 @@ func main() {
fsAPI := base.FederationSenderHTTPClient()
rsAPI := base.RoomserverHTTPClient()
asAPI := base.AppserviceHTTPClient()
// TODO: this isn't a producer
eduProducer := producers.NewEDUServerProducer(base.EDUServerClient())
federationapi.AddPublicRoutes(
base.PublicAPIMux, base.Cfg, accountDB, deviceDB, federation, keyRing,
rsAPI, asAPI, fsAPI, eduProducer,
rsAPI, asAPI, fsAPI, base.EDUServerClient(),
)
base.SetupAndServeHTTP(string(base.Cfg.Bind.FederationAPI), string(base.Cfg.Listen.FederationAPI))

View file

@ -19,7 +19,6 @@ import (
"net/http"
"github.com/matrix-org/dendrite/appservice"
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/eduserver"
"github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/dendrite/federationsender"
@ -105,7 +104,6 @@ func main() {
}
rsComponent.SetFederationSenderAPI(fsAPI)
eduProducer := producers.NewEDUServerProducer(eduInputAPI)
publicRoomsDB, err := storage.NewPublicRoomsServerDatabase(string(base.Cfg.Database.PublicRoomsAPI), base.Cfg.DbProperties(), cfg.Matrix.ServerName)
if err != nil {
logrus.WithError(err).Panicf("failed to connect to public rooms db")
@ -122,7 +120,6 @@ func main() {
AppserviceAPI: asAPI,
EDUInternalAPI: eduInputAPI,
EDUProducer: eduProducer,
FederationSenderAPI: fsAPI,
RoomserverAPI: rsAPI,
ServerKeyAPI: serverKeyAPI,

View file

@ -23,7 +23,6 @@ import (
"syscall/js"
"github.com/matrix-org/dendrite/appservice"
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/eduserver"
"github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/dendrite/federationsender"
@ -209,7 +208,6 @@ func main() {
rsAPI.SetFederationSenderAPI(fedSenderAPI)
p2pPublicRoomProvider := NewLibP2PPublicRoomsProvider(node, fedSenderAPI)
eduProducer := producers.NewEDUServerProducer(eduInputAPI)
publicRoomsDB, err := storage.NewPublicRoomsServerDatabase(string(base.Cfg.Database.PublicRoomsAPI), cfg.Matrix.ServerName)
if err != nil {
logrus.WithError(err).Panicf("failed to connect to public rooms db")
@ -226,7 +224,6 @@ func main() {
AppserviceAPI: asQuery,
EDUInternalAPI: eduInputAPI,
EDUProducer: eduProducer,
FederationSenderAPI: fedSenderAPI,
RoomserverAPI: rsAPI,
//ServerKeyAPI: serverKeyAPI,

View file

@ -1,3 +1,5 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@ -10,35 +12,22 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package producers
package api
import (
"context"
"encoding/json"
"time"
"github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/gomatrixserverlib"
)
// EDUServerProducer produces events for the EDU server to consume
type EDUServerProducer struct {
InputAPI api.EDUServerInputAPI
}
// NewEDUServerProducer creates a new EDUServerProducer
func NewEDUServerProducer(inputAPI api.EDUServerInputAPI) *EDUServerProducer {
return &EDUServerProducer{
InputAPI: inputAPI,
}
}
// SendTyping sends a typing event to EDU server
func (p *EDUServerProducer) SendTyping(
ctx context.Context, userID, roomID string,
func SendTyping(
ctx context.Context, eduAPI EDUServerInputAPI, userID, roomID string,
typing bool, timeoutMS int64,
) error {
requestData := api.InputTypingEvent{
requestData := InputTypingEvent{
UserID: userID,
RoomID: roomID,
Typing: typing,
@ -46,24 +35,24 @@ func (p *EDUServerProducer) SendTyping(
OriginServerTS: gomatrixserverlib.AsTimestamp(time.Now()),
}
var response api.InputTypingEventResponse
err := p.InputAPI.InputTypingEvent(
ctx, &api.InputTypingEventRequest{InputTypingEvent: requestData}, &response,
var response InputTypingEventResponse
err := eduAPI.InputTypingEvent(
ctx, &InputTypingEventRequest{InputTypingEvent: requestData}, &response,
)
return err
}
// SendToDevice sends a typing event to EDU server
func (p *EDUServerProducer) SendToDevice(
ctx context.Context, sender, userID, deviceID, eventType string,
func SendToDevice(
ctx context.Context, eduAPI EDUServerInputAPI, sender, userID, deviceID, eventType string,
message interface{},
) error {
js, err := json.Marshal(message)
if err != nil {
return err
}
requestData := api.InputSendToDeviceEvent{
requestData := InputSendToDeviceEvent{
UserID: userID,
DeviceID: deviceID,
SendToDeviceEvent: gomatrixserverlib.SendToDeviceEvent{
@ -72,9 +61,9 @@ func (p *EDUServerProducer) SendToDevice(
Content: js,
},
}
request := api.InputSendToDeviceEventRequest{
request := InputSendToDeviceEventRequest{
InputSendToDeviceEvent: requestData,
}
response := api.InputSendToDeviceEventResponse{}
return p.InputAPI.InputSendToDeviceEvent(ctx, &request, &response)
response := InputSendToDeviceEventResponse{}
return eduAPI.InputSendToDeviceEvent(ctx, &request, &response)
}

View file

@ -19,12 +19,11 @@ import (
appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
"github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
eduserverAPI "github.com/matrix-org/dendrite/eduserver/api"
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/internal/config"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
// TODO: Are we really wanting to pull in the producer from clientapi
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/federationapi/routing"
"github.com/matrix-org/gomatrixserverlib"
)
@ -40,13 +39,12 @@ func AddPublicRoutes(
rsAPI roomserverAPI.RoomserverInternalAPI,
asAPI appserviceAPI.AppServiceQueryAPI,
federationSenderAPI federationSenderAPI.FederationSenderInternalAPI,
eduProducer *producers.EDUServerProducer,
eduAPI eduserverAPI.EDUServerInputAPI,
) {
roomserverProducer := producers.NewRoomserverProducer(rsAPI)
routing.Setup(
router, cfg, rsAPI, asAPI, roomserverProducer,
eduProducer, federationSenderAPI, *keyRing,
router, cfg, rsAPI, asAPI,
eduAPI, federationSenderAPI, *keyRing,
federation, accountsDB, deviceDB,
)
}

View file

@ -20,8 +20,8 @@ import (
"net/http"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/roomserver/api"
roomserverVersion "github.com/matrix-org/dendrite/roomserver/version"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
@ -34,7 +34,7 @@ func Invite(
roomID string,
eventID string,
cfg *config.Dendrite,
producer *producers.RoomserverProducer,
rsAPI api.RoomserverInternalAPI,
keys gomatrixserverlib.KeyRing,
) util.JSONResponse {
inviteReq := gomatrixserverlib.InviteV2Request{}
@ -98,8 +98,8 @@ func Invite(
)
// Add the invite event to the roomserver.
if err = producer.SendInvite(
httpReq.Context(),
if err = api.SendInvite(
httpReq.Context(), rsAPI,
signedEvent.Headered(inviteReq.RoomVersion()),
inviteReq.InviteRoomState(),
event.Origin(),

View file

@ -21,7 +21,6 @@ import (
"time"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/roomserver/api"
@ -144,7 +143,6 @@ func SendJoin(
request *gomatrixserverlib.FederationRequest,
cfg *config.Dendrite,
rsAPI api.RoomserverInternalAPI,
producer *producers.RoomserverProducer,
keys gomatrixserverlib.KeyRing,
roomID, eventID string,
) util.JSONResponse {
@ -267,8 +265,8 @@ func SendJoin(
// We are responsible for notifying other servers that the user has joined
// the room, so set SendAsServer to cfg.Matrix.ServerName
if !alreadyJoined {
_, err = producer.SendEvents(
httpReq.Context(),
_, err = api.SendEvents(
httpReq.Context(), rsAPI,
[]gomatrixserverlib.HeaderedEvent{
event.Headered(stateAndAuthChainResponse.RoomVersion),
},
@ -276,7 +274,7 @@ func SendJoin(
nil,
)
if err != nil {
util.GetLogger(httpReq.Context()).WithError(err).Error("producer.SendEvents failed")
util.GetLogger(httpReq.Context()).WithError(err).Error("SendEvents failed")
return jsonerror.InternalServerError()
}
}

View file

@ -17,7 +17,6 @@ import (
"time"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/roomserver/api"
@ -113,13 +112,13 @@ func SendLeave(
httpReq *http.Request,
request *gomatrixserverlib.FederationRequest,
cfg *config.Dendrite,
producer *producers.RoomserverProducer,
rsAPI api.RoomserverInternalAPI,
keys gomatrixserverlib.KeyRing,
roomID, eventID string,
) util.JSONResponse {
verReq := api.QueryRoomVersionForRoomRequest{RoomID: roomID}
verRes := api.QueryRoomVersionForRoomResponse{}
if err := producer.RsAPI.QueryRoomVersionForRoom(httpReq.Context(), &verReq, &verRes); err != nil {
if err := rsAPI.QueryRoomVersionForRoom(httpReq.Context(), &verReq, &verRes); err != nil {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.UnsupportedRoomVersion(err.Error()),
@ -194,8 +193,8 @@ func SendLeave(
// Send the events to the room server.
// We are responsible for notifying other servers that the user has left
// the room, so set SendAsServer to cfg.Matrix.ServerName
_, err = producer.SendEvents(
httpReq.Context(),
_, err = api.SendEvents(
httpReq.Context(), rsAPI,
[]gomatrixserverlib.HeaderedEvent{
event.Headered(verRes.RoomVersion),
},

View file

@ -21,7 +21,7 @@ import (
appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
"github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
"github.com/matrix-org/dendrite/clientapi/producers"
eduserverAPI "github.com/matrix-org/dendrite/eduserver/api"
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/config"
@ -49,8 +49,7 @@ func Setup(
cfg *config.Dendrite,
rsAPI roomserverAPI.RoomserverInternalAPI,
asAPI appserviceAPI.AppServiceQueryAPI,
producer *producers.RoomserverProducer,
eduProducer *producers.EDUServerProducer,
eduAPI eduserverAPI.EDUServerInputAPI,
fsAPI federationSenderAPI.FederationSenderInternalAPI,
keys gomatrixserverlib.KeyRing,
federation *gomatrixserverlib.FederationClient,
@ -82,7 +81,7 @@ func Setup(
func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest, vars map[string]string) util.JSONResponse {
return Send(
httpReq, request, gomatrixserverlib.TransactionID(vars["txnID"]),
cfg, rsAPI, producer, eduProducer, keys, federation,
cfg, rsAPI, eduAPI, keys, federation,
)
},
)).Methods(http.MethodPut, http.MethodOptions)
@ -92,14 +91,14 @@ func Setup(
func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest, vars map[string]string) util.JSONResponse {
return Invite(
httpReq, request, vars["roomID"], vars["eventID"],
cfg, producer, keys,
cfg, rsAPI, keys,
)
},
)).Methods(http.MethodPut, http.MethodOptions)
v1fedmux.Handle("/3pid/onbind", internal.MakeExternalAPI("3pid_onbind",
func(req *http.Request) util.JSONResponse {
return CreateInvitesFrom3PIDInvites(req, rsAPI, asAPI, cfg, producer, federation, accountDB)
return CreateInvitesFrom3PIDInvites(req, rsAPI, asAPI, cfg, federation, accountDB)
},
)).Methods(http.MethodPost, http.MethodOptions)
@ -107,7 +106,7 @@ func Setup(
"exchange_third_party_invite", cfg.Matrix.ServerName, keys, wakeup,
func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest, vars map[string]string) util.JSONResponse {
return ExchangeThirdPartyInvite(
httpReq, request, vars["roomID"], rsAPI, cfg, federation, producer,
httpReq, request, vars["roomID"], rsAPI, cfg, federation,
)
},
)).Methods(http.MethodPut, http.MethodOptions)
@ -206,7 +205,7 @@ func Setup(
roomID := vars["roomID"]
eventID := vars["eventID"]
res := SendJoin(
httpReq, request, cfg, rsAPI, producer, keys, roomID, eventID,
httpReq, request, cfg, rsAPI, keys, roomID, eventID,
)
return util.JSONResponse{
Headers: res.Headers,
@ -224,7 +223,7 @@ func Setup(
roomID := vars["roomID"]
eventID := vars["eventID"]
return SendJoin(
httpReq, request, cfg, rsAPI, producer, keys, roomID, eventID,
httpReq, request, cfg, rsAPI, keys, roomID, eventID,
)
},
)).Methods(http.MethodPut)
@ -246,7 +245,7 @@ func Setup(
roomID := vars["roomID"]
eventID := vars["eventID"]
return SendLeave(
httpReq, request, cfg, producer, keys, roomID, eventID,
httpReq, request, cfg, rsAPI, keys, roomID, eventID,
)
},
)).Methods(http.MethodPut)

View file

@ -21,7 +21,7 @@ import (
"net/http"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/clientapi/producers"
eduserverAPI "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
@ -36,16 +36,14 @@ func Send(
txnID gomatrixserverlib.TransactionID,
cfg *config.Dendrite,
rsAPI api.RoomserverInternalAPI,
producer *producers.RoomserverProducer,
eduProducer *producers.EDUServerProducer,
eduAPI eduserverAPI.EDUServerInputAPI,
keys gomatrixserverlib.KeyRing,
federation *gomatrixserverlib.FederationClient,
) util.JSONResponse {
t := txnReq{
context: httpReq.Context(),
rsAPI: rsAPI,
producer: producer,
eduProducer: eduProducer,
eduAPI: eduAPI,
keys: keys,
federation: federation,
haveEvents: make(map[string]*gomatrixserverlib.HeaderedEvent),
@ -93,8 +91,7 @@ type txnReq struct {
gomatrixserverlib.Transaction
context context.Context
rsAPI api.RoomserverInternalAPI
producer *producers.RoomserverProducer
eduProducer *producers.EDUServerProducer
eduAPI eduserverAPI.EDUServerInputAPI
keys gomatrixserverlib.JSONVerifier
federation txnFederationClient
// local cache of events for auth checks, etc - this may include events
@ -262,7 +259,7 @@ func (t *txnReq) processEDUs(edus []gomatrixserverlib.EDU) {
util.GetLogger(t.context).WithError(err).Error("Failed to unmarshal typing event")
continue
}
if err := t.eduProducer.SendTyping(t.context, typingPayload.UserID, typingPayload.RoomID, typingPayload.Typing, 30*1000); err != nil {
if err := eduserverAPI.SendTyping(t.context, t.eduAPI, typingPayload.UserID, typingPayload.RoomID, typingPayload.Typing, 30*1000); err != nil {
util.GetLogger(t.context).WithError(err).Error("Failed to send typing event to edu server")
}
case gomatrixserverlib.MDirectToDevice:
@ -275,7 +272,7 @@ func (t *txnReq) processEDUs(edus []gomatrixserverlib.EDU) {
for userID, byUser := range directPayload.Messages {
for deviceID, message := range byUser {
// TODO: check that the user and the device actually exist here
if err := t.eduProducer.SendToDevice(t.context, directPayload.Sender, userID, deviceID, directPayload.Type, message); err != nil {
if err := eduserverAPI.SendToDevice(t.context, t.eduAPI, directPayload.Sender, userID, deviceID, directPayload.Type, message); err != nil {
util.GetLogger(t.context).WithError(err).WithFields(logrus.Fields{
"sender": directPayload.Sender,
"user_id": userID,
@ -325,8 +322,8 @@ func (t *txnReq) processEvent(e gomatrixserverlib.Event, isInboundTxn bool) erro
}
// pass the event to the roomserver
_, err := t.producer.SendEvents(
t.context,
_, err := api.SendEvents(
t.context, t.rsAPI,
[]gomatrixserverlib.HeaderedEvent{
e.Headered(stateResp.RoomVersion),
},
@ -408,7 +405,7 @@ func (t *txnReq) processEventWithMissingState(e gomatrixserverlib.Event, roomVer
// pass the event along with the state to the roomserver using a background context so we don't
// needlessly expire
return t.producer.SendEventWithState(context.Background(), resolvedState, e.Headered(roomVersion), t.haveEventIDs())
return api.SendEventWithState(context.Background(), t.rsAPI, resolvedState, e.Headered(roomVersion), t.haveEventIDs())
}
// lookupStateAfterEvent returns the room state after `eventID`, which is the state before eventID with the state of `eventID` (if it's a state event)

View file

@ -8,7 +8,6 @@ import (
"testing"
"time"
"github.com/matrix-org/dendrite/clientapi/producers"
eduAPI "github.com/matrix-org/dendrite/eduserver/api"
fsAPI "github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/roomserver/api"
@ -341,8 +340,7 @@ func mustCreateTransaction(rsAPI api.RoomserverInternalAPI, fedClient txnFederat
t := &txnReq{
context: context.Background(),
rsAPI: rsAPI,
producer: producers.NewRoomserverProducer(rsAPI),
eduProducer: producers.NewEDUServerProducer(&testEDUProducer{}),
eduAPI: &testEDUProducer{},
keys: &testNopJSONVerifier{},
federation: fedClient,
haveEvents: make(map[string]*gomatrixserverlib.HeaderedEvent),

View file

@ -25,7 +25,6 @@ import (
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/roomserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
@ -60,7 +59,7 @@ var (
func CreateInvitesFrom3PIDInvites(
req *http.Request, rsAPI roomserverAPI.RoomserverInternalAPI,
asAPI appserviceAPI.AppServiceQueryAPI, cfg *config.Dendrite,
producer *producers.RoomserverProducer, federation *gomatrixserverlib.FederationClient,
federation *gomatrixserverlib.FederationClient,
accountDB accounts.Database,
) util.JSONResponse {
var body invites
@ -92,8 +91,8 @@ func CreateInvitesFrom3PIDInvites(
}
// Send all the events
if _, err := producer.SendEvents(req.Context(), evs, cfg.Matrix.ServerName, nil); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("producer.SendEvents failed")
if _, err := api.SendEvents(req.Context(), rsAPI, evs, cfg.Matrix.ServerName, nil); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed")
return jsonerror.InternalServerError()
}
@ -111,7 +110,6 @@ func ExchangeThirdPartyInvite(
rsAPI roomserverAPI.RoomserverInternalAPI,
cfg *config.Dendrite,
federation *gomatrixserverlib.FederationClient,
producer *producers.RoomserverProducer,
) util.JSONResponse {
var builder gomatrixserverlib.EventBuilder
if err := json.Unmarshal(request.Content(), &builder); err != nil {
@ -176,15 +174,15 @@ func ExchangeThirdPartyInvite(
}
// Send the event to the roomserver
if _, err = producer.SendEvents(
httpReq.Context(),
if _, err = api.SendEvents(
httpReq.Context(), rsAPI,
[]gomatrixserverlib.HeaderedEvent{
signedEvent.Event.Headered(verRes.RoomVersion),
},
cfg.Matrix.ServerName,
nil,
); err != nil {
util.GetLogger(httpReq.Context()).WithError(err).Error("producer.SendEvents failed")
util.GetLogger(httpReq.Context()).WithError(err).Error("SendEvents failed")
return jsonerror.InternalServerError()
}

View file

@ -7,7 +7,6 @@ import (
"github.com/matrix-org/dendrite/clientapi"
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
"github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
"github.com/matrix-org/dendrite/clientapi/producers"
eduServerAPI "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/federationapi"
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
@ -41,8 +40,6 @@ type Monolith struct {
RoomserverAPI roomserverAPI.RoomserverInternalAPI
ServerKeyAPI serverKeyAPI.ServerKeyInternalAPI
// TODO: remove, this isn't even a producer
EDUProducer *producers.EDUServerProducer
// TODO: can we remove this? It's weird that we are required the database
// yet every other component can do that on its own. libp2p-demo uses a custom
// database though annoyingly.
@ -65,7 +62,7 @@ func (m *Monolith) AddAllPublicRoutes(publicMux *mux.Router) {
federationapi.AddPublicRoutes(
publicMux, m.Config, m.AccountDB, m.DeviceDB, m.FedClient,
m.KeyRing, m.RoomserverAPI, m.AppserviceAPI, m.FederationSenderAPI,
m.EDUProducer,
m.EDUInternalAPI,
)
mediaapi.AddPublicRoutes(publicMux, m.Config, m.DeviceDB)
publicroomsapi.AddPublicRoutes(

View file

@ -1,4 +1,4 @@
// Copyright 2017 Vector Creations Ltd
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -12,63 +12,51 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package producers
package api
import (
"context"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
)
// RoomserverProducer produces events for the roomserver to consume.
type RoomserverProducer struct {
RsAPI api.RoomserverInternalAPI
}
// NewRoomserverProducer creates a new RoomserverProducer
func NewRoomserverProducer(rsAPI api.RoomserverInternalAPI) *RoomserverProducer {
return &RoomserverProducer{
RsAPI: rsAPI,
}
}
// SendEvents writes the given events to the roomserver input log. The events are written with KindNew.
func (c *RoomserverProducer) SendEvents(
ctx context.Context, events []gomatrixserverlib.HeaderedEvent, sendAsServer gomatrixserverlib.ServerName,
txnID *api.TransactionID,
// SendEvents to the roomserver The events are written with KindNew.
func SendEvents(
ctx context.Context, rsAPI RoomserverInternalAPI, events []gomatrixserverlib.HeaderedEvent,
sendAsServer gomatrixserverlib.ServerName, txnID *TransactionID,
) (string, error) {
ires := make([]api.InputRoomEvent, len(events))
ires := make([]InputRoomEvent, len(events))
for i, event := range events {
ires[i] = api.InputRoomEvent{
Kind: api.KindNew,
ires[i] = InputRoomEvent{
Kind: KindNew,
Event: event,
AuthEventIDs: event.AuthEventIDs(),
SendAsServer: string(sendAsServer),
TransactionID: txnID,
}
}
return c.SendInputRoomEvents(ctx, ires)
return SendInputRoomEvents(ctx, rsAPI, ires)
}
// SendEventWithState writes an event with KindNew to the roomserver input log
// SendEventWithState writes an event with KindNew to the roomserver
// with the state at the event as KindOutlier before it. Will not send any event that is
// marked as `true` in haveEventIDs
func (c *RoomserverProducer) SendEventWithState(
ctx context.Context, state *gomatrixserverlib.RespState, event gomatrixserverlib.HeaderedEvent, haveEventIDs map[string]bool,
func SendEventWithState(
ctx context.Context, rsAPI RoomserverInternalAPI, state *gomatrixserverlib.RespState,
event gomatrixserverlib.HeaderedEvent, haveEventIDs map[string]bool,
) error {
outliers, err := state.Events()
if err != nil {
return err
}
var ires []api.InputRoomEvent
var ires []InputRoomEvent
for _, outlier := range outliers {
if haveEventIDs[outlier.EventID()] {
continue
}
ires = append(ires, api.InputRoomEvent{
Kind: api.KindOutlier,
ires = append(ires, InputRoomEvent{
Kind: KindOutlier,
Event: outlier.Headered(event.RoomVersion),
AuthEventIDs: outlier.AuthEventIDs(),
})
@ -79,39 +67,40 @@ func (c *RoomserverProducer) SendEventWithState(
stateEventIDs[i] = state.StateEvents[i].EventID()
}
ires = append(ires, api.InputRoomEvent{
Kind: api.KindNew,
ires = append(ires, InputRoomEvent{
Kind: KindNew,
Event: event,
AuthEventIDs: event.AuthEventIDs(),
HasState: true,
StateEventIDs: stateEventIDs,
})
_, err = c.SendInputRoomEvents(ctx, ires)
_, err = SendInputRoomEvents(ctx, rsAPI, ires)
return err
}
// SendInputRoomEvents writes the given input room events to the roomserver input API.
func (c *RoomserverProducer) SendInputRoomEvents(
ctx context.Context, ires []api.InputRoomEvent,
// SendInputRoomEvents to the roomserver.
func SendInputRoomEvents(
ctx context.Context, rsAPI RoomserverInternalAPI, ires []InputRoomEvent,
) (eventID string, err error) {
request := api.InputRoomEventsRequest{InputRoomEvents: ires}
var response api.InputRoomEventsResponse
err = c.RsAPI.InputRoomEvents(ctx, &request, &response)
request := InputRoomEventsRequest{InputRoomEvents: ires}
var response InputRoomEventsResponse
err = rsAPI.InputRoomEvents(ctx, &request, &response)
eventID = response.EventID
return
}
// SendInvite writes the invite event to the roomserver input API.
// SendInvite event to the roomserver.
// This should only be needed for invite events that occur outside of a known room.
// If we are in the room then the event should be sent using the SendEvents method.
func (c *RoomserverProducer) SendInvite(
ctx context.Context, inviteEvent gomatrixserverlib.HeaderedEvent,
func SendInvite(
ctx context.Context,
rsAPI RoomserverInternalAPI, inviteEvent gomatrixserverlib.HeaderedEvent,
inviteRoomState []gomatrixserverlib.InviteV2StrippedState,
sendAsServer gomatrixserverlib.ServerName, txnID *api.TransactionID,
sendAsServer gomatrixserverlib.ServerName, txnID *TransactionID,
) error {
request := api.InputRoomEventsRequest{
InputInviteEvents: []api.InputInviteEvent{{
request := InputRoomEventsRequest{
InputInviteEvents: []InputInviteEvent{{
Event: inviteEvent,
InviteRoomState: inviteRoomState,
RoomVersion: inviteEvent.RoomVersion,
@ -119,6 +108,6 @@ func (c *RoomserverProducer) SendInvite(
TransactionID: txnID,
}},
}
var response api.InputRoomEventsResponse
return c.RsAPI.InputRoomEvents(ctx, &request, &response)
var response InputRoomEventsResponse
return rsAPI.InputRoomEvents(ctx, &request, &response)
}