From a0e11e3dfc5494162ae05c4dacc1c51fd5bb4e15 Mon Sep 17 00:00:00 2001 From: Devon Hudson Date: Fri, 21 Apr 2023 16:34:57 -0600 Subject: [PATCH] Move matrix join logic to gmsl --- federationapi/internal/perform.go | 352 +++++------------------------- 1 file changed, 53 insertions(+), 299 deletions(-) diff --git a/federationapi/internal/perform.go b/federationapi/internal/perform.go index 8596bc604..e40460288 100644 --- a/federationapi/internal/perform.go +++ b/federationapi/internal/perform.go @@ -16,7 +16,6 @@ import ( "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/federationapi/consumers" "github.com/matrix-org/dendrite/federationapi/statistics" - "github.com/matrix-org/dendrite/federationapi/types" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/version" ) @@ -73,12 +72,6 @@ func (r *FederationInternalAPI) PerformJoin( r.joins.Store(j, nil) defer r.joins.Delete(j) - // Look up the supported room versions. - var supportedVersions []gomatrixserverlib.RoomVersion - for version := range version.SupportedRoomVersions() { - supportedVersions = append(supportedVersions, version) - } - // Deduplicate the server names we were provided but keep the ordering // as this encodes useful information about which servers are most likely // to respond. @@ -103,7 +96,6 @@ func (r *FederationInternalAPI) PerformJoin( request.UserID, request.Content, serverName, - supportedVersions, request.Unsigned, ); err != nil { logrus.WithError(err).WithFields(logrus.Fields{ @@ -146,288 +138,74 @@ func (r *FederationInternalAPI) performJoinUsingServer( roomID, userID string, content map[string]interface{}, serverName gomatrixserverlib.ServerName, - supportedVersions []gomatrixserverlib.RoomVersion, unsigned map[string]interface{}, ) error { if !r.shouldAttemptDirectFederation(serverName) { return fmt.Errorf("relay servers have no meaningful response for join.") } - _, origin, err := r.cfg.Matrix.SplitLocalID('@', userID) + user, err := gomatrixserverlib.NewUserID(userID, true) if err != nil { return err } - // Try to perform a make_join using the information supplied in the - // request. - respMakeJoin, err := r.federation.MakeJoin( - ctx, - origin, + sendJoinInput := fclient.SendJoinInput{ + UserID: user, + RoomID: roomID, + ServerName: serverName, + Content: content, + Unsigned: unsigned, + PrivateKey: r.cfg.Matrix.PrivateKey, + KeyID: r.cfg.Matrix.KeyID, + KeyRing: r.keyRing, + EventProvider: federatedEventProvider, + } + callbacks := fclient.SendJoinCallbacks{ + FederationFailure: func(server gomatrixserverlib.ServerName) { + r.statistics.ForServer(server).Failure() + }, + FederationSuccess: func(server gomatrixserverlib.ServerName) { + r.statistics.ForServer(server).Success(statistics.SendDirect) + }, + } + + event, respState, err := fclient.HandleSendJoin(ctx, r.federation, sendJoinInput, callbacks) + if err != nil { + return err + } + + // We need to immediately update our list of joined hosts for this room now as we are technically + // joined. We must do this synchronously: we cannot rely on the roomserver output events as they + // will happen asyncly. If we don't update this table, you can end up with bad failure modes like + // joining a room, waiting for 200 OK then changing device keys and have those keys not be sent + // to other servers (this was a cause of a flakey sytest "Local device key changes get to remote servers") + // The events are trusted now as we performed auth checks above. + joinedHosts, err := consumers.JoinedHostsFromEvents(respState.GetStateEvents().TrustedEvents(event.RoomVersion, false)) + if err != nil { + return fmt.Errorf("JoinedHostsFromEvents: failed to get joined hosts: %s", err) + } + + logrus.WithField("room", roomID).Infof("Joined federated room with %d hosts", len(joinedHosts)) + if _, err = r.db.UpdateRoom(context.Background(), roomID, joinedHosts, nil, true); err != nil { + return fmt.Errorf("UpdatedRoom: failed to update room with joined hosts: %s", err) + } + + if err = roomserverAPI.SendEventWithState( + context.Background(), + r.rsAPI, + user.Domain(), + roomserverAPI.KindNew, + respState, + event, serverName, - roomID, - userID, - supportedVersions, - ) - if err != nil { - // TODO: Check if the user was not allowed to join the room. - r.statistics.ForServer(serverName).Failure() - return fmt.Errorf("r.federation.MakeJoin: %w", err) + nil, + false, + ); err != nil { + return fmt.Errorf("roomserverAPI.SendEventWithState: %w", err) } - r.statistics.ForServer(serverName).Success(statistics.SendDirect) - - // Set all the fields to be what they should be, this should be a no-op - // but it's possible that the remote server returned us something "odd" - respMakeJoin.JoinEvent.Type = gomatrixserverlib.MRoomMember - respMakeJoin.JoinEvent.Sender = userID - respMakeJoin.JoinEvent.StateKey = &userID - respMakeJoin.JoinEvent.RoomID = roomID - respMakeJoin.JoinEvent.Redacts = "" - if content == nil { - content = map[string]interface{}{} - } - _ = json.Unmarshal(respMakeJoin.JoinEvent.Content, &content) - content["membership"] = gomatrixserverlib.Join - if err = respMakeJoin.JoinEvent.SetContent(content); err != nil { - return fmt.Errorf("respMakeJoin.JoinEvent.SetContent: %w", err) - } - if err = respMakeJoin.JoinEvent.SetUnsigned(struct{}{}); err != nil { - return fmt.Errorf("respMakeJoin.JoinEvent.SetUnsigned: %w", err) - } - - // Work out if we support the room version that has been supplied in - // the make_join response. - // "If not provided, the room version is assumed to be either "1" or "2"." - // https://matrix.org/docs/spec/server_server/unstable#get-matrix-federation-v1-make-join-roomid-userid - if respMakeJoin.RoomVersion == "" { - // TODO: (PowerDAG) Handle this case for PowerEvents - respMakeJoin.RoomVersion = setDefaultRoomVersionFromJoinEvent(respMakeJoin.JoinEvent) - } - if _, err = respMakeJoin.RoomVersion.EventFormat(); err != nil { - return fmt.Errorf("respMakeJoin.RoomVersion.EventFormat: %w", err) - } - - // Build the join event. - event, err := respMakeJoin.JoinEvent.Build( - time.Now(), - origin, - r.cfg.Matrix.KeyID, - r.cfg.Matrix.PrivateKey, - respMakeJoin.RoomVersion, - ) - if err != nil { - return fmt.Errorf("respMakeJoin.JoinEvent.Build: %w", err) - } - - joinedHosts := []types.JoinedHost{} - if respMakeJoin.RoomVersion == gomatrixserverlib.RoomVersionPowerDAG { - respSendJoin, err := r.federation.SendJoinPowerDAG( - context.Background(), - origin, - serverName, - event, - ) - - if err != nil { - r.statistics.ForServer(serverName).Failure() - return fmt.Errorf("r.federation.SendJoin: %w", err) - } - r.statistics.ForServer(serverName).Success(statistics.SendDirect) - - // If the remote server returned an event in the "event" key of - // the send_join request then we should use that instead. It may - // contain signatures that we don't know about. - if len(respSendJoin.Event) > 0 { - var remoteEvent *gomatrixserverlib.Event - remoteEvent, err = respSendJoin.Event.UntrustedEvent(respMakeJoin.RoomVersion) - if err == nil && isWellFormedMembershipEvent( - remoteEvent, roomID, userID, - ) { - event = remoteEvent - } - } - - // Sanity-check the join response to ensure that it has a create - // event, that the room version is known, etc. - powerEvents := respSendJoin.PowerEvents.UntrustedEvents(respMakeJoin.RoomVersion) - if err = checkEventsContainCreateEvent(powerEvents); err != nil { - return fmt.Errorf("sanityCheckPowerDAG: %w", err) - } - - // Process the join response in a goroutine. The idea here is - // that we'll try and wait for as long as possible for the work - // to complete, but if the client does give up waiting, we'll - // still continue to process the join anyway so that we don't - // waste the effort. - // TODO: Can we expand Check here to return a list of missing auth - // events rather than failing one at a time? - var respState gomatrixserverlib.StateResponsePowerDAG - respState, err = gomatrixserverlib.CheckSendJoinResponsePowerDAG( - context.Background(), - respMakeJoin.RoomVersion, &respSendJoin, - r.keyRing, - event, - federatedEventProvider(ctx, r.federation, r.keyRing, origin, serverName), - ) - if err != nil { - return fmt.Errorf("respSendJoin.Check: %w", err) - } - - // We need to immediately update our list of joined hosts for this room now as we are technically - // joined. We must do this synchronously: we cannot rely on the roomserver output events as they - // will happen asyncly. If we don't update this table, you can end up with bad failure modes like - // joining a room, waiting for 200 OK then changing device keys and have those keys not be sent - // to other servers (this was a cause of a flakey sytest "Local device key changes get to remote servers") - // The events are trusted now as we performed auth checks above. - joinedHosts, err = consumers.JoinedHostsFromEvents(respState.GetStateEvents().TrustedEvents(respMakeJoin.RoomVersion, false)) - if err != nil { - return fmt.Errorf("JoinedHostsFromEvents: failed to get joined hosts: %s", err) - } - - logrus.WithField("room", roomID).Infof("Joined federated room with %d hosts", len(joinedHosts)) - if _, err = r.db.UpdateRoom(context.Background(), roomID, joinedHosts, nil, true); err != nil { - return fmt.Errorf("UpdatedRoom: failed to update room with joined hosts: %s", err) - } - - // If we successfully performed a send_join above then the other - // server now thinks we're a part of the room. Send the newly - // returned state to the roomserver to update our local view. - if unsigned != nil { - event, err = event.SetUnsigned(unsigned) - if err != nil { - // non-fatal, log and continue - logrus.WithError(err).Errorf("Failed to set unsigned content") - } - } - - // TODO: (PowerDAG) Send join event to roomserver - //if err = roomserverAPI.SendEventWithState( - // context.Background(), - // r.rsAPI, - // origin, - // roomserverAPI.KindNew, - // respState, - // event.Headered(respMakeJoin.RoomVersion), - // serverName, - // nil, - // false, - //); err != nil { - // return fmt.Errorf("roomserverAPI.SendEventWithState: %w", err) - //} - } else { // RoomVersionV1 -> RoomVersionV10 - // Try to perform a send_join using the newly built event. - respSendJoin, err := r.federation.SendJoin( - context.Background(), - origin, - serverName, - event, - ) - if err != nil { - r.statistics.ForServer(serverName).Failure() - return fmt.Errorf("r.federation.SendJoin: %w", err) - } - r.statistics.ForServer(serverName).Success(statistics.SendDirect) - - // If the remote server returned an event in the "event" key of - // the send_join request then we should use that instead. It may - // contain signatures that we don't know about. - if len(respSendJoin.Event) > 0 { - var remoteEvent *gomatrixserverlib.Event - remoteEvent, err = respSendJoin.Event.UntrustedEvent(respMakeJoin.RoomVersion) - if err == nil && isWellFormedMembershipEvent( - remoteEvent, roomID, userID, - ) { - event = remoteEvent - } - } - - // Sanity-check the join response to ensure that it has a create - // event, that the room version is known, etc. - authEvents := respSendJoin.AuthEvents.UntrustedEvents(respMakeJoin.RoomVersion) - if err = checkEventsContainCreateEvent(authEvents); err != nil { - return fmt.Errorf("sanityCheckAuthChain: %w", err) - } - - // Process the join response in a goroutine. The idea here is - // that we'll try and wait for as long as possible for the work - // to complete, but if the client does give up waiting, we'll - // still continue to process the join anyway so that we don't - // waste the effort. - // TODO: Can we expand Check here to return a list of missing auth - // events rather than failing one at a time? - var respState gomatrixserverlib.StateResponse - respState, err = gomatrixserverlib.CheckSendJoinResponse( - context.Background(), - respMakeJoin.RoomVersion, &respSendJoin, - r.keyRing, - event, - federatedEventProvider(ctx, r.federation, r.keyRing, origin, serverName), - ) - if err != nil { - return fmt.Errorf("respSendJoin.Check: %w", err) - } - - // We need to immediately update our list of joined hosts for this room now as we are technically - // joined. We must do this synchronously: we cannot rely on the roomserver output events as they - // will happen asyncly. If we don't update this table, you can end up with bad failure modes like - // joining a room, waiting for 200 OK then changing device keys and have those keys not be sent - // to other servers (this was a cause of a flakey sytest "Local device key changes get to remote servers") - // The events are trusted now as we performed auth checks above. - joinedHosts, err = consumers.JoinedHostsFromEvents(respState.GetStateEvents().TrustedEvents(respMakeJoin.RoomVersion, false)) - if err != nil { - return fmt.Errorf("JoinedHostsFromEvents: failed to get joined hosts: %s", err) - } - - logrus.WithField("room", roomID).Infof("Joined federated room with %d hosts", len(joinedHosts)) - if _, err = r.db.UpdateRoom(context.Background(), roomID, joinedHosts, nil, true); err != nil { - return fmt.Errorf("UpdatedRoom: failed to update room with joined hosts: %s", err) - } - - // If we successfully performed a send_join above then the other - // server now thinks we're a part of the room. Send the newly - // returned state to the roomserver to update our local view. - if unsigned != nil { - event, err = event.SetUnsigned(unsigned) - if err != nil { - // non-fatal, log and continue - logrus.WithError(err).Errorf("Failed to set unsigned content") - } - } - - if err = roomserverAPI.SendEventWithState( - context.Background(), - r.rsAPI, - origin, - roomserverAPI.KindNew, - respState, - event.Headered(respMakeJoin.RoomVersion), - serverName, - nil, - false, - ); err != nil { - return fmt.Errorf("roomserverAPI.SendEventWithState: %w", err) - } - } - return nil } -// isWellFormedMembershipEvent returns true if the event looks like a legitimate -// membership event. -func isWellFormedMembershipEvent(event *gomatrixserverlib.Event, roomID, userID string) bool { - if membership, err := event.Membership(); err != nil { - return false - } else if membership != gomatrixserverlib.Join { - return false - } - if event.RoomID() != roomID { - return false - } - if !event.StateKeyEquals(userID) { - return false - } - return true -} - // PerformOutboundPeekRequest implements api.FederationInternalAPI func (r *FederationInternalAPI) PerformOutboundPeek( ctx context.Context, @@ -840,30 +618,6 @@ func checkEventsContainCreateEvent(events []*gomatrixserverlib.Event) error { return fmt.Errorf("response is missing m.room.create event") } -func setDefaultRoomVersionFromJoinEvent( - joinEvent gomatrixserverlib.EventBuilder, -) gomatrixserverlib.RoomVersion { - // if auth events are not event references we know it must be v3+ - // we have to do these shenanigans to satisfy sytest, specifically for: - // "Outbound federation rejects m.room.create events with an unknown room version" - hasEventRefs := true - authEvents, ok := joinEvent.AuthEvents.([]interface{}) - if ok { - if len(authEvents) > 0 { - _, ok = authEvents[0].(string) - if ok { - // event refs are objects, not strings, so we know we must be dealing with a v3+ room. - hasEventRefs = false - } - } - } - - if hasEventRefs { - return gomatrixserverlib.RoomVersionV1 - } - return gomatrixserverlib.RoomVersionV4 -} - // federatedEventProvider is an event provider which fetches events from the server provided func federatedEventProvider( ctx context.Context, federation api.FederationClient,