From 6d0b780922e86ccea5d3306656d8f2df48169416 Mon Sep 17 00:00:00 2001 From: Devon Hudson Date: Wed, 19 Apr 2023 17:05:39 -0600 Subject: [PATCH] Rough in federation PerformJoin for power DAGs --- federationapi/api/api.go | 1 + federationapi/internal/perform.go | 302 ++++++++++++++++++++---------- 2 files changed, 202 insertions(+), 101 deletions(-) diff --git a/federationapi/api/api.go b/federationapi/api/api.go index e23bec271..52a92845d 100644 --- a/federationapi/api/api.go +++ b/federationapi/api/api.go @@ -115,6 +115,7 @@ type FederationClient interface { Peek(ctx context.Context, origin, s gomatrixserverlib.ServerName, roomID, peekID string, roomVersions []gomatrixserverlib.RoomVersion) (res fclient.RespPeek, err error) MakeJoin(ctx context.Context, origin, s gomatrixserverlib.ServerName, roomID, userID string, roomVersions []gomatrixserverlib.RoomVersion) (res fclient.RespMakeJoin, err error) SendJoin(ctx context.Context, origin, s gomatrixserverlib.ServerName, event *gomatrixserverlib.Event) (res fclient.RespSendJoin, err error) + SendJoinPowerDAG(ctx context.Context, origin, s gomatrixserverlib.ServerName, event *gomatrixserverlib.Event) (res fclient.RespSendJoinPowerDAG, err error) MakeLeave(ctx context.Context, origin, s gomatrixserverlib.ServerName, roomID, userID string) (res fclient.RespMakeLeave, err error) SendLeave(ctx context.Context, origin, s gomatrixserverlib.ServerName, event *gomatrixserverlib.Event) (err error) SendInviteV2(ctx context.Context, origin, s gomatrixserverlib.ServerName, request gomatrixserverlib.InviteV2Request) (res fclient.RespInviteV2, err error) diff --git a/federationapi/internal/perform.go b/federationapi/internal/perform.go index c580e5275..8596bc604 100644 --- a/federationapi/internal/perform.go +++ b/federationapi/internal/perform.go @@ -16,6 +16,7 @@ import ( "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/federationapi/consumers" "github.com/matrix-org/dendrite/federationapi/statistics" + "github.com/matrix-org/dendrite/federationapi/types" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/version" ) @@ -198,6 +199,7 @@ func (r *FederationInternalAPI) performJoinUsingServer( // "If not provided, the room version is assumed to be either "1" or "2"." // https://matrix.org/docs/spec/server_server/unstable#get-matrix-federation-v1-make-join-roomid-userid if respMakeJoin.RoomVersion == "" { + // TODO: (PowerDAG) Handle this case for PowerEvents respMakeJoin.RoomVersion = setDefaultRoomVersionFromJoinEvent(respMakeJoin.JoinEvent) } if _, err = respMakeJoin.RoomVersion.EventFormat(); err != nil { @@ -216,96 +218,194 @@ func (r *FederationInternalAPI) performJoinUsingServer( return fmt.Errorf("respMakeJoin.JoinEvent.Build: %w", err) } - // Try to perform a send_join using the newly built event. - respSendJoin, err := r.federation.SendJoin( - context.Background(), - origin, - serverName, - event, - ) - if err != nil { - r.statistics.ForServer(serverName).Failure() - return fmt.Errorf("r.federation.SendJoin: %w", err) - } - r.statistics.ForServer(serverName).Success(statistics.SendDirect) + joinedHosts := []types.JoinedHost{} + if respMakeJoin.RoomVersion == gomatrixserverlib.RoomVersionPowerDAG { + respSendJoin, err := r.federation.SendJoinPowerDAG( + context.Background(), + origin, + serverName, + event, + ) - // If the remote server returned an event in the "event" key of - // the send_join request then we should use that instead. It may - // contain signatures that we don't know about. - if len(respSendJoin.Event) > 0 { - var remoteEvent *gomatrixserverlib.Event - remoteEvent, err = respSendJoin.Event.UntrustedEvent(respMakeJoin.RoomVersion) - if err == nil && isWellFormedMembershipEvent( - remoteEvent, roomID, userID, - ) { - event = remoteEvent - } - } - - // Sanity-check the join response to ensure that it has a create - // event, that the room version is known, etc. - authEvents := respSendJoin.AuthEvents.UntrustedEvents(respMakeJoin.RoomVersion) - if err = sanityCheckAuthChain(authEvents); err != nil { - return fmt.Errorf("sanityCheckAuthChain: %w", err) - } - - // Process the join response in a goroutine. The idea here is - // that we'll try and wait for as long as possible for the work - // to complete, but if the client does give up waiting, we'll - // still continue to process the join anyway so that we don't - // waste the effort. - // TODO: Can we expand Check here to return a list of missing auth - // events rather than failing one at a time? - var respState gomatrixserverlib.StateResponse - respState, err = gomatrixserverlib.CheckSendJoinResponse( - context.Background(), - respMakeJoin.RoomVersion, &respSendJoin, - r.keyRing, - event, - federatedAuthProvider(ctx, r.federation, r.keyRing, origin, serverName), - ) - if err != nil { - return fmt.Errorf("respSendJoin.Check: %w", err) - } - - // We need to immediately update our list of joined hosts for this room now as we are technically - // joined. We must do this synchronously: we cannot rely on the roomserver output events as they - // will happen asyncly. If we don't update this table, you can end up with bad failure modes like - // joining a room, waiting for 200 OK then changing device keys and have those keys not be sent - // to other servers (this was a cause of a flakey sytest "Local device key changes get to remote servers") - // The events are trusted now as we performed auth checks above. - joinedHosts, err := consumers.JoinedHostsFromEvents(respState.GetStateEvents().TrustedEvents(respMakeJoin.RoomVersion, false)) - if err != nil { - return fmt.Errorf("JoinedHostsFromEvents: failed to get joined hosts: %s", err) - } - logrus.WithField("room", roomID).Infof("Joined federated room with %d hosts", len(joinedHosts)) - if _, err = r.db.UpdateRoom(context.Background(), roomID, joinedHosts, nil, true); err != nil { - return fmt.Errorf("UpdatedRoom: failed to update room with joined hosts: %s", err) - } - - // If we successfully performed a send_join above then the other - // server now thinks we're a part of the room. Send the newly - // returned state to the roomserver to update our local view. - if unsigned != nil { - event, err = event.SetUnsigned(unsigned) if err != nil { - // non-fatal, log and continue - logrus.WithError(err).Errorf("Failed to set unsigned content") + r.statistics.ForServer(serverName).Failure() + return fmt.Errorf("r.federation.SendJoin: %w", err) } - } + r.statistics.ForServer(serverName).Success(statistics.SendDirect) - if err = roomserverAPI.SendEventWithState( - context.Background(), - r.rsAPI, - origin, - roomserverAPI.KindNew, - respState, - event.Headered(respMakeJoin.RoomVersion), - serverName, - nil, - false, - ); err != nil { - return fmt.Errorf("roomserverAPI.SendEventWithState: %w", err) + // If the remote server returned an event in the "event" key of + // the send_join request then we should use that instead. It may + // contain signatures that we don't know about. + if len(respSendJoin.Event) > 0 { + var remoteEvent *gomatrixserverlib.Event + remoteEvent, err = respSendJoin.Event.UntrustedEvent(respMakeJoin.RoomVersion) + if err == nil && isWellFormedMembershipEvent( + remoteEvent, roomID, userID, + ) { + event = remoteEvent + } + } + + // Sanity-check the join response to ensure that it has a create + // event, that the room version is known, etc. + powerEvents := respSendJoin.PowerEvents.UntrustedEvents(respMakeJoin.RoomVersion) + if err = checkEventsContainCreateEvent(powerEvents); err != nil { + return fmt.Errorf("sanityCheckPowerDAG: %w", err) + } + + // Process the join response in a goroutine. The idea here is + // that we'll try and wait for as long as possible for the work + // to complete, but if the client does give up waiting, we'll + // still continue to process the join anyway so that we don't + // waste the effort. + // TODO: Can we expand Check here to return a list of missing auth + // events rather than failing one at a time? + var respState gomatrixserverlib.StateResponsePowerDAG + respState, err = gomatrixserverlib.CheckSendJoinResponsePowerDAG( + context.Background(), + respMakeJoin.RoomVersion, &respSendJoin, + r.keyRing, + event, + federatedEventProvider(ctx, r.federation, r.keyRing, origin, serverName), + ) + if err != nil { + return fmt.Errorf("respSendJoin.Check: %w", err) + } + + // We need to immediately update our list of joined hosts for this room now as we are technically + // joined. We must do this synchronously: we cannot rely on the roomserver output events as they + // will happen asyncly. If we don't update this table, you can end up with bad failure modes like + // joining a room, waiting for 200 OK then changing device keys and have those keys not be sent + // to other servers (this was a cause of a flakey sytest "Local device key changes get to remote servers") + // The events are trusted now as we performed auth checks above. + joinedHosts, err = consumers.JoinedHostsFromEvents(respState.GetStateEvents().TrustedEvents(respMakeJoin.RoomVersion, false)) + if err != nil { + return fmt.Errorf("JoinedHostsFromEvents: failed to get joined hosts: %s", err) + } + + logrus.WithField("room", roomID).Infof("Joined federated room with %d hosts", len(joinedHosts)) + if _, err = r.db.UpdateRoom(context.Background(), roomID, joinedHosts, nil, true); err != nil { + return fmt.Errorf("UpdatedRoom: failed to update room with joined hosts: %s", err) + } + + // If we successfully performed a send_join above then the other + // server now thinks we're a part of the room. Send the newly + // returned state to the roomserver to update our local view. + if unsigned != nil { + event, err = event.SetUnsigned(unsigned) + if err != nil { + // non-fatal, log and continue + logrus.WithError(err).Errorf("Failed to set unsigned content") + } + } + + // TODO: (PowerDAG) Send join event to roomserver + //if err = roomserverAPI.SendEventWithState( + // context.Background(), + // r.rsAPI, + // origin, + // roomserverAPI.KindNew, + // respState, + // event.Headered(respMakeJoin.RoomVersion), + // serverName, + // nil, + // false, + //); err != nil { + // return fmt.Errorf("roomserverAPI.SendEventWithState: %w", err) + //} + } else { // RoomVersionV1 -> RoomVersionV10 + // Try to perform a send_join using the newly built event. + respSendJoin, err := r.federation.SendJoin( + context.Background(), + origin, + serverName, + event, + ) + if err != nil { + r.statistics.ForServer(serverName).Failure() + return fmt.Errorf("r.federation.SendJoin: %w", err) + } + r.statistics.ForServer(serverName).Success(statistics.SendDirect) + + // If the remote server returned an event in the "event" key of + // the send_join request then we should use that instead. It may + // contain signatures that we don't know about. + if len(respSendJoin.Event) > 0 { + var remoteEvent *gomatrixserverlib.Event + remoteEvent, err = respSendJoin.Event.UntrustedEvent(respMakeJoin.RoomVersion) + if err == nil && isWellFormedMembershipEvent( + remoteEvent, roomID, userID, + ) { + event = remoteEvent + } + } + + // Sanity-check the join response to ensure that it has a create + // event, that the room version is known, etc. + authEvents := respSendJoin.AuthEvents.UntrustedEvents(respMakeJoin.RoomVersion) + if err = checkEventsContainCreateEvent(authEvents); err != nil { + return fmt.Errorf("sanityCheckAuthChain: %w", err) + } + + // Process the join response in a goroutine. The idea here is + // that we'll try and wait for as long as possible for the work + // to complete, but if the client does give up waiting, we'll + // still continue to process the join anyway so that we don't + // waste the effort. + // TODO: Can we expand Check here to return a list of missing auth + // events rather than failing one at a time? + var respState gomatrixserverlib.StateResponse + respState, err = gomatrixserverlib.CheckSendJoinResponse( + context.Background(), + respMakeJoin.RoomVersion, &respSendJoin, + r.keyRing, + event, + federatedEventProvider(ctx, r.federation, r.keyRing, origin, serverName), + ) + if err != nil { + return fmt.Errorf("respSendJoin.Check: %w", err) + } + + // We need to immediately update our list of joined hosts for this room now as we are technically + // joined. We must do this synchronously: we cannot rely on the roomserver output events as they + // will happen asyncly. If we don't update this table, you can end up with bad failure modes like + // joining a room, waiting for 200 OK then changing device keys and have those keys not be sent + // to other servers (this was a cause of a flakey sytest "Local device key changes get to remote servers") + // The events are trusted now as we performed auth checks above. + joinedHosts, err = consumers.JoinedHostsFromEvents(respState.GetStateEvents().TrustedEvents(respMakeJoin.RoomVersion, false)) + if err != nil { + return fmt.Errorf("JoinedHostsFromEvents: failed to get joined hosts: %s", err) + } + + logrus.WithField("room", roomID).Infof("Joined federated room with %d hosts", len(joinedHosts)) + if _, err = r.db.UpdateRoom(context.Background(), roomID, joinedHosts, nil, true); err != nil { + return fmt.Errorf("UpdatedRoom: failed to update room with joined hosts: %s", err) + } + + // If we successfully performed a send_join above then the other + // server now thinks we're a part of the room. Send the newly + // returned state to the roomserver to update our local view. + if unsigned != nil { + event, err = event.SetUnsigned(unsigned) + if err != nil { + // non-fatal, log and continue + logrus.WithError(err).Errorf("Failed to set unsigned content") + } + } + + if err = roomserverAPI.SendEventWithState( + context.Background(), + r.rsAPI, + origin, + roomserverAPI.KindNew, + respState, + event.Headered(respMakeJoin.RoomVersion), + serverName, + nil, + false, + ); err != nil { + return fmt.Errorf("roomserverAPI.SendEventWithState: %w", err) + } } return nil @@ -473,12 +573,12 @@ func (r *FederationInternalAPI) performOutboundPeekUsingServer( // authenticate the state returned (check its auth events etc) // the equivalent of CheckSendJoinResponse() authEvents, stateEvents, err := gomatrixserverlib.CheckStateResponse( - ctx, &respPeek, respPeek.RoomVersion, r.keyRing, federatedAuthProvider(ctx, r.federation, r.keyRing, r.cfg.Matrix.ServerName, serverName), + ctx, &respPeek, respPeek.RoomVersion, r.keyRing, federatedEventProvider(ctx, r.federation, r.keyRing, r.cfg.Matrix.ServerName, serverName), ) if err != nil { return fmt.Errorf("error checking state returned from peeking: %w", err) } - if err = sanityCheckAuthChain(authEvents); err != nil { + if err = checkEventsContainCreateEvent(authEvents); err != nil { return fmt.Errorf("sanityCheckAuthChain: %w", err) } @@ -712,9 +812,9 @@ func (r *FederationInternalAPI) MarkServersAlive(destinations []gomatrixserverli } } -func sanityCheckAuthChain(authChain []*gomatrixserverlib.Event) error { +func checkEventsContainCreateEvent(events []*gomatrixserverlib.Event) error { // sanity check we have a create event and it has a known room version - for _, ev := range authChain { + for _, ev := range events { if ev.Type() == gomatrixserverlib.MRoomCreate && ev.StateKeyEquals("") { // make sure the room version is known content := ev.Content() @@ -732,12 +832,12 @@ func sanityCheckAuthChain(authChain []*gomatrixserverlib.Event) error { } knownVersions := gomatrixserverlib.RoomVersions() if _, ok := knownVersions[gomatrixserverlib.RoomVersion(verBody.Version)]; !ok { - return fmt.Errorf("auth chain m.room.create event has an unknown room version: %s", verBody.Version) + return fmt.Errorf("m.room.create event has an unknown room version: %s", verBody.Version) } return nil } } - return fmt.Errorf("auth chain response is missing m.room.create event") + return fmt.Errorf("response is missing m.room.create event") } func setDefaultRoomVersionFromJoinEvent( @@ -764,17 +864,17 @@ func setDefaultRoomVersionFromJoinEvent( return gomatrixserverlib.RoomVersionV4 } -// FederatedAuthProvider is an auth chain provider which fetches events from the server provided -func federatedAuthProvider( +// federatedEventProvider is an event provider which fetches events from the server provided +func federatedEventProvider( ctx context.Context, federation api.FederationClient, keyRing gomatrixserverlib.JSONVerifier, origin, server gomatrixserverlib.ServerName, -) gomatrixserverlib.AuthChainProvider { +) gomatrixserverlib.EventProvider { // A list of events that we have retried, if they were not included in - // the auth events supplied in the send_join. + // the events supplied in the send_join. retries := map[string][]*gomatrixserverlib.Event{} // Define a function which we can pass to Check to retrieve missing - // auth events inline. This greatly increases our chances of not having + // events inline. This greatly increases our chances of not having // to repeat the entire set of checks just for a missing event or two. return func(roomVersion gomatrixserverlib.RoomVersion, eventIDs []string) ([]*gomatrixserverlib.Event, error) { returning := []*gomatrixserverlib.Event{} @@ -785,7 +885,7 @@ func federatedAuthProvider( // just append the results. We won't retry the request. if retry, ok := retries[eventID]; ok { if retry == nil { - return nil, fmt.Errorf("missingAuth: not retrying failed event ID %q", eventID) + return nil, fmt.Errorf("missingEvent: not retrying failed event ID %q", eventID) } returning = append(returning, retry...) continue @@ -799,7 +899,7 @@ func federatedAuthProvider( // join response. tx, txerr := federation.GetEvent(ctx, origin, server, eventID) if txerr != nil { - return nil, fmt.Errorf("missingAuth r.federation.GetEvent: %w", txerr) + return nil, fmt.Errorf("missingEvent r.federation.GetEvent: %w", txerr) } // For each event returned, add it to the set of return events. We @@ -809,12 +909,12 @@ func federatedAuthProvider( // Try to parse the event. ev, everr := gomatrixserverlib.NewEventFromUntrustedJSON(pdu, roomVersion) if everr != nil { - return nil, fmt.Errorf("missingAuth gomatrixserverlib.NewEventFromUntrustedJSON: %w", everr) + return nil, fmt.Errorf("missingEvent gomatrixserverlib.NewEventFromUntrustedJSON: %w", everr) } // Check the signatures of the event. if err := ev.VerifyEventSignatures(ctx, keyRing); err != nil { - return nil, fmt.Errorf("missingAuth VerifyEventSignatures: %w", err) + return nil, fmt.Errorf("missingEvent VerifyEventSignatures: %w", err) } // If the event is OK then add it to the results and the retry map.