From f17de49c6b4a2e0746f992fad40bcad6fad08034 Mon Sep 17 00:00:00 2001 From: Devon Hudson Date: Thu, 12 Oct 2023 17:41:45 -0600 Subject: [PATCH] CryptoID joins --- clientapi/routing/createroom.go | 47 --- clientapi/routing/joinroom.go | 125 +++++++ clientapi/routing/routing.go | 30 +- clientapi/routing/send_pdus.go | 59 +++- federationapi/api/api.go | 11 + federationapi/internal/perform.go | 314 +++++++++++++++++ roomserver/api/api.go | 3 + roomserver/api/perform.go | 10 + roomserver/internal/perform/perform_join.go | 368 ++++++++++++++++++++ 9 files changed, 910 insertions(+), 57 deletions(-) diff --git a/clientapi/routing/createroom.go b/clientapi/routing/createroom.go index a1d9004db..8d9a235c9 100644 --- a/clientapi/routing/createroom.go +++ b/clientapi/routing/createroom.go @@ -259,53 +259,6 @@ func ToProtoEvents(ctx context.Context, events []gomatrixserverlib.PDU, rsAPI ro result := make([]json.RawMessage, len(events)) for i, event := range events { result[i] = json.RawMessage(event.JSON()) - //fmt.Printf("\nProcessing %s event (%s)\n", events[i].Type(), events[i].EventID()) - //var rawJson interface{} - //json.Unmarshal(events[i].JSON(), &rawJson) - //fmt.Printf("JSON: %+v\n", rawJson) - //result[i] = gomatrixserverlib.ProtoEvent{ - // SenderID: string(events[i].SenderID()), - // RoomID: events[i].RoomID().String(), - // Type: events[i].Type(), - // StateKey: events[i].StateKey(), - // PrevEvents: events[i].PrevEventIDs(), - // AuthEvents: events[i].AuthEventIDs(), - // Redacts: events[i].Redacts(), - // Depth: events[i].Depth(), - // Content: events[i].Content(), - // Unsigned: events[i].Unsigned(), - // Hashes: events[i].Hashes(), - // OriginServerTimestamp: events[i].OriginServerTS(), - //} - - //roomVersion, _ := rsAPI.QueryRoomVersionForRoom(ctx, events[i].RoomID().String()) - //verImpl, _ := gomatrixserverlib.GetRoomVersion(roomVersion) - //eventJSON, err := json.Marshal(result[i]) - //if err != nil { - // util.GetLogger(ctx).WithError(err).Error("failed marshalling event") - // continue - //} - //pdu, err := verImpl.NewEventFromUntrustedJSON(eventJSON) - //if err != nil { - // util.GetLogger(ctx).WithError(err).Error("failed making event from json") - // continue - //} - //fmt.Printf("\nProcessing %s event (%s) - PDU\n", result[i].Type, pdu.EventID()) - //fmt.Printf(" EventID: %v - %v\n", events[i].EventID(), pdu.EventID()) - //fmt.Printf(" SenderID: %s - %s\n", events[i].SenderID(), pdu.SenderID()) - //fmt.Printf(" RoomID: %s - %s\n", events[i].RoomID().String(), pdu.RoomID().String()) - //fmt.Printf(" Type: %s - %s\n", events[i].Type(), pdu.Type()) - //fmt.Printf(" StateKey: %s - %s\n", *events[i].StateKey(), *pdu.StateKey()) - //fmt.Printf(" PrevEvents: %v - %v\n", events[i].PrevEventIDs(), pdu.PrevEventIDs()) - //fmt.Printf(" AuthEvents: %v - %v\n", events[i].AuthEventIDs(), pdu.AuthEventIDs()) - //fmt.Printf(" Redacts: %s - %s\n", events[i].Redacts(), pdu.Redacts()) - //fmt.Printf(" Depth: %d - %d\n", events[i].Depth(), pdu.Depth()) - //fmt.Printf(" Content: %v - %v\n", events[i].Content(), pdu.Content()) - //fmt.Printf(" Unsigned: %v - %v\n", events[i].Unsigned(), pdu.Unsigned()) - //fmt.Printf(" Hashes: %v - %v\n", events[i].Hashes(), pdu.Hashes()) - //fmt.Printf(" OriginServerTS: %d - %d\n", events[i].OriginServerTS(), pdu.OriginServerTS()) - //json.Unmarshal(eventJSON, &rawJson) - //fmt.Printf("JSON: %+v\n", rawJson) } return result } diff --git a/clientapi/routing/joinroom.go b/clientapi/routing/joinroom.go index 43331b42a..5768d14c5 100644 --- a/clientapi/routing/joinroom.go +++ b/clientapi/routing/joinroom.go @@ -144,3 +144,128 @@ func JoinRoomByIDOrAlias( return result } } + +type joinRoomCryptoIDsResponse struct { + RoomID string `json:"room_id"` + Version string `json:"room_version"` + ViaServer string `json:"via_server"` + PDU json.RawMessage `json:"pdu"` +} + +func JoinRoomByIDOrAliasCryptoIDs( + req *http.Request, + device *api.Device, + rsAPI roomserverAPI.ClientRoomserverAPI, + profileAPI api.ClientUserAPI, + roomIDOrAlias string, +) util.JSONResponse { + // Prepare to ask the roomserver to perform the room join. + joinReq := roomserverAPI.PerformJoinRequest{ + RoomIDOrAlias: roomIDOrAlias, + UserID: device.UserID, + IsGuest: device.AccountType == api.AccountTypeGuest, + Content: map[string]interface{}{}, + } + + // Check to see if any ?server_name= query parameters were + // given in the request. + if serverNames, ok := req.URL.Query()["server_name"]; ok { + for _, serverName := range serverNames { + joinReq.ServerNames = append( + joinReq.ServerNames, + spec.ServerName(serverName), + ) + } + } + + // If content was provided in the request then include that + // in the request. It'll get used as a part of the membership + // event content. + _ = httputil.UnmarshalJSONRequest(req, &joinReq.Content) + + // Work out our localpart for the client profile request. + + // Request our profile content to populate the request content with. + profile, err := profileAPI.QueryProfile(req.Context(), device.UserID) + + switch err { + case nil: + joinReq.Content["displayname"] = profile.DisplayName + joinReq.Content["avatar_url"] = profile.AvatarURL + case appserviceAPI.ErrProfileNotExists: + util.GetLogger(req.Context()).Error("Unable to query user profile, no profile found.") + return util.JSONResponse{ + Code: http.StatusInternalServerError, + JSON: spec.Unknown("Unable to query user profile, no profile found."), + } + default: + } + + // Ask the roomserver to perform the join. + done := make(chan util.JSONResponse, 1) + go func() { + defer close(done) + joinEvent, roomID, version, serverName, err := rsAPI.PerformJoinCryptoIDs(req.Context(), &joinReq) + var response util.JSONResponse + + switch e := err.(type) { + case nil: // success case + response = util.JSONResponse{ + Code: http.StatusOK, + JSON: joinRoomCryptoIDsResponse{ + RoomID: roomID, + Version: string(version), + ViaServer: string(serverName), + PDU: json.RawMessage(joinEvent.JSON()), + }, + } + case roomserverAPI.ErrInvalidID: + response = util.JSONResponse{ + Code: http.StatusBadRequest, + JSON: spec.Unknown(e.Error()), + } + case roomserverAPI.ErrNotAllowed: + jsonErr := spec.Forbidden(e.Error()) + if device.AccountType == api.AccountTypeGuest { + jsonErr = spec.GuestAccessForbidden(e.Error()) + } + response = util.JSONResponse{ + Code: http.StatusForbidden, + JSON: jsonErr, + } + case *gomatrix.HTTPError: // this ensures we proxy responses over federation to the client + response = util.JSONResponse{ + Code: e.Code, + JSON: json.RawMessage(e.Message), + } + case eventutil.ErrRoomNoExists: + response = util.JSONResponse{ + Code: http.StatusNotFound, + JSON: spec.NotFound(e.Error()), + } + default: + response = util.JSONResponse{ + Code: http.StatusInternalServerError, + JSON: spec.InternalServerError{}, + } + } + done <- response + }() + + // Wait either for the join to finish, or for us to hit a reasonable + // timeout, at which point we'll just return a 200 to placate clients. + timer := time.NewTimer(time.Second * 20) + select { + case <-timer.C: + return util.JSONResponse{ + Code: http.StatusRequestTimeout, + JSON: spec.Unknown("Failed creating join event with the remote server."), + } + case result := <-done: + // Stop and drain the timer + if !timer.Stop() { + <-timer.C + } + return result + } +} diff --git a/clientapi/routing/routing.go b/clientapi/routing/routing.go index 10b036815..2fac3df91 100644 --- a/clientapi/routing/routing.go +++ b/clientapi/routing/routing.go @@ -329,7 +329,7 @@ func Setup( return SendPDUs(req, device, cfg, userAPI, rsAPI, asAPI) }), ).Methods(http.MethodPost, http.MethodOptions) - // TODO: update for cryptoIDs + v3mux.Handle("/join/{roomIDOrAlias}", httputil.MakeAuthAPI(spec.Join, userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse { if r := rateLimits.Limit(req, device); r != nil { @@ -352,6 +352,28 @@ func Setup( return resp.(util.JSONResponse) }, httputil.WithAllowGuests()), ).Methods(http.MethodPost, http.MethodOptions) + unstableMux.Handle("/org.matrix.msc_cryptoids/join/{roomIDOrAlias}", + httputil.MakeAuthAPI(spec.Join, userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse { + if r := rateLimits.Limit(req, device); r != nil { + return *r + } + vars, err := httputil.URLDecodeMapValues(mux.Vars(req)) + if err != nil { + return util.ErrorResponse(err) + } + // Only execute a join for roomIDOrAlias and UserID once. If there is a join in progress + // it waits for it to complete and returns that result for subsequent requests. + resp, _, _ := sf.Do(vars["roomIDOrAlias"]+device.UserID, func() (any, error) { + return JoinRoomByIDOrAliasCryptoIDs( + req, device, rsAPI, userAPI, vars["roomIDOrAlias"], + ), nil + }) + // once all joins are processed, drop them from the cache. Further requests + // will be processed as usual. + sf.Forget(vars["roomIDOrAlias"] + device.UserID) + return resp.(util.JSONResponse) + }, httputil.WithAllowGuests()), + ).Methods(http.MethodPost, http.MethodOptions) if mscCfg.Enabled("msc2753") { // TODO: update for cryptoIDs @@ -620,7 +642,6 @@ func Setup( }), ).Methods(http.MethodGet, http.MethodOptions) - // TODO: update for cryptoIDs v3mux.Handle("/directory/list/room/{roomID}", httputil.MakeAuthAPI("directory_list", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse { vars, err := httputil.URLDecodeMapValues(mux.Vars(req)) @@ -630,7 +651,6 @@ func Setup( return SetVisibility(req, rsAPI, device, vars["roomID"]) }), ).Methods(http.MethodPut, http.MethodOptions) - // TODO: update for cryptoIDs v3mux.Handle("/directory/list/appservice/{networkID}/{roomID}", httputil.MakeAuthAPI("directory_list", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse { vars, err := httputil.URLDecodeMapValues(mux.Vars(req)) @@ -642,7 +662,6 @@ func Setup( ).Methods(http.MethodPut, http.MethodOptions) // Undocumented endpoint - // TODO: update for cryptoIDs v3mux.Handle("/directory/list/appservice/{networkID}/{roomID}", httputil.MakeAuthAPI("directory_list", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse { vars, err := httputil.URLDecodeMapValues(mux.Vars(req)) @@ -671,6 +690,7 @@ func Setup( }), ).Methods(http.MethodPost, http.MethodOptions) + // TODO: update for cryptoIDs v3mux.Handle("/rooms/{roomID}/typing/{userID}", httputil.MakeAuthAPI("rooms_typing", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse { if r := rateLimits.Limit(req, device); r != nil { @@ -1156,6 +1176,7 @@ func Setup( }), ).Methods(http.MethodPost, http.MethodOptions) + // TODO: update for cryptoIDs v3mux.Handle("/rooms/{roomID}/read_markers", httputil.MakeAuthAPI("rooms_read_markers", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse { if r := rateLimits.Limit(req, device); r != nil { @@ -1534,6 +1555,7 @@ func Setup( return SetReceipt(req, userAPI, syncProducer, device, vars["roomId"], vars["receiptType"], vars["eventId"]) }), ).Methods(http.MethodPost, http.MethodOptions) + // TODO: update for cryptoIDs v3mux.Handle("/presence/{userId}/status", httputil.MakeAuthAPI("set_presence", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse { vars, err := httputil.URLDecodeMapValues(mux.Vars(req)) diff --git a/clientapi/routing/send_pdus.go b/clientapi/routing/send_pdus.go index f073f0099..ad2a7119e 100644 --- a/clientapi/routing/send_pdus.go +++ b/clientapi/routing/send_pdus.go @@ -30,8 +30,9 @@ import ( ) type sendPDUsRequest struct { - Version string `json:"room_version"` - PDUs []json.RawMessage `json:"pdus"` + Version string `json:"room_version"` + ViaServer string `json:"via_server,omitempty"` + PDUs []json.RawMessage `json:"pdus"` } // SendPDUs implements /sendPDUs @@ -106,8 +107,53 @@ func SendPDUs( } pdu = pdu.Sign(string(pdu.SenderID()), "ed25519:1", key) util.GetLogger(req.Context()).Infof("Processing %s event (%s)", pdu.Type(), pdu.EventID()) + switch pdu.Type() { case spec.MRoomCreate: + case spec.MRoomMember: + var membership gomatrixserverlib.MemberContent + err = json.Unmarshal(pdu.Content(), &membership) + switch { + case err != nil: + util.GetLogger(req.Context()).Errorf("m.room.member event content invalid", pdu.Content(), pdu.EventID()) + continue + case membership.Membership == spec.Join: + deviceUserID, err := spec.NewUserID(device.UserID, true) + if err != nil { + return util.JSONResponse{ + Code: http.StatusForbidden, + JSON: spec.Forbidden("userID doesn't have power level to change visibility"), + } + } + queryReq := roomserverAPI.QueryMembershipForUserRequest{ + RoomID: pdu.RoomID().String(), + UserID: *deviceUserID, + } + var queryRes roomserverAPI.QueryMembershipForUserResponse + if err := rsAPI.QueryMembershipForUser(req.Context(), &queryReq, &queryRes); err != nil { + util.GetLogger(req.Context()).WithError(err).Error("rsAPI.QueryMembershipsForRoom failed") + return util.JSONResponse{ + Code: http.StatusInternalServerError, + JSON: spec.InternalServerError{}, + } + } + if !queryRes.IsInRoom { + // This is a join event to a remote room + // TODO: cryptoIDs - figure out how to obtain unsigned contents for outstanding federated invites + joinReq := roomserverAPI.PerformJoinRequestCryptoIDs{ + RoomID: pdu.RoomID().String(), + UserID: device.UserID, + IsGuest: device.AccountType == api.AccountTypeGuest, + ServerNames: []spec.ServerName{spec.ServerName(pdus.ViaServer)}, + JoinEvent: pdu, + } + err := rsAPI.PerformSendJoinCryptoIDs(req.Context(), &joinReq) + if err != nil { + util.GetLogger(req.Context()).Errorf("Failed processing %s event (%s): %v", pdu.Type(), pdu.EventID(), err) + } + continue // NOTE: don't send this event on to the roomserver + } + } } // TODO: cryptoIDs - does it matter which order these are added? @@ -118,10 +164,11 @@ func SendPDUs( // We should be doing this already as part of `SendInputRoomEvents`, but how should we pass this // failure back to the client? inputs = append(inputs, roomserverAPI.InputRoomEvent{ - Kind: roomserverAPI.KindNew, - Event: &types.HeaderedEvent{PDU: pdu}, - Origin: userID.Domain(), - SendAsServer: roomserverAPI.DoNotSendToOtherServers, + Kind: roomserverAPI.KindNew, + Event: &types.HeaderedEvent{PDU: pdu}, + Origin: userID.Domain(), + // TODO: cryptoIDs - what to do with this field? + //SendAsServer: roomserverAPI.DoNotSendToOtherServers, }) } diff --git a/federationapi/api/api.go b/federationapi/api/api.go index efe0547df..a846bcf39 100644 --- a/federationapi/api/api.go +++ b/federationapi/api/api.go @@ -58,6 +58,8 @@ type RoomserverFederationAPI interface { PerformDirectoryLookup(ctx context.Context, request *PerformDirectoryLookupRequest, response *PerformDirectoryLookupResponse) error // Handle an instruction to make_join & send_join with a remote server. PerformJoin(ctx context.Context, request *PerformJoinRequest, response *PerformJoinResponse) + PerformMakeJoin(ctx context.Context, request *PerformJoinRequest) (gomatrixserverlib.PDU, gomatrixserverlib.RoomVersion, spec.ServerName, error) + PerformSendJoin(ctx context.Context, request *PerformSendJoinRequestCryptoIDs, response *PerformJoinResponse) // Handle an instruction to make_leave & send_leave with a remote server. PerformLeave(ctx context.Context, request *PerformLeaveRequest, response *PerformLeaveResponse) error // Handle sending an invite to a remote server. @@ -168,6 +170,15 @@ type PerformJoinRequest struct { Unsigned map[string]interface{} `json:"unsigned"` } +type PerformSendJoinRequestCryptoIDs struct { + RoomID string `json:"room_id"` + UserID string `json:"user_id"` + // The sorted list of servers to try. Servers will be tried sequentially, after de-duplication. + ServerNames types.ServerNames `json:"server_names"` + Unsigned map[string]interface{} `json:"unsigned"` + Event gomatrixserverlib.PDU +} + type PerformJoinResponse struct { JoinedVia spec.ServerName LastError *gomatrix.HTTPError diff --git a/federationapi/internal/perform.go b/federationapi/internal/perform.go index 0200cf69b..4d083f56a 100644 --- a/federationapi/internal/perform.go +++ b/federationapi/internal/perform.go @@ -239,6 +239,320 @@ func (r *FederationInternalAPI) performJoinUsingServer( return nil } +// PerformMakeJoin implements api.FederationInternalAPI +func (r *FederationInternalAPI) PerformMakeJoin( + ctx context.Context, + request *api.PerformJoinRequest, +) (gomatrixserverlib.PDU, gomatrixserverlib.RoomVersion, spec.ServerName, error) { + // Check that a join isn't already in progress for this user/room. + j := federatedJoin{request.UserID, request.RoomID} + if _, found := r.joins.Load(j); found { + return nil, "", "", &gomatrix.HTTPError{ + Code: 429, + Message: `{ + "errcode": "M_LIMIT_EXCEEDED", + "error": "There is already a federated join to this room in progress. Please wait for it to finish." + }`, // TODO: Why do none of our error types play nicely with each other? + } + } + r.joins.Store(j, nil) + defer r.joins.Delete(j) + + // Deduplicate the server names we were provided but keep the ordering + // as this encodes useful information about which servers are most likely + // to respond. + seenSet := make(map[spec.ServerName]bool) + var uniqueList []spec.ServerName + for _, srv := range request.ServerNames { + if seenSet[srv] || r.cfg.Matrix.IsLocalServerName(srv) { + continue + } + seenSet[srv] = true + uniqueList = append(uniqueList, srv) + } + request.ServerNames = uniqueList + + // Try each server that we were provided until we land on one that + // successfully completes the make-join send-join dance. + var lastErr error + for _, serverName := range request.ServerNames { + var joinEvent gomatrixserverlib.PDU + var roomVersion gomatrixserverlib.RoomVersion + var err error + if joinEvent, roomVersion, _, err = r.performMakeJoinUsingServer( + ctx, + request.RoomID, + request.UserID, + request.Content, + serverName, + ); err != nil { + logrus.WithError(err).WithFields(logrus.Fields{ + "server_name": serverName, + "room_id": request.RoomID, + }).Warnf("Failed to join room through server") + lastErr = err + continue + } + + // We're all good. + return joinEvent, roomVersion, serverName, err + } + + // If we reach here then we didn't complete a join for some reason. + var httpErr gomatrix.HTTPError + var lastError *gomatrix.HTTPError + if ok := errors.As(lastErr, &httpErr); ok { + httpErr.Message = string(httpErr.Contents) + lastError = &httpErr + } else { + lastError = &gomatrix.HTTPError{ + Code: 0, + WrappedError: nil, + Message: "Unknown HTTP error", + } + if lastError != nil { + lastError.Message = lastErr.Error() + } + } + + logrus.Errorf( + "failed to join user %q to room %q through %d server(s): last error %s", + request.UserID, request.RoomID, len(request.ServerNames), lastError, + ) + return nil, "", "", lastError +} + +func (r *FederationInternalAPI) performMakeJoinUsingServer( + ctx context.Context, + roomID, userID string, + content map[string]interface{}, + serverName spec.ServerName, +) (gomatrixserverlib.PDU, gomatrixserverlib.RoomVersion, spec.SenderID, error) { + if !r.shouldAttemptDirectFederation(serverName) { + return nil, "", "", fmt.Errorf("relay servers have no meaningful response for join.") + } + + user, err := spec.NewUserID(userID, true) + if err != nil { + return nil, "", "", err + } + room, err := spec.NewRoomID(roomID) + if err != nil { + return nil, "", "", err + } + + joinInput := gomatrixserverlib.PerformMakeJoinInput{ + UserID: user, + RoomID: room, + ServerName: serverName, + Content: content, + PrivateKey: r.cfg.Matrix.PrivateKey, + KeyID: r.cfg.Matrix.KeyID, + KeyRing: r.keyRing, + GetOrCreateSenderID: func(ctx context.Context, userID spec.UserID, roomID spec.RoomID, roomVersion string) (spec.SenderID, ed25519.PrivateKey, error) { + // assign a roomNID, otherwise we can't create a private key for the user + _, nidErr := r.rsAPI.AssignRoomNID(ctx, roomID, gomatrixserverlib.RoomVersion(roomVersion)) + if nidErr != nil { + return "", nil, nidErr + } + key, keyErr := r.rsAPI.GetOrCreateUserRoomPrivateKey(ctx, userID, roomID) + if keyErr != nil { + return "", nil, keyErr + } + return spec.SenderIDFromPseudoIDKey(key), key, nil + }, + } + joinEvent, version, senderID, joinErr := gomatrixserverlib.PerformMakeJoin(ctx, r, joinInput) + + if joinErr != nil { + if !joinErr.Reachable { + r.statistics.ForServer(joinErr.ServerName).Failure() + } else { + r.statistics.ForServer(joinErr.ServerName).Success(statistics.SendDirect) + } + return nil, "", "", joinErr.Err + } + r.statistics.ForServer(serverName).Success(statistics.SendDirect) + if joinEvent == nil { + return nil, "", "", fmt.Errorf("Received nil joinEvent response from gomatrixserverlib.PerformJoin") + } + + return joinEvent, version, senderID, nil +} + +// PerformSendJoin implements api.FederationInternalAPI +func (r *FederationInternalAPI) PerformSendJoin( + ctx context.Context, + request *api.PerformSendJoinRequestCryptoIDs, + response *api.PerformJoinResponse, +) { + // Check that a join isn't already in progress for this user/room. + j := federatedJoin{request.UserID, request.RoomID} + if _, found := r.joins.Load(j); found { + response.LastError = &gomatrix.HTTPError{ + Code: 429, + Message: `{ + "errcode": "M_LIMIT_EXCEEDED", + "error": "There is already a federated join to this room in progress. Please wait for it to finish." + }`, + } + return + } + r.joins.Store(j, nil) + defer r.joins.Delete(j) + + // Deduplicate the server names we were provided but keep the ordering + // as this encodes useful information about which servers are most likely + // to respond. + seenSet := make(map[spec.ServerName]bool) + var uniqueList []spec.ServerName + for _, srv := range request.ServerNames { + if seenSet[srv] || r.cfg.Matrix.IsLocalServerName(srv) { + continue + } + seenSet[srv] = true + uniqueList = append(uniqueList, srv) + } + request.ServerNames = uniqueList + + // Try each server that we were provided until we land on one that + // successfully completes the make-join send-join dance. + var lastErr error + for _, serverName := range request.ServerNames { + if err := r.performSendJoinUsingServer( + ctx, + request.RoomID, + request.UserID, + request.Unsigned, + request.Event, + serverName, + ); err != nil { + logrus.WithError(err).WithFields(logrus.Fields{ + "server_name": serverName, + "room_id": request.RoomID, + }).Warnf("Failed to join room through server") + lastErr = err + continue + } + + // We're all good. + return + } + + // If we reach here then we didn't complete a join for some reason. + var httpErr gomatrix.HTTPError + var lastError *gomatrix.HTTPError + if ok := errors.As(lastErr, &httpErr); ok { + httpErr.Message = string(httpErr.Contents) + lastError = &httpErr + } else { + lastError = &gomatrix.HTTPError{ + Code: 0, + WrappedError: nil, + Message: "Unknown HTTP error", + } + if lastError != nil { + lastError.Message = lastErr.Error() + } + } + + logrus.Errorf( + "failed to join user %q to room %q through %d server(s): last error %s", + request.UserID, request.RoomID, len(request.ServerNames), lastError, + ) + return +} + +func (r *FederationInternalAPI) performSendJoinUsingServer( + ctx context.Context, + roomID, userID string, + unsigned map[string]interface{}, + event gomatrixserverlib.PDU, + serverName spec.ServerName, +) error { + user, err := spec.NewUserID(userID, true) + if err != nil { + return err + } + room, err := spec.NewRoomID(roomID) + if err != nil { + return err + } + senderID, err := r.rsAPI.QuerySenderIDForUser(ctx, *room, *user) + if err != nil { + return err + } + + joinInput := gomatrixserverlib.PerformSendJoinInput{ + RoomID: room, + ServerName: serverName, + Unsigned: unsigned, + Origin: user.Domain(), + SenderID: *senderID, + KeyRing: r.keyRing, + Event: event, + RoomVersion: event.Version(), + EventProvider: federatedEventProvider(ctx, r.federation, r.keyRing, user.Domain(), serverName, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) { + return r.rsAPI.QueryUserIDForSender(ctx, roomID, senderID) + }), + UserIDQuerier: func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) { + return r.rsAPI.QueryUserIDForSender(ctx, roomID, senderID) + }, + StoreSenderIDFromPublicID: func(ctx context.Context, senderID spec.SenderID, userIDRaw string, roomID spec.RoomID) error { + storeUserID, userErr := spec.NewUserID(userIDRaw, true) + if userErr != nil { + return userErr + } + return r.rsAPI.StoreUserRoomPublicKey(ctx, senderID, *storeUserID, roomID) + }, + } + response, joinErr := gomatrixserverlib.PerformSendJoin(ctx, r, joinInput) + if joinErr != nil { + if !joinErr.Reachable { + r.statistics.ForServer(joinErr.ServerName).Failure() + } else { + r.statistics.ForServer(joinErr.ServerName).Success(statistics.SendDirect) + } + return joinErr.Err + } + r.statistics.ForServer(serverName).Success(statistics.SendDirect) + if response == nil { + return fmt.Errorf("Received nil response from gomatrixserverlib.PerformSendJoin") + } + + // We need to immediately update our list of joined hosts for this room now as we are technically + // joined. We must do this synchronously: we cannot rely on the roomserver output events as they + // will happen asyncly. If we don't update this table, you can end up with bad failure modes like + // joining a room, waiting for 200 OK then changing device keys and have those keys not be sent + // to other servers (this was a cause of a flakey sytest "Local device key changes get to remote servers") + // The events are trusted now as we performed auth checks above. + joinedHosts, err := consumers.JoinedHostsFromEvents(ctx, response.StateSnapshot.GetStateEvents().TrustedEvents(response.JoinEvent.Version(), false), r.rsAPI) + if err != nil { + return fmt.Errorf("JoinedHostsFromEvents: failed to get joined hosts: %s", err) + } + + logrus.WithField("room", roomID).Infof("Joined federated room with %d hosts", len(joinedHosts)) + if _, err = r.db.UpdateRoom(context.Background(), roomID, joinedHosts, nil, true); err != nil { + return fmt.Errorf("UpdatedRoom: failed to update room with joined hosts: %s", err) + } + + // TODO: Can I change this to not take respState but instead just take an opaque list of events? + if err = roomserverAPI.SendEventWithState( + context.Background(), + r.rsAPI, + user.Domain(), + roomserverAPI.KindNew, + response.StateSnapshot, + &types.HeaderedEvent{PDU: response.JoinEvent}, + serverName, + nil, + false, + ); err != nil { + return fmt.Errorf("roomserverAPI.SendEventWithState: %w", err) + } + return nil +} + // PerformOutboundPeekRequest implements api.FederationInternalAPI func (r *FederationInternalAPI) PerformOutboundPeek( ctx context.Context, diff --git a/roomserver/api/api.go b/roomserver/api/api.go index d8f0bf8d7..6e02550f0 100644 --- a/roomserver/api/api.go +++ b/roomserver/api/api.go @@ -222,6 +222,7 @@ type ClientRoomserverAPI interface { DefaultRoomVersionAPI QueryMembershipForUser(ctx context.Context, req *QueryMembershipForUserRequest, res *QueryMembershipForUserResponse) error QueryMembershipsForRoom(ctx context.Context, req *QueryMembershipsForRoomRequest, res *QueryMembershipsForRoomResponse) error + InvitePending(ctx context.Context, roomID spec.RoomID, senderID spec.SenderID) (bool, error) QueryRoomsForUser(ctx context.Context, userID spec.UserID, desiredMembership string) ([]spec.RoomID, error) QueryStateAfterEvents(ctx context.Context, req *QueryStateAfterEventsRequest, res *QueryStateAfterEventsResponse) error // QueryKnownUsers returns a list of users that we know about from our joined rooms. @@ -244,6 +245,8 @@ type ClientRoomserverAPI interface { PerformUnpeek(ctx context.Context, roomID, userID, deviceID string) error PerformInvite(ctx context.Context, req *PerformInviteRequest) error PerformJoin(ctx context.Context, req *PerformJoinRequest) (roomID string, joinedVia spec.ServerName, err error) + PerformSendJoinCryptoIDs(ctx context.Context, req *PerformJoinRequestCryptoIDs) error + PerformJoinCryptoIDs(ctx context.Context, req *PerformJoinRequest) (join gomatrixserverlib.PDU, roomID string, version gomatrixserverlib.RoomVersion, serverName spec.ServerName, err error) PerformLeave(ctx context.Context, req *PerformLeaveRequest, res *PerformLeaveResponse) error PerformPublish(ctx context.Context, req *PerformPublishRequest) error // PerformForget forgets a rooms history for a specific user diff --git a/roomserver/api/perform.go b/roomserver/api/perform.go index c24728da8..8c72e6910 100644 --- a/roomserver/api/perform.go +++ b/roomserver/api/perform.go @@ -42,6 +42,16 @@ type PerformJoinRequest struct { Unsigned map[string]interface{} `json:"unsigned"` } +type PerformJoinRequestCryptoIDs struct { + RoomID string `json:"room_id"` + UserID string `json:"user_id"` + IsGuest bool `json:"is_guest"` + Content map[string]interface{} `json:"content"` + ServerNames []spec.ServerName `json:"server_names"` + Unsigned map[string]interface{} `json:"unsigned"` + JoinEvent gomatrixserverlib.PDU +} + type PerformLeaveRequest struct { RoomID string Leaver spec.UserID diff --git a/roomserver/internal/perform/perform_join.go b/roomserver/internal/perform/perform_join.go index 23988e467..e23173cf4 100644 --- a/roomserver/internal/perform/perform_join.go +++ b/roomserver/internal/perform/perform_join.go @@ -406,6 +406,374 @@ func (r *Joiner) performJoinRoomByID( return req.RoomIDOrAlias, userDomain, nil } +func (r *Joiner) PerformSendJoinCryptoIDs( + ctx context.Context, + req *rsAPI.PerformJoinRequestCryptoIDs, +) error { + logger := logrus.WithContext(ctx).WithFields(logrus.Fields{ + "room_id": req.RoomID, + "user_id": req.UserID, + "servers": req.ServerNames, + }) + logger.Info("performing send join") + res := fsAPI.PerformJoinResponse{} + r.FSAPI.PerformSendJoin(ctx, &fsAPI.PerformSendJoinRequestCryptoIDs{ + RoomID: req.RoomID, + UserID: req.UserID, + ServerNames: req.ServerNames, + Unsigned: req.Unsigned, + Event: req.JoinEvent, + }, &res) + if res.LastError != nil { + return res.LastError + } + + return nil +} + +// PerformJoin handles joining matrix rooms, including over federation by talking to the federationapi. +func (r *Joiner) PerformJoinCryptoIDs( + ctx context.Context, + req *rsAPI.PerformJoinRequest, +) (joinEvent gomatrixserverlib.PDU, roomID string, version gomatrixserverlib.RoomVersion, serverName spec.ServerName, err error) { + logger := logrus.WithContext(ctx).WithFields(logrus.Fields{ + "room_id": req.RoomIDOrAlias, + "user_id": req.UserID, + "servers": req.ServerNames, + }) + logger.Info("User requested to room join") + join, roomID, version, serverName, err := r.makeJoinEvent(context.Background(), req) + if err != nil { + logger.WithError(err).Error("Failed to make join room event") + sentry.CaptureException(err) + return nil, "", "", "", err + } + + return join, roomID, version, serverName, nil +} + +func (r *Joiner) makeJoinEvent( + ctx context.Context, + req *rsAPI.PerformJoinRequest, +) (gomatrixserverlib.PDU, string, gomatrixserverlib.RoomVersion, spec.ServerName, error) { + _, domain, err := gomatrixserverlib.SplitID('@', req.UserID) + if err != nil { + return nil, "", "", "", rsAPI.ErrInvalidID{Err: fmt.Errorf("supplied user ID %q in incorrect format", req.UserID)} + } + if !r.Cfg.Matrix.IsLocalServerName(domain) { + return nil, "", "", "", rsAPI.ErrInvalidID{Err: fmt.Errorf("user %q does not belong to this homeserver", req.UserID)} + } + if strings.HasPrefix(req.RoomIDOrAlias, "!") { + return r.performJoinRoomByIDCryptoIDs(ctx, req) + } + if strings.HasPrefix(req.RoomIDOrAlias, "#") { + return r.performJoinRoomByAliasCryptoIDs(ctx, req) + } + return nil, "", "", "", rsAPI.ErrInvalidID{Err: fmt.Errorf("room ID or alias %q is invalid", req.RoomIDOrAlias)} +} + +func (r *Joiner) performJoinRoomByAliasCryptoIDs( + ctx context.Context, + req *rsAPI.PerformJoinRequest, +) (gomatrixserverlib.PDU, string, gomatrixserverlib.RoomVersion, spec.ServerName, error) { + // Get the domain part of the room alias. + _, domain, err := gomatrixserverlib.SplitID('#', req.RoomIDOrAlias) + if err != nil { + return nil, "", "", "", fmt.Errorf("alias %q is not in the correct format", req.RoomIDOrAlias) + } + req.ServerNames = append(req.ServerNames, domain) + + // Check if this alias matches our own server configuration. If it + // doesn't then we'll need to try a federated join. + var roomID string + if !r.Cfg.Matrix.IsLocalServerName(domain) { + // The alias isn't owned by us, so we will need to try joining using + // a remote server. + dirReq := fsAPI.PerformDirectoryLookupRequest{ + RoomAlias: req.RoomIDOrAlias, // the room alias to lookup + ServerName: domain, // the server to ask + } + dirRes := fsAPI.PerformDirectoryLookupResponse{} + err = r.FSAPI.PerformDirectoryLookup(ctx, &dirReq, &dirRes) + if err != nil { + logrus.WithError(err).Errorf("error looking up alias %q", req.RoomIDOrAlias) + return nil, "", "", "", fmt.Errorf("looking up alias %q over federation failed: %w", req.RoomIDOrAlias, err) + } + roomID = dirRes.RoomID + req.ServerNames = append(req.ServerNames, dirRes.ServerNames...) + } else { + var getRoomReq = rsAPI.GetRoomIDForAliasRequest{ + Alias: req.RoomIDOrAlias, + IncludeAppservices: true, + } + var getRoomRes = rsAPI.GetRoomIDForAliasResponse{} + // Otherwise, look up if we know this room alias locally. + err = r.RSAPI.GetRoomIDForAlias(ctx, &getRoomReq, &getRoomRes) + if err != nil { + return nil, "", "", "", fmt.Errorf("lookup room alias %q failed: %w", req.RoomIDOrAlias, err) + } + roomID = getRoomRes.RoomID + } + + // If the room ID is empty then we failed to look up the alias. + if roomID == "" { + return nil, "", "", "", fmt.Errorf("alias %q not found", req.RoomIDOrAlias) + } + + // If we do, then pluck out the room ID and continue the join. + req.RoomIDOrAlias = roomID + return r.performJoinRoomByIDCryptoIDs(ctx, req) +} + +// TODO: Break this function up a bit & move to GMSL +// nolint:gocyclo +func (r *Joiner) performJoinRoomByIDCryptoIDs( + ctx context.Context, + req *rsAPI.PerformJoinRequest, +) (gomatrixserverlib.PDU, string, gomatrixserverlib.RoomVersion, spec.ServerName, error) { + // The original client request ?server_name=... may include this HS so filter that out so we + // don't attempt to make_join with ourselves + for i := 0; i < len(req.ServerNames); i++ { + if r.Cfg.Matrix.IsLocalServerName(req.ServerNames[i]) { + // delete this entry + req.ServerNames = append(req.ServerNames[:i], req.ServerNames[i+1:]...) + i-- + } + } + + // Get the domain part of the room ID. + roomID, err := spec.NewRoomID(req.RoomIDOrAlias) + if err != nil { + return nil, "", "", "", rsAPI.ErrInvalidID{Err: fmt.Errorf("room ID %q is invalid: %w", req.RoomIDOrAlias, err)} + } + + // If the server name in the room ID isn't ours then it's a + // possible candidate for finding the room via federation. Add + // it to the list of servers to try. + if !r.Cfg.Matrix.IsLocalServerName(roomID.Domain()) { + req.ServerNames = append(req.ServerNames, roomID.Domain()) + } + + // Force a federated join if we aren't in the room and we've been + // given some server names to try joining by. + inRoomReq := &rsAPI.QueryServerJoinedToRoomRequest{ + RoomID: req.RoomIDOrAlias, + } + inRoomRes := &rsAPI.QueryServerJoinedToRoomResponse{} + if err = r.Queryer.QueryServerJoinedToRoom(ctx, inRoomReq, inRoomRes); err != nil { + return nil, "", "", "", fmt.Errorf("r.Queryer.QueryServerJoinedToRoom: %w", err) + } + serverInRoom := inRoomRes.IsInRoom + forceFederatedJoin := len(req.ServerNames) > 0 && !serverInRoom + + userID, err := spec.NewUserID(req.UserID, true) + if err != nil { + return nil, "", "", "", rsAPI.ErrInvalidID{Err: fmt.Errorf("user ID %q is invalid: %w", req.UserID, err)} + } + + // Look up the room NID for the supplied room ID. + var senderID spec.SenderID + checkInvitePending := false + info, err := r.DB.RoomInfo(ctx, req.RoomIDOrAlias) + if err == nil && info != nil { + switch info.RoomVersion { + case gomatrixserverlib.RoomVersionPseudoIDs: + senderIDPtr, queryErr := r.Queryer.QuerySenderIDForUser(ctx, *roomID, *userID) + if queryErr == nil { + checkInvitePending = true + } + if senderIDPtr == nil { + // create user room key if needed + key, keyErr := r.RSAPI.GetOrCreateUserRoomPrivateKey(ctx, *userID, *roomID) + if keyErr != nil { + util.GetLogger(ctx).WithError(keyErr).Error("GetOrCreateUserRoomPrivateKey failed") + return nil, "", "", "", fmt.Errorf("GetOrCreateUserRoomPrivateKey failed: %w", keyErr) + } + senderID = spec.SenderIDFromPseudoIDKey(key) + } else { + senderID = *senderIDPtr + } + default: + checkInvitePending = true + senderID = spec.SenderID(userID.String()) + } + } + + // Force a federated join if we're dealing with a pending invite + // and we aren't in the room. + if checkInvitePending { + isInvitePending, inviteSender, _, inviteEvent, inviteErr := helpers.IsInvitePending(ctx, r.DB, req.RoomIDOrAlias, senderID) + if inviteErr == nil && !serverInRoom && isInvitePending { + inviter, queryErr := r.RSAPI.QueryUserIDForSender(ctx, *roomID, inviteSender) + if queryErr != nil { + return nil, "", "", "", fmt.Errorf("r.RSAPI.QueryUserIDForSender: %w", queryErr) + } + + // If we were invited by someone from another server then we can + // assume they are in the room so we can join via them. + if inviter != nil && !r.Cfg.Matrix.IsLocalServerName(inviter.Domain()) { + req.ServerNames = append(req.ServerNames, inviter.Domain()) + forceFederatedJoin = true + memberEvent := gjson.Parse(string(inviteEvent.JSON())) + // only set unsigned if we've got a content.membership, which we _should_ + if memberEvent.Get("content.membership").Exists() { + req.Unsigned = map[string]interface{}{ + "prev_sender": memberEvent.Get("sender").Str, + "prev_content": map[string]interface{}{ + "is_direct": memberEvent.Get("content.is_direct").Bool(), + "membership": memberEvent.Get("content.membership").Str, + }, + } + } + } + } + } + + // If a guest is trying to join a room, check that the room has a m.room.guest_access event + if req.IsGuest { + var guestAccessEvent *types.HeaderedEvent + guestAccess := "forbidden" + guestAccessEvent, err = r.DB.GetStateEvent(ctx, req.RoomIDOrAlias, spec.MRoomGuestAccess, "") + if (err != nil && !errors.Is(err, sql.ErrNoRows)) || guestAccessEvent == nil { + logrus.WithError(err).Warn("unable to get m.room.guest_access event, defaulting to 'forbidden'") + } + if guestAccessEvent != nil { + guestAccess = gjson.GetBytes(guestAccessEvent.Content(), "guest_access").String() + } + + // Servers MUST only allow guest users to join rooms if the m.room.guest_access state event + // is present on the room and has the guest_access value can_join. + if guestAccess != "can_join" { + return nil, "", "", "", rsAPI.ErrNotAllowed{Err: fmt.Errorf("guest access is forbidden")} + } + } + + // If we should do a forced federated join then do that. + if forceFederatedJoin { + joinEvent, version, serverName, err := r.performFederatedMakeJoinByIDCryptoIDs(ctx, req) + return joinEvent, req.RoomIDOrAlias, version, serverName, err + } + + // Try to construct an actual join event from the template. + // If this succeeds then it is a sign that the room already exists + // locally on the homeserver. + // TODO: Check what happens if the room exists on the server + // but everyone has since left. I suspect it does the wrong thing. + + var buildRes rsAPI.QueryLatestEventsAndStateResponse + identity := r.Cfg.Matrix.SigningIdentity + + // at this point we know we have an existing room + if inRoomRes.RoomVersion == gomatrixserverlib.RoomVersionPseudoIDs { + var pseudoIDKey ed25519.PrivateKey + pseudoIDKey, err = r.RSAPI.GetOrCreateUserRoomPrivateKey(ctx, *userID, *roomID) + if err != nil { + util.GetLogger(ctx).WithError(err).Error("GetOrCreateUserRoomPrivateKey failed") + return nil, "", "", "", err + } + + mapping := &gomatrixserverlib.MXIDMapping{ + UserRoomKey: spec.SenderIDFromPseudoIDKey(pseudoIDKey), + UserID: userID.String(), + } + + // Sign the mapping with the server identity + if err = mapping.Sign(identity.ServerName, identity.KeyID, identity.PrivateKey); err != nil { + return nil, "", "", "", err + } + req.Content["mxid_mapping"] = mapping + + // sign the event with the pseudo ID key + identity = fclient.SigningIdentity{ + ServerName: spec.ServerName(spec.SenderIDFromPseudoIDKey(pseudoIDKey)), + KeyID: "ed25519:1", + PrivateKey: pseudoIDKey, + } + } + + senderIDString := string(senderID) + + // Prepare the template for the join event. + proto := gomatrixserverlib.ProtoEvent{ + Type: spec.MRoomMember, + SenderID: senderIDString, + StateKey: &senderIDString, + RoomID: req.RoomIDOrAlias, + Redacts: "", + } + if err = proto.SetUnsigned(struct{}{}); err != nil { + return nil, "", "", "", fmt.Errorf("eb.SetUnsigned: %w", err) + } + + // It is possible for the request to include some "content" for the + // event. We'll always overwrite the "membership" key, but the rest, + // like "display_name" or "avatar_url", will be kept if supplied. + if req.Content == nil { + req.Content = map[string]interface{}{} + } + req.Content["membership"] = spec.Join + if authorisedVia, aerr := r.populateAuthorisedViaUserForRestrictedJoin(ctx, req, senderID); aerr != nil { + return nil, "", "", "", aerr + } else if authorisedVia != "" { + req.Content["join_authorised_via_users_server"] = authorisedVia + } + if err = proto.SetContent(req.Content); err != nil { + return nil, "", "", "", fmt.Errorf("eb.SetContent: %w", err) + } + joinEvent, err := eventutil.QueryAndBuildEvent(ctx, &proto, &identity, time.Now(), r.RSAPI, &buildRes) + + switch err.(type) { + case nil: + // Do nothing + + case eventutil.ErrRoomNoExists: + // The room doesn't exist locally. If the room ID looks like it should + // be ours then this probably means that we've nuked our database at + // some point. + if r.Cfg.Matrix.IsLocalServerName(roomID.Domain()) { + // If there are no more server names to try then give up here. + // Otherwise we'll try a federated join as normal, since it's quite + // possible that the room still exists on other servers. + if len(req.ServerNames) == 0 { + return nil, "", "", "", eventutil.ErrRoomNoExists{} + } + } + + // Perform a federated room join. + joinEvent, version, serverName, err := r.performFederatedMakeJoinByIDCryptoIDs(ctx, req) + return joinEvent, req.RoomIDOrAlias, version, serverName, err + + default: + // Something else went wrong. + return nil, "", "", "", fmt.Errorf("error joining local room: %q", err) + } + + // By this point, if req.RoomIDOrAlias contained an alias, then + // it will have been overwritten with a room ID by performJoinRoomByAlias. + // We should now include this in the response so that the CS API can + // return the right room ID. + return joinEvent, req.RoomIDOrAlias, inRoomRes.RoomVersion, userID.Domain(), nil +} + +func (r *Joiner) performFederatedMakeJoinByIDCryptoIDs( + ctx context.Context, + req *rsAPI.PerformJoinRequest, +) (gomatrixserverlib.PDU, gomatrixserverlib.RoomVersion, spec.ServerName, error) { + // Try joining by all of the supplied server names. + fedReq := fsAPI.PerformJoinRequest{ + RoomID: req.RoomIDOrAlias, // the room ID to try and join + UserID: req.UserID, // the user ID joining the room + ServerNames: req.ServerNames, // the server to try joining with + Content: req.Content, // the membership event content + Unsigned: req.Unsigned, // the unsigned event content, if any + } + joinEvent, version, serverName, err := r.FSAPI.PerformMakeJoin(ctx, &fedReq) + if err != nil { + return nil, "", "", err + } + return joinEvent, version, serverName, nil +} + func (r *Joiner) performFederatedJoinRoomByID( ctx context.Context, req *rsAPI.PerformJoinRequest,