Make PerformJoin responsible for sending invite to RS input (#1277)
* Make PerformJoin send input membership event * Invite input room events in separate goroutine * Don't limit roomserver input events using request context * Synchronous input room events * Nope, that didn't work * oops send state key to GetMembership * Don't generate stripped state in client API more times than necessary, generate output events on receiving end of federated invite * Commit membership updater changes * Tweaks
This commit is contained in:
parent
e7d450adb8
commit
a5a85c6a11
|
@ -371,20 +371,10 @@ func createRoom(
|
||||||
}
|
}
|
||||||
|
|
||||||
// If this is a direct message then we should invite the participants.
|
// If this is a direct message then we should invite the participants.
|
||||||
for _, invitee := range r.Invite {
|
if len(r.Invite) > 0 {
|
||||||
// Build the invite event.
|
|
||||||
inviteEvent, err := buildMembershipEvent(
|
|
||||||
req.Context(), invitee, "", accountDB, device, gomatrixserverlib.Invite,
|
|
||||||
roomID, true, cfg, evTime, rsAPI, asAPI,
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
util.GetLogger(req.Context()).WithError(err).Error("buildMembershipEvent failed")
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
// Build some stripped state for the invite.
|
// Build some stripped state for the invite.
|
||||||
candidates := append(gomatrixserverlib.UnwrapEventHeaders(builtEvents), inviteEvent.Event)
|
var globalStrippedState []gomatrixserverlib.InviteV2StrippedState
|
||||||
var strippedState []gomatrixserverlib.InviteV2StrippedState
|
for _, event := range builtEvents {
|
||||||
for _, event := range candidates {
|
|
||||||
switch event.Type() {
|
switch event.Type() {
|
||||||
case gomatrixserverlib.MRoomName:
|
case gomatrixserverlib.MRoomName:
|
||||||
fallthrough
|
fallthrough
|
||||||
|
@ -395,29 +385,48 @@ func createRoom(
|
||||||
case gomatrixserverlib.MRoomMember:
|
case gomatrixserverlib.MRoomMember:
|
||||||
fallthrough
|
fallthrough
|
||||||
case gomatrixserverlib.MRoomJoinRules:
|
case gomatrixserverlib.MRoomJoinRules:
|
||||||
strippedState = append(
|
ev := event.Event
|
||||||
strippedState,
|
globalStrippedState = append(
|
||||||
gomatrixserverlib.NewInviteV2StrippedState(&event),
|
globalStrippedState,
|
||||||
|
gomatrixserverlib.NewInviteV2StrippedState(&ev),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Send the invite event to the roomserver.
|
|
||||||
err = roomserverAPI.SendInvite(
|
// Process the invites.
|
||||||
req.Context(), rsAPI,
|
for _, invitee := range r.Invite {
|
||||||
inviteEvent.Headered(roomVersion),
|
// Build the invite event.
|
||||||
strippedState, // invite room state
|
inviteEvent, err := buildMembershipEvent(
|
||||||
cfg.Matrix.ServerName, // send as server
|
req.Context(), invitee, "", accountDB, device, gomatrixserverlib.Invite,
|
||||||
nil, // transaction ID
|
roomID, true, cfg, evTime, rsAPI, asAPI,
|
||||||
)
|
)
|
||||||
switch e := err.(type) {
|
if err != nil {
|
||||||
case *roomserverAPI.PerformError:
|
util.GetLogger(req.Context()).WithError(err).Error("buildMembershipEvent failed")
|
||||||
return e.JSONResponse()
|
continue
|
||||||
case nil:
|
}
|
||||||
default:
|
inviteStrippedState := append(
|
||||||
util.GetLogger(req.Context()).WithError(err).Error("roomserverAPI.SendInvite failed")
|
globalStrippedState,
|
||||||
return util.JSONResponse{
|
gomatrixserverlib.NewInviteV2StrippedState(&inviteEvent.Event),
|
||||||
Code: http.StatusInternalServerError,
|
)
|
||||||
JSON: jsonerror.InternalServerError(),
|
// Send the invite event to the roomserver.
|
||||||
|
err = roomserverAPI.SendInvite(
|
||||||
|
req.Context(),
|
||||||
|
rsAPI,
|
||||||
|
inviteEvent.Headered(roomVersion),
|
||||||
|
inviteStrippedState, // invite room state
|
||||||
|
cfg.Matrix.ServerName, // send as server
|
||||||
|
nil, // transaction ID
|
||||||
|
)
|
||||||
|
switch e := err.(type) {
|
||||||
|
case *roomserverAPI.PerformError:
|
||||||
|
return e.JSONResponse()
|
||||||
|
case nil:
|
||||||
|
default:
|
||||||
|
util.GetLogger(req.Context()).WithError(err).Error("roomserverAPI.SendInvite failed")
|
||||||
|
return util.JSONResponse{
|
||||||
|
Code: http.StatusInternalServerError,
|
||||||
|
JSON: jsonerror.InternalServerError(),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -118,23 +118,6 @@ func SendInvite(
|
||||||
return response.Error
|
return response.Error
|
||||||
}
|
}
|
||||||
|
|
||||||
// Now send the invite event into the roomserver. If the room is known
|
|
||||||
// locally then this will succeed, notifying existing users in the room
|
|
||||||
// about the new invite. If the room isn't known locally then this will
|
|
||||||
// fail - and that's also OK.
|
|
||||||
inputReq := &InputRoomEventsRequest{
|
|
||||||
InputRoomEvents: []InputRoomEvent{
|
|
||||||
{
|
|
||||||
Kind: KindNew,
|
|
||||||
Event: inviteEvent,
|
|
||||||
AuthEventIDs: inviteEvent.AuthEventIDs(),
|
|
||||||
SendAsServer: string(sendAsServer),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
inputRes := &InputRoomEventsResponse{}
|
|
||||||
_ = rsAPI.InputRoomEvents(ctx, inputReq, inputRes)
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -5,7 +5,6 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
|
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
|
||||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
"github.com/matrix-org/dendrite/roomserver/state"
|
"github.com/matrix-org/dendrite/roomserver/state"
|
||||||
"github.com/matrix-org/dendrite/roomserver/storage"
|
"github.com/matrix-org/dendrite/roomserver/storage"
|
||||||
|
@ -55,19 +54,15 @@ func (r *RoomserverInternalAPI) PerformInvite(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
updater, err := r.DB.MembershipUpdater(ctx, roomID, targetUserID, isTargetLocal, req.RoomVersion)
|
var isAlreadyJoined bool
|
||||||
if err != nil {
|
roomNID, err := r.DB.RoomNID(ctx, roomID)
|
||||||
return fmt.Errorf("r.DB.MembershipUpdater: %w", err)
|
if err == nil {
|
||||||
}
|
_, isAlreadyJoined, err = r.DB.GetMembership(ctx, roomNID, *event.StateKey())
|
||||||
succeeded := false
|
if err != nil {
|
||||||
defer func() {
|
return fmt.Errorf("r.DB.GetMembership: %w", err)
|
||||||
txerr := sqlutil.EndTransaction(updater, &succeeded)
|
|
||||||
if err == nil && txerr != nil {
|
|
||||||
err = txerr
|
|
||||||
}
|
}
|
||||||
}()
|
}
|
||||||
|
if isAlreadyJoined {
|
||||||
if updater.IsJoin() {
|
|
||||||
// If the user is joined to the room then that takes precedence over this
|
// If the user is joined to the room then that takes precedence over this
|
||||||
// invite event. It makes little sense to move a user that is already
|
// invite event. It makes little sense to move a user that is already
|
||||||
// joined to the room into the invite state.
|
// joined to the room into the invite state.
|
||||||
|
@ -103,9 +98,11 @@ func (r *RoomserverInternalAPI) PerformInvite(
|
||||||
}
|
}
|
||||||
|
|
||||||
if isOriginLocal {
|
if isOriginLocal {
|
||||||
// check that the user is allowed to do this. We can only do this check if it is
|
// The invite originated locally. Therefore we have a responsibility to
|
||||||
// a local invite as we have the auth events, else we have to take it on trust.
|
// try and see if the user is allowed to make this invite. We can't do
|
||||||
_, err = checkAuthEvents(ctx, r.DB, req.Event, req.Event.AuthEventIDs())
|
// this for invites coming in over federation - we have to take those on
|
||||||
|
// trust.
|
||||||
|
_, err = checkAuthEvents(ctx, r.DB, event, event.AuthEventIDs())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithError(err).WithField("event_id", event.EventID()).WithField("auth_event_ids", event.AuthEventIDs()).Error(
|
log.WithError(err).WithField("event_id", event.EventID()).WithField("auth_event_ids", event.AuthEventIDs()).Error(
|
||||||
"processInviteEvent.checkAuthEvents failed for event",
|
"processInviteEvent.checkAuthEvents failed for event",
|
||||||
|
@ -127,7 +124,7 @@ func (r *RoomserverInternalAPI) PerformInvite(
|
||||||
if req.SendAsServer != api.DoNotSendToOtherServers && !isTargetLocal {
|
if req.SendAsServer != api.DoNotSendToOtherServers && !isTargetLocal {
|
||||||
fsReq := &federationSenderAPI.PerformInviteRequest{
|
fsReq := &federationSenderAPI.PerformInviteRequest{
|
||||||
RoomVersion: req.RoomVersion,
|
RoomVersion: req.RoomVersion,
|
||||||
Event: req.Event,
|
Event: event,
|
||||||
InviteRoomState: inviteState,
|
InviteRoomState: inviteState,
|
||||||
}
|
}
|
||||||
fsRes := &federationSenderAPI.PerformInviteResponse{}
|
fsRes := &federationSenderAPI.PerformInviteResponse{}
|
||||||
|
@ -141,19 +138,49 @@ func (r *RoomserverInternalAPI) PerformInvite(
|
||||||
}
|
}
|
||||||
event = fsRes.Event
|
event = fsRes.Event
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Send the invite event to the roomserver input stream. This will
|
||||||
|
// notify existing users in the room about the invite, update the
|
||||||
|
// membership table and ensure that the event is ready and available
|
||||||
|
// to use as an auth event when accepting the invite.
|
||||||
|
inputReq := &api.InputRoomEventsRequest{
|
||||||
|
InputRoomEvents: []api.InputRoomEvent{
|
||||||
|
{
|
||||||
|
Kind: api.KindNew,
|
||||||
|
Event: event,
|
||||||
|
AuthEventIDs: event.AuthEventIDs(),
|
||||||
|
SendAsServer: req.SendAsServer,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
inputRes := &api.InputRoomEventsResponse{}
|
||||||
|
if err = r.InputRoomEvents(context.Background(), inputReq, inputRes); err != nil {
|
||||||
|
return fmt.Errorf("r.InputRoomEvents: %w", err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// The invite originated over federation. Process the membership
|
||||||
|
// update, which will notify the sync API etc about the incoming
|
||||||
|
// invite.
|
||||||
|
updater, err := r.DB.MembershipUpdater(ctx, roomID, targetUserID, isTargetLocal, req.RoomVersion)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("r.DB.MembershipUpdater: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
unwrapped := event.Unwrap()
|
||||||
|
outputUpdates, err := updateToInviteMembership(updater, &unwrapped, nil, req.Event.RoomVersion)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("updateToInviteMembership: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = updater.Commit(); err != nil {
|
||||||
|
return fmt.Errorf("updater.Commit: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = r.WriteOutputEvents(roomID, outputUpdates); err != nil {
|
||||||
|
return fmt.Errorf("r.WriteOutputEvents: %w", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
unwrapped := event.Unwrap()
|
|
||||||
outputUpdates, err := updateToInviteMembership(updater, &unwrapped, nil, req.Event.RoomVersion)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("updateToInviteMembership: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err = r.WriteOutputEvents(roomID, outputUpdates); err != nil {
|
|
||||||
return fmt.Errorf("r.WriteOutputEvents: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
succeeded = true
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -455,3 +455,4 @@ Banned servers cannot backfill
|
||||||
Inbound /v1/send_leave rejects leaves from other servers
|
Inbound /v1/send_leave rejects leaves from other servers
|
||||||
Guest users can accept invites to private rooms over federation
|
Guest users can accept invites to private rooms over federation
|
||||||
AS user (not ghost) can join room without registering
|
AS user (not ghost) can join room without registering
|
||||||
|
If user leaves room, remote user changes device and rejoins we see update in /sync and /keys/changes
|
||||||
|
|
Loading…
Reference in a new issue