From 3ab8ebf6b8fbc813bfb3e0e0735e76a69a8ed2dd Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Fri, 24 Apr 2020 16:30:25 +0100 Subject: [PATCH 1/3] More invite support (#979) * Update gomatixserverlib * Try to build invite stripped state if not given to us * SendInvite improvements * Transpose invite_room_state into invite_state.events for sync API * Remove syncapi debugging output * Use RespInviteV2 * Update gomatrixserverlib * Send the invite event as a normal roomserver event too, for incorporating into room (should this be done by the roomserver automatically for invite inputs?) * Federation sender use invite_room_state, room server try to insert membership state * Check supported room versions on the invite endpoint * Prevent roomserver query API from trying to handle requests for stub rooms * Adding a nolint * Replace IsRoomStub with RoomNIDExcludingStubs, fix query API to use that instead * Review comments --- clientapi/producers/roomserver.go | 3 ++ clientapi/routing/membership.go | 42 ++++++++++----- federationapi/routing/invite.go | 16 +++++- federationsender/consumers/roomserver.go | 48 +++-------------- go.mod | 3 +- go.sum | 4 +- roomserver/api/input.go | 2 + roomserver/input/events.go | 66 +++++++++++++++++++++++- roomserver/input/membership.go | 2 +- roomserver/query/query.go | 6 +-- roomserver/storage/interface.go | 4 ++ roomserver/storage/postgres/storage.go | 17 ++++++ roomserver/storage/sqlite3/storage.go | 17 ++++++ syncapi/storage/postgres/syncserver.go | 6 +-- syncapi/storage/sqlite3/syncserver.go | 6 +-- syncapi/types/types.go | 10 ++-- 16 files changed, 175 insertions(+), 77 deletions(-) diff --git a/clientapi/producers/roomserver.go b/clientapi/producers/roomserver.go index 391ea07bf..fac1e3c7c 100644 --- a/clientapi/producers/roomserver.go +++ b/clientapi/producers/roomserver.go @@ -106,12 +106,15 @@ func (c *RoomserverProducer) SendInputRoomEvents( func (c *RoomserverProducer) SendInvite( ctx context.Context, inviteEvent gomatrixserverlib.HeaderedEvent, inviteRoomState []gomatrixserverlib.InviteV2StrippedState, + sendAsServer gomatrixserverlib.ServerName, txnID *api.TransactionID, ) error { request := api.InputRoomEventsRequest{ InputInviteEvents: []api.InputInviteEvent{{ Event: inviteEvent, InviteRoomState: inviteRoomState, RoomVersion: inviteEvent.RoomVersion, + SendAsServer: string(sendAsServer), + TransactionID: txnID, }}, } var response api.InputRoomEventsResponse diff --git a/clientapi/routing/membership.go b/clientapi/routing/membership.go index 9f386b718..c597dd27d 100644 --- a/clientapi/routing/membership.go +++ b/clientapi/routing/membership.go @@ -40,6 +40,8 @@ var errMissingUserID = errors.New("'user_id' must be supplied") // SendMembership implements PUT /rooms/{roomID}/(join|kick|ban|unban|leave|invite) // by building a m.room.member event then sending it to the room server +// TODO: Can we improve the cyclo count here? Separate code paths for invites? +// nolint:gocyclo func SendMembership( req *http.Request, accountDB accounts.Database, device *authtypes.Device, roomID string, membership string, cfg *config.Dendrite, @@ -104,23 +106,39 @@ func SendMembership( return jsonerror.InternalServerError() } - if _, err := producer.SendEvents( - req.Context(), - []gomatrixserverlib.HeaderedEvent{(*event).Headered(verRes.RoomVersion)}, - cfg.Matrix.ServerName, - nil, - ); err != nil { - util.GetLogger(req.Context()).WithError(err).Error("producer.SendEvents failed") - return jsonerror.InternalServerError() - } - var returnData interface{} = struct{}{} - // The join membership requires the room id to be sent in the response - if membership == gomatrixserverlib.Join { + switch membership { + case gomatrixserverlib.Invite: + // Invites need to be handled specially + err = producer.SendInvite( + req.Context(), + event.Headered(verRes.RoomVersion), + nil, // ask the roomserver to draw up invite room state for us + cfg.Matrix.ServerName, + nil, + ) + if err != nil { + util.GetLogger(req.Context()).WithError(err).Error("producer.SendInvite failed") + return jsonerror.InternalServerError() + } + case gomatrixserverlib.Join: + // The join membership requires the room id to be sent in the response returnData = struct { RoomID string `json:"room_id"` }{roomID} + default: + } + + _, err = producer.SendEvents( + req.Context(), + []gomatrixserverlib.HeaderedEvent{event.Headered(verRes.RoomVersion)}, + cfg.Matrix.ServerName, + nil, + ) + if err != nil { + util.GetLogger(req.Context()).WithError(err).Error("producer.SendEvents failed") + return jsonerror.InternalServerError() } return util.JSONResponse{ diff --git a/federationapi/routing/invite.go b/federationapi/routing/invite.go index 4b367e004..064abe7e9 100644 --- a/federationapi/routing/invite.go +++ b/federationapi/routing/invite.go @@ -16,11 +16,13 @@ package routing import ( "encoding/json" + "fmt" "net/http" "github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/common/config" + roomserverVersion "github.com/matrix-org/dendrite/roomserver/version" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" ) @@ -44,6 +46,16 @@ func Invite( } event := inviteReq.Event() + // Check that we can accept invites for this room version. + if _, err := roomserverVersion.SupportedRoomVersion(inviteReq.RoomVersion()); err != nil { + return util.JSONResponse{ + Code: http.StatusBadRequest, + JSON: jsonerror.UnsupportedRoomVersion( + fmt.Sprintf("Room version %q is not supported by this server.", inviteReq.RoomVersion()), + ), + } + } + // Check that the room ID is correct. if event.RoomID() != roomID { return util.JSONResponse{ @@ -90,6 +102,8 @@ func Invite( httpReq.Context(), signedEvent.Headered(inviteReq.RoomVersion()), inviteReq.InviteRoomState(), + event.Origin(), + nil, ); err != nil { util.GetLogger(httpReq.Context()).WithError(err).Error("producer.SendInvite failed") return jsonerror.InternalServerError() @@ -99,6 +113,6 @@ func Invite( // the other servers in the room that we have been invited. return util.JSONResponse{ Code: http.StatusOK, - JSON: gomatrixserverlib.RespInvite{Event: signedEvent}, + JSON: gomatrixserverlib.RespInviteV2{Event: signedEvent}, } } diff --git a/federationsender/consumers/roomserver.go b/federationsender/consumers/roomserver.go index a36fb3792..18c8324b4 100644 --- a/federationsender/consumers/roomserver.go +++ b/federationsender/consumers/roomserver.go @@ -28,6 +28,7 @@ import ( "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrixserverlib" log "github.com/sirupsen/logrus" + "github.com/tidwall/gjson" ) // OutputRoomEventConsumer consumes events that originated in the room server. @@ -187,49 +188,12 @@ func (s *OutputRoomEventConsumer) processInvite(oie api.OutputNewInviteEvent) er return nil } - // When sending a v2 invite, the inviting server should try and include - // a "stripped down" version of the room state. This is pretty much just - // enough information for the remote side to show something useful to the - // user, like the room name, aliases etc. + // Try to extract the room invite state. The roomserver will have stashed + // this for us in invite_room_state if it didn't already exist. strippedState := []gomatrixserverlib.InviteV2StrippedState{} - stateWanted := []string{ - gomatrixserverlib.MRoomName, gomatrixserverlib.MRoomCanonicalAlias, - gomatrixserverlib.MRoomAliases, gomatrixserverlib.MRoomJoinRules, - } - - // For each of the state keys that we want to try and send, ask the - // roomserver if we have a state event for that room that matches the - // state key. - for _, wanted := range stateWanted { - queryReq := api.QueryLatestEventsAndStateRequest{ - RoomID: oie.Event.RoomID(), - StateToFetch: []gomatrixserverlib.StateKeyTuple{ - gomatrixserverlib.StateKeyTuple{ - EventType: wanted, - StateKey: "", - }, - }, - } - // If this fails then we just move onto the next event - we don't - // actually know at this point whether the room even has that type - // of state. - queryRes := api.QueryLatestEventsAndStateResponse{} - if err := s.query.QueryLatestEventsAndState(context.TODO(), &queryReq, &queryRes); err != nil { - log.WithFields(log.Fields{ - "room_id": queryReq.RoomID, - "event_type": wanted, - }).WithError(err).Info("couldn't find state to strip") - continue - } - // Append the stripped down copy of the state to our list. - for _, headeredEvent := range queryRes.StateEvents { - event := headeredEvent.Unwrap() - strippedState = append(strippedState, gomatrixserverlib.NewInviteV2StrippedState(&event)) - - log.WithFields(log.Fields{ - "room_id": queryReq.RoomID, - "event_type": event.Type(), - }).Info("adding stripped state") + if inviteRoomState := gjson.GetBytes(oie.Event.Unsigned(), "invite_room_state"); inviteRoomState.Exists() { + if err := json.Unmarshal([]byte(inviteRoomState.Raw), &strippedState); err != nil { + log.WithError(err).Warn("failed to extract invite_room_state from event unsigned") } } diff --git a/go.mod b/go.mod index daf232781..37a67fcdd 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( github.com/matrix-org/go-http-js-libp2p v0.0.0-20200318135427-31631a9ef51f github.com/matrix-org/go-sqlite3-js v0.0.0-20200325174927-327088cdef10 github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26 - github.com/matrix-org/gomatrixserverlib v0.0.0-20200422082552-d7b4202c47f3 + github.com/matrix-org/gomatrixserverlib v0.0.0-20200424101831-2f10e8068538 github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7 github.com/mattn/go-sqlite3 v2.0.2+incompatible @@ -27,6 +27,7 @@ require ( github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.4.1 github.com/sirupsen/logrus v1.4.2 + github.com/tidwall/gjson v1.6.0 github.com/uber/jaeger-client-go v2.15.0+incompatible github.com/uber/jaeger-lib v1.5.0 go.uber.org/atomic v1.4.0 diff --git a/go.sum b/go.sum index c8f7e43fe..517a77b3d 100644 --- a/go.sum +++ b/go.sum @@ -367,8 +367,8 @@ github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26 h1:Hr3zjRsq2bh github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0= github.com/matrix-org/gomatrixserverlib v0.0.0-20200124100636-0c2ec91d1df5 h1:kmRjpmFOenVpOaV/DRlo9p6z/IbOKlUC+hhKsAAh8Qg= github.com/matrix-org/gomatrixserverlib v0.0.0-20200124100636-0c2ec91d1df5/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI= -github.com/matrix-org/gomatrixserverlib v0.0.0-20200422082552-d7b4202c47f3 h1:xis1ojN99vjygwqudzB9VQq3cM2SJ7aCAMlXj/YN+88= -github.com/matrix-org/gomatrixserverlib v0.0.0-20200422082552-d7b4202c47f3/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU= +github.com/matrix-org/gomatrixserverlib v0.0.0-20200424101831-2f10e8068538 h1:kj2LdNOdg2+vydS9HrPdbECEVeusRg9VTSOkYm61reA= +github.com/matrix-org/gomatrixserverlib v0.0.0-20200424101831-2f10e8068538/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU= github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1 h1:osLoFdOy+ChQqVUn2PeTDETFftVkl4w9t/OW18g3lnk= github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1/go.mod h1:cXoYQIENbdWIQHt1SyCo6Bl3C3raHwJ0wgVrXHSqf+A= github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f h1:pRz4VTiRCO4zPlEMc3ESdUOcW4PXHH4Kj+YDz1XyE+Y= diff --git a/roomserver/api/input.go b/roomserver/api/input.go index 87e3983e3..bb4e040de 100644 --- a/roomserver/api/input.go +++ b/roomserver/api/input.go @@ -89,6 +89,8 @@ type InputInviteEvent struct { RoomVersion gomatrixserverlib.RoomVersion `json:"room_version"` Event gomatrixserverlib.HeaderedEvent `json:"event"` InviteRoomState []gomatrixserverlib.InviteV2StrippedState `json:"invite_room_state"` + SendAsServer string `json:"send_as_server"` + TransactionID *TransactionID `json:"transaction_id"` } // InputRoomEventsRequest is a request to InputRoomEvents diff --git a/roomserver/input/events.go b/roomserver/input/events.go index 393c1f419..205035d98 100644 --- a/roomserver/input/events.go +++ b/roomserver/input/events.go @@ -196,8 +196,23 @@ func processInviteEvent( event := input.Event.Unwrap() - if err = event.SetUnsignedField("invite_room_state", input.InviteRoomState); err != nil { - return err + if len(input.InviteRoomState) > 0 { + // If we were supplied with some invite room state already (which is + // most likely to be if the event came in over federation) then use + // that. + if err = event.SetUnsignedField("invite_room_state", input.InviteRoomState); err != nil { + return err + } + } else { + // There's no invite room state, so let's have a go at building it + // up from local data (which is most likely to be if the event came + // from the CS API). If we know about the room then we can insert + // the invite room state, if we don't then we just fail quietly. + if irs, ierr := buildInviteStrippedState(ctx, db, input); ierr == nil { + if err = event.SetUnsignedField("invite_room_state", irs); err != nil { + return err + } + } } outputUpdates, err := updateToInviteMembership(updater, &event, nil, input.Event.RoomVersion) @@ -212,3 +227,50 @@ func processInviteEvent( succeeded = true return nil } + +func buildInviteStrippedState( + ctx context.Context, + db storage.Database, + input api.InputInviteEvent, +) ([]gomatrixserverlib.InviteV2StrippedState, error) { + roomNID, err := db.RoomNID(ctx, input.Event.RoomID()) + if err != nil || roomNID == 0 { + return nil, fmt.Errorf("room %q unknown", input.Event.RoomID()) + } + stateWanted := []gomatrixserverlib.StateKeyTuple{} + for _, t := range []string{ + gomatrixserverlib.MRoomName, gomatrixserverlib.MRoomCanonicalAlias, + gomatrixserverlib.MRoomAliases, gomatrixserverlib.MRoomJoinRules, + } { + stateWanted = append(stateWanted, gomatrixserverlib.StateKeyTuple{ + EventType: t, + StateKey: "", + }) + } + _, currentStateSnapshotNID, _, err := db.LatestEventIDs(ctx, roomNID) + if err != nil { + return nil, err + } + roomState := state.NewStateResolution(db) + stateEntries, err := roomState.LoadStateAtSnapshotForStringTuples( + ctx, currentStateSnapshotNID, stateWanted, + ) + if err != nil { + return nil, err + } + stateNIDs := []types.EventNID{} + for _, stateNID := range stateEntries { + stateNIDs = append(stateNIDs, stateNID.EventNID) + } + stateEvents, err := db.Events(ctx, stateNIDs) + if err != nil { + return nil, err + } + inviteState := []gomatrixserverlib.InviteV2StrippedState{ + gomatrixserverlib.NewInviteV2StrippedState(&input.Event.Event), + } + for _, event := range stateEvents { + inviteState = append(inviteState, gomatrixserverlib.NewInviteV2StrippedState(&event.Event)) + } + return inviteState, nil +} diff --git a/roomserver/input/membership.go b/roomserver/input/membership.go index 8629cb238..351e63d61 100644 --- a/roomserver/input/membership.go +++ b/roomserver/input/membership.go @@ -144,7 +144,7 @@ func updateToInviteMembership( // consider a single stream of events when determining whether a user // is invited, rather than having to combine multiple streams themselves. onie := api.OutputNewInviteEvent{ - Event: (*add).Headered(roomVersion), + Event: add.Headered(roomVersion), RoomVersion: roomVersion, } updates = append(updates, api.OutputEvent{ diff --git a/roomserver/query/query.go b/roomserver/query/query.go index e9286b4e6..7508d7902 100644 --- a/roomserver/query/query.go +++ b/roomserver/query/query.go @@ -54,7 +54,7 @@ func (r *RoomserverQueryAPI) QueryLatestEventsAndState( roomState := state.NewStateResolution(r.DB) response.QueryLatestEventsAndStateRequest = *request - roomNID, err := r.DB.RoomNID(ctx, request.RoomID) + roomNID, err := r.DB.RoomNIDExcludingStubs(ctx, request.RoomID) if err != nil { return err } @@ -114,7 +114,7 @@ func (r *RoomserverQueryAPI) QueryStateAfterEvents( roomState := state.NewStateResolution(r.DB) response.QueryStateAfterEventsRequest = *request - roomNID, err := r.DB.RoomNID(ctx, request.RoomID) + roomNID, err := r.DB.RoomNIDExcludingStubs(ctx, request.RoomID) if err != nil { return err } @@ -649,7 +649,7 @@ func (r *RoomserverQueryAPI) QueryStateAndAuthChain( response *api.QueryStateAndAuthChainResponse, ) error { response.QueryStateAndAuthChainRequest = *request - roomNID, err := r.DB.RoomNID(ctx, request.RoomID) + roomNID, err := r.DB.RoomNIDExcludingStubs(ctx, request.RoomID) if err != nil { return err } diff --git a/roomserver/storage/interface.go b/roomserver/storage/interface.go index 0235e51ed..a13c44d6e 100644 --- a/roomserver/storage/interface.go +++ b/roomserver/storage/interface.go @@ -71,6 +71,10 @@ type Database interface { GetLatestEventsForUpdate(ctx context.Context, roomNID types.RoomNID) (types.RoomRecentEventsUpdater, error) GetTransactionEventID(ctx context.Context, transactionID string, sessionID int64, userID string) (string, error) RoomNID(ctx context.Context, roomID string) (types.RoomNID, error) + // RoomNIDExcludingStubs is a special variation of RoomNID that will return 0 as if the room + // does not exist if the room has no latest events. This can happen when we've received an + // invite over federation for a room that we don't know anything else about yet. + RoomNIDExcludingStubs(ctx context.Context, roomID string) (types.RoomNID, error) LatestEventIDs(ctx context.Context, roomNID types.RoomNID) ([]gomatrixserverlib.EventReference, types.StateSnapshotNID, int64, error) GetInvitesForUser(ctx context.Context, roomNID types.RoomNID, targetUserNID types.EventStateKeyNID) (senderUserIDs []types.EventStateKeyNID, err error) SetRoomAlias(ctx context.Context, alias string, roomID string, creatorUserID string) error diff --git a/roomserver/storage/postgres/storage.go b/roomserver/storage/postgres/storage.go index 6f2b96610..5b5c61b0f 100644 --- a/roomserver/storage/postgres/storage.go +++ b/roomserver/storage/postgres/storage.go @@ -471,6 +471,23 @@ func (d *Database) RoomNID(ctx context.Context, roomID string) (types.RoomNID, e return roomNID, err } +// RoomNIDExcludingStubs implements query.RoomserverQueryAPIDB +func (d *Database) RoomNIDExcludingStubs(ctx context.Context, roomID string) (roomNID types.RoomNID, err error) { + roomNID, err = d.RoomNID(ctx, roomID) + if err != nil { + return + } + latestEvents, _, err := d.statements.selectLatestEventNIDs(ctx, roomNID) + if err != nil { + return + } + if len(latestEvents) == 0 { + roomNID = 0 + return + } + return +} + // LatestEventIDs implements query.RoomserverQueryAPIDatabase func (d *Database) LatestEventIDs( ctx context.Context, roomNID types.RoomNID, diff --git a/roomserver/storage/sqlite3/storage.go b/roomserver/storage/sqlite3/storage.go index 444a8fdd5..5df9c4e08 100644 --- a/roomserver/storage/sqlite3/storage.go +++ b/roomserver/storage/sqlite3/storage.go @@ -590,6 +590,23 @@ func (d *Database) RoomNID(ctx context.Context, roomID string) (roomNID types.Ro return } +// RoomNIDExcludingStubs implements query.RoomserverQueryAPIDB +func (d *Database) RoomNIDExcludingStubs(ctx context.Context, roomID string) (roomNID types.RoomNID, err error) { + roomNID, err = d.RoomNID(ctx, roomID) + if err != nil { + return + } + latestEvents, _, err := d.statements.selectLatestEventNIDs(ctx, nil, roomNID) + if err != nil { + return + } + if len(latestEvents) == 0 { + roomNID = 0 + return + } + return +} + // LatestEventIDs implements query.RoomserverQueryAPIDatabase func (d *Database) LatestEventIDs( ctx context.Context, roomNID types.RoomNID, diff --git a/syncapi/storage/postgres/syncserver.go b/syncapi/storage/postgres/syncserver.go index 7fd75f066..1e078ef46 100644 --- a/syncapi/storage/postgres/syncserver.go +++ b/syncapi/storage/postgres/syncserver.go @@ -752,11 +752,7 @@ func (d *SyncServerDatasource) addInvitesToResponse( return err } for roomID, inviteEvent := range invites { - ir := types.NewInviteResponse() - ir.InviteState.Events = gomatrixserverlib.ToClientEvents( - []gomatrixserverlib.Event{inviteEvent.Event}, gomatrixserverlib.FormatSync, - ) - // TODO: add the invite state from the invite event. + ir := types.NewInviteResponse(inviteEvent) res.Rooms.Invite[roomID] = *ir } return nil diff --git a/syncapi/storage/sqlite3/syncserver.go b/syncapi/storage/sqlite3/syncserver.go index 29051cd06..cdfd29b84 100644 --- a/syncapi/storage/sqlite3/syncserver.go +++ b/syncapi/storage/sqlite3/syncserver.go @@ -799,11 +799,7 @@ func (d *SyncServerDatasource) addInvitesToResponse( return err } for roomID, inviteEvent := range invites { - ir := types.NewInviteResponse() - ir.InviteState.Events = gomatrixserverlib.HeaderedToClientEvents( - []gomatrixserverlib.HeaderedEvent{inviteEvent}, gomatrixserverlib.FormatSync, - ) - // TODO: add the invite state from the invite event. + ir := types.NewInviteResponse(inviteEvent) res.Rooms.Invite[roomID] = *ir } return nil diff --git a/syncapi/types/types.go b/syncapi/types/types.go index 718906ecd..cfd49ff1f 100644 --- a/syncapi/types/types.go +++ b/syncapi/types/types.go @@ -23,6 +23,7 @@ import ( "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrixserverlib" + "github.com/tidwall/gjson" ) var ( @@ -247,14 +248,17 @@ func NewJoinResponse() *JoinResponse { // InviteResponse represents a /sync response for a room which is under the 'invite' key. type InviteResponse struct { InviteState struct { - Events []gomatrixserverlib.ClientEvent `json:"events"` + Events json.RawMessage `json:"events"` } `json:"invite_state"` } // NewInviteResponse creates an empty response with initialised arrays. -func NewInviteResponse() *InviteResponse { +func NewInviteResponse(event gomatrixserverlib.HeaderedEvent) *InviteResponse { res := InviteResponse{} - res.InviteState.Events = make([]gomatrixserverlib.ClientEvent, 0) + res.InviteState.Events = json.RawMessage{'[', ']'} + if inviteRoomState := gjson.GetBytes(event.Unsigned(), "invite_room_state"); inviteRoomState.Exists() { + res.InviteState.Events = json.RawMessage(inviteRoomState.Raw) + } return &res } From 87f05721b07cc6d57bcb4ce1d9ad77a9e1847054 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 27 Apr 2020 15:47:36 +0100 Subject: [PATCH 2/3] Update gomatrixserverlib --- federationapi/routing/backfill.go | 5 ++++- go.mod | 2 +- go.sum | 4 ++-- syncapi/routing/messages.go | 5 ++++- 4 files changed, 11 insertions(+), 5 deletions(-) diff --git a/federationapi/routing/backfill.go b/federationapi/routing/backfill.go index 62471b8a9..6f85ba4be 100644 --- a/federationapi/routing/backfill.go +++ b/federationapi/routing/backfill.go @@ -97,7 +97,10 @@ func Backfill( } var eventJSONs []json.RawMessage - for _, e := range gomatrixserverlib.ReverseTopologicalOrdering(evs) { + for _, e := range gomatrixserverlib.ReverseTopologicalOrdering( + evs, + gomatrixserverlib.TopologicalOrderByPrevEvents, + ) { eventJSONs = append(eventJSONs, e.JSON()) } diff --git a/go.mod b/go.mod index 37a67fcdd..fd1c2de81 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( github.com/matrix-org/go-http-js-libp2p v0.0.0-20200318135427-31631a9ef51f github.com/matrix-org/go-sqlite3-js v0.0.0-20200325174927-327088cdef10 github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26 - github.com/matrix-org/gomatrixserverlib v0.0.0-20200424101831-2f10e8068538 + github.com/matrix-org/gomatrixserverlib v0.0.0-20200427134702-21db6d1430e3 github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7 github.com/mattn/go-sqlite3 v2.0.2+incompatible diff --git a/go.sum b/go.sum index 517a77b3d..156f67252 100644 --- a/go.sum +++ b/go.sum @@ -367,8 +367,8 @@ github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26 h1:Hr3zjRsq2bh github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0= github.com/matrix-org/gomatrixserverlib v0.0.0-20200124100636-0c2ec91d1df5 h1:kmRjpmFOenVpOaV/DRlo9p6z/IbOKlUC+hhKsAAh8Qg= github.com/matrix-org/gomatrixserverlib v0.0.0-20200124100636-0c2ec91d1df5/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI= -github.com/matrix-org/gomatrixserverlib v0.0.0-20200424101831-2f10e8068538 h1:kj2LdNOdg2+vydS9HrPdbECEVeusRg9VTSOkYm61reA= -github.com/matrix-org/gomatrixserverlib v0.0.0-20200424101831-2f10e8068538/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU= +github.com/matrix-org/gomatrixserverlib v0.0.0-20200427134702-21db6d1430e3 h1:aJMAKjfXG5I8TqPxJQbQIkGSWM770oxkpgsPHE8C06E= +github.com/matrix-org/gomatrixserverlib v0.0.0-20200427134702-21db6d1430e3/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU= github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1 h1:osLoFdOy+ChQqVUn2PeTDETFftVkl4w9t/OW18g3lnk= github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1/go.mod h1:cXoYQIENbdWIQHt1SyCo6Bl3C3raHwJ0wgVrXHSqf+A= github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f h1:pRz4VTiRCO4zPlEMc3ESdUOcW4PXHH4Kj+YDz1XyE+Y= diff --git a/syncapi/routing/messages.go b/syncapi/routing/messages.go index 873ee9366..53a9a963a 100644 --- a/syncapi/routing/messages.go +++ b/syncapi/routing/messages.go @@ -210,7 +210,10 @@ func (r *messagesReq) retrieveEvents() ( } // Sort the events to ensure we send them in the right order. - events = gomatrixserverlib.HeaderedReverseTopologicalOrdering(events) + events = gomatrixserverlib.HeaderedReverseTopologicalOrdering( + events, + gomatrixserverlib.TopologicalOrderByPrevEvents, + ) if r.backwardOrdering { // This reverses the array from old->new to new->old sort.SliceStable(events, func(i, j int) bool { From 3a858afca2368f588b2681de4f4816f26686f540 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Tue, 28 Apr 2020 10:53:07 +0100 Subject: [PATCH 3/3] Loopback event from invite response (#982) * Working invite v2 support * Fix copyright notice * Update gomatrixserverlib * Add fallthrough * Add missing continue * Update sytest-whitelist, gomatrixserverlib * Update gomatrixserverlib to test matrix-org/gomatrixserverlib#181 * Update gomatrixserverlib --- clientapi/routing/membership.go | 22 +++---- cmd/dendrite-demo-libp2p/main.go | 2 +- cmd/dendrite-federation-sender-server/main.go | 4 +- cmd/dendrite-monolith-server/main.go | 2 +- federationsender/federationsender.go | 6 +- federationsender/producers/roomserver.go | 66 +++++++++++++++++++ federationsender/queue/destinationqueue.go | 28 +++++++- federationsender/queue/queue.go | 22 +++++-- go.mod | 2 +- go.sum | 4 +- sytest-whitelist | 5 ++ 11 files changed, 135 insertions(+), 28 deletions(-) create mode 100644 federationsender/producers/roomserver.go diff --git a/clientapi/routing/membership.go b/clientapi/routing/membership.go index c597dd27d..dff194dd3 100644 --- a/clientapi/routing/membership.go +++ b/clientapi/routing/membership.go @@ -127,18 +127,18 @@ func SendMembership( returnData = struct { RoomID string `json:"room_id"` }{roomID} + fallthrough default: - } - - _, err = producer.SendEvents( - req.Context(), - []gomatrixserverlib.HeaderedEvent{event.Headered(verRes.RoomVersion)}, - cfg.Matrix.ServerName, - nil, - ) - if err != nil { - util.GetLogger(req.Context()).WithError(err).Error("producer.SendEvents failed") - return jsonerror.InternalServerError() + _, err = producer.SendEvents( + req.Context(), + []gomatrixserverlib.HeaderedEvent{event.Headered(verRes.RoomVersion)}, + cfg.Matrix.ServerName, + nil, + ) + if err != nil { + util.GetLogger(req.Context()).WithError(err).Error("producer.SendEvents failed") + return jsonerror.InternalServerError() + } } return util.JSONResponse{ diff --git a/cmd/dendrite-demo-libp2p/main.go b/cmd/dendrite-demo-libp2p/main.go index f280c7483..0365a6f27 100644 --- a/cmd/dendrite-demo-libp2p/main.go +++ b/cmd/dendrite-demo-libp2p/main.go @@ -153,7 +153,7 @@ func main() { asQuery := appservice.SetupAppServiceAPIComponent( &base.Base, accountDB, deviceDB, federation, alias, query, transactions.New(), ) - fedSenderAPI := federationsender.SetupFederationSenderComponent(&base.Base, federation, query) + fedSenderAPI := federationsender.SetupFederationSenderComponent(&base.Base, federation, query, input) clientapi.SetupClientAPIComponent( &base.Base, deviceDB, accountDB, diff --git a/cmd/dendrite-federation-sender-server/main.go b/cmd/dendrite-federation-sender-server/main.go index 71fc0b015..1593afaa5 100644 --- a/cmd/dendrite-federation-sender-server/main.go +++ b/cmd/dendrite-federation-sender-server/main.go @@ -26,10 +26,10 @@ func main() { federation := base.CreateFederationClient() - _, _, query := base.CreateHTTPRoomserverAPIs() + _, input, query := base.CreateHTTPRoomserverAPIs() federationsender.SetupFederationSenderComponent( - base, federation, query, + base, federation, query, input, ) base.SetupAndServeHTTP(string(base.Cfg.Bind.FederationSender), string(base.Cfg.Listen.FederationSender)) diff --git a/cmd/dendrite-monolith-server/main.go b/cmd/dendrite-monolith-server/main.go index 6b0d83ae1..70a59ed68 100644 --- a/cmd/dendrite-monolith-server/main.go +++ b/cmd/dendrite-monolith-server/main.go @@ -62,7 +62,7 @@ func main() { asQuery := appservice.SetupAppServiceAPIComponent( base, accountDB, deviceDB, federation, alias, query, transactions.New(), ) - fedSenderAPI := federationsender.SetupFederationSenderComponent(base, federation, query) + fedSenderAPI := federationsender.SetupFederationSenderComponent(base, federation, query, input) clientapi.SetupClientAPIComponent( base, deviceDB, accountDB, diff --git a/federationsender/federationsender.go b/federationsender/federationsender.go index a318d2099..a06caf402 100644 --- a/federationsender/federationsender.go +++ b/federationsender/federationsender.go @@ -20,6 +20,7 @@ import ( "github.com/matrix-org/dendrite/common/basecomponent" "github.com/matrix-org/dendrite/federationsender/api" "github.com/matrix-org/dendrite/federationsender/consumers" + "github.com/matrix-org/dendrite/federationsender/producers" "github.com/matrix-org/dendrite/federationsender/query" "github.com/matrix-org/dendrite/federationsender/queue" "github.com/matrix-org/dendrite/federationsender/storage" @@ -34,13 +35,16 @@ func SetupFederationSenderComponent( base *basecomponent.BaseDendrite, federation *gomatrixserverlib.FederationClient, rsQueryAPI roomserverAPI.RoomserverQueryAPI, + rsInputAPI roomserverAPI.RoomserverInputAPI, ) api.FederationSenderQueryAPI { federationSenderDB, err := storage.NewDatabase(string(base.Cfg.Database.FederationSender)) if err != nil { logrus.WithError(err).Panic("failed to connect to federation sender db") } - queues := queue.NewOutgoingQueues(base.Cfg.Matrix.ServerName, federation) + roomserverProducer := producers.NewRoomserverProducer(rsInputAPI, base.Cfg.Matrix.ServerName) + + queues := queue.NewOutgoingQueues(base.Cfg.Matrix.ServerName, federation, roomserverProducer) rsConsumer := consumers.NewOutputRoomEventConsumer( base.Cfg, base.KafkaConsumer, queues, diff --git a/federationsender/producers/roomserver.go b/federationsender/producers/roomserver.go new file mode 100644 index 000000000..0395f9628 --- /dev/null +++ b/federationsender/producers/roomserver.go @@ -0,0 +1,66 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package producers + +import ( + "context" + + "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/gomatrixserverlib" +) + +// RoomserverProducer produces events for the roomserver to consume. +type RoomserverProducer struct { + InputAPI api.RoomserverInputAPI + serverName gomatrixserverlib.ServerName +} + +// NewRoomserverProducer creates a new RoomserverProducer +func NewRoomserverProducer( + inputAPI api.RoomserverInputAPI, serverName gomatrixserverlib.ServerName, +) *RoomserverProducer { + return &RoomserverProducer{ + InputAPI: inputAPI, + serverName: serverName, + } +} + +// SendInviteResponse drops an invite response back into the roomserver so that users +// already in the room will be notified of the new invite. The invite response is signed +// by the remote side. +func (c *RoomserverProducer) SendInviteResponse( + ctx context.Context, res gomatrixserverlib.RespInviteV2, roomVersion gomatrixserverlib.RoomVersion, +) (string, error) { + ev := res.Event.Headered(roomVersion) + ire := api.InputRoomEvent{ + Kind: api.KindNew, + Event: ev, + AuthEventIDs: ev.AuthEventIDs(), + SendAsServer: string(c.serverName), + TransactionID: nil, + } + return c.SendInputRoomEvents(ctx, []api.InputRoomEvent{ire}) +} + +// SendInputRoomEvents writes the given input room events to the roomserver input API. +func (c *RoomserverProducer) SendInputRoomEvents( + ctx context.Context, ires []api.InputRoomEvent, +) (eventID string, err error) { + request := api.InputRoomEventsRequest{InputRoomEvents: ires} + var response api.InputRoomEventsResponse + err = c.InputAPI.InputRoomEvents(ctx, &request, &response) + eventID = response.EventID + return +} diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go index 7d4dc850b..89526fcfd 100644 --- a/federationsender/queue/destinationqueue.go +++ b/federationsender/queue/destinationqueue.go @@ -21,6 +21,7 @@ import ( "sync" "time" + "github.com/matrix-org/dendrite/federationsender/producers" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" log "github.com/sirupsen/logrus" @@ -32,6 +33,7 @@ import ( // ensures that only one request is in flight to a given destination // at a time. type destinationQueue struct { + rsProducer *producers.RoomserverProducer client *gomatrixserverlib.FederationClient origin gomatrixserverlib.ServerName destination gomatrixserverlib.ServerName @@ -165,18 +167,38 @@ func (oq *destinationQueue) nextInvites() bool { } for _, inviteReq := range oq.pendingInvites { - ev := inviteReq.Event() + ev, roomVersion := inviteReq.Event(), inviteReq.RoomVersion() - if _, err := oq.client.SendInviteV2( + log.WithFields(log.Fields{ + "event_id": ev.EventID(), + "room_version": roomVersion, + "destination": oq.destination, + }).Info("sending invite") + + inviteRes, err := oq.client.SendInviteV2( context.TODO(), oq.destination, *inviteReq, - ); err != nil { + ) + if err != nil { log.WithFields(log.Fields{ "event_id": ev.EventID(), "state_key": ev.StateKey(), "destination": oq.destination, }).WithError(err).Error("failed to send invite") + continue + } + + if _, err = oq.rsProducer.SendInviteResponse( + context.TODO(), + inviteRes, + roomVersion, + ); err != nil { + log.WithFields(log.Fields{ + "event_id": ev.EventID(), + "state_key": ev.StateKey(), + "destination": oq.destination, + }).WithError(err).Error("failed to return signed invite to roomserver") } } diff --git a/federationsender/queue/queue.go b/federationsender/queue/queue.go index 88d47f120..33abc8fdd 100644 --- a/federationsender/queue/queue.go +++ b/federationsender/queue/queue.go @@ -18,6 +18,7 @@ import ( "fmt" "sync" + "github.com/matrix-org/dendrite/federationsender/producers" "github.com/matrix-org/gomatrixserverlib" log "github.com/sirupsen/logrus" ) @@ -25,19 +26,25 @@ import ( // OutgoingQueues is a collection of queues for sending transactions to other // matrix servers type OutgoingQueues struct { - origin gomatrixserverlib.ServerName - client *gomatrixserverlib.FederationClient + rsProducer *producers.RoomserverProducer + origin gomatrixserverlib.ServerName + client *gomatrixserverlib.FederationClient // The queuesMutex protects queues queuesMutex sync.Mutex queues map[gomatrixserverlib.ServerName]*destinationQueue } // NewOutgoingQueues makes a new OutgoingQueues -func NewOutgoingQueues(origin gomatrixserverlib.ServerName, client *gomatrixserverlib.FederationClient) *OutgoingQueues { +func NewOutgoingQueues( + origin gomatrixserverlib.ServerName, + client *gomatrixserverlib.FederationClient, + rsProducer *producers.RoomserverProducer, +) *OutgoingQueues { return &OutgoingQueues{ - origin: origin, - client: client, - queues: map[gomatrixserverlib.ServerName]*destinationQueue{}, + rsProducer: rsProducer, + origin: origin, + client: client, + queues: map[gomatrixserverlib.ServerName]*destinationQueue{}, } } @@ -67,6 +74,7 @@ func (oqs *OutgoingQueues) SendEvent( oq := oqs.queues[destination] if oq == nil { oq = &destinationQueue{ + rsProducer: oqs.rsProducer, origin: oqs.origin, destination: destination, client: oqs.client, @@ -111,6 +119,7 @@ func (oqs *OutgoingQueues) SendInvite( oq := oqs.queues[destination] if oq == nil { oq = &destinationQueue{ + rsProducer: oqs.rsProducer, origin: oqs.origin, destination: destination, client: oqs.client, @@ -151,6 +160,7 @@ func (oqs *OutgoingQueues) SendEDU( oq := oqs.queues[destination] if oq == nil { oq = &destinationQueue{ + rsProducer: oqs.rsProducer, origin: oqs.origin, destination: destination, client: oqs.client, diff --git a/go.mod b/go.mod index fd1c2de81..f566e6d9a 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( github.com/matrix-org/go-http-js-libp2p v0.0.0-20200318135427-31631a9ef51f github.com/matrix-org/go-sqlite3-js v0.0.0-20200325174927-327088cdef10 github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26 - github.com/matrix-org/gomatrixserverlib v0.0.0-20200427134702-21db6d1430e3 + github.com/matrix-org/gomatrixserverlib v0.0.0-20200428095012-a95e289995b1 github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7 github.com/mattn/go-sqlite3 v2.0.2+incompatible diff --git a/go.sum b/go.sum index 156f67252..535a999d3 100644 --- a/go.sum +++ b/go.sum @@ -367,8 +367,8 @@ github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26 h1:Hr3zjRsq2bh github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0= github.com/matrix-org/gomatrixserverlib v0.0.0-20200124100636-0c2ec91d1df5 h1:kmRjpmFOenVpOaV/DRlo9p6z/IbOKlUC+hhKsAAh8Qg= github.com/matrix-org/gomatrixserverlib v0.0.0-20200124100636-0c2ec91d1df5/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI= -github.com/matrix-org/gomatrixserverlib v0.0.0-20200427134702-21db6d1430e3 h1:aJMAKjfXG5I8TqPxJQbQIkGSWM770oxkpgsPHE8C06E= -github.com/matrix-org/gomatrixserverlib v0.0.0-20200427134702-21db6d1430e3/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU= +github.com/matrix-org/gomatrixserverlib v0.0.0-20200428095012-a95e289995b1 h1:TB4V69eOtvmHdFp0+BgLNrDCcCwq6QDUOTjmi8fjC/M= +github.com/matrix-org/gomatrixserverlib v0.0.0-20200428095012-a95e289995b1/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU= github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1 h1:osLoFdOy+ChQqVUn2PeTDETFftVkl4w9t/OW18g3lnk= github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1/go.mod h1:cXoYQIENbdWIQHt1SyCo6Bl3C3raHwJ0wgVrXHSqf+A= github.com/matrix-org/naffka v0.0.0-20200422140631-181f1ee7401f h1:pRz4VTiRCO4zPlEMc3ESdUOcW4PXHH4Kj+YDz1XyE+Y= diff --git a/sytest-whitelist b/sytest-whitelist index 7bd2a63c4..439c306c7 100644 --- a/sytest-whitelist +++ b/sytest-whitelist @@ -253,3 +253,8 @@ User can invite local user to room with version 3 User can invite local user to room with version 4 A pair of servers can establish a join in a v2 room Can logout all devices +State from remote users is included in the timeline in an incremental sync +User can invite remote user to room with version 1 +User can invite remote user to room with version 2 +User can invite remote user to room with version 3 +User can invite remote user to room with version 4