Add /send & /state support for cryptoids

This commit is contained in:
Devon Hudson 2023-10-13 17:01:18 -06:00
parent 4bfcf27106
commit 29cd14baf5
No known key found for this signature in database
GPG key ID: CD06B18E77F6A628
4 changed files with 236 additions and 5 deletions

View file

@ -354,6 +354,7 @@ func Setup(
).Methods(http.MethodPost, http.MethodOptions) ).Methods(http.MethodPost, http.MethodOptions)
unstableMux.Handle("/org.matrix.msc_cryptoids/join/{roomIDOrAlias}", unstableMux.Handle("/org.matrix.msc_cryptoids/join/{roomIDOrAlias}",
httputil.MakeAuthAPI(spec.Join, userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse { httputil.MakeAuthAPI(spec.Join, userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
logrus.Info("Processing request to /org.matrix.msc_cryptoids/join/{roomIDOrAlias}")
if r := rateLimits.Limit(req, device); r != nil { if r := rateLimits.Limit(req, device); r != nil {
return *r return *r
} }
@ -421,6 +422,7 @@ func Setup(
).Methods(http.MethodPost, http.MethodOptions) ).Methods(http.MethodPost, http.MethodOptions)
unstableMux.Handle("/org.matrix.msc_cryptoids/rooms/{roomID}/join", unstableMux.Handle("/org.matrix.msc_cryptoids/rooms/{roomID}/join",
httputil.MakeAuthAPI(spec.Join, userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse { httputil.MakeAuthAPI(spec.Join, userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
logrus.Info("Processing request to /org.matrix.msc_cryptoids/rooms/{roomID}/join")
if r := rateLimits.Limit(req, device); r != nil { if r := rateLimits.Limit(req, device); r != nil {
return *r return *r
} }
@ -511,7 +513,6 @@ func Setup(
return SendUnban(req, userAPI, device, vars["roomID"], cfg, rsAPI, asAPI) return SendUnban(req, userAPI, device, vars["roomID"], cfg, rsAPI, asAPI)
}), }),
).Methods(http.MethodPost, http.MethodOptions) ).Methods(http.MethodPost, http.MethodOptions)
// TODO: update for cryptoIDs
v3mux.Handle("/rooms/{roomID}/send/{eventType}", v3mux.Handle("/rooms/{roomID}/send/{eventType}",
httputil.MakeAuthAPI("send_message", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse { httputil.MakeAuthAPI("send_message", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
vars, err := httputil.URLDecodeMapValues(mux.Vars(req)) vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
@ -521,7 +522,16 @@ func Setup(
return SendEvent(req, device, vars["roomID"], vars["eventType"], nil, nil, cfg, rsAPI, nil) return SendEvent(req, device, vars["roomID"], vars["eventType"], nil, nil, cfg, rsAPI, nil)
}, httputil.WithAllowGuests()), }, httputil.WithAllowGuests()),
).Methods(http.MethodPost, http.MethodOptions) ).Methods(http.MethodPost, http.MethodOptions)
// TODO: update for cryptoIDs unstableMux.Handle("/org.matrix.msc_cryptoids/rooms/{roomID}/send/{eventType}",
httputil.MakeAuthAPI("send_message", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
logrus.Info("Processing request to /org.matrix.msc_cryptoids/rooms/{roomID}/send/{eventType}")
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
if err != nil {
return util.ErrorResponse(err)
}
return SendEventCryptoIDs(req, device, vars["roomID"], vars["eventType"], nil, nil, cfg, rsAPI, nil)
}, httputil.WithAllowGuests()),
).Methods(http.MethodPost, http.MethodOptions)
v3mux.Handle("/rooms/{roomID}/send/{eventType}/{txnID}", v3mux.Handle("/rooms/{roomID}/send/{eventType}/{txnID}",
httputil.MakeAuthAPI("send_message", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse { httputil.MakeAuthAPI("send_message", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
vars, err := httputil.URLDecodeMapValues(mux.Vars(req)) vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
@ -533,6 +543,18 @@ func Setup(
nil, cfg, rsAPI, transactionsCache) nil, cfg, rsAPI, transactionsCache)
}, httputil.WithAllowGuests()), }, httputil.WithAllowGuests()),
).Methods(http.MethodPut, http.MethodOptions) ).Methods(http.MethodPut, http.MethodOptions)
unstableMux.Handle("/org.matrix.msc_cryptoids/rooms/{roomID}/send/{eventType}/{txnID}",
httputil.MakeAuthAPI("send_message", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
logrus.Info("Processing request to /org.matrix.msc_cryptoids/rooms/{roomID}/send/{eventType}/{txnID}")
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
if err != nil {
return util.ErrorResponse(err)
}
txnID := vars["txnID"]
return SendEventCryptoIDs(req, device, vars["roomID"], vars["eventType"], &txnID,
nil, cfg, rsAPI, transactionsCache)
}, httputil.WithAllowGuests()),
).Methods(http.MethodPut, http.MethodOptions)
v3mux.Handle("/rooms/{roomID}/state", httputil.MakeAuthAPI("room_state", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse { v3mux.Handle("/rooms/{roomID}/state", httputil.MakeAuthAPI("room_state", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
vars, err := httputil.URLDecodeMapValues(mux.Vars(req)) vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
@ -570,7 +592,6 @@ func Setup(
return OnIncomingStateTypeRequest(req.Context(), device, rsAPI, vars["roomID"], vars["type"], vars["stateKey"], eventFormat) return OnIncomingStateTypeRequest(req.Context(), device, rsAPI, vars["roomID"], vars["type"], vars["stateKey"], eventFormat)
}, httputil.WithAllowGuests())).Methods(http.MethodGet, http.MethodOptions) }, httputil.WithAllowGuests())).Methods(http.MethodGet, http.MethodOptions)
// TODO: update for cryptoIDs
v3mux.Handle("/rooms/{roomID}/state/{eventType:[^/]+/?}", v3mux.Handle("/rooms/{roomID}/state/{eventType:[^/]+/?}",
httputil.MakeAuthAPI("send_message", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse { httputil.MakeAuthAPI("send_message", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
vars, err := httputil.URLDecodeMapValues(mux.Vars(req)) vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
@ -582,8 +603,19 @@ func Setup(
return SendEvent(req, device, vars["roomID"], eventType, nil, &emptyString, cfg, rsAPI, nil) return SendEvent(req, device, vars["roomID"], eventType, nil, &emptyString, cfg, rsAPI, nil)
}, httputil.WithAllowGuests()), }, httputil.WithAllowGuests()),
).Methods(http.MethodPut, http.MethodOptions) ).Methods(http.MethodPut, http.MethodOptions)
unstableMux.Handle("/org.matrix.msc_cryptoids/rooms/{roomID}/state/{eventType:[^/]+/?}",
httputil.MakeAuthAPI("send_message", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
logrus.Info("Processing request to /org.matrix.msc_cryptoids/rooms/{roomID}/state/{eventType:[^/]+/?}")
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
if err != nil {
return util.ErrorResponse(err)
}
emptyString := ""
eventType := strings.TrimSuffix(vars["eventType"], "/")
return SendEventCryptoIDs(req, device, vars["roomID"], eventType, nil, &emptyString, cfg, rsAPI, nil)
}, httputil.WithAllowGuests()),
).Methods(http.MethodPut, http.MethodOptions)
// TODO: update for cryptoIDs
v3mux.Handle("/rooms/{roomID}/state/{eventType}/{stateKey}", v3mux.Handle("/rooms/{roomID}/state/{eventType}/{stateKey}",
httputil.MakeAuthAPI("send_message", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse { httputil.MakeAuthAPI("send_message", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
vars, err := httputil.URLDecodeMapValues(mux.Vars(req)) vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
@ -594,6 +626,17 @@ func Setup(
return SendEvent(req, device, vars["roomID"], vars["eventType"], nil, &stateKey, cfg, rsAPI, nil) return SendEvent(req, device, vars["roomID"], vars["eventType"], nil, &stateKey, cfg, rsAPI, nil)
}, httputil.WithAllowGuests()), }, httputil.WithAllowGuests()),
).Methods(http.MethodPut, http.MethodOptions) ).Methods(http.MethodPut, http.MethodOptions)
unstableMux.Handle("/org.matrix.msc_cryptoids/rooms/{roomID}/state/{eventType}/{stateKey}",
httputil.MakeAuthAPI("send_message", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
logrus.Info("Processing request to /org.matrix.msc_cryptoids/rooms/{roomID}/state/{eventType}/{stateKey}")
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
if err != nil {
return util.ErrorResponse(err)
}
stateKey := vars["stateKey"]
return SendEventCryptoIDs(req, device, vars["roomID"], vars["eventType"], nil, &stateKey, cfg, rsAPI, nil)
}, httputil.WithAllowGuests()),
).Methods(http.MethodPut, http.MethodOptions)
// Defined outside of handler to persist between calls // Defined outside of handler to persist between calls
// TODO: clear based on some criteria // TODO: clear based on some criteria

View file

@ -17,6 +17,8 @@ package routing
import ( import (
"encoding/json" "encoding/json"
"net/http" "net/http"
"sync"
"time"
appserviceAPI "github.com/matrix-org/dendrite/appservice/api" appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
"github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/httputil"
@ -27,11 +29,13 @@ import (
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/gomatrixserverlib/spec" "github.com/matrix-org/gomatrixserverlib/spec"
"github.com/matrix-org/util" "github.com/matrix-org/util"
"github.com/prometheus/client_golang/prometheus"
) )
type sendPDUsRequest struct { type sendPDUsRequest struct {
Version string `json:"room_version"` Version string `json:"room_version"`
ViaServer string `json:"via_server,omitempty"` ViaServer string `json:"via_server,omitempty"`
TxnID string `json:"txn_id,omitempty"`
PDUs []json.RawMessage `json:"pdus"` PDUs []json.RawMessage `json:"pdus"`
} }
@ -68,6 +72,19 @@ func SendPDUs(
} }
} }
// create a mutex for the specific user in the specific room
// this avoids a situation where events that are received in quick succession are sent to the roomserver in a jumbled order
// TODO: cryptoIDs - where to get roomID from?
mutex, _ := userRoomSendMutexes.LoadOrStore("roomID"+userID.String(), &sync.Mutex{})
mutex.(*sync.Mutex).Lock()
defer mutex.(*sync.Mutex).Unlock()
var txnID *roomserverAPI.TransactionID
if pdus.TxnID != "" {
txnID.TransactionID = pdus.TxnID
txnID.SessionID = device.SessionID
}
inputs := make([]roomserverAPI.InputRoomEvent, 0, len(pdus.PDUs)) inputs := make([]roomserverAPI.InputRoomEvent, 0, len(pdus.PDUs))
for _, event := range pdus.PDUs { for _, event := range pdus.PDUs {
// TODO: cryptoIDs - event hash check? // TODO: cryptoIDs - event hash check?
@ -165,15 +182,19 @@ func SendPDUs(
// ie. what if the client changes power levels that disallow further events they sent? // ie. what if the client changes power levels that disallow further events they sent?
// We should be doing this already as part of `SendInputRoomEvents`, but how should we pass this // We should be doing this already as part of `SendInputRoomEvents`, but how should we pass this
// failure back to the client? // failure back to the client?
inputs = append(inputs, roomserverAPI.InputRoomEvent{ inputs = append(inputs, roomserverAPI.InputRoomEvent{
Kind: roomserverAPI.KindNew, Kind: roomserverAPI.KindNew,
Event: &types.HeaderedEvent{PDU: pdu}, Event: &types.HeaderedEvent{PDU: pdu},
Origin: userID.Domain(), Origin: userID.Domain(),
// TODO: cryptoIDs - what to do with this field? // TODO: cryptoIDs - what to do with this field?
// should probably generate this based on the event type being sent?
//SendAsServer: roomserverAPI.DoNotSendToOtherServers, //SendAsServer: roomserverAPI.DoNotSendToOtherServers,
TransactionID: txnID,
}) })
} }
startedSubmittingEvents := time.Now()
// send the events to the roomserver // send the events to the roomserver
if err := roomserverAPI.SendInputRoomEvents(req.Context(), rsAPI, userID.Domain(), inputs, false); err != nil { if err := roomserverAPI.SendInputRoomEvents(req.Context(), rsAPI, userID.Domain(), inputs, false); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("roomserverAPI.SendInputRoomEvents failed") util.GetLogger(req.Context()).WithError(err).Error("roomserverAPI.SendInputRoomEvents failed")
@ -182,6 +203,18 @@ func SendPDUs(
JSON: spec.InternalServerError{Err: err.Error()}, JSON: spec.InternalServerError{Err: err.Error()},
} }
} }
timeToSubmitEvents := time.Since(startedSubmittingEvents)
sendEventDuration.With(prometheus.Labels{"action": "submit"}).Observe(float64(timeToSubmitEvents.Milliseconds()))
// Add response to transactionsCache
if txnID != nil {
// TODO : cryptoIDs - fix this
//res := util.JSONResponse{
// Code: http.StatusOK,
// JSON: sendEventResponse{e.EventID()},
//}
//txnCache.AddTransaction(device.AccessToken, *txnID, req.URL, &res)
}
return util.JSONResponse{ return util.JSONResponse{
Code: http.StatusOK, Code: http.StatusOK,

View file

@ -44,6 +44,11 @@ type sendEventResponse struct {
EventID string `json:"event_id"` EventID string `json:"event_id"`
} }
type sendEventResponseCryptoIDs struct {
EventID string `json:"event_id"`
PDU json.RawMessage `json:"pdu"`
}
var ( var (
userRoomSendMutexes sync.Map // (roomID+userID) -> mutex. mutexes to ensure correct ordering of sendEvents userRoomSendMutexes sync.Map // (roomID+userID) -> mutex. mutexes to ensure correct ordering of sendEvents
) )
@ -262,6 +267,156 @@ func SendEvent(
return res return res
} }
// SendEventCryptoIDs implements:
//
// /rooms/{roomID}/send/{eventType}
// /rooms/{roomID}/send/{eventType}/{txnID}
// /rooms/{roomID}/state/{eventType}/{stateKey}
//
// nolint: gocyclo
func SendEventCryptoIDs(
req *http.Request,
device *userapi.Device,
roomID, eventType string, txnID, stateKey *string,
cfg *config.ClientAPI,
rsAPI api.ClientRoomserverAPI,
txnCache *transactions.Cache,
) util.JSONResponse {
roomVersion, err := rsAPI.QueryRoomVersionForRoom(req.Context(), roomID)
if err != nil {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: spec.UnsupportedRoomVersion(err.Error()),
}
}
if txnID != nil {
// Try to fetch response from transactionsCache
if res, ok := txnCache.FetchTransaction(device.AccessToken, *txnID, req.URL); ok {
return *res
}
}
// Translate user ID state keys to room keys in pseudo ID rooms
if roomVersion == gomatrixserverlib.RoomVersionPseudoIDs && stateKey != nil {
parsedRoomID, innerErr := spec.NewRoomID(roomID)
if innerErr != nil {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: spec.InvalidParam("invalid room ID"),
}
}
newStateKey, innerErr := synctypes.FromClientStateKey(*parsedRoomID, *stateKey, func(roomID spec.RoomID, userID spec.UserID) (*spec.SenderID, error) {
return rsAPI.QuerySenderIDForUser(req.Context(), roomID, userID)
})
if innerErr != nil {
// TODO: work out better logic for failure cases (e.g. sender ID not found)
util.GetLogger(req.Context()).WithError(innerErr).Error("synctypes.FromClientStateKey failed")
return util.JSONResponse{
Code: http.StatusInternalServerError,
JSON: spec.Unknown("internal server error"),
}
}
stateKey = newStateKey
}
var r map[string]interface{} // must be a JSON object
resErr := httputil.UnmarshalJSONRequest(req, &r)
if resErr != nil {
return *resErr
}
if stateKey != nil {
// If the existing/new state content are equal, return the existing event_id, making the request idempotent.
if resp := stateEqual(req.Context(), rsAPI, eventType, *stateKey, roomID, r); resp != nil {
return *resp
}
}
startedGeneratingEvent := time.Now()
// If we're sending a membership update, make sure to strip the authorised
// via key if it is present, otherwise other servers won't be able to auth
// the event if the room is set to the "restricted" join rule.
if eventType == spec.MRoomMember {
delete(r, "join_authorised_via_users_server")
}
// for power level events we need to replace the userID with the pseudoID
if roomVersion == gomatrixserverlib.RoomVersionPseudoIDs && eventType == spec.MRoomPowerLevels {
err = updatePowerLevels(req, r, roomID, rsAPI)
if err != nil {
return util.JSONResponse{
Code: http.StatusInternalServerError,
JSON: spec.InternalServerError{Err: err.Error()},
}
}
}
evTime, err := httputil.ParseTSParam(req)
if err != nil {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: spec.InvalidParam(err.Error()),
}
}
e, resErr := generateSendEvent(req.Context(), r, device, roomID, eventType, stateKey, rsAPI, evTime)
if resErr != nil {
return *resErr
}
timeToGenerateEvent := time.Since(startedGeneratingEvent)
// validate that the aliases exists
if eventType == spec.MRoomCanonicalAlias && stateKey != nil && *stateKey == "" {
aliasReq := api.AliasEvent{}
if err = json.Unmarshal(e.Content(), &aliasReq); err != nil {
return util.ErrorResponse(fmt.Errorf("unable to parse alias event: %w", err))
}
if !aliasReq.Valid() {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: spec.InvalidParam("Request contains invalid aliases."),
}
}
aliasRes := &api.GetAliasesForRoomIDResponse{}
if err = rsAPI.GetAliasesForRoomID(req.Context(), &api.GetAliasesForRoomIDRequest{RoomID: roomID}, aliasRes); err != nil {
return util.JSONResponse{
Code: http.StatusInternalServerError,
JSON: spec.InternalServerError{},
}
}
var found int
requestAliases := append(aliasReq.AltAliases, aliasReq.Alias)
for _, alias := range aliasRes.Aliases {
for _, altAlias := range requestAliases {
if altAlias == alias {
found++
}
}
}
// check that we found at least the same amount of existing aliases as are in the request
if aliasReq.Alias != "" && found < len(requestAliases) {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: spec.BadAlias("No matching alias found."),
}
}
}
sendEventDuration.With(prometheus.Labels{"action": "build"}).Observe(float64(timeToGenerateEvent.Milliseconds()))
res := util.JSONResponse{
Code: http.StatusOK,
JSON: sendEventResponseCryptoIDs{
EventID: e.EventID(),
PDU: json.RawMessage(e.JSON()),
},
}
return res
}
func updatePowerLevels(req *http.Request, r map[string]interface{}, roomID string, rsAPI api.ClientRoomserverAPI) error { func updatePowerLevels(req *http.Request, r map[string]interface{}, roomID string, rsAPI api.ClientRoomserverAPI) error {
users, ok := r["users"] users, ok := r["users"]
if !ok { if !ok {

View file

@ -647,7 +647,7 @@ func (s *OutputRoomEventConsumer) notifyLocal(ctx context.Context, event *rstype
func (s *OutputRoomEventConsumer) evaluatePushRules(ctx context.Context, event *rstypes.HeaderedEvent, mem *localMembership, roomSize int) ([]*pushrules.Action, error) { func (s *OutputRoomEventConsumer) evaluatePushRules(ctx context.Context, event *rstypes.HeaderedEvent, mem *localMembership, roomSize int) ([]*pushrules.Action, error) {
user := "" user := ""
sender, err := s.rsAPI.QueryUserIDForSender(ctx, event.RoomID(), event.SenderID()) sender, err := s.rsAPI.QueryUserIDForSender(ctx, event.RoomID(), event.SenderID())
if err == nil { if err == nil && sender != nil {
user = sender.String() user = sender.String()
} }
if user == mem.UserID { if user == mem.UserID {