diff --git a/appservice/appservice.go b/appservice/appservice.go index dca600bf5..b7adf755b 100644 --- a/appservice/appservice.go +++ b/appservice/appservice.go @@ -26,7 +26,6 @@ import ( "github.com/matrix-org/dendrite/appservice/consumers" "github.com/matrix-org/dendrite/appservice/inthttp" "github.com/matrix-org/dendrite/appservice/query" - "github.com/matrix-org/dendrite/appservice/routing" "github.com/matrix-org/dendrite/appservice/storage" "github.com/matrix-org/dendrite/appservice/types" "github.com/matrix-org/dendrite/appservice/workers" @@ -35,22 +34,10 @@ import ( "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/basecomponent" "github.com/matrix-org/dendrite/internal/config" - "github.com/matrix-org/dendrite/internal/transactions" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" - "github.com/matrix-org/gomatrixserverlib" "github.com/sirupsen/logrus" ) -// AddPublicRoutes registers HTTP handlers for CS API calls -func AddPublicRoutes(router *mux.Router, cfg *config.Dendrite, rsAPI roomserverAPI.RoomserverInternalAPI, - accountsDB accounts.Database, federation *gomatrixserverlib.FederationClient, txnCache *transactions.Cache) { - - routing.Setup( - router, cfg, rsAPI, - accountsDB, federation, txnCache, - ) -} - // AddInternalRoutes registers HTTP handlers for internal API calls func AddInternalRoutes(router *mux.Router, queryAPI appserviceAPI.AppServiceQueryAPI) { inthttp.AddRoutes(queryAPI, router) diff --git a/appservice/routing/routing.go b/appservice/routing/routing.go deleted file mode 100644 index e5ed7ab40..000000000 --- a/appservice/routing/routing.go +++ /dev/null @@ -1,65 +0,0 @@ -// Copyright 2018 Vector Creations Ltd -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package routing - -import ( - "net/http" - - "github.com/gorilla/mux" - "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" - "github.com/matrix-org/dendrite/internal" - "github.com/matrix-org/dendrite/internal/config" - "github.com/matrix-org/dendrite/internal/transactions" - "github.com/matrix-org/dendrite/roomserver/api" - "github.com/matrix-org/gomatrixserverlib" - "github.com/matrix-org/util" -) - -const pathPrefixApp = "/app/v1" - -// Setup registers HTTP handlers with the given ServeMux. It also supplies the given http.Client -// to clients which need to make outbound HTTP requests. -// -// Due to Setup being used to call many other functions, a gocyclo nolint is -// applied: -// nolint: gocyclo -func Setup( - apiMux *mux.Router, cfg *config.Dendrite, // nolint: unparam - rsAPI api.RoomserverInternalAPI, // nolint: unparam - accountDB accounts.Database, // nolint: unparam - federation *gomatrixserverlib.FederationClient, // nolint: unparam - transactionsCache *transactions.Cache, // nolint: unparam -) { - appMux := apiMux.PathPrefix(pathPrefixApp).Subrouter() - - appMux.Handle("/alias", - internal.MakeExternalAPI("alias", func(req *http.Request) util.JSONResponse { - // TODO: Implement - return util.JSONResponse{ - Code: http.StatusOK, - JSON: nil, - } - }), - ).Methods(http.MethodGet, http.MethodOptions) - appMux.Handle("/user", - internal.MakeExternalAPI("user", func(req *http.Request) util.JSONResponse { - // TODO: Implement - return util.JSONResponse{ - Code: http.StatusOK, - JSON: nil, - } - }), - ).Methods(http.MethodGet, http.MethodOptions) -} diff --git a/clientapi/clientapi.go b/clientapi/clientapi.go index 90db9eead..2780f367c 100644 --- a/clientapi/clientapi.go +++ b/clientapi/clientapi.go @@ -48,9 +48,6 @@ func AddPublicRoutes( transactionsCache *transactions.Cache, fsAPI federationSenderAPI.FederationSenderInternalAPI, ) { - roomserverProducer := producers.NewRoomserverProducer(rsAPI) - eduProducer := producers.NewEDUServerProducer(eduInputAPI) - syncProducer := &producers.SyncAPIProducer{ Producer: producer, Topic: string(cfg.Kafka.Topics.OutputClientData), @@ -64,8 +61,8 @@ func AddPublicRoutes( } routing.Setup( - router, cfg, roomserverProducer, rsAPI, asAPI, + router, cfg, eduInputAPI, rsAPI, asAPI, accountsDB, deviceDB, federation, *keyRing, - syncProducer, eduProducer, transactionsCache, fsAPI, + syncProducer, transactionsCache, fsAPI, ) } diff --git a/clientapi/routing/createroom.go b/clientapi/routing/createroom.go index 1b4ff184f..89f49d356 100644 --- a/clientapi/routing/createroom.go +++ b/clientapi/routing/createroom.go @@ -29,7 +29,6 @@ import ( "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" "github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/jsonerror" - "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/clientapi/threepid" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/config" @@ -137,21 +136,21 @@ type fledglingEvent struct { // CreateRoom implements /createRoom func CreateRoom( req *http.Request, device *authtypes.Device, - cfg *config.Dendrite, producer *producers.RoomserverProducer, + cfg *config.Dendrite, accountDB accounts.Database, rsAPI roomserverAPI.RoomserverInternalAPI, asAPI appserviceAPI.AppServiceQueryAPI, ) util.JSONResponse { // TODO (#267): Check room ID doesn't clash with an existing one, and we // probably shouldn't be using pseudo-random strings, maybe GUIDs? roomID := fmt.Sprintf("!%s:%s", util.RandomString(16), cfg.Matrix.ServerName) - return createRoom(req, device, cfg, roomID, producer, accountDB, rsAPI, asAPI) + return createRoom(req, device, cfg, roomID, accountDB, rsAPI, asAPI) } // createRoom implements /createRoom // nolint: gocyclo func createRoom( req *http.Request, device *authtypes.Device, - cfg *config.Dendrite, roomID string, producer *producers.RoomserverProducer, + cfg *config.Dendrite, roomID string, accountDB accounts.Database, rsAPI roomserverAPI.RoomserverInternalAPI, asAPI appserviceAPI.AppServiceQueryAPI, ) util.JSONResponse { @@ -344,9 +343,9 @@ func createRoom( } // send events to the room server - _, err = producer.SendEvents(req.Context(), builtEvents, cfg.Matrix.ServerName, nil) + _, err = roomserverAPI.SendEvents(req.Context(), rsAPI, builtEvents, cfg.Matrix.ServerName, nil) if err != nil { - util.GetLogger(req.Context()).WithError(err).Error("producer.SendEvents failed") + util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed") return jsonerror.InternalServerError() } @@ -404,14 +403,14 @@ func createRoom( } } // Send the invite event to the roomserver. - if err = producer.SendInvite( - req.Context(), + if err = roomserverAPI.SendInvite( + req.Context(), rsAPI, inviteEvent.Headered(roomVersion), strippedState, // invite room state cfg.Matrix.ServerName, // send as server nil, // transaction ID ); err != nil { - util.GetLogger(req.Context()).WithError(err).Error("producer.SendEvents failed") + util.GetLogger(req.Context()).WithError(err).Error("SendInvite failed") return jsonerror.InternalServerError() } } diff --git a/clientapi/routing/membership.go b/clientapi/routing/membership.go index 74d92a059..0b2c2bc5e 100644 --- a/clientapi/routing/membership.go +++ b/clientapi/routing/membership.go @@ -25,7 +25,6 @@ import ( "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" "github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/jsonerror" - "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/clientapi/threepid" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/config" @@ -46,7 +45,6 @@ func SendMembership( req *http.Request, accountDB accounts.Database, device *authtypes.Device, roomID string, membership string, cfg *config.Dendrite, rsAPI roomserverAPI.RoomserverInternalAPI, asAPI appserviceAPI.AppServiceQueryAPI, - producer *producers.RoomserverProducer, ) util.JSONResponse { verReq := api.QueryRoomVersionForRoomRequest{RoomID: roomID} verRes := api.QueryRoomVersionForRoomResponse{} @@ -71,7 +69,7 @@ func SendMembership( } inviteStored, jsonErrResp := checkAndProcessThreepid( - req, device, &body, cfg, rsAPI, accountDB, producer, + req, device, &body, cfg, rsAPI, accountDB, membership, roomID, evTime, ) if jsonErrResp != nil { @@ -112,8 +110,8 @@ func SendMembership( switch membership { case gomatrixserverlib.Invite: // Invites need to be handled specially - err = producer.SendInvite( - req.Context(), + err = roomserverAPI.SendInvite( + req.Context(), rsAPI, event.Headered(verRes.RoomVersion), nil, // ask the roomserver to draw up invite room state for us cfg.Matrix.ServerName, @@ -130,14 +128,14 @@ func SendMembership( }{roomID} fallthrough default: - _, err = producer.SendEvents( - req.Context(), + _, err = roomserverAPI.SendEvents( + req.Context(), rsAPI, []gomatrixserverlib.HeaderedEvent{event.Headered(verRes.RoomVersion)}, cfg.Matrix.ServerName, nil, ) if err != nil { - util.GetLogger(req.Context()).WithError(err).Error("producer.SendEvents failed") + util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed") return jsonerror.InternalServerError() } } @@ -252,13 +250,12 @@ func checkAndProcessThreepid( cfg *config.Dendrite, rsAPI roomserverAPI.RoomserverInternalAPI, accountDB accounts.Database, - producer *producers.RoomserverProducer, membership, roomID string, evTime time.Time, ) (inviteStored bool, errRes *util.JSONResponse) { inviteStored, err := threepid.CheckAndProcessInvite( - req.Context(), device, body, cfg, rsAPI, accountDB, producer, + req.Context(), device, body, cfg, rsAPI, accountDB, membership, roomID, evTime, ) if err == threepid.ErrMissingParameter { diff --git a/clientapi/routing/profile.go b/clientapi/routing/profile.go index c0fe32a31..642a72887 100644 --- a/clientapi/routing/profile.go +++ b/clientapi/routing/profile.go @@ -24,7 +24,6 @@ import ( "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" "github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/jsonerror" - "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/roomserver/api" @@ -94,8 +93,7 @@ func GetAvatarURL( // nolint:gocyclo func SetAvatarURL( req *http.Request, accountDB accounts.Database, device *authtypes.Device, - userID string, cfg *config.Dendrite, - rsProducer *producers.RoomserverProducer, rsAPI api.RoomserverInternalAPI, + userID string, cfg *config.Dendrite, rsAPI api.RoomserverInternalAPI, ) util.JSONResponse { if userID != device.UserID { return util.JSONResponse{ @@ -167,8 +165,8 @@ func SetAvatarURL( return jsonerror.InternalServerError() } - if _, err := rsProducer.SendEvents(req.Context(), events, cfg.Matrix.ServerName, nil); err != nil { - util.GetLogger(req.Context()).WithError(err).Error("rsProducer.SendEvents failed") + if _, err := api.SendEvents(req.Context(), rsAPI, events, cfg.Matrix.ServerName, nil); err != nil { + util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed") return jsonerror.InternalServerError() } @@ -209,8 +207,7 @@ func GetDisplayName( // nolint:gocyclo func SetDisplayName( req *http.Request, accountDB accounts.Database, device *authtypes.Device, - userID string, cfg *config.Dendrite, - rsProducer *producers.RoomserverProducer, rsAPI api.RoomserverInternalAPI, + userID string, cfg *config.Dendrite, rsAPI api.RoomserverInternalAPI, ) util.JSONResponse { if userID != device.UserID { return util.JSONResponse{ @@ -282,8 +279,8 @@ func SetDisplayName( return jsonerror.InternalServerError() } - if _, err := rsProducer.SendEvents(req.Context(), events, cfg.Matrix.ServerName, nil); err != nil { - util.GetLogger(req.Context()).WithError(err).Error("rsProducer.SendEvents failed") + if _, err := api.SendEvents(req.Context(), rsAPI, events, cfg.Matrix.ServerName, nil); err != nil { + util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed") return jsonerror.InternalServerError() } diff --git a/clientapi/routing/routing.go b/clientapi/routing/routing.go index eb558205e..024707759 100644 --- a/clientapi/routing/routing.go +++ b/clientapi/routing/routing.go @@ -27,6 +27,7 @@ import ( "github.com/matrix-org/dendrite/clientapi/auth/storage/devices" "github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/producers" + eduServerAPI "github.com/matrix-org/dendrite/eduserver/api" federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/config" @@ -48,7 +49,7 @@ const pathPrefixUnstable = "/client/unstable" // nolint: gocyclo func Setup( publicAPIMux *mux.Router, cfg *config.Dendrite, - producer *producers.RoomserverProducer, + eduAPI eduServerAPI.EDUServerInputAPI, rsAPI roomserverAPI.RoomserverInternalAPI, asAPI appserviceAPI.AppServiceQueryAPI, accountDB accounts.Database, @@ -56,7 +57,6 @@ func Setup( federation *gomatrixserverlib.FederationClient, keyRing gomatrixserverlib.KeyRing, syncProducer *producers.SyncAPIProducer, - eduProducer *producers.EDUServerProducer, transactionsCache *transactions.Cache, federationSender federationSenderAPI.FederationSenderInternalAPI, ) { @@ -89,7 +89,7 @@ func Setup( r0mux.Handle("/createRoom", internal.MakeAuthAPI("createRoom", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse { - return CreateRoom(req, device, cfg, producer, accountDB, rsAPI, asAPI) + return CreateRoom(req, device, cfg, accountDB, rsAPI, asAPI) }), ).Methods(http.MethodPost, http.MethodOptions) r0mux.Handle("/join/{roomIDOrAlias}", @@ -125,7 +125,7 @@ func Setup( if err != nil { return util.ErrorResponse(err) } - return SendMembership(req, accountDB, device, vars["roomID"], vars["membership"], cfg, rsAPI, asAPI, producer) + return SendMembership(req, accountDB, device, vars["roomID"], vars["membership"], cfg, rsAPI, asAPI) }), ).Methods(http.MethodPost, http.MethodOptions) r0mux.Handle("/rooms/{roomID}/send/{eventType}", @@ -134,7 +134,7 @@ func Setup( if err != nil { return util.ErrorResponse(err) } - return SendEvent(req, device, vars["roomID"], vars["eventType"], nil, nil, cfg, rsAPI, producer, nil) + return SendEvent(req, device, vars["roomID"], vars["eventType"], nil, nil, cfg, rsAPI, nil) }), ).Methods(http.MethodPost, http.MethodOptions) r0mux.Handle("/rooms/{roomID}/send/{eventType}/{txnID}", @@ -145,7 +145,7 @@ func Setup( } txnID := vars["txnID"] return SendEvent(req, device, vars["roomID"], vars["eventType"], &txnID, - nil, cfg, rsAPI, producer, transactionsCache) + nil, cfg, rsAPI, transactionsCache) }), ).Methods(http.MethodPut, http.MethodOptions) r0mux.Handle("/rooms/{roomID}/event/{eventID}", @@ -194,7 +194,7 @@ func Setup( if strings.HasSuffix(eventType, "/") { eventType = eventType[:len(eventType)-1] } - return SendEvent(req, device, vars["roomID"], eventType, nil, &emptyString, cfg, rsAPI, producer, nil) + return SendEvent(req, device, vars["roomID"], eventType, nil, &emptyString, cfg, rsAPI, nil) }), ).Methods(http.MethodPut, http.MethodOptions) @@ -205,7 +205,7 @@ func Setup( return util.ErrorResponse(err) } stateKey := vars["stateKey"] - return SendEvent(req, device, vars["roomID"], vars["eventType"], nil, &stateKey, cfg, rsAPI, producer, nil) + return SendEvent(req, device, vars["roomID"], vars["eventType"], nil, &stateKey, cfg, rsAPI, nil) }), ).Methods(http.MethodPut, http.MethodOptions) @@ -269,7 +269,7 @@ func Setup( if err != nil { return util.ErrorResponse(err) } - return SendTyping(req, device, vars["roomID"], vars["userID"], accountDB, eduProducer) + return SendTyping(req, device, vars["roomID"], vars["userID"], accountDB, eduAPI) }), ).Methods(http.MethodPut, http.MethodOptions) @@ -280,7 +280,7 @@ func Setup( return util.ErrorResponse(err) } txnID := vars["txnID"] - return SendToDevice(req, device, eduProducer, transactionsCache, vars["eventType"], &txnID) + return SendToDevice(req, device, eduAPI, transactionsCache, vars["eventType"], &txnID) }), ).Methods(http.MethodPut, http.MethodOptions) @@ -294,7 +294,7 @@ func Setup( return util.ErrorResponse(err) } txnID := vars["txnID"] - return SendToDevice(req, device, eduProducer, transactionsCache, vars["eventType"], &txnID) + return SendToDevice(req, device, eduAPI, transactionsCache, vars["eventType"], &txnID) }), ).Methods(http.MethodPut, http.MethodOptions) @@ -386,7 +386,7 @@ func Setup( if err != nil { return util.ErrorResponse(err) } - return SetAvatarURL(req, accountDB, device, vars["userID"], cfg, producer, rsAPI) + return SetAvatarURL(req, accountDB, device, vars["userID"], cfg, rsAPI) }), ).Methods(http.MethodPut, http.MethodOptions) // Browsers use the OPTIONS HTTP method to check if the CORS policy allows @@ -408,7 +408,7 @@ func Setup( if err != nil { return util.ErrorResponse(err) } - return SetDisplayName(req, accountDB, device, vars["userID"], cfg, producer, rsAPI) + return SetDisplayName(req, accountDB, device, vars["userID"], cfg, rsAPI) }), ).Methods(http.MethodPut, http.MethodOptions) // Browsers use the OPTIONS HTTP method to check if the CORS policy allows diff --git a/clientapi/routing/sendevent.go b/clientapi/routing/sendevent.go index a8f8893ba..5d5507e85 100644 --- a/clientapi/routing/sendevent.go +++ b/clientapi/routing/sendevent.go @@ -20,7 +20,6 @@ import ( "github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/jsonerror" - "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/internal/transactions" @@ -46,7 +45,6 @@ func SendEvent( roomID, eventType string, txnID, stateKey *string, cfg *config.Dendrite, rsAPI api.RoomserverInternalAPI, - producer *producers.RoomserverProducer, txnCache *transactions.Cache, ) util.JSONResponse { verReq := api.QueryRoomVersionForRoomRequest{RoomID: roomID} @@ -80,8 +78,8 @@ func SendEvent( // pass the new event to the roomserver and receive the correct event ID // event ID in case of duplicate transaction is discarded - eventID, err := producer.SendEvents( - req.Context(), + eventID, err := api.SendEvents( + req.Context(), rsAPI, []gomatrixserverlib.HeaderedEvent{ e.Headered(verRes.RoomVersion), }, @@ -89,7 +87,7 @@ func SendEvent( txnAndSessionID, ) if err != nil { - util.GetLogger(req.Context()).WithError(err).Error("producer.SendEvents failed") + util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed") return jsonerror.InternalServerError() } util.GetLogger(req.Context()).WithFields(logrus.Fields{ diff --git a/clientapi/routing/sendtodevice.go b/clientapi/routing/sendtodevice.go index 5d3060d74..dc0a6572b 100644 --- a/clientapi/routing/sendtodevice.go +++ b/clientapi/routing/sendtodevice.go @@ -19,7 +19,7 @@ import ( "github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/jsonerror" - "github.com/matrix-org/dendrite/clientapi/producers" + "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/internal/transactions" "github.com/matrix-org/util" ) @@ -28,7 +28,7 @@ import ( // sends the device events to the EDU Server func SendToDevice( req *http.Request, device *authtypes.Device, - eduProducer *producers.EDUServerProducer, + eduAPI api.EDUServerInputAPI, txnCache *transactions.Cache, eventType string, txnID *string, ) util.JSONResponse { @@ -48,8 +48,8 @@ func SendToDevice( for userID, byUser := range httpReq.Messages { for deviceID, message := range byUser { - if err := eduProducer.SendToDevice( - req.Context(), device.UserID, userID, deviceID, eventType, message, + if err := api.SendToDevice( + req.Context(), eduAPI, device.UserID, userID, deviceID, eventType, message, ); err != nil { util.GetLogger(req.Context()).WithError(err).Error("eduProducer.SendToDevice failed") return jsonerror.InternalServerError() diff --git a/clientapi/routing/sendtyping.go b/clientapi/routing/sendtyping.go index ffaa0e662..2eae16582 100644 --- a/clientapi/routing/sendtyping.go +++ b/clientapi/routing/sendtyping.go @@ -20,8 +20,8 @@ import ( "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" "github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/jsonerror" - "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/clientapi/userutil" + "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/util" ) @@ -35,7 +35,7 @@ type typingContentJSON struct { func SendTyping( req *http.Request, device *authtypes.Device, roomID string, userID string, accountDB accounts.Database, - eduProducer *producers.EDUServerProducer, + eduAPI api.EDUServerInputAPI, ) util.JSONResponse { if device.UserID != userID { return util.JSONResponse{ @@ -69,8 +69,8 @@ func SendTyping( return *resErr } - if err = eduProducer.SendTyping( - req.Context(), userID, roomID, r.Typing, r.Timeout, + if err = api.SendTyping( + req.Context(), eduAPI, userID, roomID, r.Typing, r.Timeout, ); err != nil { util.GetLogger(req.Context()).WithError(err).Error("eduProducer.Send failed") return jsonerror.InternalServerError() diff --git a/clientapi/threepid/invites.go b/clientapi/threepid/invites.go index 8f173bf8a..b0df0dd4d 100644 --- a/clientapi/threepid/invites.go +++ b/clientapi/threepid/invites.go @@ -26,7 +26,6 @@ import ( "github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" - "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/roomserver/api" @@ -88,7 +87,7 @@ func CheckAndProcessInvite( ctx context.Context, device *authtypes.Device, body *MembershipRequest, cfg *config.Dendrite, rsAPI api.RoomserverInternalAPI, db accounts.Database, - producer *producers.RoomserverProducer, membership string, roomID string, + membership string, roomID string, evTime time.Time, ) (inviteStoredOnIDServer bool, err error) { if membership != gomatrixserverlib.Invite || (body.Address == "" && body.IDServer == "" && body.Medium == "") { @@ -112,7 +111,7 @@ func CheckAndProcessInvite( // "m.room.third_party_invite" have to be emitted from the data in // storeInviteRes. err = emit3PIDInviteEvent( - ctx, body, storeInviteRes, device, roomID, cfg, rsAPI, producer, evTime, + ctx, body, storeInviteRes, device, roomID, cfg, rsAPI, evTime, ) inviteStoredOnIDServer = err == nil @@ -331,7 +330,7 @@ func emit3PIDInviteEvent( ctx context.Context, body *MembershipRequest, res *idServerStoreInviteResponse, device *authtypes.Device, roomID string, cfg *config.Dendrite, - rsAPI api.RoomserverInternalAPI, producer *producers.RoomserverProducer, + rsAPI api.RoomserverInternalAPI, evTime time.Time, ) error { builder := &gomatrixserverlib.EventBuilder{ @@ -359,8 +358,8 @@ func emit3PIDInviteEvent( return err } - _, err = producer.SendEvents( - ctx, + _, err = api.SendEvents( + ctx, rsAPI, []gomatrixserverlib.HeaderedEvent{ (*event).Headered(queryRes.RoomVersion), }, diff --git a/cmd/dendrite-appservice-server/main.go b/cmd/dendrite-appservice-server/main.go index 21fb1b79c..fe00a117d 100644 --- a/cmd/dendrite-appservice-server/main.go +++ b/cmd/dendrite-appservice-server/main.go @@ -17,7 +17,6 @@ package main import ( "github.com/matrix-org/dendrite/appservice" "github.com/matrix-org/dendrite/internal/basecomponent" - "github.com/matrix-org/dendrite/internal/transactions" ) func main() { @@ -27,13 +26,10 @@ func main() { defer base.Close() // nolint: errcheck accountDB := base.CreateAccountsDB() deviceDB := base.CreateDeviceDB() - federation := base.CreateFederationClient() rsAPI := base.RoomserverHTTPClient() - cache := transactions.New() intAPI := appservice.NewInternalAPI(base, accountDB, deviceDB, rsAPI) appservice.AddInternalRoutes(base.InternalAPIMux, intAPI) - appservice.AddPublicRoutes(base.PublicAPIMux, base.Cfg, rsAPI, accountDB, federation, cache) base.SetupAndServeHTTP(string(base.Cfg.Bind.AppServiceAPI), string(base.Cfg.Listen.AppServiceAPI)) diff --git a/cmd/dendrite-demo-libp2p/main.go b/cmd/dendrite-demo-libp2p/main.go index ba1af148d..b7cbcdc9c 100644 --- a/cmd/dendrite-demo-libp2p/main.go +++ b/cmd/dendrite-demo-libp2p/main.go @@ -29,7 +29,6 @@ import ( p2phttp "github.com/libp2p/go-libp2p-http" p2pdisc "github.com/libp2p/go-libp2p/p2p/discovery" "github.com/matrix-org/dendrite/appservice" - "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/cmd/dendrite-demo-libp2p/storage" "github.com/matrix-org/dendrite/eduserver" "github.com/matrix-org/dendrite/federationsender" @@ -149,7 +148,6 @@ func main() { &base.Base, federation, rsAPI, keyRing, ) rsAPI.SetFederationSenderAPI(fsAPI) - eduProducer := producers.NewEDUServerProducer(eduInputAPI) publicRoomsDB, err := storage.NewPublicRoomsServerDatabaseWithPubSub(string(base.Base.Cfg.Database.PublicRoomsAPI), base.LibP2PPubsub, cfg.Matrix.ServerName) if err != nil { logrus.WithError(err).Panicf("failed to connect to public rooms db") @@ -166,7 +164,6 @@ func main() { AppserviceAPI: asAPI, EDUInternalAPI: eduInputAPI, - EDUProducer: eduProducer, FederationSenderAPI: fsAPI, RoomserverAPI: rsAPI, ServerKeyAPI: serverKeyAPI, diff --git a/cmd/dendrite-demo-yggdrasil/convert/25519.go b/cmd/dendrite-demo-yggdrasil/convert/25519.go new file mode 100644 index 000000000..97f053ec0 --- /dev/null +++ b/cmd/dendrite-demo-yggdrasil/convert/25519.go @@ -0,0 +1,53 @@ +// Copyright 2019 Google LLC +// +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file or at +// https://developers.google.com/open-source/licenses/bsd +// +// Original code from https://github.com/FiloSottile/age/blob/bbab440e198a4d67ba78591176c7853e62d29e04/internal/age/ssh.go + +package convert + +import ( + "crypto/ed25519" + "crypto/sha512" + "math/big" + + "golang.org/x/crypto/curve25519" +) + +var curve25519P, _ = new(big.Int).SetString("57896044618658097711785492504343953926634992332820282019728792003956564819949", 10) + +func Ed25519PrivateKeyToCurve25519(pk ed25519.PrivateKey) []byte { + h := sha512.New() + _, _ = h.Write(pk.Seed()) + out := h.Sum(nil) + return out[:curve25519.ScalarSize] +} + +func Ed25519PublicKeyToCurve25519(pk ed25519.PublicKey) []byte { + // ed25519.PublicKey is a little endian representation of the y-coordinate, + // with the most significant bit set based on the sign of the x-coordinate. + bigEndianY := make([]byte, ed25519.PublicKeySize) + for i, b := range pk { + bigEndianY[ed25519.PublicKeySize-i-1] = b + } + bigEndianY[0] &= 0b0111_1111 + + // The Montgomery u-coordinate is derived through the bilinear map + // u = (1 + y) / (1 - y) + // See https://blog.filippo.io/using-ed25519-keys-for-encryption. + y := new(big.Int).SetBytes(bigEndianY) + denom := big.NewInt(1) + denom.ModInverse(denom.Sub(denom, y), curve25519P) // 1 / (1 - y) + u := y.Mul(y.Add(y, big.NewInt(1)), denom) + u.Mod(u, curve25519P) + + out := make([]byte, curve25519.PointSize) + uBytes := u.Bytes() + for i, b := range uBytes { + out[len(uBytes)-i-1] = b + } + + return out +} diff --git a/cmd/dendrite-demo-yggdrasil/convert/25519_test.go b/cmd/dendrite-demo-yggdrasil/convert/25519_test.go new file mode 100644 index 000000000..22177b8b4 --- /dev/null +++ b/cmd/dendrite-demo-yggdrasil/convert/25519_test.go @@ -0,0 +1,51 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package convert + +import ( + "bytes" + "crypto/ed25519" + "encoding/hex" + "testing" + + "golang.org/x/crypto/curve25519" +) + +func TestKeyConversion(t *testing.T) { + edPub, edPriv, err := ed25519.GenerateKey(nil) + if err != nil { + t.Fatal(err) + } + t.Log("Signing public:", hex.EncodeToString(edPub)) + t.Log("Signing private:", hex.EncodeToString(edPriv)) + + cuPriv := Ed25519PrivateKeyToCurve25519(edPriv) + t.Log("Encryption private:", hex.EncodeToString(cuPriv)) + + cuPub := Ed25519PublicKeyToCurve25519(edPub) + t.Log("Converted encryption public:", hex.EncodeToString(cuPub)) + + var realPub, realPriv [32]byte + copy(realPriv[:32], cuPriv[:32]) + curve25519.ScalarBaseMult(&realPub, &realPriv) + t.Log("Scalar-multed encryption public:", hex.EncodeToString(realPub[:])) + + if !bytes.Equal(realPriv[:], cuPriv[:]) { + t.Fatal("Private keys should be equal (this means the test is broken)") + } + if !bytes.Equal(realPub[:], cuPub[:]) { + t.Fatal("Public keys should be equal") + } +} diff --git a/cmd/dendrite-demo-yggdrasil/main.go b/cmd/dendrite-demo-yggdrasil/main.go index 040427567..9e707986b 100644 --- a/cmd/dendrite-demo-yggdrasil/main.go +++ b/cmd/dendrite-demo-yggdrasil/main.go @@ -16,7 +16,9 @@ package main import ( "context" + "crypto/ed25519" "crypto/tls" + "encoding/hex" "flag" "fmt" "net" @@ -25,7 +27,8 @@ import ( "time" "github.com/matrix-org/dendrite/appservice" - "github.com/matrix-org/dendrite/clientapi/producers" + "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/convert" + "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing" "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/yggconn" "github.com/matrix-org/dendrite/eduserver" "github.com/matrix-org/dendrite/eduserver/cache" @@ -37,7 +40,6 @@ import ( "github.com/matrix-org/dendrite/internal/setup" "github.com/matrix-org/dendrite/publicroomsapi/storage" "github.com/matrix-org/dendrite/roomserver" - "github.com/matrix-org/dendrite/serverkeyapi" "github.com/matrix-org/gomatrixserverlib" _ "github.com/matrix-org/dendrite/internal/embed" @@ -65,7 +67,13 @@ func createFederationClient( ) *gomatrixserverlib.FederationClient { yggdialer := func(_, address string) (net.Conn, error) { tokens := strings.Split(address, ":") - return n.Dial("curve25519", tokens[0]) + raw, err := hex.DecodeString(tokens[0]) + if err != nil { + return nil, fmt.Errorf("hex.DecodeString: %w", err) + } + converted := convert.Ed25519PublicKeyToCurve25519(ed25519.PublicKey(raw)) + convhex := hex.EncodeToString(converted) + return n.Dial("curve25519", convhex) } yggdialerctx := func(ctx context.Context, network, address string) (net.Conn, error) { return yggdialer(network, address) @@ -106,9 +114,9 @@ func main() { cfg := &config.Dendrite{} cfg.SetDefaults() - cfg.Matrix.ServerName = gomatrixserverlib.ServerName(ygg.EncryptionPublicKey()) + cfg.Matrix.ServerName = gomatrixserverlib.ServerName(ygg.DerivedServerName()) cfg.Matrix.PrivateKey = ygg.SigningPrivateKey() - cfg.Matrix.KeyID = gomatrixserverlib.KeyID("ed25519:auto") + cfg.Matrix.KeyID = gomatrixserverlib.KeyID(signing.KeyID) cfg.Kafka.UseNaffka = true cfg.Kafka.Topics.OutputRoomEvent = "roomserverOutput" cfg.Kafka.Topics.OutputClientData = "clientapiOutput" @@ -134,9 +142,7 @@ func main() { deviceDB := base.CreateDeviceDB() federation := createFederationClient(base, ygg) - serverKeyAPI := serverkeyapi.NewInternalAPI( - base.Cfg, federation, base.Caches, - ) + serverKeyAPI := &signing.YggdrasilKeys{} keyRing := serverKeyAPI.KeyRing() rsComponent := roomserver.NewInternalAPI( @@ -156,7 +162,6 @@ func main() { rsComponent.SetFederationSenderAPI(fsAPI) - eduProducer := producers.NewEDUServerProducer(eduInputAPI) publicRoomsDB, err := storage.NewPublicRoomsServerDatabase(string(base.Cfg.Database.PublicRoomsAPI), base.Cfg.DbProperties(), cfg.Matrix.ServerName) if err != nil { logrus.WithError(err).Panicf("failed to connect to public rooms db") @@ -175,10 +180,9 @@ func main() { AppserviceAPI: asAPI, EDUInternalAPI: eduInputAPI, - EDUProducer: eduProducer, FederationSenderAPI: fsAPI, RoomserverAPI: rsAPI, - ServerKeyAPI: serverKeyAPI, + //ServerKeyAPI: serverKeyAPI, PublicRoomsDB: publicRoomsDB, } @@ -193,7 +197,7 @@ func main() { ) go func() { - logrus.Info("Listening on ", ygg.EncryptionPublicKey()) + logrus.Info("Listening on ", ygg.DerivedServerName()) logrus.Fatal(httpServer.Serve(ygg)) }() go func() { diff --git a/cmd/dendrite-demo-yggdrasil/signing/fetcher.go b/cmd/dendrite-demo-yggdrasil/signing/fetcher.go new file mode 100644 index 000000000..bcec0cbec --- /dev/null +++ b/cmd/dendrite-demo-yggdrasil/signing/fetcher.go @@ -0,0 +1,69 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package signing + +import ( + "context" + "encoding/hex" + "fmt" + "time" + + "github.com/matrix-org/gomatrixserverlib" +) + +const KeyID = "ed25519:dendrite-demo-yggdrasil" + +type YggdrasilKeys struct { +} + +func (f *YggdrasilKeys) KeyRing() *gomatrixserverlib.KeyRing { + return &gomatrixserverlib.KeyRing{ + KeyDatabase: f, + } +} + +func (f *YggdrasilKeys) FetchKeys( + ctx context.Context, + requests map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.Timestamp, +) (map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult, error) { + res := make(map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult) + for req := range requests { + if req.KeyID != KeyID { + return nil, fmt.Errorf("FetchKeys: cannot fetch key with ID %s, should be %s", req.KeyID, KeyID) + } + + hexkey, err := hex.DecodeString(string(req.ServerName)) + if err != nil { + return nil, fmt.Errorf("FetchKeys: can't decode server name %q: %w", req.ServerName, err) + } + + res[req] = gomatrixserverlib.PublicKeyLookupResult{ + VerifyKey: gomatrixserverlib.VerifyKey{ + Key: hexkey, + }, + ExpiredTS: gomatrixserverlib.PublicKeyNotExpired, + ValidUntilTS: gomatrixserverlib.AsTimestamp(time.Now().Add(24 * time.Hour * 365)), + } + } + return res, nil +} + +func (f *YggdrasilKeys) FetcherName() string { + return "YggdrasilKeys" +} + +func (f *YggdrasilKeys) StoreKeys(ctx context.Context, results map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult) error { + return nil +} diff --git a/cmd/dendrite-demo-yggdrasil/yggconn/node.go b/cmd/dendrite-demo-yggdrasil/yggconn/node.go index 517f7f794..14bbf84d7 100644 --- a/cmd/dendrite-demo-yggdrasil/yggconn/node.go +++ b/cmd/dendrite-demo-yggdrasil/yggconn/node.go @@ -24,6 +24,8 @@ import ( "os" "sync" + "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/convert" + "github.com/libp2p/go-yamux" yggdrasiladmin "github.com/yggdrasil-network/yggdrasil-go/src/admin" yggdrasilconfig "github.com/yggdrasil-network/yggdrasil-go/src/config" @@ -56,8 +58,6 @@ func Setup(instanceName, instancePeer string) (*Node, error) { log: gologme.New(os.Stdout, "YGG ", log.Flags()), incoming: make(chan *yamux.Stream), } - n.config.AdminListen = fmt.Sprintf("unix://./%s-yggdrasil.sock", instanceName) - n.config.MulticastInterfaces = []string{".*"} yggfile := fmt.Sprintf("%s-yggdrasil.conf", instanceName) if _, err := os.Stat(yggfile); !os.IsNotExist(err) { @@ -69,6 +69,11 @@ func Setup(instanceName, instancePeer string) (*Node, error) { panic(err) } } else { + n.config.AdminListen = fmt.Sprintf("unix://./%s-yggdrasil.sock", instanceName) + n.config.MulticastInterfaces = []string{".*"} + n.config.EncryptionPrivateKey = hex.EncodeToString(n.EncryptionPrivateKey()) + n.config.EncryptionPublicKey = hex.EncodeToString(n.EncryptionPublicKey()) + j, err := json.MarshalIndent(n.config, "", " ") if err != nil { panic(err) @@ -119,8 +124,23 @@ func Setup(instanceName, instancePeer string) (*Node, error) { return n, nil } -func (n *Node) EncryptionPublicKey() string { - return n.core.EncryptionPublicKey() +func (n *Node) DerivedServerName() string { + return hex.EncodeToString(n.SigningPublicKey()) +} + +func (n *Node) EncryptionPublicKey() []byte { + edkey := n.SigningPublicKey() + return convert.Ed25519PublicKeyToCurve25519(edkey) +} + +func (n *Node) EncryptionPrivateKey() []byte { + edkey := n.SigningPrivateKey() + return convert.Ed25519PrivateKeyToCurve25519(edkey) +} + +func (n *Node) SigningPublicKey() ed25519.PublicKey { + pubBytes, _ := hex.DecodeString(n.config.SigningPublicKey) + return ed25519.PublicKey(pubBytes) } func (n *Node) SigningPrivateKey() ed25519.PrivateKey { diff --git a/cmd/dendrite-demo-yggdrasil/yggconn/session.go b/cmd/dendrite-demo-yggdrasil/yggconn/session.go index 4e9a9f043..a9bb15f7e 100644 --- a/cmd/dendrite-demo-yggdrasil/yggconn/session.go +++ b/cmd/dendrite-demo-yggdrasil/yggconn/session.go @@ -40,10 +40,10 @@ func (n *Node) listenFromYgg() { return } var session *yamux.Session - if strings.Compare(n.EncryptionPublicKey(), conn.RemoteAddr().String()) < 0 { - session, err = yamux.Client(conn, n.yamuxConfig()) - } else { + if strings.Compare(conn.RemoteAddr().String(), n.DerivedServerName()) < 0 { session, err = yamux.Server(conn, n.yamuxConfig()) + } else { + session, err = yamux.Client(conn, n.yamuxConfig()) } if err != nil { return @@ -96,7 +96,7 @@ func (n *Node) DialContext(ctx context.Context, network, address string) (net.Co n.log.Println("n.dialer.DialContext:", err) return nil, err } - if strings.Compare(n.EncryptionPublicKey(), address) < 0 { + if strings.Compare(address, n.DerivedServerName()) > 0 { session, err = yamux.Client(conn, n.yamuxConfig()) } else { session, err = yamux.Server(conn, n.yamuxConfig()) diff --git a/cmd/dendrite-federation-api-server/main.go b/cmd/dendrite-federation-api-server/main.go index 7481299e9..fcdc000c4 100644 --- a/cmd/dendrite-federation-api-server/main.go +++ b/cmd/dendrite-federation-api-server/main.go @@ -15,7 +15,6 @@ package main import ( - "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/federationapi" "github.com/matrix-org/dendrite/internal/basecomponent" ) @@ -33,12 +32,10 @@ func main() { fsAPI := base.FederationSenderHTTPClient() rsAPI := base.RoomserverHTTPClient() asAPI := base.AppserviceHTTPClient() - // TODO: this isn't a producer - eduProducer := producers.NewEDUServerProducer(base.EDUServerClient()) federationapi.AddPublicRoutes( base.PublicAPIMux, base.Cfg, accountDB, deviceDB, federation, keyRing, - rsAPI, asAPI, fsAPI, eduProducer, + rsAPI, asAPI, fsAPI, base.EDUServerClient(), ) base.SetupAndServeHTTP(string(base.Cfg.Bind.FederationAPI), string(base.Cfg.Listen.FederationAPI)) diff --git a/cmd/dendrite-monolith-server/main.go b/cmd/dendrite-monolith-server/main.go index dda3ade5a..195a1ac50 100644 --- a/cmd/dendrite-monolith-server/main.go +++ b/cmd/dendrite-monolith-server/main.go @@ -19,7 +19,6 @@ import ( "net/http" "github.com/matrix-org/dendrite/appservice" - "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/eduserver" "github.com/matrix-org/dendrite/eduserver/cache" "github.com/matrix-org/dendrite/federationsender" @@ -105,7 +104,6 @@ func main() { } rsComponent.SetFederationSenderAPI(fsAPI) - eduProducer := producers.NewEDUServerProducer(eduInputAPI) publicRoomsDB, err := storage.NewPublicRoomsServerDatabase(string(base.Cfg.Database.PublicRoomsAPI), base.Cfg.DbProperties(), cfg.Matrix.ServerName) if err != nil { logrus.WithError(err).Panicf("failed to connect to public rooms db") @@ -122,7 +120,6 @@ func main() { AppserviceAPI: asAPI, EDUInternalAPI: eduInputAPI, - EDUProducer: eduProducer, FederationSenderAPI: fsAPI, RoomserverAPI: rsAPI, ServerKeyAPI: serverKeyAPI, diff --git a/cmd/dendritejs/main.go b/cmd/dendritejs/main.go index 86ae33688..0512ee5c9 100644 --- a/cmd/dendritejs/main.go +++ b/cmd/dendritejs/main.go @@ -23,7 +23,6 @@ import ( "syscall/js" "github.com/matrix-org/dendrite/appservice" - "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/eduserver" "github.com/matrix-org/dendrite/eduserver/cache" "github.com/matrix-org/dendrite/federationsender" @@ -209,7 +208,6 @@ func main() { rsAPI.SetFederationSenderAPI(fedSenderAPI) p2pPublicRoomProvider := NewLibP2PPublicRoomsProvider(node, fedSenderAPI) - eduProducer := producers.NewEDUServerProducer(eduInputAPI) publicRoomsDB, err := storage.NewPublicRoomsServerDatabase(string(base.Cfg.Database.PublicRoomsAPI), cfg.Matrix.ServerName) if err != nil { logrus.WithError(err).Panicf("failed to connect to public rooms db") @@ -226,7 +224,6 @@ func main() { AppserviceAPI: asQuery, EDUInternalAPI: eduInputAPI, - EDUProducer: eduProducer, FederationSenderAPI: fedSenderAPI, RoomserverAPI: rsAPI, //ServerKeyAPI: serverKeyAPI, diff --git a/clientapi/producers/eduserver.go b/eduserver/api/wrapper.go similarity index 56% rename from clientapi/producers/eduserver.go rename to eduserver/api/wrapper.go index 102c1fad8..c2c4596de 100644 --- a/clientapi/producers/eduserver.go +++ b/eduserver/api/wrapper.go @@ -1,3 +1,5 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -10,35 +12,22 @@ // See the License for the specific language governing permissions and // limitations under the License. -package producers +package api import ( "context" "encoding/json" "time" - "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/gomatrixserverlib" ) -// EDUServerProducer produces events for the EDU server to consume -type EDUServerProducer struct { - InputAPI api.EDUServerInputAPI -} - -// NewEDUServerProducer creates a new EDUServerProducer -func NewEDUServerProducer(inputAPI api.EDUServerInputAPI) *EDUServerProducer { - return &EDUServerProducer{ - InputAPI: inputAPI, - } -} - // SendTyping sends a typing event to EDU server -func (p *EDUServerProducer) SendTyping( - ctx context.Context, userID, roomID string, +func SendTyping( + ctx context.Context, eduAPI EDUServerInputAPI, userID, roomID string, typing bool, timeoutMS int64, ) error { - requestData := api.InputTypingEvent{ + requestData := InputTypingEvent{ UserID: userID, RoomID: roomID, Typing: typing, @@ -46,24 +35,24 @@ func (p *EDUServerProducer) SendTyping( OriginServerTS: gomatrixserverlib.AsTimestamp(time.Now()), } - var response api.InputTypingEventResponse - err := p.InputAPI.InputTypingEvent( - ctx, &api.InputTypingEventRequest{InputTypingEvent: requestData}, &response, + var response InputTypingEventResponse + err := eduAPI.InputTypingEvent( + ctx, &InputTypingEventRequest{InputTypingEvent: requestData}, &response, ) return err } // SendToDevice sends a typing event to EDU server -func (p *EDUServerProducer) SendToDevice( - ctx context.Context, sender, userID, deviceID, eventType string, +func SendToDevice( + ctx context.Context, eduAPI EDUServerInputAPI, sender, userID, deviceID, eventType string, message interface{}, ) error { js, err := json.Marshal(message) if err != nil { return err } - requestData := api.InputSendToDeviceEvent{ + requestData := InputSendToDeviceEvent{ UserID: userID, DeviceID: deviceID, SendToDeviceEvent: gomatrixserverlib.SendToDeviceEvent{ @@ -72,9 +61,9 @@ func (p *EDUServerProducer) SendToDevice( Content: js, }, } - request := api.InputSendToDeviceEventRequest{ + request := InputSendToDeviceEventRequest{ InputSendToDeviceEvent: requestData, } - response := api.InputSendToDeviceEventResponse{} - return p.InputAPI.InputSendToDeviceEvent(ctx, &request, &response) + response := InputSendToDeviceEventResponse{} + return eduAPI.InputSendToDeviceEvent(ctx, &request, &response) } diff --git a/federationapi/federationapi.go b/federationapi/federationapi.go index 7aecd272b..9299b5016 100644 --- a/federationapi/federationapi.go +++ b/federationapi/federationapi.go @@ -19,12 +19,11 @@ import ( appserviceAPI "github.com/matrix-org/dendrite/appservice/api" "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" "github.com/matrix-org/dendrite/clientapi/auth/storage/devices" + eduserverAPI "github.com/matrix-org/dendrite/eduserver/api" federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api" "github.com/matrix-org/dendrite/internal/config" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" - // TODO: Are we really wanting to pull in the producer from clientapi - "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/federationapi/routing" "github.com/matrix-org/gomatrixserverlib" ) @@ -40,13 +39,12 @@ func AddPublicRoutes( rsAPI roomserverAPI.RoomserverInternalAPI, asAPI appserviceAPI.AppServiceQueryAPI, federationSenderAPI federationSenderAPI.FederationSenderInternalAPI, - eduProducer *producers.EDUServerProducer, + eduAPI eduserverAPI.EDUServerInputAPI, ) { - roomserverProducer := producers.NewRoomserverProducer(rsAPI) routing.Setup( - router, cfg, rsAPI, asAPI, roomserverProducer, - eduProducer, federationSenderAPI, *keyRing, + router, cfg, rsAPI, asAPI, + eduAPI, federationSenderAPI, *keyRing, federation, accountsDB, deviceDB, ) } diff --git a/federationapi/routing/invite.go b/federationapi/routing/invite.go index 05efe5872..908a04fcb 100644 --- a/federationapi/routing/invite.go +++ b/federationapi/routing/invite.go @@ -20,8 +20,8 @@ import ( "net/http" "github.com/matrix-org/dendrite/clientapi/jsonerror" - "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/internal/config" + "github.com/matrix-org/dendrite/roomserver/api" roomserverVersion "github.com/matrix-org/dendrite/roomserver/version" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" @@ -34,7 +34,7 @@ func Invite( roomID string, eventID string, cfg *config.Dendrite, - producer *producers.RoomserverProducer, + rsAPI api.RoomserverInternalAPI, keys gomatrixserverlib.KeyRing, ) util.JSONResponse { inviteReq := gomatrixserverlib.InviteV2Request{} @@ -98,8 +98,8 @@ func Invite( ) // Add the invite event to the roomserver. - if err = producer.SendInvite( - httpReq.Context(), + if err = api.SendInvite( + httpReq.Context(), rsAPI, signedEvent.Headered(inviteReq.RoomVersion()), inviteReq.InviteRoomState(), event.Origin(), diff --git a/federationapi/routing/join.go b/federationapi/routing/join.go index 5b4e8db3e..593aa169f 100644 --- a/federationapi/routing/join.go +++ b/federationapi/routing/join.go @@ -21,7 +21,6 @@ import ( "time" "github.com/matrix-org/dendrite/clientapi/jsonerror" - "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/roomserver/api" @@ -144,7 +143,6 @@ func SendJoin( request *gomatrixserverlib.FederationRequest, cfg *config.Dendrite, rsAPI api.RoomserverInternalAPI, - producer *producers.RoomserverProducer, keys gomatrixserverlib.KeyRing, roomID, eventID string, ) util.JSONResponse { @@ -267,8 +265,8 @@ func SendJoin( // We are responsible for notifying other servers that the user has joined // the room, so set SendAsServer to cfg.Matrix.ServerName if !alreadyJoined { - _, err = producer.SendEvents( - httpReq.Context(), + _, err = api.SendEvents( + httpReq.Context(), rsAPI, []gomatrixserverlib.HeaderedEvent{ event.Headered(stateAndAuthChainResponse.RoomVersion), }, @@ -276,7 +274,7 @@ func SendJoin( nil, ) if err != nil { - util.GetLogger(httpReq.Context()).WithError(err).Error("producer.SendEvents failed") + util.GetLogger(httpReq.Context()).WithError(err).Error("SendEvents failed") return jsonerror.InternalServerError() } } diff --git a/federationapi/routing/leave.go b/federationapi/routing/leave.go index 62ca11457..f998be45a 100644 --- a/federationapi/routing/leave.go +++ b/federationapi/routing/leave.go @@ -17,7 +17,6 @@ import ( "time" "github.com/matrix-org/dendrite/clientapi/jsonerror" - "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/roomserver/api" @@ -113,13 +112,13 @@ func SendLeave( httpReq *http.Request, request *gomatrixserverlib.FederationRequest, cfg *config.Dendrite, - producer *producers.RoomserverProducer, + rsAPI api.RoomserverInternalAPI, keys gomatrixserverlib.KeyRing, roomID, eventID string, ) util.JSONResponse { verReq := api.QueryRoomVersionForRoomRequest{RoomID: roomID} verRes := api.QueryRoomVersionForRoomResponse{} - if err := producer.RsAPI.QueryRoomVersionForRoom(httpReq.Context(), &verReq, &verRes); err != nil { + if err := rsAPI.QueryRoomVersionForRoom(httpReq.Context(), &verReq, &verRes); err != nil { return util.JSONResponse{ Code: http.StatusBadRequest, JSON: jsonerror.UnsupportedRoomVersion(err.Error()), @@ -194,8 +193,8 @@ func SendLeave( // Send the events to the room server. // We are responsible for notifying other servers that the user has left // the room, so set SendAsServer to cfg.Matrix.ServerName - _, err = producer.SendEvents( - httpReq.Context(), + _, err = api.SendEvents( + httpReq.Context(), rsAPI, []gomatrixserverlib.HeaderedEvent{ event.Headered(verRes.RoomVersion), }, diff --git a/federationapi/routing/routing.go b/federationapi/routing/routing.go index e6c6df658..754dcdfb0 100644 --- a/federationapi/routing/routing.go +++ b/federationapi/routing/routing.go @@ -21,7 +21,7 @@ import ( appserviceAPI "github.com/matrix-org/dendrite/appservice/api" "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" "github.com/matrix-org/dendrite/clientapi/auth/storage/devices" - "github.com/matrix-org/dendrite/clientapi/producers" + eduserverAPI "github.com/matrix-org/dendrite/eduserver/api" federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/config" @@ -49,8 +49,7 @@ func Setup( cfg *config.Dendrite, rsAPI roomserverAPI.RoomserverInternalAPI, asAPI appserviceAPI.AppServiceQueryAPI, - producer *producers.RoomserverProducer, - eduProducer *producers.EDUServerProducer, + eduAPI eduserverAPI.EDUServerInputAPI, fsAPI federationSenderAPI.FederationSenderInternalAPI, keys gomatrixserverlib.KeyRing, federation *gomatrixserverlib.FederationClient, @@ -82,7 +81,7 @@ func Setup( func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest, vars map[string]string) util.JSONResponse { return Send( httpReq, request, gomatrixserverlib.TransactionID(vars["txnID"]), - cfg, rsAPI, producer, eduProducer, keys, federation, + cfg, rsAPI, eduAPI, keys, federation, ) }, )).Methods(http.MethodPut, http.MethodOptions) @@ -92,14 +91,14 @@ func Setup( func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest, vars map[string]string) util.JSONResponse { return Invite( httpReq, request, vars["roomID"], vars["eventID"], - cfg, producer, keys, + cfg, rsAPI, keys, ) }, )).Methods(http.MethodPut, http.MethodOptions) v1fedmux.Handle("/3pid/onbind", internal.MakeExternalAPI("3pid_onbind", func(req *http.Request) util.JSONResponse { - return CreateInvitesFrom3PIDInvites(req, rsAPI, asAPI, cfg, producer, federation, accountDB) + return CreateInvitesFrom3PIDInvites(req, rsAPI, asAPI, cfg, federation, accountDB) }, )).Methods(http.MethodPost, http.MethodOptions) @@ -107,7 +106,7 @@ func Setup( "exchange_third_party_invite", cfg.Matrix.ServerName, keys, wakeup, func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest, vars map[string]string) util.JSONResponse { return ExchangeThirdPartyInvite( - httpReq, request, vars["roomID"], rsAPI, cfg, federation, producer, + httpReq, request, vars["roomID"], rsAPI, cfg, federation, ) }, )).Methods(http.MethodPut, http.MethodOptions) @@ -206,7 +205,7 @@ func Setup( roomID := vars["roomID"] eventID := vars["eventID"] res := SendJoin( - httpReq, request, cfg, rsAPI, producer, keys, roomID, eventID, + httpReq, request, cfg, rsAPI, keys, roomID, eventID, ) return util.JSONResponse{ Headers: res.Headers, @@ -224,7 +223,7 @@ func Setup( roomID := vars["roomID"] eventID := vars["eventID"] return SendJoin( - httpReq, request, cfg, rsAPI, producer, keys, roomID, eventID, + httpReq, request, cfg, rsAPI, keys, roomID, eventID, ) }, )).Methods(http.MethodPut) @@ -246,7 +245,7 @@ func Setup( roomID := vars["roomID"] eventID := vars["eventID"] return SendLeave( - httpReq, request, cfg, producer, keys, roomID, eventID, + httpReq, request, cfg, rsAPI, keys, roomID, eventID, ) }, )).Methods(http.MethodPut) diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index a2120d81b..a057750f6 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -21,7 +21,7 @@ import ( "net/http" "github.com/matrix-org/dendrite/clientapi/jsonerror" - "github.com/matrix-org/dendrite/clientapi/producers" + eduserverAPI "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrixserverlib" @@ -36,20 +36,18 @@ func Send( txnID gomatrixserverlib.TransactionID, cfg *config.Dendrite, rsAPI api.RoomserverInternalAPI, - producer *producers.RoomserverProducer, - eduProducer *producers.EDUServerProducer, + eduAPI eduserverAPI.EDUServerInputAPI, keys gomatrixserverlib.KeyRing, federation *gomatrixserverlib.FederationClient, ) util.JSONResponse { t := txnReq{ - context: httpReq.Context(), - rsAPI: rsAPI, - producer: producer, - eduProducer: eduProducer, - keys: keys, - federation: federation, - haveEvents: make(map[string]*gomatrixserverlib.HeaderedEvent), - newEvents: make(map[string]bool), + context: httpReq.Context(), + rsAPI: rsAPI, + eduAPI: eduAPI, + keys: keys, + federation: federation, + haveEvents: make(map[string]*gomatrixserverlib.HeaderedEvent), + newEvents: make(map[string]bool), } var txnEvents struct { @@ -91,12 +89,11 @@ func Send( type txnReq struct { gomatrixserverlib.Transaction - context context.Context - rsAPI api.RoomserverInternalAPI - producer *producers.RoomserverProducer - eduProducer *producers.EDUServerProducer - keys gomatrixserverlib.JSONVerifier - federation txnFederationClient + context context.Context + rsAPI api.RoomserverInternalAPI + eduAPI eduserverAPI.EDUServerInputAPI + keys gomatrixserverlib.JSONVerifier + federation txnFederationClient // local cache of events for auth checks, etc - this may include events // which the roomserver is unaware of. haveEvents map[string]*gomatrixserverlib.HeaderedEvent @@ -262,7 +259,7 @@ func (t *txnReq) processEDUs(edus []gomatrixserverlib.EDU) { util.GetLogger(t.context).WithError(err).Error("Failed to unmarshal typing event") continue } - if err := t.eduProducer.SendTyping(t.context, typingPayload.UserID, typingPayload.RoomID, typingPayload.Typing, 30*1000); err != nil { + if err := eduserverAPI.SendTyping(t.context, t.eduAPI, typingPayload.UserID, typingPayload.RoomID, typingPayload.Typing, 30*1000); err != nil { util.GetLogger(t.context).WithError(err).Error("Failed to send typing event to edu server") } case gomatrixserverlib.MDirectToDevice: @@ -275,7 +272,7 @@ func (t *txnReq) processEDUs(edus []gomatrixserverlib.EDU) { for userID, byUser := range directPayload.Messages { for deviceID, message := range byUser { // TODO: check that the user and the device actually exist here - if err := t.eduProducer.SendToDevice(t.context, directPayload.Sender, userID, deviceID, directPayload.Type, message); err != nil { + if err := eduserverAPI.SendToDevice(t.context, t.eduAPI, directPayload.Sender, userID, deviceID, directPayload.Type, message); err != nil { util.GetLogger(t.context).WithError(err).WithFields(logrus.Fields{ "sender": directPayload.Sender, "user_id": userID, @@ -325,8 +322,8 @@ func (t *txnReq) processEvent(e gomatrixserverlib.Event, isInboundTxn bool) erro } // pass the event to the roomserver - _, err := t.producer.SendEvents( - t.context, + _, err := api.SendEvents( + t.context, t.rsAPI, []gomatrixserverlib.HeaderedEvent{ e.Headered(stateResp.RoomVersion), }, @@ -408,7 +405,7 @@ func (t *txnReq) processEventWithMissingState(e gomatrixserverlib.Event, roomVer // pass the event along with the state to the roomserver using a background context so we don't // needlessly expire - return t.producer.SendEventWithState(context.Background(), resolvedState, e.Headered(roomVersion), t.haveEventIDs()) + return api.SendEventWithState(context.Background(), t.rsAPI, resolvedState, e.Headered(roomVersion), t.haveEventIDs()) } // lookupStateAfterEvent returns the room state after `eventID`, which is the state before eventID with the state of `eventID` (if it's a state event) diff --git a/federationapi/routing/send_test.go b/federationapi/routing/send_test.go index 3e28a3476..b81f1c003 100644 --- a/federationapi/routing/send_test.go +++ b/federationapi/routing/send_test.go @@ -8,7 +8,6 @@ import ( "testing" "time" - "github.com/matrix-org/dendrite/clientapi/producers" eduAPI "github.com/matrix-org/dendrite/eduserver/api" fsAPI "github.com/matrix-org/dendrite/federationsender/api" "github.com/matrix-org/dendrite/roomserver/api" @@ -339,14 +338,13 @@ func (c *txnFedClient) LookupMissingEvents(ctx context.Context, s gomatrixserver func mustCreateTransaction(rsAPI api.RoomserverInternalAPI, fedClient txnFederationClient, pdus []json.RawMessage) *txnReq { t := &txnReq{ - context: context.Background(), - rsAPI: rsAPI, - producer: producers.NewRoomserverProducer(rsAPI), - eduProducer: producers.NewEDUServerProducer(&testEDUProducer{}), - keys: &testNopJSONVerifier{}, - federation: fedClient, - haveEvents: make(map[string]*gomatrixserverlib.HeaderedEvent), - newEvents: make(map[string]bool), + context: context.Background(), + rsAPI: rsAPI, + eduAPI: &testEDUProducer{}, + keys: &testNopJSONVerifier{}, + federation: fedClient, + haveEvents: make(map[string]*gomatrixserverlib.HeaderedEvent), + newEvents: make(map[string]bool), } t.PDUs = pdus t.Origin = testOrigin diff --git a/federationapi/routing/threepid.go b/federationapi/routing/threepid.go index 8053cedde..8f3193870 100644 --- a/federationapi/routing/threepid.go +++ b/federationapi/routing/threepid.go @@ -25,7 +25,6 @@ import ( "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" "github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/jsonerror" - "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/roomserver/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" @@ -60,7 +59,7 @@ var ( func CreateInvitesFrom3PIDInvites( req *http.Request, rsAPI roomserverAPI.RoomserverInternalAPI, asAPI appserviceAPI.AppServiceQueryAPI, cfg *config.Dendrite, - producer *producers.RoomserverProducer, federation *gomatrixserverlib.FederationClient, + federation *gomatrixserverlib.FederationClient, accountDB accounts.Database, ) util.JSONResponse { var body invites @@ -92,8 +91,8 @@ func CreateInvitesFrom3PIDInvites( } // Send all the events - if _, err := producer.SendEvents(req.Context(), evs, cfg.Matrix.ServerName, nil); err != nil { - util.GetLogger(req.Context()).WithError(err).Error("producer.SendEvents failed") + if _, err := api.SendEvents(req.Context(), rsAPI, evs, cfg.Matrix.ServerName, nil); err != nil { + util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed") return jsonerror.InternalServerError() } @@ -111,7 +110,6 @@ func ExchangeThirdPartyInvite( rsAPI roomserverAPI.RoomserverInternalAPI, cfg *config.Dendrite, federation *gomatrixserverlib.FederationClient, - producer *producers.RoomserverProducer, ) util.JSONResponse { var builder gomatrixserverlib.EventBuilder if err := json.Unmarshal(request.Content(), &builder); err != nil { @@ -176,15 +174,15 @@ func ExchangeThirdPartyInvite( } // Send the event to the roomserver - if _, err = producer.SendEvents( - httpReq.Context(), + if _, err = api.SendEvents( + httpReq.Context(), rsAPI, []gomatrixserverlib.HeaderedEvent{ signedEvent.Event.Headered(verRes.RoomVersion), }, cfg.Matrix.ServerName, nil, ); err != nil { - util.GetLogger(httpReq.Context()).WithError(err).Error("producer.SendEvents failed") + util.GetLogger(httpReq.Context()).WithError(err).Error("SendEvents failed") return jsonerror.InternalServerError() } diff --git a/internal/setup/monolith.go b/internal/setup/monolith.go index d9bb28395..35fcd3119 100644 --- a/internal/setup/monolith.go +++ b/internal/setup/monolith.go @@ -7,7 +7,6 @@ import ( "github.com/matrix-org/dendrite/clientapi" "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" "github.com/matrix-org/dendrite/clientapi/auth/storage/devices" - "github.com/matrix-org/dendrite/clientapi/producers" eduServerAPI "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/federationapi" federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api" @@ -41,8 +40,6 @@ type Monolith struct { RoomserverAPI roomserverAPI.RoomserverInternalAPI ServerKeyAPI serverKeyAPI.ServerKeyInternalAPI - // TODO: remove, this isn't even a producer - EDUProducer *producers.EDUServerProducer // TODO: can we remove this? It's weird that we are required the database // yet every other component can do that on its own. libp2p-demo uses a custom // database though annoyingly. @@ -65,7 +62,7 @@ func (m *Monolith) AddAllPublicRoutes(publicMux *mux.Router) { federationapi.AddPublicRoutes( publicMux, m.Config, m.AccountDB, m.DeviceDB, m.FedClient, m.KeyRing, m.RoomserverAPI, m.AppserviceAPI, m.FederationSenderAPI, - m.EDUProducer, + m.EDUInternalAPI, ) mediaapi.AddPublicRoutes(publicMux, m.Config, m.DeviceDB) publicroomsapi.AddPublicRoutes( diff --git a/clientapi/producers/roomserver.go b/roomserver/api/wrapper.go similarity index 50% rename from clientapi/producers/roomserver.go rename to roomserver/api/wrapper.go index f0733db9c..97940e0c2 100644 --- a/clientapi/producers/roomserver.go +++ b/roomserver/api/wrapper.go @@ -1,4 +1,4 @@ -// Copyright 2017 Vector Creations Ltd +// Copyright 2020 The Matrix.org Foundation C.I.C. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -12,63 +12,51 @@ // See the License for the specific language governing permissions and // limitations under the License. -package producers +package api import ( "context" - "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrixserverlib" ) -// RoomserverProducer produces events for the roomserver to consume. -type RoomserverProducer struct { - RsAPI api.RoomserverInternalAPI -} - -// NewRoomserverProducer creates a new RoomserverProducer -func NewRoomserverProducer(rsAPI api.RoomserverInternalAPI) *RoomserverProducer { - return &RoomserverProducer{ - RsAPI: rsAPI, - } -} - -// SendEvents writes the given events to the roomserver input log. The events are written with KindNew. -func (c *RoomserverProducer) SendEvents( - ctx context.Context, events []gomatrixserverlib.HeaderedEvent, sendAsServer gomatrixserverlib.ServerName, - txnID *api.TransactionID, +// SendEvents to the roomserver The events are written with KindNew. +func SendEvents( + ctx context.Context, rsAPI RoomserverInternalAPI, events []gomatrixserverlib.HeaderedEvent, + sendAsServer gomatrixserverlib.ServerName, txnID *TransactionID, ) (string, error) { - ires := make([]api.InputRoomEvent, len(events)) + ires := make([]InputRoomEvent, len(events)) for i, event := range events { - ires[i] = api.InputRoomEvent{ - Kind: api.KindNew, + ires[i] = InputRoomEvent{ + Kind: KindNew, Event: event, AuthEventIDs: event.AuthEventIDs(), SendAsServer: string(sendAsServer), TransactionID: txnID, } } - return c.SendInputRoomEvents(ctx, ires) + return SendInputRoomEvents(ctx, rsAPI, ires) } -// SendEventWithState writes an event with KindNew to the roomserver input log +// SendEventWithState writes an event with KindNew to the roomserver // with the state at the event as KindOutlier before it. Will not send any event that is // marked as `true` in haveEventIDs -func (c *RoomserverProducer) SendEventWithState( - ctx context.Context, state *gomatrixserverlib.RespState, event gomatrixserverlib.HeaderedEvent, haveEventIDs map[string]bool, +func SendEventWithState( + ctx context.Context, rsAPI RoomserverInternalAPI, state *gomatrixserverlib.RespState, + event gomatrixserverlib.HeaderedEvent, haveEventIDs map[string]bool, ) error { outliers, err := state.Events() if err != nil { return err } - var ires []api.InputRoomEvent + var ires []InputRoomEvent for _, outlier := range outliers { if haveEventIDs[outlier.EventID()] { continue } - ires = append(ires, api.InputRoomEvent{ - Kind: api.KindOutlier, + ires = append(ires, InputRoomEvent{ + Kind: KindOutlier, Event: outlier.Headered(event.RoomVersion), AuthEventIDs: outlier.AuthEventIDs(), }) @@ -79,39 +67,40 @@ func (c *RoomserverProducer) SendEventWithState( stateEventIDs[i] = state.StateEvents[i].EventID() } - ires = append(ires, api.InputRoomEvent{ - Kind: api.KindNew, + ires = append(ires, InputRoomEvent{ + Kind: KindNew, Event: event, AuthEventIDs: event.AuthEventIDs(), HasState: true, StateEventIDs: stateEventIDs, }) - _, err = c.SendInputRoomEvents(ctx, ires) + _, err = SendInputRoomEvents(ctx, rsAPI, ires) return err } -// SendInputRoomEvents writes the given input room events to the roomserver input API. -func (c *RoomserverProducer) SendInputRoomEvents( - ctx context.Context, ires []api.InputRoomEvent, +// SendInputRoomEvents to the roomserver. +func SendInputRoomEvents( + ctx context.Context, rsAPI RoomserverInternalAPI, ires []InputRoomEvent, ) (eventID string, err error) { - request := api.InputRoomEventsRequest{InputRoomEvents: ires} - var response api.InputRoomEventsResponse - err = c.RsAPI.InputRoomEvents(ctx, &request, &response) + request := InputRoomEventsRequest{InputRoomEvents: ires} + var response InputRoomEventsResponse + err = rsAPI.InputRoomEvents(ctx, &request, &response) eventID = response.EventID return } -// SendInvite writes the invite event to the roomserver input API. +// SendInvite event to the roomserver. // This should only be needed for invite events that occur outside of a known room. // If we are in the room then the event should be sent using the SendEvents method. -func (c *RoomserverProducer) SendInvite( - ctx context.Context, inviteEvent gomatrixserverlib.HeaderedEvent, +func SendInvite( + ctx context.Context, + rsAPI RoomserverInternalAPI, inviteEvent gomatrixserverlib.HeaderedEvent, inviteRoomState []gomatrixserverlib.InviteV2StrippedState, - sendAsServer gomatrixserverlib.ServerName, txnID *api.TransactionID, + sendAsServer gomatrixserverlib.ServerName, txnID *TransactionID, ) error { - request := api.InputRoomEventsRequest{ - InputInviteEvents: []api.InputInviteEvent{{ + request := InputRoomEventsRequest{ + InputInviteEvents: []InputInviteEvent{{ Event: inviteEvent, InviteRoomState: inviteRoomState, RoomVersion: inviteEvent.RoomVersion, @@ -119,6 +108,6 @@ func (c *RoomserverProducer) SendInvite( TransactionID: txnID, }}, } - var response api.InputRoomEventsResponse - return c.RsAPI.InputRoomEvents(ctx, &request, &response) + var response InputRoomEventsResponse + return rsAPI.InputRoomEvents(ctx, &request, &response) }