Refactor invites to be synchronous

This commit is contained in:
Neil Alexander 2020-08-14 12:28:42 +01:00
parent c1f4faf308
commit 96814d436a
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
18 changed files with 197 additions and 392 deletions

View file

@ -371,7 +371,10 @@ func createRoom(
}
// If this is a direct message then we should invite the participants.
fmt.Println("INVITEES:")
for _, invitee := range r.Invite {
fmt.Println("*", invitee)
// Build the invite event.
inviteEvent, err := buildMembershipEvent(
req.Context(), invitee, "", accountDB, device, gomatrixserverlib.Invite,
@ -386,8 +389,8 @@ func createRoom(
var strippedState []gomatrixserverlib.InviteV2StrippedState
for _, event := range candidates {
switch event.Type() {
// TODO: case gomatrixserverlib.MRoomEncryption:
// fallthrough
case "m.room.encryption": // TODO: move this to gmsl
fallthrough
case gomatrixserverlib.MRoomMember:
fallthrough
case gomatrixserverlib.MRoomJoinRules:
@ -398,15 +401,15 @@ func createRoom(
}
}
// Send the invite event to the roomserver.
if perr := roomserverAPI.SendInvite(
if err := roomserverAPI.SendInvite(
req.Context(), rsAPI,
inviteEvent.Headered(roomVersion),
strippedState, // invite room state
cfg.Matrix.ServerName, // send as server
nil, // transaction ID
); perr != nil {
util.GetLogger(req.Context()).WithError(perr).Error("SendInvite failed")
return perr.JSONResponse()
); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("SendInvite failed")
return jsonerror.InternalServerError()
}
}

View file

@ -26,6 +26,7 @@ import (
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/clientapi/threepid"
currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/roomserver/api"
@ -172,8 +173,9 @@ func SendInvite(
req *http.Request, accountDB accounts.Database, device *userapi.Device,
roomID string, cfg *config.ClientAPI,
rsAPI roomserverAPI.RoomserverInternalAPI, asAPI appserviceAPI.AppServiceQueryAPI,
fsAPI federationSenderAPI.FederationSenderInternalAPI,
) util.JSONResponse {
body, evTime, roomVer, reqErr := extractRequestData(req, roomID, rsAPI)
body, evTime, _, reqErr := extractRequestData(req, roomID, rsAPI)
if reqErr != nil {
return *reqErr
}
@ -214,16 +216,16 @@ func SendInvite(
return jsonerror.InternalServerError()
}
perr := roomserverAPI.SendInvite(
err = roomserverAPI.SendInvite(
req.Context(), rsAPI,
event.Event.Headered(roomVer),
*event,
nil, // ask the roomserver to draw up invite room state for us
cfg.Matrix.ServerName,
nil,
)
if perr != nil {
util.GetLogger(req.Context()).WithError(perr).Error("producer.SendInvite failed")
return perr.JSONResponse()
if err != nil {
util.GetLogger(req.Context()).WithError(err).Error("roomserverAPI.SendInvite failed")
return jsonerror.InternalServerError()
}
return util.JSONResponse{
Code: http.StatusOK,

View file

@ -142,7 +142,7 @@ func Setup(
if err != nil {
return util.ErrorResponse(err)
}
return SendInvite(req, accountDB, device, vars["roomID"], cfg, rsAPI, asAPI)
return SendInvite(req, accountDB, device, vars["roomID"], cfg, rsAPI, asAPI, federationSender)
}),
).Methods(http.MethodPost, http.MethodOptions)
r0mux.Handle("/rooms/{roomID}/kick",

View file

@ -143,11 +143,11 @@ func processInvite(
)
// Add the invite event to the roomserver.
if perr := api.SendInvite(
if err := api.SendInvite(
ctx, rsAPI, signedEvent.Headered(roomVer), strippedState, event.Origin(), nil,
); perr != nil {
); err != nil {
util.GetLogger(ctx).WithError(err).Error("producer.SendInvite failed")
return perr.JSONResponse()
return jsonerror.InternalServerError()
}
// Return the signed event to the originating server, it should then tell

View file

@ -102,7 +102,8 @@ func (t *testRoomserverAPI) PerformInvite(
ctx context.Context,
req *api.PerformInviteRequest,
res *api.PerformInviteResponse,
) {
) error {
return nil
}
func (t *testRoomserverAPI) PerformJoin(

View file

@ -36,6 +36,12 @@ type FederationSenderInternalAPI interface {
request *PerformLeaveRequest,
response *PerformLeaveResponse,
) error
// Handle sending an invite to a remote server.
PerformInvite(
ctx context.Context,
request *PerformInviteRequest,
response *PerformInviteResponse,
) error
// Notifies the federation sender that these servers may be online and to retry sending messages.
PerformServersAlive(
ctx context.Context,
@ -81,6 +87,16 @@ type PerformLeaveRequest struct {
type PerformLeaveResponse struct {
}
type PerformInviteRequest struct {
RoomVersion gomatrixserverlib.RoomVersion `json:"room_version"`
Event gomatrixserverlib.HeaderedEvent `json:"event"`
InviteRoomState []gomatrixserverlib.InviteV2StrippedState `json:"invite_room_state"`
}
type PerformInviteResponse struct {
SignedEvent gomatrixserverlib.HeaderedEvent `json:"signed_event"`
}
type PerformServersAliveRequest struct {
Servers []gomatrixserverlib.ServerName
}

View file

@ -28,7 +28,6 @@ import (
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
log "github.com/sirupsen/logrus"
"github.com/tidwall/gjson"
)
// OutputRoomEventConsumer consumes events that originated in the room server.
@ -97,22 +96,6 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
}).Panicf("roomserver output log: write room event failure")
return nil
}
case api.OutputTypeNewInviteEvent:
ev := &output.NewInviteEvent.Event
log.WithFields(log.Fields{
"event_id": ev.EventID(),
"room_id": ev.RoomID(),
"state_key": ev.StateKey(),
}).Info("received invite event from roomserver")
if err := s.processInvite(*output.NewInviteEvent); err != nil {
// panic rather than continue with an inconsistent database
log.WithFields(log.Fields{
"event": string(ev.JSON()),
log.ErrorKey: err,
}).Panicf("roomserver output log: write invite event failure")
return nil
}
default:
log.WithField("type", output.Type).Debug(
"roomserver output log: ignoring unknown output type",
@ -172,51 +155,6 @@ func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) err
)
}
// processInvite handles an invite event for sending over federation.
func (s *OutputRoomEventConsumer) processInvite(oie api.OutputNewInviteEvent) error {
// Don't try to reflect and resend invites that didn't originate from us.
if s.cfg.Matrix.ServerName != oie.Event.Origin() {
return nil
}
// Ignore invites that don't have state keys - they are invalid.
if oie.Event.StateKey() == nil {
return fmt.Errorf("event %q doesn't have state key", oie.Event.EventID())
}
// Don't try to handle events that are actually destined for us.
stateKey := *oie.Event.StateKey()
_, destination, err := gomatrixserverlib.SplitID('@', stateKey)
if err != nil {
log.WithFields(log.Fields{
"event_id": oie.Event.EventID(),
"state_key": stateKey,
}).Info("failed to split destination from state key")
return nil
}
if s.cfg.Matrix.ServerName == destination {
return nil
}
// Try to extract the room invite state. The roomserver will have stashed
// this for us in invite_room_state if it didn't already exist.
strippedState := []gomatrixserverlib.InviteV2StrippedState{}
if inviteRoomState := gjson.GetBytes(oie.Event.Unsigned(), "invite_room_state"); inviteRoomState.Exists() {
if err = json.Unmarshal([]byte(inviteRoomState.Raw), &strippedState); err != nil {
log.WithError(err).Warn("failed to extract invite_room_state from event unsigned")
}
}
// Build the invite request with the info we've got.
inviteReq, err := gomatrixserverlib.NewInviteV2Request(&oie.Event, strippedState)
if err != nil {
return fmt.Errorf("gomatrixserverlib.NewInviteV2Request: %w", err)
}
// Send the event.
return s.queues.SendInvite(&inviteReq)
}
// joinedHostsAtEvent works out a list of matrix servers that were joined to
// the room at the event.
// It is important to use the state at the event for sending messages because:

View file

@ -296,6 +296,48 @@ func (r *FederationSenderInternalAPI) PerformLeave(
)
}
// PerformLeaveRequest implements api.FederationSenderInternalAPI
func (r *FederationSenderInternalAPI) PerformInvite(
ctx context.Context,
request *api.PerformInviteRequest,
response *api.PerformInviteResponse,
) (err error) {
if request.Event.StateKey() == nil {
return errors.New("invite must be a state event")
}
_, destination, err := gomatrixserverlib.SplitID('@', *request.Event.StateKey())
if err != nil {
return fmt.Errorf("gomatrixserverlib.SplitID: %w", err)
}
logrus.WithFields(logrus.Fields{
"event_id": request.Event.EventID(),
"user_id": *request.Event.StateKey(),
"room_id": request.Event.RoomID(),
"room_version": request.RoomVersion,
"destination": destination,
}).Info("Sending invite")
inviteReq, err := gomatrixserverlib.NewInviteV2Request(&request.Event, request.InviteRoomState)
if err != nil {
return fmt.Errorf("gomatrixserverlib.NewInviteV2Request: %w", err)
}
inviteRes, err := r.federation.SendInviteV2(ctx, destination, inviteReq)
if err != nil {
return fmt.Errorf("r.federation.SendInviteV2: %w", err)
}
response.SignedEvent = inviteRes.Event.Sign(
string(r.cfg.Matrix.ServerName),
r.cfg.Matrix.KeyID,
r.cfg.Matrix.PrivateKey,
).Headered(request.RoomVersion)
return nil
}
// PerformServersAlive implements api.FederationSenderInternalAPI
func (r *FederationSenderInternalAPI) PerformServersAlive(
ctx context.Context,

View file

@ -18,6 +18,7 @@ const (
FederationSenderPerformDirectoryLookupRequestPath = "/federationsender/performDirectoryLookup"
FederationSenderPerformJoinRequestPath = "/federationsender/performJoinRequest"
FederationSenderPerformLeaveRequestPath = "/federationsender/performLeaveRequest"
FederationSenderPerformInviteRequestPath = "/federationsender/performInviteRequest"
FederationSenderPerformServersAlivePath = "/federationsender/performServersAlive"
FederationSenderPerformBroadcastEDUPath = "/federationsender/performBroadcastEDU"
)
@ -49,6 +50,19 @@ func (h *httpFederationSenderInternalAPI) PerformLeave(
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}
// Handle sending an invite to a remote server.
func (h *httpFederationSenderInternalAPI) PerformInvite(
ctx context.Context,
request *api.PerformInviteRequest,
response *api.PerformInviteResponse,
) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "PerformInviteRequest")
defer span.Finish()
apiURL := h.federationSenderURL + FederationSenderPerformInviteRequestPath
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}
func (h *httpFederationSenderInternalAPI) PerformServersAlive(
ctx context.Context,
request *api.PerformServersAliveRequest,

View file

@ -52,6 +52,20 @@ func AddRoutes(intAPI api.FederationSenderInternalAPI, internalAPIMux *mux.Route
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
internalAPIMux.Handle(
FederationSenderPerformInviteRequestPath,
httputil.MakeInternalAPI("PerformInviteRequest", func(req *http.Request) util.JSONResponse {
var request api.PerformInviteRequest
var response api.PerformInviteResponse
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.MessageResponse(http.StatusBadRequest, err.Error())
}
if err := intAPI.PerformInvite(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err)
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
internalAPIMux.Handle(
FederationSenderPerformDirectoryLookupRequestPath,
httputil.MakeInternalAPI("PerformDirectoryLookupRequest", func(req *http.Request) util.JSONResponse {

View file

@ -52,11 +52,9 @@ type destinationQueue struct {
running atomic.Bool // is the queue worker running?
backingOff atomic.Bool // true if we're backing off
statistics *statistics.ServerStatistics // statistics about this remote server
incomingInvites chan *gomatrixserverlib.InviteV2Request // invites to send
transactionIDMutex sync.Mutex // protects transactionID
transactionID gomatrixserverlib.TransactionID // last transaction ID
transactionCount atomic.Int32 // how many events in this transaction so far
pendingInvites []*gomatrixserverlib.InviteV2Request // owned by backgroundSend
notifyPDUs chan bool // interrupts idle wait for PDUs
notifyEDUs chan bool // interrupts idle wait for EDUs
interruptBackoff chan bool // interrupts backoff
@ -138,18 +136,6 @@ func (oq *destinationQueue) sendEDU(receipt *shared.Receipt) {
}
}
// sendInvite adds the invite event to the pending queue for the
// destination. If the queue is empty then it starts a background
// goroutine to start sending events to that destination.
func (oq *destinationQueue) sendInvite(ev *gomatrixserverlib.InviteV2Request) {
if oq.statistics.Blacklisted() {
// If the destination is blacklisted then drop the event.
return
}
oq.wakeQueueIfNeeded()
oq.incomingInvites <- ev
}
// wakeQueueIfNeeded will wake up the destination queue if it is
// not already running. If it is running but it is backing off
// then we will interrupt the backoff, causing any federation
@ -234,23 +220,6 @@ func (oq *destinationQueue) backgroundSend() {
// We were woken up because there are new PDUs waiting in the
// database.
pendingEDUs = true
case invite := <-oq.incomingInvites:
// There's no strict ordering requirement for invites like
// there is for transactions, so we put the invite onto the
// front of the queue. This means that if an invite that is
// stuck failing already, that it won't block our new invite
// from being sent.
oq.pendingInvites = append(
[]*gomatrixserverlib.InviteV2Request{invite},
oq.pendingInvites...,
)
// If there are any more things waiting in the channel queue
// then read them. This is safe because we guarantee only
// having one goroutine per destination queue, so the channel
// isn't being consumed anywhere else.
for len(oq.incomingInvites) > 0 {
oq.pendingInvites = append(oq.pendingInvites, <-oq.incomingInvites)
}
case <-time.After(queueIdleTimeout):
// The worker is idle so stop the goroutine. It'll get
// restarted automatically the next time we have an event to
@ -266,7 +235,6 @@ func (oq *destinationQueue) backgroundSend() {
// It's been suggested that we should give up because the backoff
// has exceeded a maximum allowable value. Clean up the in-memory
// buffers at this point. The PDU clean-up is already on a defer.
oq.cleanPendingInvites()
log.Warnf("Blacklisting %q due to exceeding backoff threshold", oq.destination)
return
}
@ -284,33 +252,7 @@ func (oq *destinationQueue) backgroundSend() {
oq.statistics.Success()
}
}
// Try sending the next invite and see what happens.
if len(oq.pendingInvites) > 0 {
sent, ierr := oq.nextInvites(oq.pendingInvites)
if ierr != nil {
// We failed to send the transaction. Mark it as a failure.
oq.statistics.Failure()
} else if sent > 0 {
// If we successfully sent the invites then clear out
// the pending invites.
oq.statistics.Success()
// Reallocate so that the underlying array can be GC'd, as
// opposed to growing forever.
oq.cleanPendingInvites()
}
}
}
}
// cleanPendingInvites cleans out the pending invite buffer,
// removing all references so that the underlying objects can
// be GC'd.
func (oq *destinationQueue) cleanPendingInvites() {
for i := 0; i < len(oq.pendingInvites); i++ {
oq.pendingInvites[i] = nil
}
oq.pendingInvites = []*gomatrixserverlib.InviteV2Request{}
}
// nextTransaction creates a new transaction from the pending event
@ -427,66 +369,3 @@ func (oq *destinationQueue) nextTransaction() (bool, error) {
return false, err
}
}
// nextInvite takes pending invite events from the queue and sends
// them. Returns true if a transaction was sent or false otherwise.
func (oq *destinationQueue) nextInvites(
pendingInvites []*gomatrixserverlib.InviteV2Request,
) (int, error) {
done := 0
for _, inviteReq := range pendingInvites {
ev, roomVersion := inviteReq.Event(), inviteReq.RoomVersion()
log.WithFields(log.Fields{
"event_id": ev.EventID(),
"room_version": roomVersion,
"destination": oq.destination,
}).Info("sending invite")
inviteRes, err := oq.client.SendInviteV2(
context.TODO(),
oq.destination,
*inviteReq,
)
switch e := err.(type) {
case nil:
done++
case gomatrix.HTTPError:
log.WithFields(log.Fields{
"event_id": ev.EventID(),
"state_key": ev.StateKey(),
"destination": oq.destination,
"status_code": e.Code,
}).WithError(err).Error("failed to send invite due to HTTP error")
// Check whether we should do something about the error or
// just accept it as unavoidable.
if e.Code >= 400 && e.Code <= 499 {
// We tried but the remote side has sent back a client error.
// It's no use retrying because it will happen again.
done++
continue
}
return done, err
default:
log.WithFields(log.Fields{
"event_id": ev.EventID(),
"state_key": ev.StateKey(),
"destination": oq.destination,
}).WithError(err).Error("failed to send invite")
return done, err
}
invEv := inviteRes.Event.Sign(string(oq.signing.ServerName), oq.signing.KeyID, oq.signing.PrivateKey).Headered(roomVersion)
_, err = api.SendEvents(context.TODO(), oq.rsAPI, []gomatrixserverlib.HeaderedEvent{invEv}, oq.signing.ServerName, nil)
if err != nil {
log.WithFields(log.Fields{
"event_id": ev.EventID(),
"state_key": ev.StateKey(),
"destination": oq.destination,
}).WithError(err).Error("failed to return signed invite to roomserver")
return done, err
}
}
return done, nil
}

View file

@ -108,7 +108,6 @@ func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *d
destination: destination,
client: oqs.client,
statistics: oqs.statistics.ForServer(destination),
incomingInvites: make(chan *gomatrixserverlib.InviteV2Request, 128),
notifyPDUs: make(chan bool, 1),
notifyEDUs: make(chan bool, 1),
interruptBackoff: make(chan bool),
@ -178,51 +177,6 @@ func (oqs *OutgoingQueues) SendEvent(
return nil
}
// SendEvent sends an event to the destinations
func (oqs *OutgoingQueues) SendInvite(
inviteReq *gomatrixserverlib.InviteV2Request,
) error {
ev := inviteReq.Event()
stateKey := ev.StateKey()
if stateKey == nil {
log.WithFields(log.Fields{
"event_id": ev.EventID(),
}).Info("Invite had no state key, dropping")
return nil
}
_, destination, err := gomatrixserverlib.SplitID('@', *stateKey)
if err != nil {
log.WithFields(log.Fields{
"event_id": ev.EventID(),
"state_key": stateKey,
}).Info("Failed to split destination from state key")
return nil
}
if stateapi.IsServerBannedFromRoom(
context.TODO(),
oqs.stateAPI,
ev.RoomID(),
destination,
) {
log.WithFields(log.Fields{
"room_id": ev.RoomID(),
"destination": destination,
}).Info("Dropping invite to server which is prohibited by ACLs")
return nil
}
log.WithFields(log.Fields{
"event_id": ev.EventID(),
"server_name": destination,
}).Info("Sending invite")
oqs.getQueue(destination).sendInvite(inviteReq)
return nil
}
// SendEDU sends an EDU event to the destinations.
func (oqs *OutgoingQueues) SendEDU(
e *gomatrixserverlib.EDU, origin gomatrixserverlib.ServerName,

View file

@ -22,7 +22,7 @@ type RoomserverInternalAPI interface {
ctx context.Context,
req *PerformInviteRequest,
res *PerformInviteResponse,
)
) error
PerformJoin(
ctx context.Context,

View file

@ -33,9 +33,9 @@ func (t *RoomserverInternalAPITrace) PerformInvite(
ctx context.Context,
req *PerformInviteRequest,
res *PerformInviteResponse,
) {
t.Impl.PerformInvite(ctx, req, res)
) error {
util.GetLogger(ctx).Infof("PerformInvite req=%+v res=%+v", js(req), js(res))
return t.Impl.PerformInvite(ctx, req, res)
}
func (t *RoomserverInternalAPITrace) PerformJoin(

View file

@ -105,8 +105,6 @@ type PerformInviteRequest struct {
}
type PerformInviteResponse struct {
// If non-nil, the invite request failed. Contains more information why it failed.
Error *PerformError
}
// PerformBackfillRequest is a request to PerformBackfill.

View file

@ -99,22 +99,16 @@ func SendInvite(
rsAPI RoomserverInternalAPI, inviteEvent gomatrixserverlib.HeaderedEvent,
inviteRoomState []gomatrixserverlib.InviteV2StrippedState,
sendAsServer gomatrixserverlib.ServerName, txnID *TransactionID,
) *PerformError {
request := PerformInviteRequest{
) error {
request := &PerformInviteRequest{
Event: inviteEvent,
InviteRoomState: inviteRoomState,
RoomVersion: inviteEvent.RoomVersion,
SendAsServer: string(sendAsServer),
TransactionID: txnID,
}
var response PerformInviteResponse
rsAPI.PerformInvite(ctx, &request, &response)
// we need to do this because many places people will use `var err error` as the return
// arg and a nil interface != nil pointer to a concrete interface (in this case PerformError)
if response.Error != nil && response.Error.Msg != "" {
return response.Error
}
return nil
response := &PerformInviteResponse{}
return rsAPI.PerformInvite(ctx, request, response)
}
// GetEvent returns the event or nil, even on errors.

View file

@ -2,9 +2,9 @@ package internal
import (
"context"
"errors"
"fmt"
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/state"
@ -14,73 +14,50 @@ import (
log "github.com/sirupsen/logrus"
)
// PerformInvite handles inviting to matrix rooms, including over federation by talking to the federationsender.
// nolint:gocyclo
func (r *RoomserverInternalAPI) PerformInvite(
ctx context.Context,
req *api.PerformInviteRequest,
res *api.PerformInviteResponse,
) {
err := r.performInvite(ctx, req)
if err != nil {
perr, ok := err.(*api.PerformError)
if ok {
res.Error = perr
} else {
res.Error = &api.PerformError{
Msg: err.Error(),
}
}
}
}
func (r *RoomserverInternalAPI) performInvite(ctx context.Context,
req *api.PerformInviteRequest,
) error {
loopback, err := r.processInviteEvent(ctx, r, req)
if err != nil {
return err
}
// The processInviteEvent function can optionally return a
// loopback room event containing the invite, for local invites.
// If it does, we should process it with the room events below.
if loopback != nil {
var loopbackRes api.InputRoomEventsResponse
err := r.InputRoomEvents(ctx, &api.InputRoomEventsRequest{
InputRoomEvents: []api.InputRoomEvent{*loopback},
}, &loopbackRes)
if err != nil {
return err
}
}
return nil
}
// nolint:gocyclo
func (r *RoomserverInternalAPI) processInviteEvent(
ctx context.Context,
ow *RoomserverInternalAPI,
input *api.PerformInviteRequest,
) (*api.InputRoomEvent, error) {
if input.Event.StateKey() == nil {
return nil, fmt.Errorf("invite must be a state event")
event := req.Event
if event.StateKey() == nil {
return fmt.Errorf("invite must be a state event")
}
roomID := input.Event.RoomID()
targetUserID := *input.Event.StateKey()
roomID := event.RoomID()
targetUserID := *event.StateKey()
log.WithFields(log.Fields{
"event_id": input.Event.EventID(),
"event_id": event.EventID(),
"room_id": roomID,
"room_version": input.RoomVersion,
"room_version": req.RoomVersion,
"target_user_id": targetUserID,
}).Info("processing invite event")
_, domain, _ := gomatrixserverlib.SplitID('@', targetUserID)
isTargetLocalUser := domain == r.Cfg.Matrix.ServerName
isOriginLocalUser := event.Origin() == r.Cfg.Matrix.ServerName
updater, err := r.DB.MembershipUpdater(ctx, roomID, targetUserID, isTargetLocalUser, input.RoomVersion)
// If the invite originated from us and the target isn't local then we
// should try and send the invite over federation first. It might be
// that the remote user doesn't exist, in which case we can give up
// processing here.
if isOriginLocalUser && !isTargetLocalUser {
fsReq := &federationSenderAPI.PerformInviteRequest{
RoomVersion: req.RoomVersion,
Event: req.Event,
}
fsRes := &federationSenderAPI.PerformInviteResponse{}
if err := r.fsAPI.PerformInvite(ctx, fsReq, fsRes); err != nil {
return fmt.Errorf("r.fsAPI.PerformInvite: %w", err)
}
event = fsRes.SignedEvent
}
updater, err := r.DB.MembershipUpdater(ctx, roomID, targetUserID, isTargetLocalUser, req.RoomVersion)
if err != nil {
return nil, err
return err
}
succeeded := false
defer func() {
@ -118,109 +95,87 @@ func (r *RoomserverInternalAPI) processInviteEvent(
// For now we will implement option 2. Since in the abesence of a retry
// mechanism it will be equivalent to option 1, and we don't have a
// signalling mechanism to implement option 3.
return nil, &api.PerformError{
return &api.PerformError{
Code: api.PerformErrorNoOperation,
Msg: "user is already joined to room",
}
}
// Normally, with a federated invite, the federation sender would do
// the /v2/invite request (in which the remote server signs the invite)
// and then the signed event gets sent back to the roomserver as an input
// event. When the invite is local, we don't interact with the federation
// sender therefore we need to generate the loopback invite event for
// the room ourselves.
loopback, err := localInviteLoopback(ow, input)
if err != nil {
return nil, err
}
event := input.Event.Unwrap()
// check that the user is allowed to do this. We can only do this check if it is
// a local invite as we have the auth events, else we have to take it on trust.
if loopback != nil {
_, err = checkAuthEvents(ctx, r.DB, input.Event, input.Event.AuthEventIDs())
if isOriginLocalUser {
_, err = checkAuthEvents(ctx, r.DB, req.Event, req.Event.AuthEventIDs())
if err != nil {
log.WithError(err).WithField("event_id", event.EventID()).WithField("auth_event_ids", event.AuthEventIDs()).Error(
"processInviteEvent.checkAuthEvents failed for event",
)
if _, ok := err.(*gomatrixserverlib.NotAllowed); ok {
return nil, &api.PerformError{
return &api.PerformError{
Msg: err.Error(),
Code: api.PerformErrorNotAllowed,
}
}
return nil, err
return err
}
}
if len(input.InviteRoomState) > 0 {
if len(req.InviteRoomState) > 0 {
// If we were supplied with some invite room state already (which is
// most likely to be if the event came in over federation) then use
// that.
if err = event.SetUnsignedField("invite_room_state", input.InviteRoomState); err != nil {
return nil, err
if err = event.SetUnsignedField("invite_room_state", req.InviteRoomState); err != nil {
return err
}
} else {
// There's no invite room state, so let's have a go at building it
// up from local data (which is most likely to be if the event came
// from the CS API). If we know about the room then we can insert
// the invite room state, if we don't then we just fail quietly.
if irs, ierr := buildInviteStrippedState(ctx, r.DB, input); ierr == nil {
if irs, ierr := buildInviteStrippedState(ctx, r.DB, req); ierr == nil {
if err = event.SetUnsignedField("invite_room_state", irs); err != nil {
return nil, err
return err
}
} else {
log.WithError(ierr).Error("failed to build invite stripped state")
log.WithError(ierr).Warn("failed to build invite stripped state")
// still set the field else synapse deployments don't process the invite
if err = event.SetUnsignedField("invite_room_state", struct{}{}); err != nil {
return nil, err
return err
}
}
}
outputUpdates, err := updateToInviteMembership(updater, &event, nil, input.Event.RoomVersion)
// If we're the sender of the event then we should send the invite
// event into the room so that the room state is updated and other
// room participants will see the invite.
if isOriginLocalUser {
inputReq := &api.InputRoomEventsRequest{
InputRoomEvents: []api.InputRoomEvent{
{
Kind: api.KindNew,
Event: event,
AuthEventIDs: event.AuthEventIDs(),
SendAsServer: api.DoNotSendToOtherServers,
},
},
}
inputRes := &api.InputRoomEventsResponse{}
if err = r.InputRoomEvents(ctx, inputReq, inputRes); err != nil {
return fmt.Errorf("r.InputRoomEvents: %w", err)
}
}
unwrapped := event.Unwrap()
outputUpdates, err := updateToInviteMembership(updater, &unwrapped, nil, req.Event.RoomVersion)
if err != nil {
return nil, err
return err
}
if err = ow.WriteOutputEvents(roomID, outputUpdates); err != nil {
return nil, err
if err = r.WriteOutputEvents(roomID, outputUpdates); err != nil {
return err
}
succeeded = true
return loopback, nil
}
func localInviteLoopback(
ow *RoomserverInternalAPI,
input *api.PerformInviteRequest,
) (ire *api.InputRoomEvent, err error) {
if input.Event.StateKey() == nil {
return nil, errors.New("no state key on invite event")
}
ourServerName := string(ow.Cfg.Matrix.ServerName)
_, theirServerName, err := gomatrixserverlib.SplitID('@', *input.Event.StateKey())
if err != nil {
return nil, err
}
// Check if the invite originated locally and is destined locally.
if input.Event.Origin() == ow.Cfg.Matrix.ServerName && string(theirServerName) == ourServerName {
rsEvent := input.Event.Sign(
ourServerName,
ow.Cfg.Matrix.KeyID,
ow.Cfg.Matrix.PrivateKey,
).Headered(input.RoomVersion)
ire = &api.InputRoomEvent{
Kind: api.KindNew,
Event: rsEvent,
AuthEventIDs: rsEvent.AuthEventIDs(),
SendAsServer: ourServerName,
TransactionID: nil,
}
}
return ire, nil
return nil
}
func buildInviteStrippedState(

View file

@ -154,17 +154,12 @@ func (h *httpRoomserverInternalAPI) PerformInvite(
ctx context.Context,
request *api.PerformInviteRequest,
response *api.PerformInviteResponse,
) {
) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "PerformInvite")
defer span.Finish()
apiURL := h.roomserverURL + RoomserverPerformInvitePath
err := httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
if err != nil {
response.Error = &api.PerformError{
Msg: fmt.Sprintf("failed to communicate with roomserver: %s", err),
}
}
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}
func (h *httpRoomserverInternalAPI) PerformJoin(