dendrite/clientapi/routing/send_pdus.go
2023-11-02 10:56:25 -06:00

240 lines
9.1 KiB
Go

// Copyright 2023 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package routing
import (
"encoding/json"
"net/http"
"sync"
"time"
appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
"github.com/matrix-org/dendrite/clientapi/httputil"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/gomatrixserverlib/spec"
"github.com/matrix-org/util"
"github.com/prometheus/client_golang/prometheus"
)
type sendPDUsRequest struct {
Version string `json:"room_version"`
ViaServer string `json:"via_server,omitempty"`
TxnID string `json:"txn_id,omitempty"`
PDUs []json.RawMessage `json:"pdus"`
}
// SendPDUs implements /sendPDUs
func SendPDUs(
req *http.Request, device *api.Device,
cfg *config.ClientAPI,
profileAPI api.ClientUserAPI, rsAPI roomserverAPI.ClientRoomserverAPI,
asAPI appserviceAPI.AppServiceInternalAPI,
) util.JSONResponse {
// TODO: cryptoIDs - should this include an "eventType"?
// if it's a bulk send endpoint, I don't think that makes any sense since there are multiple event types
// In that case, how do I know how to treat the events?
// I could sort them all by roomID?
// Then filter them down based on event type? (how do I collect groups of events such as for room creation?)
// Possibly based on event hash tracking that I know were sent to the client?
// For createRoom, I know what the possible list of events are, so try to find those and collect them to send to room creation.
// Could also sort by depth... but that seems dangerous and depth may not be a field forever
// Does it matter at all?
// Can't I just forward all the events to the roomserver?
// Do I need to do any specific processing on them?
var pdus sendPDUsRequest
resErr := httputil.UnmarshalJSONRequest(req, &pdus)
if resErr != nil {
return *resErr
}
userID, err := spec.NewUserID(device.UserID, true)
if err != nil {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: spec.InvalidParam(err.Error()),
}
}
// create a mutex for the specific user in the specific room
// this avoids a situation where events that are received in quick succession are sent to the roomserver in a jumbled order
// TODO: cryptoIDs - where to get roomID from?
mutex, _ := userRoomSendMutexes.LoadOrStore("roomID"+userID.String(), &sync.Mutex{})
mutex.(*sync.Mutex).Lock()
defer mutex.(*sync.Mutex).Unlock()
var txnID *roomserverAPI.TransactionID
if pdus.TxnID != "" {
txnID.TransactionID = pdus.TxnID
txnID.SessionID = device.SessionID
}
inputs := make([]roomserverAPI.InputRoomEvent, 0, len(pdus.PDUs))
for _, event := range pdus.PDUs {
// TODO: cryptoIDs - event hash check?
verImpl, err := gomatrixserverlib.GetRoomVersion(gomatrixserverlib.RoomVersion(pdus.Version))
if err != nil {
return util.JSONResponse{
Code: http.StatusInternalServerError,
JSON: spec.InternalServerError{Err: err.Error()},
}
}
//eventJSON, err := json.Marshal(event)
//if err != nil {
// return util.JSONResponse{
// Code: http.StatusInternalServerError,
// JSON: spec.InternalServerError{Err: err.Error()},
// }
//}
// TODO: cryptoIDs - how should we be converting to a PDU here?
// if the hash matches an event we sent to the client, then the JSON should be good.
// But how do we know how to fill out if the event is redacted if we use the trustedJSON function?
// Also - untrusted JSON seems better - except it strips off the unsigned field?
// Also - gmsl events don't store the `hashes` field... problem?
pdu, err := verImpl.NewEventFromUntrustedJSON(event)
if err != nil {
return util.JSONResponse{
Code: http.StatusInternalServerError,
JSON: spec.InternalServerError{Err: err.Error()},
}
}
util.GetLogger(req.Context()).Infof("Processing %s event (%s): %s", pdu.Type(), pdu.EventID(), pdu.JSON())
// Check that the event is signed by the server sending the request.
redacted, err := verImpl.RedactEventJSON(pdu.JSON())
if err != nil {
util.GetLogger(req.Context()).WithError(err).Error("RedactEventJSON failed")
continue
}
verifier := gomatrixserverlib.JSONVerifierSelf{}
verifyRequests := []gomatrixserverlib.VerifyJSONRequest{{
ServerName: spec.ServerName(pdu.SenderID()),
Message: redacted,
AtTS: pdu.OriginServerTS(),
ValidityCheckingFunc: gomatrixserverlib.StrictValiditySignatureCheck,
}}
verifyResults, err := verifier.VerifyJSONs(req.Context(), verifyRequests)
if err != nil {
util.GetLogger(req.Context()).WithError(err).Error("keys.VerifyJSONs failed")
continue
}
if verifyResults[0].Error != nil {
util.GetLogger(req.Context()).WithError(verifyResults[0].Error).Error("Signature check failed: ")
continue
}
switch pdu.Type() {
case spec.MRoomCreate:
case spec.MRoomMember:
var membership gomatrixserverlib.MemberContent
err = json.Unmarshal(pdu.Content(), &membership)
switch {
case err != nil:
util.GetLogger(req.Context()).Errorf("m.room.member event (%s) content invalid: %v", pdu.EventID(), pdu.Content())
continue
case membership.Membership == spec.Join:
deviceUserID, err := spec.NewUserID(device.UserID, true)
if err != nil {
return util.JSONResponse{
Code: http.StatusForbidden,
JSON: spec.Forbidden("userID doesn't have power level to change visibility"),
}
}
if !cfg.Matrix.IsLocalServerName(pdu.RoomID().Domain()) {
queryReq := roomserverAPI.QueryMembershipForUserRequest{
RoomID: pdu.RoomID().String(),
UserID: *deviceUserID,
}
var queryRes roomserverAPI.QueryMembershipForUserResponse
if err := rsAPI.QueryMembershipForUser(req.Context(), &queryReq, &queryRes); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("rsAPI.QueryMembershipsForRoom failed")
return util.JSONResponse{
Code: http.StatusInternalServerError,
JSON: spec.InternalServerError{},
}
}
if !queryRes.IsInRoom {
// This is a join event to a remote room
// TODO: cryptoIDs - figure out how to obtain unsigned contents for outstanding federated invites
joinReq := roomserverAPI.PerformJoinRequestCryptoIDs{
RoomID: pdu.RoomID().String(),
UserID: device.UserID,
IsGuest: device.AccountType == api.AccountTypeGuest,
ServerNames: []spec.ServerName{spec.ServerName(pdus.ViaServer)},
JoinEvent: pdu,
}
err := rsAPI.PerformSendJoinCryptoIDs(req.Context(), &joinReq)
if err != nil {
util.GetLogger(req.Context()).Errorf("Failed processing %s event (%s): %v", pdu.Type(), pdu.EventID(), err)
}
continue // NOTE: don't send this event on to the roomserver
}
}
}
}
// TODO: cryptoIDs - does it matter which order these are added?
// yes - if the events are for room creation.
// Could make this a client requirement? ie. events are processed based on the order they appear
// We need to check event validity after processing each event.
// ie. what if the client changes power levels that disallow further events they sent?
// We should be doing this already as part of `SendInputRoomEvents`, but how should we pass this
// failure back to the client?
inputs = append(inputs, roomserverAPI.InputRoomEvent{
Kind: roomserverAPI.KindNew,
Event: &types.HeaderedEvent{PDU: pdu},
Origin: userID.Domain(),
// TODO: cryptoIDs - what to do with this field?
// should probably generate this based on the event type being sent?
//SendAsServer: roomserverAPI.DoNotSendToOtherServers,
TransactionID: txnID,
})
}
startedSubmittingEvents := time.Now()
// send the events to the roomserver
if err := roomserverAPI.SendInputRoomEvents(req.Context(), rsAPI, userID.Domain(), inputs, false); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("roomserverAPI.SendInputRoomEvents failed")
return util.JSONResponse{
Code: http.StatusInternalServerError,
JSON: spec.InternalServerError{Err: err.Error()},
}
}
timeToSubmitEvents := time.Since(startedSubmittingEvents)
sendEventDuration.With(prometheus.Labels{"action": "submit"}).Observe(float64(timeToSubmitEvents.Milliseconds()))
// Add response to transactionsCache
if txnID != nil {
// TODO : cryptoIDs - fix this
//res := util.JSONResponse{
// Code: http.StatusOK,
// JSON: sendEventResponse{e.EventID()},
//}
//txnCache.AddTransaction(device.AccessToken, *txnID, req.URL, &res)
}
return util.JSONResponse{
Code: http.StatusOK,
}
}