mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-01-18 09:54:27 -06:00
Move matrix join logic to gmsl
This commit is contained in:
parent
549d20cb61
commit
a0e11e3dfc
|
@ -16,7 +16,6 @@ import (
|
|||
"github.com/matrix-org/dendrite/federationapi/api"
|
||||
"github.com/matrix-org/dendrite/federationapi/consumers"
|
||||
"github.com/matrix-org/dendrite/federationapi/statistics"
|
||||
"github.com/matrix-org/dendrite/federationapi/types"
|
||||
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/dendrite/roomserver/version"
|
||||
)
|
||||
|
@ -73,12 +72,6 @@ func (r *FederationInternalAPI) PerformJoin(
|
|||
r.joins.Store(j, nil)
|
||||
defer r.joins.Delete(j)
|
||||
|
||||
// Look up the supported room versions.
|
||||
var supportedVersions []gomatrixserverlib.RoomVersion
|
||||
for version := range version.SupportedRoomVersions() {
|
||||
supportedVersions = append(supportedVersions, version)
|
||||
}
|
||||
|
||||
// Deduplicate the server names we were provided but keep the ordering
|
||||
// as this encodes useful information about which servers are most likely
|
||||
// to respond.
|
||||
|
@ -103,7 +96,6 @@ func (r *FederationInternalAPI) PerformJoin(
|
|||
request.UserID,
|
||||
request.Content,
|
||||
serverName,
|
||||
supportedVersions,
|
||||
request.Unsigned,
|
||||
); err != nil {
|
||||
logrus.WithError(err).WithFields(logrus.Fields{
|
||||
|
@ -146,288 +138,74 @@ func (r *FederationInternalAPI) performJoinUsingServer(
|
|||
roomID, userID string,
|
||||
content map[string]interface{},
|
||||
serverName gomatrixserverlib.ServerName,
|
||||
supportedVersions []gomatrixserverlib.RoomVersion,
|
||||
unsigned map[string]interface{},
|
||||
) error {
|
||||
if !r.shouldAttemptDirectFederation(serverName) {
|
||||
return fmt.Errorf("relay servers have no meaningful response for join.")
|
||||
}
|
||||
|
||||
_, origin, err := r.cfg.Matrix.SplitLocalID('@', userID)
|
||||
user, err := gomatrixserverlib.NewUserID(userID, true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Try to perform a make_join using the information supplied in the
|
||||
// request.
|
||||
respMakeJoin, err := r.federation.MakeJoin(
|
||||
ctx,
|
||||
origin,
|
||||
sendJoinInput := fclient.SendJoinInput{
|
||||
UserID: user,
|
||||
RoomID: roomID,
|
||||
ServerName: serverName,
|
||||
Content: content,
|
||||
Unsigned: unsigned,
|
||||
PrivateKey: r.cfg.Matrix.PrivateKey,
|
||||
KeyID: r.cfg.Matrix.KeyID,
|
||||
KeyRing: r.keyRing,
|
||||
EventProvider: federatedEventProvider,
|
||||
}
|
||||
callbacks := fclient.SendJoinCallbacks{
|
||||
FederationFailure: func(server gomatrixserverlib.ServerName) {
|
||||
r.statistics.ForServer(server).Failure()
|
||||
},
|
||||
FederationSuccess: func(server gomatrixserverlib.ServerName) {
|
||||
r.statistics.ForServer(server).Success(statistics.SendDirect)
|
||||
},
|
||||
}
|
||||
|
||||
event, respState, err := fclient.HandleSendJoin(ctx, r.federation, sendJoinInput, callbacks)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// We need to immediately update our list of joined hosts for this room now as we are technically
|
||||
// joined. We must do this synchronously: we cannot rely on the roomserver output events as they
|
||||
// will happen asyncly. If we don't update this table, you can end up with bad failure modes like
|
||||
// joining a room, waiting for 200 OK then changing device keys and have those keys not be sent
|
||||
// to other servers (this was a cause of a flakey sytest "Local device key changes get to remote servers")
|
||||
// The events are trusted now as we performed auth checks above.
|
||||
joinedHosts, err := consumers.JoinedHostsFromEvents(respState.GetStateEvents().TrustedEvents(event.RoomVersion, false))
|
||||
if err != nil {
|
||||
return fmt.Errorf("JoinedHostsFromEvents: failed to get joined hosts: %s", err)
|
||||
}
|
||||
|
||||
logrus.WithField("room", roomID).Infof("Joined federated room with %d hosts", len(joinedHosts))
|
||||
if _, err = r.db.UpdateRoom(context.Background(), roomID, joinedHosts, nil, true); err != nil {
|
||||
return fmt.Errorf("UpdatedRoom: failed to update room with joined hosts: %s", err)
|
||||
}
|
||||
|
||||
if err = roomserverAPI.SendEventWithState(
|
||||
context.Background(),
|
||||
r.rsAPI,
|
||||
user.Domain(),
|
||||
roomserverAPI.KindNew,
|
||||
respState,
|
||||
event,
|
||||
serverName,
|
||||
roomID,
|
||||
userID,
|
||||
supportedVersions,
|
||||
)
|
||||
if err != nil {
|
||||
// TODO: Check if the user was not allowed to join the room.
|
||||
r.statistics.ForServer(serverName).Failure()
|
||||
return fmt.Errorf("r.federation.MakeJoin: %w", err)
|
||||
nil,
|
||||
false,
|
||||
); err != nil {
|
||||
return fmt.Errorf("roomserverAPI.SendEventWithState: %w", err)
|
||||
}
|
||||
r.statistics.ForServer(serverName).Success(statistics.SendDirect)
|
||||
|
||||
// Set all the fields to be what they should be, this should be a no-op
|
||||
// but it's possible that the remote server returned us something "odd"
|
||||
respMakeJoin.JoinEvent.Type = gomatrixserverlib.MRoomMember
|
||||
respMakeJoin.JoinEvent.Sender = userID
|
||||
respMakeJoin.JoinEvent.StateKey = &userID
|
||||
respMakeJoin.JoinEvent.RoomID = roomID
|
||||
respMakeJoin.JoinEvent.Redacts = ""
|
||||
if content == nil {
|
||||
content = map[string]interface{}{}
|
||||
}
|
||||
_ = json.Unmarshal(respMakeJoin.JoinEvent.Content, &content)
|
||||
content["membership"] = gomatrixserverlib.Join
|
||||
if err = respMakeJoin.JoinEvent.SetContent(content); err != nil {
|
||||
return fmt.Errorf("respMakeJoin.JoinEvent.SetContent: %w", err)
|
||||
}
|
||||
if err = respMakeJoin.JoinEvent.SetUnsigned(struct{}{}); err != nil {
|
||||
return fmt.Errorf("respMakeJoin.JoinEvent.SetUnsigned: %w", err)
|
||||
}
|
||||
|
||||
// Work out if we support the room version that has been supplied in
|
||||
// the make_join response.
|
||||
// "If not provided, the room version is assumed to be either "1" or "2"."
|
||||
// https://matrix.org/docs/spec/server_server/unstable#get-matrix-federation-v1-make-join-roomid-userid
|
||||
if respMakeJoin.RoomVersion == "" {
|
||||
// TODO: (PowerDAG) Handle this case for PowerEvents
|
||||
respMakeJoin.RoomVersion = setDefaultRoomVersionFromJoinEvent(respMakeJoin.JoinEvent)
|
||||
}
|
||||
if _, err = respMakeJoin.RoomVersion.EventFormat(); err != nil {
|
||||
return fmt.Errorf("respMakeJoin.RoomVersion.EventFormat: %w", err)
|
||||
}
|
||||
|
||||
// Build the join event.
|
||||
event, err := respMakeJoin.JoinEvent.Build(
|
||||
time.Now(),
|
||||
origin,
|
||||
r.cfg.Matrix.KeyID,
|
||||
r.cfg.Matrix.PrivateKey,
|
||||
respMakeJoin.RoomVersion,
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("respMakeJoin.JoinEvent.Build: %w", err)
|
||||
}
|
||||
|
||||
joinedHosts := []types.JoinedHost{}
|
||||
if respMakeJoin.RoomVersion == gomatrixserverlib.RoomVersionPowerDAG {
|
||||
respSendJoin, err := r.federation.SendJoinPowerDAG(
|
||||
context.Background(),
|
||||
origin,
|
||||
serverName,
|
||||
event,
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
r.statistics.ForServer(serverName).Failure()
|
||||
return fmt.Errorf("r.federation.SendJoin: %w", err)
|
||||
}
|
||||
r.statistics.ForServer(serverName).Success(statistics.SendDirect)
|
||||
|
||||
// If the remote server returned an event in the "event" key of
|
||||
// the send_join request then we should use that instead. It may
|
||||
// contain signatures that we don't know about.
|
||||
if len(respSendJoin.Event) > 0 {
|
||||
var remoteEvent *gomatrixserverlib.Event
|
||||
remoteEvent, err = respSendJoin.Event.UntrustedEvent(respMakeJoin.RoomVersion)
|
||||
if err == nil && isWellFormedMembershipEvent(
|
||||
remoteEvent, roomID, userID,
|
||||
) {
|
||||
event = remoteEvent
|
||||
}
|
||||
}
|
||||
|
||||
// Sanity-check the join response to ensure that it has a create
|
||||
// event, that the room version is known, etc.
|
||||
powerEvents := respSendJoin.PowerEvents.UntrustedEvents(respMakeJoin.RoomVersion)
|
||||
if err = checkEventsContainCreateEvent(powerEvents); err != nil {
|
||||
return fmt.Errorf("sanityCheckPowerDAG: %w", err)
|
||||
}
|
||||
|
||||
// Process the join response in a goroutine. The idea here is
|
||||
// that we'll try and wait for as long as possible for the work
|
||||
// to complete, but if the client does give up waiting, we'll
|
||||
// still continue to process the join anyway so that we don't
|
||||
// waste the effort.
|
||||
// TODO: Can we expand Check here to return a list of missing auth
|
||||
// events rather than failing one at a time?
|
||||
var respState gomatrixserverlib.StateResponsePowerDAG
|
||||
respState, err = gomatrixserverlib.CheckSendJoinResponsePowerDAG(
|
||||
context.Background(),
|
||||
respMakeJoin.RoomVersion, &respSendJoin,
|
||||
r.keyRing,
|
||||
event,
|
||||
federatedEventProvider(ctx, r.federation, r.keyRing, origin, serverName),
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("respSendJoin.Check: %w", err)
|
||||
}
|
||||
|
||||
// We need to immediately update our list of joined hosts for this room now as we are technically
|
||||
// joined. We must do this synchronously: we cannot rely on the roomserver output events as they
|
||||
// will happen asyncly. If we don't update this table, you can end up with bad failure modes like
|
||||
// joining a room, waiting for 200 OK then changing device keys and have those keys not be sent
|
||||
// to other servers (this was a cause of a flakey sytest "Local device key changes get to remote servers")
|
||||
// The events are trusted now as we performed auth checks above.
|
||||
joinedHosts, err = consumers.JoinedHostsFromEvents(respState.GetStateEvents().TrustedEvents(respMakeJoin.RoomVersion, false))
|
||||
if err != nil {
|
||||
return fmt.Errorf("JoinedHostsFromEvents: failed to get joined hosts: %s", err)
|
||||
}
|
||||
|
||||
logrus.WithField("room", roomID).Infof("Joined federated room with %d hosts", len(joinedHosts))
|
||||
if _, err = r.db.UpdateRoom(context.Background(), roomID, joinedHosts, nil, true); err != nil {
|
||||
return fmt.Errorf("UpdatedRoom: failed to update room with joined hosts: %s", err)
|
||||
}
|
||||
|
||||
// If we successfully performed a send_join above then the other
|
||||
// server now thinks we're a part of the room. Send the newly
|
||||
// returned state to the roomserver to update our local view.
|
||||
if unsigned != nil {
|
||||
event, err = event.SetUnsigned(unsigned)
|
||||
if err != nil {
|
||||
// non-fatal, log and continue
|
||||
logrus.WithError(err).Errorf("Failed to set unsigned content")
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: (PowerDAG) Send join event to roomserver
|
||||
//if err = roomserverAPI.SendEventWithState(
|
||||
// context.Background(),
|
||||
// r.rsAPI,
|
||||
// origin,
|
||||
// roomserverAPI.KindNew,
|
||||
// respState,
|
||||
// event.Headered(respMakeJoin.RoomVersion),
|
||||
// serverName,
|
||||
// nil,
|
||||
// false,
|
||||
//); err != nil {
|
||||
// return fmt.Errorf("roomserverAPI.SendEventWithState: %w", err)
|
||||
//}
|
||||
} else { // RoomVersionV1 -> RoomVersionV10
|
||||
// Try to perform a send_join using the newly built event.
|
||||
respSendJoin, err := r.federation.SendJoin(
|
||||
context.Background(),
|
||||
origin,
|
||||
serverName,
|
||||
event,
|
||||
)
|
||||
if err != nil {
|
||||
r.statistics.ForServer(serverName).Failure()
|
||||
return fmt.Errorf("r.federation.SendJoin: %w", err)
|
||||
}
|
||||
r.statistics.ForServer(serverName).Success(statistics.SendDirect)
|
||||
|
||||
// If the remote server returned an event in the "event" key of
|
||||
// the send_join request then we should use that instead. It may
|
||||
// contain signatures that we don't know about.
|
||||
if len(respSendJoin.Event) > 0 {
|
||||
var remoteEvent *gomatrixserverlib.Event
|
||||
remoteEvent, err = respSendJoin.Event.UntrustedEvent(respMakeJoin.RoomVersion)
|
||||
if err == nil && isWellFormedMembershipEvent(
|
||||
remoteEvent, roomID, userID,
|
||||
) {
|
||||
event = remoteEvent
|
||||
}
|
||||
}
|
||||
|
||||
// Sanity-check the join response to ensure that it has a create
|
||||
// event, that the room version is known, etc.
|
||||
authEvents := respSendJoin.AuthEvents.UntrustedEvents(respMakeJoin.RoomVersion)
|
||||
if err = checkEventsContainCreateEvent(authEvents); err != nil {
|
||||
return fmt.Errorf("sanityCheckAuthChain: %w", err)
|
||||
}
|
||||
|
||||
// Process the join response in a goroutine. The idea here is
|
||||
// that we'll try and wait for as long as possible for the work
|
||||
// to complete, but if the client does give up waiting, we'll
|
||||
// still continue to process the join anyway so that we don't
|
||||
// waste the effort.
|
||||
// TODO: Can we expand Check here to return a list of missing auth
|
||||
// events rather than failing one at a time?
|
||||
var respState gomatrixserverlib.StateResponse
|
||||
respState, err = gomatrixserverlib.CheckSendJoinResponse(
|
||||
context.Background(),
|
||||
respMakeJoin.RoomVersion, &respSendJoin,
|
||||
r.keyRing,
|
||||
event,
|
||||
federatedEventProvider(ctx, r.federation, r.keyRing, origin, serverName),
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("respSendJoin.Check: %w", err)
|
||||
}
|
||||
|
||||
// We need to immediately update our list of joined hosts for this room now as we are technically
|
||||
// joined. We must do this synchronously: we cannot rely on the roomserver output events as they
|
||||
// will happen asyncly. If we don't update this table, you can end up with bad failure modes like
|
||||
// joining a room, waiting for 200 OK then changing device keys and have those keys not be sent
|
||||
// to other servers (this was a cause of a flakey sytest "Local device key changes get to remote servers")
|
||||
// The events are trusted now as we performed auth checks above.
|
||||
joinedHosts, err = consumers.JoinedHostsFromEvents(respState.GetStateEvents().TrustedEvents(respMakeJoin.RoomVersion, false))
|
||||
if err != nil {
|
||||
return fmt.Errorf("JoinedHostsFromEvents: failed to get joined hosts: %s", err)
|
||||
}
|
||||
|
||||
logrus.WithField("room", roomID).Infof("Joined federated room with %d hosts", len(joinedHosts))
|
||||
if _, err = r.db.UpdateRoom(context.Background(), roomID, joinedHosts, nil, true); err != nil {
|
||||
return fmt.Errorf("UpdatedRoom: failed to update room with joined hosts: %s", err)
|
||||
}
|
||||
|
||||
// If we successfully performed a send_join above then the other
|
||||
// server now thinks we're a part of the room. Send the newly
|
||||
// returned state to the roomserver to update our local view.
|
||||
if unsigned != nil {
|
||||
event, err = event.SetUnsigned(unsigned)
|
||||
if err != nil {
|
||||
// non-fatal, log and continue
|
||||
logrus.WithError(err).Errorf("Failed to set unsigned content")
|
||||
}
|
||||
}
|
||||
|
||||
if err = roomserverAPI.SendEventWithState(
|
||||
context.Background(),
|
||||
r.rsAPI,
|
||||
origin,
|
||||
roomserverAPI.KindNew,
|
||||
respState,
|
||||
event.Headered(respMakeJoin.RoomVersion),
|
||||
serverName,
|
||||
nil,
|
||||
false,
|
||||
); err != nil {
|
||||
return fmt.Errorf("roomserverAPI.SendEventWithState: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// isWellFormedMembershipEvent returns true if the event looks like a legitimate
|
||||
// membership event.
|
||||
func isWellFormedMembershipEvent(event *gomatrixserverlib.Event, roomID, userID string) bool {
|
||||
if membership, err := event.Membership(); err != nil {
|
||||
return false
|
||||
} else if membership != gomatrixserverlib.Join {
|
||||
return false
|
||||
}
|
||||
if event.RoomID() != roomID {
|
||||
return false
|
||||
}
|
||||
if !event.StateKeyEquals(userID) {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// PerformOutboundPeekRequest implements api.FederationInternalAPI
|
||||
func (r *FederationInternalAPI) PerformOutboundPeek(
|
||||
ctx context.Context,
|
||||
|
@ -840,30 +618,6 @@ func checkEventsContainCreateEvent(events []*gomatrixserverlib.Event) error {
|
|||
return fmt.Errorf("response is missing m.room.create event")
|
||||
}
|
||||
|
||||
func setDefaultRoomVersionFromJoinEvent(
|
||||
joinEvent gomatrixserverlib.EventBuilder,
|
||||
) gomatrixserverlib.RoomVersion {
|
||||
// if auth events are not event references we know it must be v3+
|
||||
// we have to do these shenanigans to satisfy sytest, specifically for:
|
||||
// "Outbound federation rejects m.room.create events with an unknown room version"
|
||||
hasEventRefs := true
|
||||
authEvents, ok := joinEvent.AuthEvents.([]interface{})
|
||||
if ok {
|
||||
if len(authEvents) > 0 {
|
||||
_, ok = authEvents[0].(string)
|
||||
if ok {
|
||||
// event refs are objects, not strings, so we know we must be dealing with a v3+ room.
|
||||
hasEventRefs = false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if hasEventRefs {
|
||||
return gomatrixserverlib.RoomVersionV1
|
||||
}
|
||||
return gomatrixserverlib.RoomVersionV4
|
||||
}
|
||||
|
||||
// federatedEventProvider is an event provider which fetches events from the server provided
|
||||
func federatedEventProvider(
|
||||
ctx context.Context, federation api.FederationClient,
|
||||
|
|
Loading…
Reference in a new issue