diff --git a/clientapi/routing/routing.go b/clientapi/routing/routing.go index 913b18a90..8f5d69567 100644 --- a/clientapi/routing/routing.go +++ b/clientapi/routing/routing.go @@ -354,6 +354,7 @@ func Setup( ).Methods(http.MethodPost, http.MethodOptions) unstableMux.Handle("/org.matrix.msc_cryptoids/join/{roomIDOrAlias}", httputil.MakeAuthAPI(spec.Join, userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse { + logrus.Info("Processing request to /org.matrix.msc_cryptoids/join/{roomIDOrAlias}") if r := rateLimits.Limit(req, device); r != nil { return *r } @@ -421,6 +422,7 @@ func Setup( ).Methods(http.MethodPost, http.MethodOptions) unstableMux.Handle("/org.matrix.msc_cryptoids/rooms/{roomID}/join", httputil.MakeAuthAPI(spec.Join, userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse { + logrus.Info("Processing request to /org.matrix.msc_cryptoids/rooms/{roomID}/join") if r := rateLimits.Limit(req, device); r != nil { return *r } @@ -511,7 +513,6 @@ func Setup( return SendUnban(req, userAPI, device, vars["roomID"], cfg, rsAPI, asAPI) }), ).Methods(http.MethodPost, http.MethodOptions) - // TODO: update for cryptoIDs v3mux.Handle("/rooms/{roomID}/send/{eventType}", httputil.MakeAuthAPI("send_message", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse { vars, err := httputil.URLDecodeMapValues(mux.Vars(req)) @@ -521,7 +522,16 @@ func Setup( return SendEvent(req, device, vars["roomID"], vars["eventType"], nil, nil, cfg, rsAPI, nil) }, httputil.WithAllowGuests()), ).Methods(http.MethodPost, http.MethodOptions) - // TODO: update for cryptoIDs + unstableMux.Handle("/org.matrix.msc_cryptoids/rooms/{roomID}/send/{eventType}", + httputil.MakeAuthAPI("send_message", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse { + logrus.Info("Processing request to /org.matrix.msc_cryptoids/rooms/{roomID}/send/{eventType}") + vars, err := httputil.URLDecodeMapValues(mux.Vars(req)) + if err != nil { + return util.ErrorResponse(err) + } + return SendEventCryptoIDs(req, device, vars["roomID"], vars["eventType"], nil, nil, cfg, rsAPI, nil) + }, httputil.WithAllowGuests()), + ).Methods(http.MethodPost, http.MethodOptions) v3mux.Handle("/rooms/{roomID}/send/{eventType}/{txnID}", httputil.MakeAuthAPI("send_message", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse { vars, err := httputil.URLDecodeMapValues(mux.Vars(req)) @@ -533,6 +543,18 @@ func Setup( nil, cfg, rsAPI, transactionsCache) }, httputil.WithAllowGuests()), ).Methods(http.MethodPut, http.MethodOptions) + unstableMux.Handle("/org.matrix.msc_cryptoids/rooms/{roomID}/send/{eventType}/{txnID}", + httputil.MakeAuthAPI("send_message", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse { + logrus.Info("Processing request to /org.matrix.msc_cryptoids/rooms/{roomID}/send/{eventType}/{txnID}") + vars, err := httputil.URLDecodeMapValues(mux.Vars(req)) + if err != nil { + return util.ErrorResponse(err) + } + txnID := vars["txnID"] + return SendEventCryptoIDs(req, device, vars["roomID"], vars["eventType"], &txnID, + nil, cfg, rsAPI, transactionsCache) + }, httputil.WithAllowGuests()), + ).Methods(http.MethodPut, http.MethodOptions) v3mux.Handle("/rooms/{roomID}/state", httputil.MakeAuthAPI("room_state", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse { vars, err := httputil.URLDecodeMapValues(mux.Vars(req)) @@ -570,7 +592,6 @@ func Setup( return OnIncomingStateTypeRequest(req.Context(), device, rsAPI, vars["roomID"], vars["type"], vars["stateKey"], eventFormat) }, httputil.WithAllowGuests())).Methods(http.MethodGet, http.MethodOptions) - // TODO: update for cryptoIDs v3mux.Handle("/rooms/{roomID}/state/{eventType:[^/]+/?}", httputil.MakeAuthAPI("send_message", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse { vars, err := httputil.URLDecodeMapValues(mux.Vars(req)) @@ -582,8 +603,19 @@ func Setup( return SendEvent(req, device, vars["roomID"], eventType, nil, &emptyString, cfg, rsAPI, nil) }, httputil.WithAllowGuests()), ).Methods(http.MethodPut, http.MethodOptions) + unstableMux.Handle("/org.matrix.msc_cryptoids/rooms/{roomID}/state/{eventType:[^/]+/?}", + httputil.MakeAuthAPI("send_message", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse { + logrus.Info("Processing request to /org.matrix.msc_cryptoids/rooms/{roomID}/state/{eventType:[^/]+/?}") + vars, err := httputil.URLDecodeMapValues(mux.Vars(req)) + if err != nil { + return util.ErrorResponse(err) + } + emptyString := "" + eventType := strings.TrimSuffix(vars["eventType"], "/") + return SendEventCryptoIDs(req, device, vars["roomID"], eventType, nil, &emptyString, cfg, rsAPI, nil) + }, httputil.WithAllowGuests()), + ).Methods(http.MethodPut, http.MethodOptions) - // TODO: update for cryptoIDs v3mux.Handle("/rooms/{roomID}/state/{eventType}/{stateKey}", httputil.MakeAuthAPI("send_message", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse { vars, err := httputil.URLDecodeMapValues(mux.Vars(req)) @@ -594,6 +626,17 @@ func Setup( return SendEvent(req, device, vars["roomID"], vars["eventType"], nil, &stateKey, cfg, rsAPI, nil) }, httputil.WithAllowGuests()), ).Methods(http.MethodPut, http.MethodOptions) + unstableMux.Handle("/org.matrix.msc_cryptoids/rooms/{roomID}/state/{eventType}/{stateKey}", + httputil.MakeAuthAPI("send_message", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse { + logrus.Info("Processing request to /org.matrix.msc_cryptoids/rooms/{roomID}/state/{eventType}/{stateKey}") + vars, err := httputil.URLDecodeMapValues(mux.Vars(req)) + if err != nil { + return util.ErrorResponse(err) + } + stateKey := vars["stateKey"] + return SendEventCryptoIDs(req, device, vars["roomID"], vars["eventType"], nil, &stateKey, cfg, rsAPI, nil) + }, httputil.WithAllowGuests()), + ).Methods(http.MethodPut, http.MethodOptions) // Defined outside of handler to persist between calls // TODO: clear based on some criteria diff --git a/clientapi/routing/send_pdus.go b/clientapi/routing/send_pdus.go index 9fe240475..d4cd4439d 100644 --- a/clientapi/routing/send_pdus.go +++ b/clientapi/routing/send_pdus.go @@ -17,6 +17,8 @@ package routing import ( "encoding/json" "net/http" + "sync" + "time" appserviceAPI "github.com/matrix-org/dendrite/appservice/api" "github.com/matrix-org/dendrite/clientapi/httputil" @@ -27,11 +29,13 @@ import ( "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib/spec" "github.com/matrix-org/util" + "github.com/prometheus/client_golang/prometheus" ) type sendPDUsRequest struct { Version string `json:"room_version"` ViaServer string `json:"via_server,omitempty"` + TxnID string `json:"txn_id,omitempty"` PDUs []json.RawMessage `json:"pdus"` } @@ -68,6 +72,19 @@ func SendPDUs( } } + // create a mutex for the specific user in the specific room + // this avoids a situation where events that are received in quick succession are sent to the roomserver in a jumbled order + // TODO: cryptoIDs - where to get roomID from? + mutex, _ := userRoomSendMutexes.LoadOrStore("roomID"+userID.String(), &sync.Mutex{}) + mutex.(*sync.Mutex).Lock() + defer mutex.(*sync.Mutex).Unlock() + + var txnID *roomserverAPI.TransactionID + if pdus.TxnID != "" { + txnID.TransactionID = pdus.TxnID + txnID.SessionID = device.SessionID + } + inputs := make([]roomserverAPI.InputRoomEvent, 0, len(pdus.PDUs)) for _, event := range pdus.PDUs { // TODO: cryptoIDs - event hash check? @@ -165,15 +182,19 @@ func SendPDUs( // ie. what if the client changes power levels that disallow further events they sent? // We should be doing this already as part of `SendInputRoomEvents`, but how should we pass this // failure back to the client? + inputs = append(inputs, roomserverAPI.InputRoomEvent{ Kind: roomserverAPI.KindNew, Event: &types.HeaderedEvent{PDU: pdu}, Origin: userID.Domain(), // TODO: cryptoIDs - what to do with this field? + // should probably generate this based on the event type being sent? //SendAsServer: roomserverAPI.DoNotSendToOtherServers, + TransactionID: txnID, }) } + startedSubmittingEvents := time.Now() // send the events to the roomserver if err := roomserverAPI.SendInputRoomEvents(req.Context(), rsAPI, userID.Domain(), inputs, false); err != nil { util.GetLogger(req.Context()).WithError(err).Error("roomserverAPI.SendInputRoomEvents failed") @@ -182,6 +203,18 @@ func SendPDUs( JSON: spec.InternalServerError{Err: err.Error()}, } } + timeToSubmitEvents := time.Since(startedSubmittingEvents) + sendEventDuration.With(prometheus.Labels{"action": "submit"}).Observe(float64(timeToSubmitEvents.Milliseconds())) + + // Add response to transactionsCache + if txnID != nil { + // TODO : cryptoIDs - fix this + //res := util.JSONResponse{ + // Code: http.StatusOK, + // JSON: sendEventResponse{e.EventID()}, + //} + //txnCache.AddTransaction(device.AccessToken, *txnID, req.URL, &res) + } return util.JSONResponse{ Code: http.StatusOK, diff --git a/clientapi/routing/sendevent.go b/clientapi/routing/sendevent.go index 69131966b..3a0f620ac 100644 --- a/clientapi/routing/sendevent.go +++ b/clientapi/routing/sendevent.go @@ -44,6 +44,11 @@ type sendEventResponse struct { EventID string `json:"event_id"` } +type sendEventResponseCryptoIDs struct { + EventID string `json:"event_id"` + PDU json.RawMessage `json:"pdu"` +} + var ( userRoomSendMutexes sync.Map // (roomID+userID) -> mutex. mutexes to ensure correct ordering of sendEvents ) @@ -262,6 +267,156 @@ func SendEvent( return res } +// SendEventCryptoIDs implements: +// +// /rooms/{roomID}/send/{eventType} +// /rooms/{roomID}/send/{eventType}/{txnID} +// /rooms/{roomID}/state/{eventType}/{stateKey} +// +// nolint: gocyclo +func SendEventCryptoIDs( + req *http.Request, + device *userapi.Device, + roomID, eventType string, txnID, stateKey *string, + cfg *config.ClientAPI, + rsAPI api.ClientRoomserverAPI, + txnCache *transactions.Cache, +) util.JSONResponse { + roomVersion, err := rsAPI.QueryRoomVersionForRoom(req.Context(), roomID) + if err != nil { + return util.JSONResponse{ + Code: http.StatusBadRequest, + JSON: spec.UnsupportedRoomVersion(err.Error()), + } + } + + if txnID != nil { + // Try to fetch response from transactionsCache + if res, ok := txnCache.FetchTransaction(device.AccessToken, *txnID, req.URL); ok { + return *res + } + } + + // Translate user ID state keys to room keys in pseudo ID rooms + if roomVersion == gomatrixserverlib.RoomVersionPseudoIDs && stateKey != nil { + parsedRoomID, innerErr := spec.NewRoomID(roomID) + if innerErr != nil { + return util.JSONResponse{ + Code: http.StatusBadRequest, + JSON: spec.InvalidParam("invalid room ID"), + } + } + + newStateKey, innerErr := synctypes.FromClientStateKey(*parsedRoomID, *stateKey, func(roomID spec.RoomID, userID spec.UserID) (*spec.SenderID, error) { + return rsAPI.QuerySenderIDForUser(req.Context(), roomID, userID) + }) + if innerErr != nil { + // TODO: work out better logic for failure cases (e.g. sender ID not found) + util.GetLogger(req.Context()).WithError(innerErr).Error("synctypes.FromClientStateKey failed") + return util.JSONResponse{ + Code: http.StatusInternalServerError, + JSON: spec.Unknown("internal server error"), + } + } + stateKey = newStateKey + } + + var r map[string]interface{} // must be a JSON object + resErr := httputil.UnmarshalJSONRequest(req, &r) + if resErr != nil { + return *resErr + } + + if stateKey != nil { + // If the existing/new state content are equal, return the existing event_id, making the request idempotent. + if resp := stateEqual(req.Context(), rsAPI, eventType, *stateKey, roomID, r); resp != nil { + return *resp + } + } + + startedGeneratingEvent := time.Now() + + // If we're sending a membership update, make sure to strip the authorised + // via key if it is present, otherwise other servers won't be able to auth + // the event if the room is set to the "restricted" join rule. + if eventType == spec.MRoomMember { + delete(r, "join_authorised_via_users_server") + } + + // for power level events we need to replace the userID with the pseudoID + if roomVersion == gomatrixserverlib.RoomVersionPseudoIDs && eventType == spec.MRoomPowerLevels { + err = updatePowerLevels(req, r, roomID, rsAPI) + if err != nil { + return util.JSONResponse{ + Code: http.StatusInternalServerError, + JSON: spec.InternalServerError{Err: err.Error()}, + } + } + } + + evTime, err := httputil.ParseTSParam(req) + if err != nil { + return util.JSONResponse{ + Code: http.StatusBadRequest, + JSON: spec.InvalidParam(err.Error()), + } + } + + e, resErr := generateSendEvent(req.Context(), r, device, roomID, eventType, stateKey, rsAPI, evTime) + if resErr != nil { + return *resErr + } + timeToGenerateEvent := time.Since(startedGeneratingEvent) + + // validate that the aliases exists + if eventType == spec.MRoomCanonicalAlias && stateKey != nil && *stateKey == "" { + aliasReq := api.AliasEvent{} + if err = json.Unmarshal(e.Content(), &aliasReq); err != nil { + return util.ErrorResponse(fmt.Errorf("unable to parse alias event: %w", err)) + } + if !aliasReq.Valid() { + return util.JSONResponse{ + Code: http.StatusBadRequest, + JSON: spec.InvalidParam("Request contains invalid aliases."), + } + } + aliasRes := &api.GetAliasesForRoomIDResponse{} + if err = rsAPI.GetAliasesForRoomID(req.Context(), &api.GetAliasesForRoomIDRequest{RoomID: roomID}, aliasRes); err != nil { + return util.JSONResponse{ + Code: http.StatusInternalServerError, + JSON: spec.InternalServerError{}, + } + } + var found int + requestAliases := append(aliasReq.AltAliases, aliasReq.Alias) + for _, alias := range aliasRes.Aliases { + for _, altAlias := range requestAliases { + if altAlias == alias { + found++ + } + } + } + // check that we found at least the same amount of existing aliases as are in the request + if aliasReq.Alias != "" && found < len(requestAliases) { + return util.JSONResponse{ + Code: http.StatusBadRequest, + JSON: spec.BadAlias("No matching alias found."), + } + } + } + sendEventDuration.With(prometheus.Labels{"action": "build"}).Observe(float64(timeToGenerateEvent.Milliseconds())) + + res := util.JSONResponse{ + Code: http.StatusOK, + JSON: sendEventResponseCryptoIDs{ + EventID: e.EventID(), + PDU: json.RawMessage(e.JSON()), + }, + } + + return res +} + func updatePowerLevels(req *http.Request, r map[string]interface{}, roomID string, rsAPI api.ClientRoomserverAPI) error { users, ok := r["users"] if !ok { diff --git a/userapi/consumers/roomserver.go b/userapi/consumers/roomserver.go index 6da41f8a1..45fc677f6 100644 --- a/userapi/consumers/roomserver.go +++ b/userapi/consumers/roomserver.go @@ -647,7 +647,7 @@ func (s *OutputRoomEventConsumer) notifyLocal(ctx context.Context, event *rstype func (s *OutputRoomEventConsumer) evaluatePushRules(ctx context.Context, event *rstypes.HeaderedEvent, mem *localMembership, roomSize int) ([]*pushrules.Action, error) { user := "" sender, err := s.rsAPI.QueryUserIDForSender(ctx, event.RoomID(), event.SenderID()) - if err == nil { + if err == nil && sender != nil { user = sender.String() } if user == mem.UserID {