Rough in federation PerformJoin for power DAGs

This commit is contained in:
Devon Hudson 2023-04-19 17:05:39 -06:00
parent f66862958d
commit 6d0b780922
No known key found for this signature in database
GPG key ID: CD06B18E77F6A628
2 changed files with 202 additions and 101 deletions

View file

@ -115,6 +115,7 @@ type FederationClient interface {
Peek(ctx context.Context, origin, s gomatrixserverlib.ServerName, roomID, peekID string, roomVersions []gomatrixserverlib.RoomVersion) (res fclient.RespPeek, err error) Peek(ctx context.Context, origin, s gomatrixserverlib.ServerName, roomID, peekID string, roomVersions []gomatrixserverlib.RoomVersion) (res fclient.RespPeek, err error)
MakeJoin(ctx context.Context, origin, s gomatrixserverlib.ServerName, roomID, userID string, roomVersions []gomatrixserverlib.RoomVersion) (res fclient.RespMakeJoin, err error) MakeJoin(ctx context.Context, origin, s gomatrixserverlib.ServerName, roomID, userID string, roomVersions []gomatrixserverlib.RoomVersion) (res fclient.RespMakeJoin, err error)
SendJoin(ctx context.Context, origin, s gomatrixserverlib.ServerName, event *gomatrixserverlib.Event) (res fclient.RespSendJoin, err error) SendJoin(ctx context.Context, origin, s gomatrixserverlib.ServerName, event *gomatrixserverlib.Event) (res fclient.RespSendJoin, err error)
SendJoinPowerDAG(ctx context.Context, origin, s gomatrixserverlib.ServerName, event *gomatrixserverlib.Event) (res fclient.RespSendJoinPowerDAG, err error)
MakeLeave(ctx context.Context, origin, s gomatrixserverlib.ServerName, roomID, userID string) (res fclient.RespMakeLeave, err error) MakeLeave(ctx context.Context, origin, s gomatrixserverlib.ServerName, roomID, userID string) (res fclient.RespMakeLeave, err error)
SendLeave(ctx context.Context, origin, s gomatrixserverlib.ServerName, event *gomatrixserverlib.Event) (err error) SendLeave(ctx context.Context, origin, s gomatrixserverlib.ServerName, event *gomatrixserverlib.Event) (err error)
SendInviteV2(ctx context.Context, origin, s gomatrixserverlib.ServerName, request gomatrixserverlib.InviteV2Request) (res fclient.RespInviteV2, err error) SendInviteV2(ctx context.Context, origin, s gomatrixserverlib.ServerName, request gomatrixserverlib.InviteV2Request) (res fclient.RespInviteV2, err error)

View file

@ -16,6 +16,7 @@ import (
"github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/federationapi/consumers" "github.com/matrix-org/dendrite/federationapi/consumers"
"github.com/matrix-org/dendrite/federationapi/statistics" "github.com/matrix-org/dendrite/federationapi/statistics"
"github.com/matrix-org/dendrite/federationapi/types"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/version" "github.com/matrix-org/dendrite/roomserver/version"
) )
@ -198,6 +199,7 @@ func (r *FederationInternalAPI) performJoinUsingServer(
// "If not provided, the room version is assumed to be either "1" or "2"." // "If not provided, the room version is assumed to be either "1" or "2"."
// https://matrix.org/docs/spec/server_server/unstable#get-matrix-federation-v1-make-join-roomid-userid // https://matrix.org/docs/spec/server_server/unstable#get-matrix-federation-v1-make-join-roomid-userid
if respMakeJoin.RoomVersion == "" { if respMakeJoin.RoomVersion == "" {
// TODO: (PowerDAG) Handle this case for PowerEvents
respMakeJoin.RoomVersion = setDefaultRoomVersionFromJoinEvent(respMakeJoin.JoinEvent) respMakeJoin.RoomVersion = setDefaultRoomVersionFromJoinEvent(respMakeJoin.JoinEvent)
} }
if _, err = respMakeJoin.RoomVersion.EventFormat(); err != nil { if _, err = respMakeJoin.RoomVersion.EventFormat(); err != nil {
@ -216,6 +218,102 @@ func (r *FederationInternalAPI) performJoinUsingServer(
return fmt.Errorf("respMakeJoin.JoinEvent.Build: %w", err) return fmt.Errorf("respMakeJoin.JoinEvent.Build: %w", err)
} }
joinedHosts := []types.JoinedHost{}
if respMakeJoin.RoomVersion == gomatrixserverlib.RoomVersionPowerDAG {
respSendJoin, err := r.federation.SendJoinPowerDAG(
context.Background(),
origin,
serverName,
event,
)
if err != nil {
r.statistics.ForServer(serverName).Failure()
return fmt.Errorf("r.federation.SendJoin: %w", err)
}
r.statistics.ForServer(serverName).Success(statistics.SendDirect)
// If the remote server returned an event in the "event" key of
// the send_join request then we should use that instead. It may
// contain signatures that we don't know about.
if len(respSendJoin.Event) > 0 {
var remoteEvent *gomatrixserverlib.Event
remoteEvent, err = respSendJoin.Event.UntrustedEvent(respMakeJoin.RoomVersion)
if err == nil && isWellFormedMembershipEvent(
remoteEvent, roomID, userID,
) {
event = remoteEvent
}
}
// Sanity-check the join response to ensure that it has a create
// event, that the room version is known, etc.
powerEvents := respSendJoin.PowerEvents.UntrustedEvents(respMakeJoin.RoomVersion)
if err = checkEventsContainCreateEvent(powerEvents); err != nil {
return fmt.Errorf("sanityCheckPowerDAG: %w", err)
}
// Process the join response in a goroutine. The idea here is
// that we'll try and wait for as long as possible for the work
// to complete, but if the client does give up waiting, we'll
// still continue to process the join anyway so that we don't
// waste the effort.
// TODO: Can we expand Check here to return a list of missing auth
// events rather than failing one at a time?
var respState gomatrixserverlib.StateResponsePowerDAG
respState, err = gomatrixserverlib.CheckSendJoinResponsePowerDAG(
context.Background(),
respMakeJoin.RoomVersion, &respSendJoin,
r.keyRing,
event,
federatedEventProvider(ctx, r.federation, r.keyRing, origin, serverName),
)
if err != nil {
return fmt.Errorf("respSendJoin.Check: %w", err)
}
// We need to immediately update our list of joined hosts for this room now as we are technically
// joined. We must do this synchronously: we cannot rely on the roomserver output events as they
// will happen asyncly. If we don't update this table, you can end up with bad failure modes like
// joining a room, waiting for 200 OK then changing device keys and have those keys not be sent
// to other servers (this was a cause of a flakey sytest "Local device key changes get to remote servers")
// The events are trusted now as we performed auth checks above.
joinedHosts, err = consumers.JoinedHostsFromEvents(respState.GetStateEvents().TrustedEvents(respMakeJoin.RoomVersion, false))
if err != nil {
return fmt.Errorf("JoinedHostsFromEvents: failed to get joined hosts: %s", err)
}
logrus.WithField("room", roomID).Infof("Joined federated room with %d hosts", len(joinedHosts))
if _, err = r.db.UpdateRoom(context.Background(), roomID, joinedHosts, nil, true); err != nil {
return fmt.Errorf("UpdatedRoom: failed to update room with joined hosts: %s", err)
}
// If we successfully performed a send_join above then the other
// server now thinks we're a part of the room. Send the newly
// returned state to the roomserver to update our local view.
if unsigned != nil {
event, err = event.SetUnsigned(unsigned)
if err != nil {
// non-fatal, log and continue
logrus.WithError(err).Errorf("Failed to set unsigned content")
}
}
// TODO: (PowerDAG) Send join event to roomserver
//if err = roomserverAPI.SendEventWithState(
// context.Background(),
// r.rsAPI,
// origin,
// roomserverAPI.KindNew,
// respState,
// event.Headered(respMakeJoin.RoomVersion),
// serverName,
// nil,
// false,
//); err != nil {
// return fmt.Errorf("roomserverAPI.SendEventWithState: %w", err)
//}
} else { // RoomVersionV1 -> RoomVersionV10
// Try to perform a send_join using the newly built event. // Try to perform a send_join using the newly built event.
respSendJoin, err := r.federation.SendJoin( respSendJoin, err := r.federation.SendJoin(
context.Background(), context.Background(),
@ -245,7 +343,7 @@ func (r *FederationInternalAPI) performJoinUsingServer(
// Sanity-check the join response to ensure that it has a create // Sanity-check the join response to ensure that it has a create
// event, that the room version is known, etc. // event, that the room version is known, etc.
authEvents := respSendJoin.AuthEvents.UntrustedEvents(respMakeJoin.RoomVersion) authEvents := respSendJoin.AuthEvents.UntrustedEvents(respMakeJoin.RoomVersion)
if err = sanityCheckAuthChain(authEvents); err != nil { if err = checkEventsContainCreateEvent(authEvents); err != nil {
return fmt.Errorf("sanityCheckAuthChain: %w", err) return fmt.Errorf("sanityCheckAuthChain: %w", err)
} }
@ -262,7 +360,7 @@ func (r *FederationInternalAPI) performJoinUsingServer(
respMakeJoin.RoomVersion, &respSendJoin, respMakeJoin.RoomVersion, &respSendJoin,
r.keyRing, r.keyRing,
event, event,
federatedAuthProvider(ctx, r.federation, r.keyRing, origin, serverName), federatedEventProvider(ctx, r.federation, r.keyRing, origin, serverName),
) )
if err != nil { if err != nil {
return fmt.Errorf("respSendJoin.Check: %w", err) return fmt.Errorf("respSendJoin.Check: %w", err)
@ -274,10 +372,11 @@ func (r *FederationInternalAPI) performJoinUsingServer(
// joining a room, waiting for 200 OK then changing device keys and have those keys not be sent // joining a room, waiting for 200 OK then changing device keys and have those keys not be sent
// to other servers (this was a cause of a flakey sytest "Local device key changes get to remote servers") // to other servers (this was a cause of a flakey sytest "Local device key changes get to remote servers")
// The events are trusted now as we performed auth checks above. // The events are trusted now as we performed auth checks above.
joinedHosts, err := consumers.JoinedHostsFromEvents(respState.GetStateEvents().TrustedEvents(respMakeJoin.RoomVersion, false)) joinedHosts, err = consumers.JoinedHostsFromEvents(respState.GetStateEvents().TrustedEvents(respMakeJoin.RoomVersion, false))
if err != nil { if err != nil {
return fmt.Errorf("JoinedHostsFromEvents: failed to get joined hosts: %s", err) return fmt.Errorf("JoinedHostsFromEvents: failed to get joined hosts: %s", err)
} }
logrus.WithField("room", roomID).Infof("Joined federated room with %d hosts", len(joinedHosts)) logrus.WithField("room", roomID).Infof("Joined federated room with %d hosts", len(joinedHosts))
if _, err = r.db.UpdateRoom(context.Background(), roomID, joinedHosts, nil, true); err != nil { if _, err = r.db.UpdateRoom(context.Background(), roomID, joinedHosts, nil, true); err != nil {
return fmt.Errorf("UpdatedRoom: failed to update room with joined hosts: %s", err) return fmt.Errorf("UpdatedRoom: failed to update room with joined hosts: %s", err)
@ -307,6 +406,7 @@ func (r *FederationInternalAPI) performJoinUsingServer(
); err != nil { ); err != nil {
return fmt.Errorf("roomserverAPI.SendEventWithState: %w", err) return fmt.Errorf("roomserverAPI.SendEventWithState: %w", err)
} }
}
return nil return nil
} }
@ -473,12 +573,12 @@ func (r *FederationInternalAPI) performOutboundPeekUsingServer(
// authenticate the state returned (check its auth events etc) // authenticate the state returned (check its auth events etc)
// the equivalent of CheckSendJoinResponse() // the equivalent of CheckSendJoinResponse()
authEvents, stateEvents, err := gomatrixserverlib.CheckStateResponse( authEvents, stateEvents, err := gomatrixserverlib.CheckStateResponse(
ctx, &respPeek, respPeek.RoomVersion, r.keyRing, federatedAuthProvider(ctx, r.federation, r.keyRing, r.cfg.Matrix.ServerName, serverName), ctx, &respPeek, respPeek.RoomVersion, r.keyRing, federatedEventProvider(ctx, r.federation, r.keyRing, r.cfg.Matrix.ServerName, serverName),
) )
if err != nil { if err != nil {
return fmt.Errorf("error checking state returned from peeking: %w", err) return fmt.Errorf("error checking state returned from peeking: %w", err)
} }
if err = sanityCheckAuthChain(authEvents); err != nil { if err = checkEventsContainCreateEvent(authEvents); err != nil {
return fmt.Errorf("sanityCheckAuthChain: %w", err) return fmt.Errorf("sanityCheckAuthChain: %w", err)
} }
@ -712,9 +812,9 @@ func (r *FederationInternalAPI) MarkServersAlive(destinations []gomatrixserverli
} }
} }
func sanityCheckAuthChain(authChain []*gomatrixserverlib.Event) error { func checkEventsContainCreateEvent(events []*gomatrixserverlib.Event) error {
// sanity check we have a create event and it has a known room version // sanity check we have a create event and it has a known room version
for _, ev := range authChain { for _, ev := range events {
if ev.Type() == gomatrixserverlib.MRoomCreate && ev.StateKeyEquals("") { if ev.Type() == gomatrixserverlib.MRoomCreate && ev.StateKeyEquals("") {
// make sure the room version is known // make sure the room version is known
content := ev.Content() content := ev.Content()
@ -732,12 +832,12 @@ func sanityCheckAuthChain(authChain []*gomatrixserverlib.Event) error {
} }
knownVersions := gomatrixserverlib.RoomVersions() knownVersions := gomatrixserverlib.RoomVersions()
if _, ok := knownVersions[gomatrixserverlib.RoomVersion(verBody.Version)]; !ok { if _, ok := knownVersions[gomatrixserverlib.RoomVersion(verBody.Version)]; !ok {
return fmt.Errorf("auth chain m.room.create event has an unknown room version: %s", verBody.Version) return fmt.Errorf("m.room.create event has an unknown room version: %s", verBody.Version)
} }
return nil return nil
} }
} }
return fmt.Errorf("auth chain response is missing m.room.create event") return fmt.Errorf("response is missing m.room.create event")
} }
func setDefaultRoomVersionFromJoinEvent( func setDefaultRoomVersionFromJoinEvent(
@ -764,17 +864,17 @@ func setDefaultRoomVersionFromJoinEvent(
return gomatrixserverlib.RoomVersionV4 return gomatrixserverlib.RoomVersionV4
} }
// FederatedAuthProvider is an auth chain provider which fetches events from the server provided // federatedEventProvider is an event provider which fetches events from the server provided
func federatedAuthProvider( func federatedEventProvider(
ctx context.Context, federation api.FederationClient, ctx context.Context, federation api.FederationClient,
keyRing gomatrixserverlib.JSONVerifier, origin, server gomatrixserverlib.ServerName, keyRing gomatrixserverlib.JSONVerifier, origin, server gomatrixserverlib.ServerName,
) gomatrixserverlib.AuthChainProvider { ) gomatrixserverlib.EventProvider {
// A list of events that we have retried, if they were not included in // A list of events that we have retried, if they were not included in
// the auth events supplied in the send_join. // the events supplied in the send_join.
retries := map[string][]*gomatrixserverlib.Event{} retries := map[string][]*gomatrixserverlib.Event{}
// Define a function which we can pass to Check to retrieve missing // Define a function which we can pass to Check to retrieve missing
// auth events inline. This greatly increases our chances of not having // events inline. This greatly increases our chances of not having
// to repeat the entire set of checks just for a missing event or two. // to repeat the entire set of checks just for a missing event or two.
return func(roomVersion gomatrixserverlib.RoomVersion, eventIDs []string) ([]*gomatrixserverlib.Event, error) { return func(roomVersion gomatrixserverlib.RoomVersion, eventIDs []string) ([]*gomatrixserverlib.Event, error) {
returning := []*gomatrixserverlib.Event{} returning := []*gomatrixserverlib.Event{}
@ -785,7 +885,7 @@ func federatedAuthProvider(
// just append the results. We won't retry the request. // just append the results. We won't retry the request.
if retry, ok := retries[eventID]; ok { if retry, ok := retries[eventID]; ok {
if retry == nil { if retry == nil {
return nil, fmt.Errorf("missingAuth: not retrying failed event ID %q", eventID) return nil, fmt.Errorf("missingEvent: not retrying failed event ID %q", eventID)
} }
returning = append(returning, retry...) returning = append(returning, retry...)
continue continue
@ -799,7 +899,7 @@ func federatedAuthProvider(
// join response. // join response.
tx, txerr := federation.GetEvent(ctx, origin, server, eventID) tx, txerr := federation.GetEvent(ctx, origin, server, eventID)
if txerr != nil { if txerr != nil {
return nil, fmt.Errorf("missingAuth r.federation.GetEvent: %w", txerr) return nil, fmt.Errorf("missingEvent r.federation.GetEvent: %w", txerr)
} }
// For each event returned, add it to the set of return events. We // For each event returned, add it to the set of return events. We
@ -809,12 +909,12 @@ func federatedAuthProvider(
// Try to parse the event. // Try to parse the event.
ev, everr := gomatrixserverlib.NewEventFromUntrustedJSON(pdu, roomVersion) ev, everr := gomatrixserverlib.NewEventFromUntrustedJSON(pdu, roomVersion)
if everr != nil { if everr != nil {
return nil, fmt.Errorf("missingAuth gomatrixserverlib.NewEventFromUntrustedJSON: %w", everr) return nil, fmt.Errorf("missingEvent gomatrixserverlib.NewEventFromUntrustedJSON: %w", everr)
} }
// Check the signatures of the event. // Check the signatures of the event.
if err := ev.VerifyEventSignatures(ctx, keyRing); err != nil { if err := ev.VerifyEventSignatures(ctx, keyRing); err != nil {
return nil, fmt.Errorf("missingAuth VerifyEventSignatures: %w", err) return nil, fmt.Errorf("missingEvent VerifyEventSignatures: %w", err)
} }
// If the event is OK then add it to the results and the retry map. // If the event is OK then add it to the results and the retry map.