CryptoID joins

This commit is contained in:
Devon Hudson 2023-10-12 17:41:45 -06:00
parent a5ba533cfb
commit f17de49c6b
No known key found for this signature in database
GPG key ID: CD06B18E77F6A628
9 changed files with 910 additions and 57 deletions

View file

@ -259,53 +259,6 @@ func ToProtoEvents(ctx context.Context, events []gomatrixserverlib.PDU, rsAPI ro
result := make([]json.RawMessage, len(events))
for i, event := range events {
result[i] = json.RawMessage(event.JSON())
//fmt.Printf("\nProcessing %s event (%s)\n", events[i].Type(), events[i].EventID())
//var rawJson interface{}
//json.Unmarshal(events[i].JSON(), &rawJson)
//fmt.Printf("JSON: %+v\n", rawJson)
//result[i] = gomatrixserverlib.ProtoEvent{
// SenderID: string(events[i].SenderID()),
// RoomID: events[i].RoomID().String(),
// Type: events[i].Type(),
// StateKey: events[i].StateKey(),
// PrevEvents: events[i].PrevEventIDs(),
// AuthEvents: events[i].AuthEventIDs(),
// Redacts: events[i].Redacts(),
// Depth: events[i].Depth(),
// Content: events[i].Content(),
// Unsigned: events[i].Unsigned(),
// Hashes: events[i].Hashes(),
// OriginServerTimestamp: events[i].OriginServerTS(),
//}
//roomVersion, _ := rsAPI.QueryRoomVersionForRoom(ctx, events[i].RoomID().String())
//verImpl, _ := gomatrixserverlib.GetRoomVersion(roomVersion)
//eventJSON, err := json.Marshal(result[i])
//if err != nil {
// util.GetLogger(ctx).WithError(err).Error("failed marshalling event")
// continue
//}
//pdu, err := verImpl.NewEventFromUntrustedJSON(eventJSON)
//if err != nil {
// util.GetLogger(ctx).WithError(err).Error("failed making event from json")
// continue
//}
//fmt.Printf("\nProcessing %s event (%s) - PDU\n", result[i].Type, pdu.EventID())
//fmt.Printf(" EventID: %v - %v\n", events[i].EventID(), pdu.EventID())
//fmt.Printf(" SenderID: %s - %s\n", events[i].SenderID(), pdu.SenderID())
//fmt.Printf(" RoomID: %s - %s\n", events[i].RoomID().String(), pdu.RoomID().String())
//fmt.Printf(" Type: %s - %s\n", events[i].Type(), pdu.Type())
//fmt.Printf(" StateKey: %s - %s\n", *events[i].StateKey(), *pdu.StateKey())
//fmt.Printf(" PrevEvents: %v - %v\n", events[i].PrevEventIDs(), pdu.PrevEventIDs())
//fmt.Printf(" AuthEvents: %v - %v\n", events[i].AuthEventIDs(), pdu.AuthEventIDs())
//fmt.Printf(" Redacts: %s - %s\n", events[i].Redacts(), pdu.Redacts())
//fmt.Printf(" Depth: %d - %d\n", events[i].Depth(), pdu.Depth())
//fmt.Printf(" Content: %v - %v\n", events[i].Content(), pdu.Content())
//fmt.Printf(" Unsigned: %v - %v\n", events[i].Unsigned(), pdu.Unsigned())
//fmt.Printf(" Hashes: %v - %v\n", events[i].Hashes(), pdu.Hashes())
//fmt.Printf(" OriginServerTS: %d - %d\n", events[i].OriginServerTS(), pdu.OriginServerTS())
//json.Unmarshal(eventJSON, &rawJson)
//fmt.Printf("JSON: %+v\n", rawJson)
}
return result
}

View file

@ -144,3 +144,128 @@ func JoinRoomByIDOrAlias(
return result
}
}
type joinRoomCryptoIDsResponse struct {
RoomID string `json:"room_id"`
Version string `json:"room_version"`
ViaServer string `json:"via_server"`
PDU json.RawMessage `json:"pdu"`
}
func JoinRoomByIDOrAliasCryptoIDs(
req *http.Request,
device *api.Device,
rsAPI roomserverAPI.ClientRoomserverAPI,
profileAPI api.ClientUserAPI,
roomIDOrAlias string,
) util.JSONResponse {
// Prepare to ask the roomserver to perform the room join.
joinReq := roomserverAPI.PerformJoinRequest{
RoomIDOrAlias: roomIDOrAlias,
UserID: device.UserID,
IsGuest: device.AccountType == api.AccountTypeGuest,
Content: map[string]interface{}{},
}
// Check to see if any ?server_name= query parameters were
// given in the request.
if serverNames, ok := req.URL.Query()["server_name"]; ok {
for _, serverName := range serverNames {
joinReq.ServerNames = append(
joinReq.ServerNames,
spec.ServerName(serverName),
)
}
}
// If content was provided in the request then include that
// in the request. It'll get used as a part of the membership
// event content.
_ = httputil.UnmarshalJSONRequest(req, &joinReq.Content)
// Work out our localpart for the client profile request.
// Request our profile content to populate the request content with.
profile, err := profileAPI.QueryProfile(req.Context(), device.UserID)
switch err {
case nil:
joinReq.Content["displayname"] = profile.DisplayName
joinReq.Content["avatar_url"] = profile.AvatarURL
case appserviceAPI.ErrProfileNotExists:
util.GetLogger(req.Context()).Error("Unable to query user profile, no profile found.")
return util.JSONResponse{
Code: http.StatusInternalServerError,
JSON: spec.Unknown("Unable to query user profile, no profile found."),
}
default:
}
// Ask the roomserver to perform the join.
done := make(chan util.JSONResponse, 1)
go func() {
defer close(done)
joinEvent, roomID, version, serverName, err := rsAPI.PerformJoinCryptoIDs(req.Context(), &joinReq)
var response util.JSONResponse
switch e := err.(type) {
case nil: // success case
response = util.JSONResponse{
Code: http.StatusOK,
JSON: joinRoomCryptoIDsResponse{
RoomID: roomID,
Version: string(version),
ViaServer: string(serverName),
PDU: json.RawMessage(joinEvent.JSON()),
},
}
case roomserverAPI.ErrInvalidID:
response = util.JSONResponse{
Code: http.StatusBadRequest,
JSON: spec.Unknown(e.Error()),
}
case roomserverAPI.ErrNotAllowed:
jsonErr := spec.Forbidden(e.Error())
if device.AccountType == api.AccountTypeGuest {
jsonErr = spec.GuestAccessForbidden(e.Error())
}
response = util.JSONResponse{
Code: http.StatusForbidden,
JSON: jsonErr,
}
case *gomatrix.HTTPError: // this ensures we proxy responses over federation to the client
response = util.JSONResponse{
Code: e.Code,
JSON: json.RawMessage(e.Message),
}
case eventutil.ErrRoomNoExists:
response = util.JSONResponse{
Code: http.StatusNotFound,
JSON: spec.NotFound(e.Error()),
}
default:
response = util.JSONResponse{
Code: http.StatusInternalServerError,
JSON: spec.InternalServerError{},
}
}
done <- response
}()
// Wait either for the join to finish, or for us to hit a reasonable
// timeout, at which point we'll just return a 200 to placate clients.
timer := time.NewTimer(time.Second * 20)
select {
case <-timer.C:
return util.JSONResponse{
Code: http.StatusRequestTimeout,
JSON: spec.Unknown("Failed creating join event with the remote server."),
}
case result := <-done:
// Stop and drain the timer
if !timer.Stop() {
<-timer.C
}
return result
}
}

View file

@ -329,7 +329,7 @@ func Setup(
return SendPDUs(req, device, cfg, userAPI, rsAPI, asAPI)
}),
).Methods(http.MethodPost, http.MethodOptions)
// TODO: update for cryptoIDs
v3mux.Handle("/join/{roomIDOrAlias}",
httputil.MakeAuthAPI(spec.Join, userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.Limit(req, device); r != nil {
@ -352,6 +352,28 @@ func Setup(
return resp.(util.JSONResponse)
}, httputil.WithAllowGuests()),
).Methods(http.MethodPost, http.MethodOptions)
unstableMux.Handle("/org.matrix.msc_cryptoids/join/{roomIDOrAlias}",
httputil.MakeAuthAPI(spec.Join, userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.Limit(req, device); r != nil {
return *r
}
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
if err != nil {
return util.ErrorResponse(err)
}
// Only execute a join for roomIDOrAlias and UserID once. If there is a join in progress
// it waits for it to complete and returns that result for subsequent requests.
resp, _, _ := sf.Do(vars["roomIDOrAlias"]+device.UserID, func() (any, error) {
return JoinRoomByIDOrAliasCryptoIDs(
req, device, rsAPI, userAPI, vars["roomIDOrAlias"],
), nil
})
// once all joins are processed, drop them from the cache. Further requests
// will be processed as usual.
sf.Forget(vars["roomIDOrAlias"] + device.UserID)
return resp.(util.JSONResponse)
}, httputil.WithAllowGuests()),
).Methods(http.MethodPost, http.MethodOptions)
if mscCfg.Enabled("msc2753") {
// TODO: update for cryptoIDs
@ -620,7 +642,6 @@ func Setup(
}),
).Methods(http.MethodGet, http.MethodOptions)
// TODO: update for cryptoIDs
v3mux.Handle("/directory/list/room/{roomID}",
httputil.MakeAuthAPI("directory_list", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
@ -630,7 +651,6 @@ func Setup(
return SetVisibility(req, rsAPI, device, vars["roomID"])
}),
).Methods(http.MethodPut, http.MethodOptions)
// TODO: update for cryptoIDs
v3mux.Handle("/directory/list/appservice/{networkID}/{roomID}",
httputil.MakeAuthAPI("directory_list", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
@ -642,7 +662,6 @@ func Setup(
).Methods(http.MethodPut, http.MethodOptions)
// Undocumented endpoint
// TODO: update for cryptoIDs
v3mux.Handle("/directory/list/appservice/{networkID}/{roomID}",
httputil.MakeAuthAPI("directory_list", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
@ -671,6 +690,7 @@ func Setup(
}),
).Methods(http.MethodPost, http.MethodOptions)
// TODO: update for cryptoIDs
v3mux.Handle("/rooms/{roomID}/typing/{userID}",
httputil.MakeAuthAPI("rooms_typing", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.Limit(req, device); r != nil {
@ -1156,6 +1176,7 @@ func Setup(
}),
).Methods(http.MethodPost, http.MethodOptions)
// TODO: update for cryptoIDs
v3mux.Handle("/rooms/{roomID}/read_markers",
httputil.MakeAuthAPI("rooms_read_markers", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.Limit(req, device); r != nil {
@ -1534,6 +1555,7 @@ func Setup(
return SetReceipt(req, userAPI, syncProducer, device, vars["roomId"], vars["receiptType"], vars["eventId"])
}),
).Methods(http.MethodPost, http.MethodOptions)
// TODO: update for cryptoIDs
v3mux.Handle("/presence/{userId}/status",
httputil.MakeAuthAPI("set_presence", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))

View file

@ -30,8 +30,9 @@ import (
)
type sendPDUsRequest struct {
Version string `json:"room_version"`
PDUs []json.RawMessage `json:"pdus"`
Version string `json:"room_version"`
ViaServer string `json:"via_server,omitempty"`
PDUs []json.RawMessage `json:"pdus"`
}
// SendPDUs implements /sendPDUs
@ -106,8 +107,53 @@ func SendPDUs(
}
pdu = pdu.Sign(string(pdu.SenderID()), "ed25519:1", key)
util.GetLogger(req.Context()).Infof("Processing %s event (%s)", pdu.Type(), pdu.EventID())
switch pdu.Type() {
case spec.MRoomCreate:
case spec.MRoomMember:
var membership gomatrixserverlib.MemberContent
err = json.Unmarshal(pdu.Content(), &membership)
switch {
case err != nil:
util.GetLogger(req.Context()).Errorf("m.room.member event content invalid", pdu.Content(), pdu.EventID())
continue
case membership.Membership == spec.Join:
deviceUserID, err := spec.NewUserID(device.UserID, true)
if err != nil {
return util.JSONResponse{
Code: http.StatusForbidden,
JSON: spec.Forbidden("userID doesn't have power level to change visibility"),
}
}
queryReq := roomserverAPI.QueryMembershipForUserRequest{
RoomID: pdu.RoomID().String(),
UserID: *deviceUserID,
}
var queryRes roomserverAPI.QueryMembershipForUserResponse
if err := rsAPI.QueryMembershipForUser(req.Context(), &queryReq, &queryRes); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("rsAPI.QueryMembershipsForRoom failed")
return util.JSONResponse{
Code: http.StatusInternalServerError,
JSON: spec.InternalServerError{},
}
}
if !queryRes.IsInRoom {
// This is a join event to a remote room
// TODO: cryptoIDs - figure out how to obtain unsigned contents for outstanding federated invites
joinReq := roomserverAPI.PerformJoinRequestCryptoIDs{
RoomID: pdu.RoomID().String(),
UserID: device.UserID,
IsGuest: device.AccountType == api.AccountTypeGuest,
ServerNames: []spec.ServerName{spec.ServerName(pdus.ViaServer)},
JoinEvent: pdu,
}
err := rsAPI.PerformSendJoinCryptoIDs(req.Context(), &joinReq)
if err != nil {
util.GetLogger(req.Context()).Errorf("Failed processing %s event (%s): %v", pdu.Type(), pdu.EventID(), err)
}
continue // NOTE: don't send this event on to the roomserver
}
}
}
// TODO: cryptoIDs - does it matter which order these are added?
@ -118,10 +164,11 @@ func SendPDUs(
// We should be doing this already as part of `SendInputRoomEvents`, but how should we pass this
// failure back to the client?
inputs = append(inputs, roomserverAPI.InputRoomEvent{
Kind: roomserverAPI.KindNew,
Event: &types.HeaderedEvent{PDU: pdu},
Origin: userID.Domain(),
SendAsServer: roomserverAPI.DoNotSendToOtherServers,
Kind: roomserverAPI.KindNew,
Event: &types.HeaderedEvent{PDU: pdu},
Origin: userID.Domain(),
// TODO: cryptoIDs - what to do with this field?
//SendAsServer: roomserverAPI.DoNotSendToOtherServers,
})
}

View file

@ -58,6 +58,8 @@ type RoomserverFederationAPI interface {
PerformDirectoryLookup(ctx context.Context, request *PerformDirectoryLookupRequest, response *PerformDirectoryLookupResponse) error
// Handle an instruction to make_join & send_join with a remote server.
PerformJoin(ctx context.Context, request *PerformJoinRequest, response *PerformJoinResponse)
PerformMakeJoin(ctx context.Context, request *PerformJoinRequest) (gomatrixserverlib.PDU, gomatrixserverlib.RoomVersion, spec.ServerName, error)
PerformSendJoin(ctx context.Context, request *PerformSendJoinRequestCryptoIDs, response *PerformJoinResponse)
// Handle an instruction to make_leave & send_leave with a remote server.
PerformLeave(ctx context.Context, request *PerformLeaveRequest, response *PerformLeaveResponse) error
// Handle sending an invite to a remote server.
@ -168,6 +170,15 @@ type PerformJoinRequest struct {
Unsigned map[string]interface{} `json:"unsigned"`
}
type PerformSendJoinRequestCryptoIDs struct {
RoomID string `json:"room_id"`
UserID string `json:"user_id"`
// The sorted list of servers to try. Servers will be tried sequentially, after de-duplication.
ServerNames types.ServerNames `json:"server_names"`
Unsigned map[string]interface{} `json:"unsigned"`
Event gomatrixserverlib.PDU
}
type PerformJoinResponse struct {
JoinedVia spec.ServerName
LastError *gomatrix.HTTPError

View file

@ -239,6 +239,320 @@ func (r *FederationInternalAPI) performJoinUsingServer(
return nil
}
// PerformMakeJoin implements api.FederationInternalAPI
func (r *FederationInternalAPI) PerformMakeJoin(
ctx context.Context,
request *api.PerformJoinRequest,
) (gomatrixserverlib.PDU, gomatrixserverlib.RoomVersion, spec.ServerName, error) {
// Check that a join isn't already in progress for this user/room.
j := federatedJoin{request.UserID, request.RoomID}
if _, found := r.joins.Load(j); found {
return nil, "", "", &gomatrix.HTTPError{
Code: 429,
Message: `{
"errcode": "M_LIMIT_EXCEEDED",
"error": "There is already a federated join to this room in progress. Please wait for it to finish."
}`, // TODO: Why do none of our error types play nicely with each other?
}
}
r.joins.Store(j, nil)
defer r.joins.Delete(j)
// Deduplicate the server names we were provided but keep the ordering
// as this encodes useful information about which servers are most likely
// to respond.
seenSet := make(map[spec.ServerName]bool)
var uniqueList []spec.ServerName
for _, srv := range request.ServerNames {
if seenSet[srv] || r.cfg.Matrix.IsLocalServerName(srv) {
continue
}
seenSet[srv] = true
uniqueList = append(uniqueList, srv)
}
request.ServerNames = uniqueList
// Try each server that we were provided until we land on one that
// successfully completes the make-join send-join dance.
var lastErr error
for _, serverName := range request.ServerNames {
var joinEvent gomatrixserverlib.PDU
var roomVersion gomatrixserverlib.RoomVersion
var err error
if joinEvent, roomVersion, _, err = r.performMakeJoinUsingServer(
ctx,
request.RoomID,
request.UserID,
request.Content,
serverName,
); err != nil {
logrus.WithError(err).WithFields(logrus.Fields{
"server_name": serverName,
"room_id": request.RoomID,
}).Warnf("Failed to join room through server")
lastErr = err
continue
}
// We're all good.
return joinEvent, roomVersion, serverName, err
}
// If we reach here then we didn't complete a join for some reason.
var httpErr gomatrix.HTTPError
var lastError *gomatrix.HTTPError
if ok := errors.As(lastErr, &httpErr); ok {
httpErr.Message = string(httpErr.Contents)
lastError = &httpErr
} else {
lastError = &gomatrix.HTTPError{
Code: 0,
WrappedError: nil,
Message: "Unknown HTTP error",
}
if lastError != nil {
lastError.Message = lastErr.Error()
}
}
logrus.Errorf(
"failed to join user %q to room %q through %d server(s): last error %s",
request.UserID, request.RoomID, len(request.ServerNames), lastError,
)
return nil, "", "", lastError
}
func (r *FederationInternalAPI) performMakeJoinUsingServer(
ctx context.Context,
roomID, userID string,
content map[string]interface{},
serverName spec.ServerName,
) (gomatrixserverlib.PDU, gomatrixserverlib.RoomVersion, spec.SenderID, error) {
if !r.shouldAttemptDirectFederation(serverName) {
return nil, "", "", fmt.Errorf("relay servers have no meaningful response for join.")
}
user, err := spec.NewUserID(userID, true)
if err != nil {
return nil, "", "", err
}
room, err := spec.NewRoomID(roomID)
if err != nil {
return nil, "", "", err
}
joinInput := gomatrixserverlib.PerformMakeJoinInput{
UserID: user,
RoomID: room,
ServerName: serverName,
Content: content,
PrivateKey: r.cfg.Matrix.PrivateKey,
KeyID: r.cfg.Matrix.KeyID,
KeyRing: r.keyRing,
GetOrCreateSenderID: func(ctx context.Context, userID spec.UserID, roomID spec.RoomID, roomVersion string) (spec.SenderID, ed25519.PrivateKey, error) {
// assign a roomNID, otherwise we can't create a private key for the user
_, nidErr := r.rsAPI.AssignRoomNID(ctx, roomID, gomatrixserverlib.RoomVersion(roomVersion))
if nidErr != nil {
return "", nil, nidErr
}
key, keyErr := r.rsAPI.GetOrCreateUserRoomPrivateKey(ctx, userID, roomID)
if keyErr != nil {
return "", nil, keyErr
}
return spec.SenderIDFromPseudoIDKey(key), key, nil
},
}
joinEvent, version, senderID, joinErr := gomatrixserverlib.PerformMakeJoin(ctx, r, joinInput)
if joinErr != nil {
if !joinErr.Reachable {
r.statistics.ForServer(joinErr.ServerName).Failure()
} else {
r.statistics.ForServer(joinErr.ServerName).Success(statistics.SendDirect)
}
return nil, "", "", joinErr.Err
}
r.statistics.ForServer(serverName).Success(statistics.SendDirect)
if joinEvent == nil {
return nil, "", "", fmt.Errorf("Received nil joinEvent response from gomatrixserverlib.PerformJoin")
}
return joinEvent, version, senderID, nil
}
// PerformSendJoin implements api.FederationInternalAPI
func (r *FederationInternalAPI) PerformSendJoin(
ctx context.Context,
request *api.PerformSendJoinRequestCryptoIDs,
response *api.PerformJoinResponse,
) {
// Check that a join isn't already in progress for this user/room.
j := federatedJoin{request.UserID, request.RoomID}
if _, found := r.joins.Load(j); found {
response.LastError = &gomatrix.HTTPError{
Code: 429,
Message: `{
"errcode": "M_LIMIT_EXCEEDED",
"error": "There is already a federated join to this room in progress. Please wait for it to finish."
}`,
}
return
}
r.joins.Store(j, nil)
defer r.joins.Delete(j)
// Deduplicate the server names we were provided but keep the ordering
// as this encodes useful information about which servers are most likely
// to respond.
seenSet := make(map[spec.ServerName]bool)
var uniqueList []spec.ServerName
for _, srv := range request.ServerNames {
if seenSet[srv] || r.cfg.Matrix.IsLocalServerName(srv) {
continue
}
seenSet[srv] = true
uniqueList = append(uniqueList, srv)
}
request.ServerNames = uniqueList
// Try each server that we were provided until we land on one that
// successfully completes the make-join send-join dance.
var lastErr error
for _, serverName := range request.ServerNames {
if err := r.performSendJoinUsingServer(
ctx,
request.RoomID,
request.UserID,
request.Unsigned,
request.Event,
serverName,
); err != nil {
logrus.WithError(err).WithFields(logrus.Fields{
"server_name": serverName,
"room_id": request.RoomID,
}).Warnf("Failed to join room through server")
lastErr = err
continue
}
// We're all good.
return
}
// If we reach here then we didn't complete a join for some reason.
var httpErr gomatrix.HTTPError
var lastError *gomatrix.HTTPError
if ok := errors.As(lastErr, &httpErr); ok {
httpErr.Message = string(httpErr.Contents)
lastError = &httpErr
} else {
lastError = &gomatrix.HTTPError{
Code: 0,
WrappedError: nil,
Message: "Unknown HTTP error",
}
if lastError != nil {
lastError.Message = lastErr.Error()
}
}
logrus.Errorf(
"failed to join user %q to room %q through %d server(s): last error %s",
request.UserID, request.RoomID, len(request.ServerNames), lastError,
)
return
}
func (r *FederationInternalAPI) performSendJoinUsingServer(
ctx context.Context,
roomID, userID string,
unsigned map[string]interface{},
event gomatrixserverlib.PDU,
serverName spec.ServerName,
) error {
user, err := spec.NewUserID(userID, true)
if err != nil {
return err
}
room, err := spec.NewRoomID(roomID)
if err != nil {
return err
}
senderID, err := r.rsAPI.QuerySenderIDForUser(ctx, *room, *user)
if err != nil {
return err
}
joinInput := gomatrixserverlib.PerformSendJoinInput{
RoomID: room,
ServerName: serverName,
Unsigned: unsigned,
Origin: user.Domain(),
SenderID: *senderID,
KeyRing: r.keyRing,
Event: event,
RoomVersion: event.Version(),
EventProvider: federatedEventProvider(ctx, r.federation, r.keyRing, user.Domain(), serverName, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {
return r.rsAPI.QueryUserIDForSender(ctx, roomID, senderID)
}),
UserIDQuerier: func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {
return r.rsAPI.QueryUserIDForSender(ctx, roomID, senderID)
},
StoreSenderIDFromPublicID: func(ctx context.Context, senderID spec.SenderID, userIDRaw string, roomID spec.RoomID) error {
storeUserID, userErr := spec.NewUserID(userIDRaw, true)
if userErr != nil {
return userErr
}
return r.rsAPI.StoreUserRoomPublicKey(ctx, senderID, *storeUserID, roomID)
},
}
response, joinErr := gomatrixserverlib.PerformSendJoin(ctx, r, joinInput)
if joinErr != nil {
if !joinErr.Reachable {
r.statistics.ForServer(joinErr.ServerName).Failure()
} else {
r.statistics.ForServer(joinErr.ServerName).Success(statistics.SendDirect)
}
return joinErr.Err
}
r.statistics.ForServer(serverName).Success(statistics.SendDirect)
if response == nil {
return fmt.Errorf("Received nil response from gomatrixserverlib.PerformSendJoin")
}
// We need to immediately update our list of joined hosts for this room now as we are technically
// joined. We must do this synchronously: we cannot rely on the roomserver output events as they
// will happen asyncly. If we don't update this table, you can end up with bad failure modes like
// joining a room, waiting for 200 OK then changing device keys and have those keys not be sent
// to other servers (this was a cause of a flakey sytest "Local device key changes get to remote servers")
// The events are trusted now as we performed auth checks above.
joinedHosts, err := consumers.JoinedHostsFromEvents(ctx, response.StateSnapshot.GetStateEvents().TrustedEvents(response.JoinEvent.Version(), false), r.rsAPI)
if err != nil {
return fmt.Errorf("JoinedHostsFromEvents: failed to get joined hosts: %s", err)
}
logrus.WithField("room", roomID).Infof("Joined federated room with %d hosts", len(joinedHosts))
if _, err = r.db.UpdateRoom(context.Background(), roomID, joinedHosts, nil, true); err != nil {
return fmt.Errorf("UpdatedRoom: failed to update room with joined hosts: %s", err)
}
// TODO: Can I change this to not take respState but instead just take an opaque list of events?
if err = roomserverAPI.SendEventWithState(
context.Background(),
r.rsAPI,
user.Domain(),
roomserverAPI.KindNew,
response.StateSnapshot,
&types.HeaderedEvent{PDU: response.JoinEvent},
serverName,
nil,
false,
); err != nil {
return fmt.Errorf("roomserverAPI.SendEventWithState: %w", err)
}
return nil
}
// PerformOutboundPeekRequest implements api.FederationInternalAPI
func (r *FederationInternalAPI) PerformOutboundPeek(
ctx context.Context,

View file

@ -222,6 +222,7 @@ type ClientRoomserverAPI interface {
DefaultRoomVersionAPI
QueryMembershipForUser(ctx context.Context, req *QueryMembershipForUserRequest, res *QueryMembershipForUserResponse) error
QueryMembershipsForRoom(ctx context.Context, req *QueryMembershipsForRoomRequest, res *QueryMembershipsForRoomResponse) error
InvitePending(ctx context.Context, roomID spec.RoomID, senderID spec.SenderID) (bool, error)
QueryRoomsForUser(ctx context.Context, userID spec.UserID, desiredMembership string) ([]spec.RoomID, error)
QueryStateAfterEvents(ctx context.Context, req *QueryStateAfterEventsRequest, res *QueryStateAfterEventsResponse) error
// QueryKnownUsers returns a list of users that we know about from our joined rooms.
@ -244,6 +245,8 @@ type ClientRoomserverAPI interface {
PerformUnpeek(ctx context.Context, roomID, userID, deviceID string) error
PerformInvite(ctx context.Context, req *PerformInviteRequest) error
PerformJoin(ctx context.Context, req *PerformJoinRequest) (roomID string, joinedVia spec.ServerName, err error)
PerformSendJoinCryptoIDs(ctx context.Context, req *PerformJoinRequestCryptoIDs) error
PerformJoinCryptoIDs(ctx context.Context, req *PerformJoinRequest) (join gomatrixserverlib.PDU, roomID string, version gomatrixserverlib.RoomVersion, serverName spec.ServerName, err error)
PerformLeave(ctx context.Context, req *PerformLeaveRequest, res *PerformLeaveResponse) error
PerformPublish(ctx context.Context, req *PerformPublishRequest) error
// PerformForget forgets a rooms history for a specific user

View file

@ -42,6 +42,16 @@ type PerformJoinRequest struct {
Unsigned map[string]interface{} `json:"unsigned"`
}
type PerformJoinRequestCryptoIDs struct {
RoomID string `json:"room_id"`
UserID string `json:"user_id"`
IsGuest bool `json:"is_guest"`
Content map[string]interface{} `json:"content"`
ServerNames []spec.ServerName `json:"server_names"`
Unsigned map[string]interface{} `json:"unsigned"`
JoinEvent gomatrixserverlib.PDU
}
type PerformLeaveRequest struct {
RoomID string
Leaver spec.UserID

View file

@ -406,6 +406,374 @@ func (r *Joiner) performJoinRoomByID(
return req.RoomIDOrAlias, userDomain, nil
}
func (r *Joiner) PerformSendJoinCryptoIDs(
ctx context.Context,
req *rsAPI.PerformJoinRequestCryptoIDs,
) error {
logger := logrus.WithContext(ctx).WithFields(logrus.Fields{
"room_id": req.RoomID,
"user_id": req.UserID,
"servers": req.ServerNames,
})
logger.Info("performing send join")
res := fsAPI.PerformJoinResponse{}
r.FSAPI.PerformSendJoin(ctx, &fsAPI.PerformSendJoinRequestCryptoIDs{
RoomID: req.RoomID,
UserID: req.UserID,
ServerNames: req.ServerNames,
Unsigned: req.Unsigned,
Event: req.JoinEvent,
}, &res)
if res.LastError != nil {
return res.LastError
}
return nil
}
// PerformJoin handles joining matrix rooms, including over federation by talking to the federationapi.
func (r *Joiner) PerformJoinCryptoIDs(
ctx context.Context,
req *rsAPI.PerformJoinRequest,
) (joinEvent gomatrixserverlib.PDU, roomID string, version gomatrixserverlib.RoomVersion, serverName spec.ServerName, err error) {
logger := logrus.WithContext(ctx).WithFields(logrus.Fields{
"room_id": req.RoomIDOrAlias,
"user_id": req.UserID,
"servers": req.ServerNames,
})
logger.Info("User requested to room join")
join, roomID, version, serverName, err := r.makeJoinEvent(context.Background(), req)
if err != nil {
logger.WithError(err).Error("Failed to make join room event")
sentry.CaptureException(err)
return nil, "", "", "", err
}
return join, roomID, version, serverName, nil
}
func (r *Joiner) makeJoinEvent(
ctx context.Context,
req *rsAPI.PerformJoinRequest,
) (gomatrixserverlib.PDU, string, gomatrixserverlib.RoomVersion, spec.ServerName, error) {
_, domain, err := gomatrixserverlib.SplitID('@', req.UserID)
if err != nil {
return nil, "", "", "", rsAPI.ErrInvalidID{Err: fmt.Errorf("supplied user ID %q in incorrect format", req.UserID)}
}
if !r.Cfg.Matrix.IsLocalServerName(domain) {
return nil, "", "", "", rsAPI.ErrInvalidID{Err: fmt.Errorf("user %q does not belong to this homeserver", req.UserID)}
}
if strings.HasPrefix(req.RoomIDOrAlias, "!") {
return r.performJoinRoomByIDCryptoIDs(ctx, req)
}
if strings.HasPrefix(req.RoomIDOrAlias, "#") {
return r.performJoinRoomByAliasCryptoIDs(ctx, req)
}
return nil, "", "", "", rsAPI.ErrInvalidID{Err: fmt.Errorf("room ID or alias %q is invalid", req.RoomIDOrAlias)}
}
func (r *Joiner) performJoinRoomByAliasCryptoIDs(
ctx context.Context,
req *rsAPI.PerformJoinRequest,
) (gomatrixserverlib.PDU, string, gomatrixserverlib.RoomVersion, spec.ServerName, error) {
// Get the domain part of the room alias.
_, domain, err := gomatrixserverlib.SplitID('#', req.RoomIDOrAlias)
if err != nil {
return nil, "", "", "", fmt.Errorf("alias %q is not in the correct format", req.RoomIDOrAlias)
}
req.ServerNames = append(req.ServerNames, domain)
// Check if this alias matches our own server configuration. If it
// doesn't then we'll need to try a federated join.
var roomID string
if !r.Cfg.Matrix.IsLocalServerName(domain) {
// The alias isn't owned by us, so we will need to try joining using
// a remote server.
dirReq := fsAPI.PerformDirectoryLookupRequest{
RoomAlias: req.RoomIDOrAlias, // the room alias to lookup
ServerName: domain, // the server to ask
}
dirRes := fsAPI.PerformDirectoryLookupResponse{}
err = r.FSAPI.PerformDirectoryLookup(ctx, &dirReq, &dirRes)
if err != nil {
logrus.WithError(err).Errorf("error looking up alias %q", req.RoomIDOrAlias)
return nil, "", "", "", fmt.Errorf("looking up alias %q over federation failed: %w", req.RoomIDOrAlias, err)
}
roomID = dirRes.RoomID
req.ServerNames = append(req.ServerNames, dirRes.ServerNames...)
} else {
var getRoomReq = rsAPI.GetRoomIDForAliasRequest{
Alias: req.RoomIDOrAlias,
IncludeAppservices: true,
}
var getRoomRes = rsAPI.GetRoomIDForAliasResponse{}
// Otherwise, look up if we know this room alias locally.
err = r.RSAPI.GetRoomIDForAlias(ctx, &getRoomReq, &getRoomRes)
if err != nil {
return nil, "", "", "", fmt.Errorf("lookup room alias %q failed: %w", req.RoomIDOrAlias, err)
}
roomID = getRoomRes.RoomID
}
// If the room ID is empty then we failed to look up the alias.
if roomID == "" {
return nil, "", "", "", fmt.Errorf("alias %q not found", req.RoomIDOrAlias)
}
// If we do, then pluck out the room ID and continue the join.
req.RoomIDOrAlias = roomID
return r.performJoinRoomByIDCryptoIDs(ctx, req)
}
// TODO: Break this function up a bit & move to GMSL
// nolint:gocyclo
func (r *Joiner) performJoinRoomByIDCryptoIDs(
ctx context.Context,
req *rsAPI.PerformJoinRequest,
) (gomatrixserverlib.PDU, string, gomatrixserverlib.RoomVersion, spec.ServerName, error) {
// The original client request ?server_name=... may include this HS so filter that out so we
// don't attempt to make_join with ourselves
for i := 0; i < len(req.ServerNames); i++ {
if r.Cfg.Matrix.IsLocalServerName(req.ServerNames[i]) {
// delete this entry
req.ServerNames = append(req.ServerNames[:i], req.ServerNames[i+1:]...)
i--
}
}
// Get the domain part of the room ID.
roomID, err := spec.NewRoomID(req.RoomIDOrAlias)
if err != nil {
return nil, "", "", "", rsAPI.ErrInvalidID{Err: fmt.Errorf("room ID %q is invalid: %w", req.RoomIDOrAlias, err)}
}
// If the server name in the room ID isn't ours then it's a
// possible candidate for finding the room via federation. Add
// it to the list of servers to try.
if !r.Cfg.Matrix.IsLocalServerName(roomID.Domain()) {
req.ServerNames = append(req.ServerNames, roomID.Domain())
}
// Force a federated join if we aren't in the room and we've been
// given some server names to try joining by.
inRoomReq := &rsAPI.QueryServerJoinedToRoomRequest{
RoomID: req.RoomIDOrAlias,
}
inRoomRes := &rsAPI.QueryServerJoinedToRoomResponse{}
if err = r.Queryer.QueryServerJoinedToRoom(ctx, inRoomReq, inRoomRes); err != nil {
return nil, "", "", "", fmt.Errorf("r.Queryer.QueryServerJoinedToRoom: %w", err)
}
serverInRoom := inRoomRes.IsInRoom
forceFederatedJoin := len(req.ServerNames) > 0 && !serverInRoom
userID, err := spec.NewUserID(req.UserID, true)
if err != nil {
return nil, "", "", "", rsAPI.ErrInvalidID{Err: fmt.Errorf("user ID %q is invalid: %w", req.UserID, err)}
}
// Look up the room NID for the supplied room ID.
var senderID spec.SenderID
checkInvitePending := false
info, err := r.DB.RoomInfo(ctx, req.RoomIDOrAlias)
if err == nil && info != nil {
switch info.RoomVersion {
case gomatrixserverlib.RoomVersionPseudoIDs:
senderIDPtr, queryErr := r.Queryer.QuerySenderIDForUser(ctx, *roomID, *userID)
if queryErr == nil {
checkInvitePending = true
}
if senderIDPtr == nil {
// create user room key if needed
key, keyErr := r.RSAPI.GetOrCreateUserRoomPrivateKey(ctx, *userID, *roomID)
if keyErr != nil {
util.GetLogger(ctx).WithError(keyErr).Error("GetOrCreateUserRoomPrivateKey failed")
return nil, "", "", "", fmt.Errorf("GetOrCreateUserRoomPrivateKey failed: %w", keyErr)
}
senderID = spec.SenderIDFromPseudoIDKey(key)
} else {
senderID = *senderIDPtr
}
default:
checkInvitePending = true
senderID = spec.SenderID(userID.String())
}
}
// Force a federated join if we're dealing with a pending invite
// and we aren't in the room.
if checkInvitePending {
isInvitePending, inviteSender, _, inviteEvent, inviteErr := helpers.IsInvitePending(ctx, r.DB, req.RoomIDOrAlias, senderID)
if inviteErr == nil && !serverInRoom && isInvitePending {
inviter, queryErr := r.RSAPI.QueryUserIDForSender(ctx, *roomID, inviteSender)
if queryErr != nil {
return nil, "", "", "", fmt.Errorf("r.RSAPI.QueryUserIDForSender: %w", queryErr)
}
// If we were invited by someone from another server then we can
// assume they are in the room so we can join via them.
if inviter != nil && !r.Cfg.Matrix.IsLocalServerName(inviter.Domain()) {
req.ServerNames = append(req.ServerNames, inviter.Domain())
forceFederatedJoin = true
memberEvent := gjson.Parse(string(inviteEvent.JSON()))
// only set unsigned if we've got a content.membership, which we _should_
if memberEvent.Get("content.membership").Exists() {
req.Unsigned = map[string]interface{}{
"prev_sender": memberEvent.Get("sender").Str,
"prev_content": map[string]interface{}{
"is_direct": memberEvent.Get("content.is_direct").Bool(),
"membership": memberEvent.Get("content.membership").Str,
},
}
}
}
}
}
// If a guest is trying to join a room, check that the room has a m.room.guest_access event
if req.IsGuest {
var guestAccessEvent *types.HeaderedEvent
guestAccess := "forbidden"
guestAccessEvent, err = r.DB.GetStateEvent(ctx, req.RoomIDOrAlias, spec.MRoomGuestAccess, "")
if (err != nil && !errors.Is(err, sql.ErrNoRows)) || guestAccessEvent == nil {
logrus.WithError(err).Warn("unable to get m.room.guest_access event, defaulting to 'forbidden'")
}
if guestAccessEvent != nil {
guestAccess = gjson.GetBytes(guestAccessEvent.Content(), "guest_access").String()
}
// Servers MUST only allow guest users to join rooms if the m.room.guest_access state event
// is present on the room and has the guest_access value can_join.
if guestAccess != "can_join" {
return nil, "", "", "", rsAPI.ErrNotAllowed{Err: fmt.Errorf("guest access is forbidden")}
}
}
// If we should do a forced federated join then do that.
if forceFederatedJoin {
joinEvent, version, serverName, err := r.performFederatedMakeJoinByIDCryptoIDs(ctx, req)
return joinEvent, req.RoomIDOrAlias, version, serverName, err
}
// Try to construct an actual join event from the template.
// If this succeeds then it is a sign that the room already exists
// locally on the homeserver.
// TODO: Check what happens if the room exists on the server
// but everyone has since left. I suspect it does the wrong thing.
var buildRes rsAPI.QueryLatestEventsAndStateResponse
identity := r.Cfg.Matrix.SigningIdentity
// at this point we know we have an existing room
if inRoomRes.RoomVersion == gomatrixserverlib.RoomVersionPseudoIDs {
var pseudoIDKey ed25519.PrivateKey
pseudoIDKey, err = r.RSAPI.GetOrCreateUserRoomPrivateKey(ctx, *userID, *roomID)
if err != nil {
util.GetLogger(ctx).WithError(err).Error("GetOrCreateUserRoomPrivateKey failed")
return nil, "", "", "", err
}
mapping := &gomatrixserverlib.MXIDMapping{
UserRoomKey: spec.SenderIDFromPseudoIDKey(pseudoIDKey),
UserID: userID.String(),
}
// Sign the mapping with the server identity
if err = mapping.Sign(identity.ServerName, identity.KeyID, identity.PrivateKey); err != nil {
return nil, "", "", "", err
}
req.Content["mxid_mapping"] = mapping
// sign the event with the pseudo ID key
identity = fclient.SigningIdentity{
ServerName: spec.ServerName(spec.SenderIDFromPseudoIDKey(pseudoIDKey)),
KeyID: "ed25519:1",
PrivateKey: pseudoIDKey,
}
}
senderIDString := string(senderID)
// Prepare the template for the join event.
proto := gomatrixserverlib.ProtoEvent{
Type: spec.MRoomMember,
SenderID: senderIDString,
StateKey: &senderIDString,
RoomID: req.RoomIDOrAlias,
Redacts: "",
}
if err = proto.SetUnsigned(struct{}{}); err != nil {
return nil, "", "", "", fmt.Errorf("eb.SetUnsigned: %w", err)
}
// It is possible for the request to include some "content" for the
// event. We'll always overwrite the "membership" key, but the rest,
// like "display_name" or "avatar_url", will be kept if supplied.
if req.Content == nil {
req.Content = map[string]interface{}{}
}
req.Content["membership"] = spec.Join
if authorisedVia, aerr := r.populateAuthorisedViaUserForRestrictedJoin(ctx, req, senderID); aerr != nil {
return nil, "", "", "", aerr
} else if authorisedVia != "" {
req.Content["join_authorised_via_users_server"] = authorisedVia
}
if err = proto.SetContent(req.Content); err != nil {
return nil, "", "", "", fmt.Errorf("eb.SetContent: %w", err)
}
joinEvent, err := eventutil.QueryAndBuildEvent(ctx, &proto, &identity, time.Now(), r.RSAPI, &buildRes)
switch err.(type) {
case nil:
// Do nothing
case eventutil.ErrRoomNoExists:
// The room doesn't exist locally. If the room ID looks like it should
// be ours then this probably means that we've nuked our database at
// some point.
if r.Cfg.Matrix.IsLocalServerName(roomID.Domain()) {
// If there are no more server names to try then give up here.
// Otherwise we'll try a federated join as normal, since it's quite
// possible that the room still exists on other servers.
if len(req.ServerNames) == 0 {
return nil, "", "", "", eventutil.ErrRoomNoExists{}
}
}
// Perform a federated room join.
joinEvent, version, serverName, err := r.performFederatedMakeJoinByIDCryptoIDs(ctx, req)
return joinEvent, req.RoomIDOrAlias, version, serverName, err
default:
// Something else went wrong.
return nil, "", "", "", fmt.Errorf("error joining local room: %q", err)
}
// By this point, if req.RoomIDOrAlias contained an alias, then
// it will have been overwritten with a room ID by performJoinRoomByAlias.
// We should now include this in the response so that the CS API can
// return the right room ID.
return joinEvent, req.RoomIDOrAlias, inRoomRes.RoomVersion, userID.Domain(), nil
}
func (r *Joiner) performFederatedMakeJoinByIDCryptoIDs(
ctx context.Context,
req *rsAPI.PerformJoinRequest,
) (gomatrixserverlib.PDU, gomatrixserverlib.RoomVersion, spec.ServerName, error) {
// Try joining by all of the supplied server names.
fedReq := fsAPI.PerformJoinRequest{
RoomID: req.RoomIDOrAlias, // the room ID to try and join
UserID: req.UserID, // the user ID joining the room
ServerNames: req.ServerNames, // the server to try joining with
Content: req.Content, // the membership event content
Unsigned: req.Unsigned, // the unsigned event content, if any
}
joinEvent, version, serverName, err := r.FSAPI.PerformMakeJoin(ctx, &fedReq)
if err != nil {
return nil, "", "", err
}
return joinEvent, version, serverName, nil
}
func (r *Joiner) performFederatedJoinRoomByID(
ctx context.Context,
req *rsAPI.PerformJoinRequest,