mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-01-18 09:54:27 -06:00
Membership updater refactoring (#2541)
* Membership updater refactoring * Pass in membership state * Use membership check rather than referring to state directly * Delete irrelevant membership states * We don't need the leave event after all * Tweaks * Put a log entry in that I might stand a chance of finding * Be less panicky * Tweak invite handling * Don't freak if we can't find the event NID * Use event NID from `types.Event` * Clean up * Better invite handling * Placate the almighty linter * Blacklist a Sytest which is otherwise fine under Complement for reasons I don't understand * Fix the sytest after all (thanks @S7evinK for the spot)
This commit is contained in:
parent
a201b4400d
commit
f0c8a03649
|
@ -26,7 +26,6 @@ import (
|
|||
"github.com/matrix-org/dendrite/setup/config"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/matrix-org/util"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// InviteV2 implements /_matrix/federation/v2/invite/{roomID}/{eventID}
|
||||
|
@ -144,7 +143,6 @@ func processInvite(
|
|||
// Check that the event is signed by the server sending the request.
|
||||
redacted, err := gomatrixserverlib.RedactEventJSON(event.JSON(), event.Version())
|
||||
if err != nil {
|
||||
logrus.WithError(err).Errorf("XXX: invite.go")
|
||||
return util.JSONResponse{
|
||||
Code: http.StatusBadRequest,
|
||||
JSON: jsonerror.BadJSON("The event JSON could not be redacted"),
|
||||
|
|
|
@ -12,6 +12,7 @@ import (
|
|||
"github.com/matrix-org/dendrite/roomserver/state"
|
||||
"github.com/matrix-org/dendrite/roomserver/storage"
|
||||
"github.com/matrix-org/dendrite/roomserver/storage/shared"
|
||||
"github.com/matrix-org/dendrite/roomserver/storage/tables"
|
||||
"github.com/matrix-org/dendrite/roomserver/types"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/matrix-org/util"
|
||||
|
@ -21,14 +22,14 @@ import (
|
|||
// Move these to a more sensible place.
|
||||
|
||||
func UpdateToInviteMembership(
|
||||
mu *shared.MembershipUpdater, add *gomatrixserverlib.Event, updates []api.OutputEvent,
|
||||
mu *shared.MembershipUpdater, add *types.Event, updates []api.OutputEvent,
|
||||
roomVersion gomatrixserverlib.RoomVersion,
|
||||
) ([]api.OutputEvent, error) {
|
||||
// We may have already sent the invite to the user, either because we are
|
||||
// reprocessing this event, or because the we received this invite from a
|
||||
// remote server via the federation invite API. In those cases we don't need
|
||||
// to send the event.
|
||||
needsSending, err := mu.SetToInvite(add)
|
||||
needsSending, retired, err := mu.Update(tables.MembershipStateInvite, add)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -38,13 +39,23 @@ func UpdateToInviteMembership(
|
|||
// room event stream. This ensures that the consumers only have to
|
||||
// consider a single stream of events when determining whether a user
|
||||
// is invited, rather than having to combine multiple streams themselves.
|
||||
onie := api.OutputNewInviteEvent{
|
||||
Event: add.Headered(roomVersion),
|
||||
RoomVersion: roomVersion,
|
||||
}
|
||||
updates = append(updates, api.OutputEvent{
|
||||
Type: api.OutputTypeNewInviteEvent,
|
||||
NewInviteEvent: &onie,
|
||||
Type: api.OutputTypeNewInviteEvent,
|
||||
NewInviteEvent: &api.OutputNewInviteEvent{
|
||||
Event: add.Headered(roomVersion),
|
||||
RoomVersion: roomVersion,
|
||||
},
|
||||
})
|
||||
}
|
||||
for _, eventID := range retired {
|
||||
updates = append(updates, api.OutputEvent{
|
||||
Type: api.OutputTypeRetireInviteEvent,
|
||||
RetireInviteEvent: &api.OutputRetireInviteEvent{
|
||||
EventID: eventID,
|
||||
Membership: gomatrixserverlib.Join,
|
||||
RetiredByEventID: add.EventID(),
|
||||
TargetUserID: *add.StateKey(),
|
||||
},
|
||||
})
|
||||
}
|
||||
return updates, nil
|
||||
|
|
|
@ -21,6 +21,7 @@ import (
|
|||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/dendrite/roomserver/internal/helpers"
|
||||
"github.com/matrix-org/dendrite/roomserver/storage/shared"
|
||||
"github.com/matrix-org/dendrite/roomserver/storage/tables"
|
||||
"github.com/matrix-org/dendrite/roomserver/types"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/opentracing/opentracing-go"
|
||||
|
@ -60,20 +61,14 @@ func (r *Inputer) updateMemberships(
|
|||
var updates []api.OutputEvent
|
||||
|
||||
for _, change := range changes {
|
||||
var ae *gomatrixserverlib.Event
|
||||
var re *gomatrixserverlib.Event
|
||||
var ae *types.Event
|
||||
var re *types.Event
|
||||
targetUserNID := change.EventStateKeyNID
|
||||
if change.removedEventNID != 0 {
|
||||
ev, _ := helpers.EventMap(events).Lookup(change.removedEventNID)
|
||||
if ev != nil {
|
||||
re = ev.Event
|
||||
}
|
||||
re, _ = helpers.EventMap(events).Lookup(change.removedEventNID)
|
||||
}
|
||||
if change.addedEventNID != 0 {
|
||||
ev, _ := helpers.EventMap(events).Lookup(change.addedEventNID)
|
||||
if ev != nil {
|
||||
ae = ev.Event
|
||||
}
|
||||
ae, _ = helpers.EventMap(events).Lookup(change.addedEventNID)
|
||||
}
|
||||
if updates, err = r.updateMembership(updater, targetUserNID, re, ae, updates); err != nil {
|
||||
return nil, err
|
||||
|
@ -85,30 +80,27 @@ func (r *Inputer) updateMemberships(
|
|||
func (r *Inputer) updateMembership(
|
||||
updater *shared.RoomUpdater,
|
||||
targetUserNID types.EventStateKeyNID,
|
||||
remove, add *gomatrixserverlib.Event,
|
||||
remove, add *types.Event,
|
||||
updates []api.OutputEvent,
|
||||
) ([]api.OutputEvent, error) {
|
||||
var err error
|
||||
// Default the membership to Leave if no event was added or removed.
|
||||
oldMembership := gomatrixserverlib.Leave
|
||||
newMembership := gomatrixserverlib.Leave
|
||||
|
||||
if remove != nil {
|
||||
oldMembership, err = remove.Membership()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if add != nil {
|
||||
newMembership, err = add.Membership()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if oldMembership == newMembership && newMembership != gomatrixserverlib.Join {
|
||||
// If the membership is the same then nothing changed and we can return
|
||||
// immediately, unless it's a Join update (e.g. profile update).
|
||||
return updates, nil
|
||||
|
||||
var targetLocal bool
|
||||
if add != nil {
|
||||
targetLocal = r.isLocalTarget(add)
|
||||
}
|
||||
|
||||
mu, err := updater.MembershipUpdater(targetUserNID, targetLocal)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// In an ideal world, we shouldn't ever have "add" be nil and "remove" be
|
||||
|
@ -120,17 +112,10 @@ func (r *Inputer) updateMembership(
|
|||
// after a state reset, often thinking that the user was still joined to
|
||||
// the room even though the room state said otherwise, and this would prevent
|
||||
// the user from being able to attempt to rejoin the room without modifying
|
||||
// the database. So instead what we'll do is we'll just update the membership
|
||||
// table to say that the user is "leave" and we'll use the old event to
|
||||
// avoid nil pointer exceptions on the code path that follows.
|
||||
if add == nil {
|
||||
add = remove
|
||||
newMembership = gomatrixserverlib.Leave
|
||||
}
|
||||
|
||||
mu, err := updater.MembershipUpdater(targetUserNID, r.isLocalTarget(add))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
// the database. So instead we're going to remove the membership from the
|
||||
// database altogether, so that it doesn't create future problems.
|
||||
if add == nil && remove != nil {
|
||||
return nil, mu.Delete()
|
||||
}
|
||||
|
||||
switch newMembership {
|
||||
|
@ -149,7 +134,7 @@ func (r *Inputer) updateMembership(
|
|||
}
|
||||
}
|
||||
|
||||
func (r *Inputer) isLocalTarget(event *gomatrixserverlib.Event) bool {
|
||||
func (r *Inputer) isLocalTarget(event *types.Event) bool {
|
||||
isTargetLocalUser := false
|
||||
if statekey := event.StateKey(); statekey != nil {
|
||||
_, domain, _ := gomatrixserverlib.SplitID('@', *statekey)
|
||||
|
@ -159,81 +144,61 @@ func (r *Inputer) isLocalTarget(event *gomatrixserverlib.Event) bool {
|
|||
}
|
||||
|
||||
func updateToJoinMembership(
|
||||
mu *shared.MembershipUpdater, add *gomatrixserverlib.Event, updates []api.OutputEvent,
|
||||
mu *shared.MembershipUpdater, add *types.Event, updates []api.OutputEvent,
|
||||
) ([]api.OutputEvent, error) {
|
||||
// If the user is already marked as being joined, we call SetToJoin to update
|
||||
// the event ID then we can return immediately. Retired is ignored as there
|
||||
// is no invite event to retire.
|
||||
if mu.IsJoin() {
|
||||
_, err := mu.SetToJoin(add.Sender(), add.EventID(), true)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return updates, nil
|
||||
}
|
||||
// When we mark a user as being joined we will invalidate any invites that
|
||||
// are active for that user. We notify the consumers that the invites have
|
||||
// been retired using a special event, even though they could infer this
|
||||
// by studying the state changes in the room event stream.
|
||||
retired, err := mu.SetToJoin(add.Sender(), add.EventID(), false)
|
||||
_, retired, err := mu.Update(tables.MembershipStateJoin, add)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, eventID := range retired {
|
||||
orie := api.OutputRetireInviteEvent{
|
||||
EventID: eventID,
|
||||
Membership: gomatrixserverlib.Join,
|
||||
RetiredByEventID: add.EventID(),
|
||||
TargetUserID: *add.StateKey(),
|
||||
}
|
||||
updates = append(updates, api.OutputEvent{
|
||||
Type: api.OutputTypeRetireInviteEvent,
|
||||
RetireInviteEvent: &orie,
|
||||
Type: api.OutputTypeRetireInviteEvent,
|
||||
RetireInviteEvent: &api.OutputRetireInviteEvent{
|
||||
EventID: eventID,
|
||||
Membership: gomatrixserverlib.Join,
|
||||
RetiredByEventID: add.EventID(),
|
||||
TargetUserID: *add.StateKey(),
|
||||
},
|
||||
})
|
||||
}
|
||||
return updates, nil
|
||||
}
|
||||
|
||||
func updateToLeaveMembership(
|
||||
mu *shared.MembershipUpdater, add *gomatrixserverlib.Event,
|
||||
mu *shared.MembershipUpdater, add *types.Event,
|
||||
newMembership string, updates []api.OutputEvent,
|
||||
) ([]api.OutputEvent, error) {
|
||||
// If the user is already neither joined, nor invited to the room then we
|
||||
// can return immediately.
|
||||
if mu.IsLeave() {
|
||||
return updates, nil
|
||||
}
|
||||
// When we mark a user as having left we will invalidate any invites that
|
||||
// are active for that user. We notify the consumers that the invites have
|
||||
// been retired using a special event, even though they could infer this
|
||||
// by studying the state changes in the room event stream.
|
||||
retired, err := mu.SetToLeave(add.Sender(), add.EventID())
|
||||
_, retired, err := mu.Update(tables.MembershipStateLeaveOrBan, add)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, eventID := range retired {
|
||||
orie := api.OutputRetireInviteEvent{
|
||||
EventID: eventID,
|
||||
Membership: newMembership,
|
||||
RetiredByEventID: add.EventID(),
|
||||
TargetUserID: *add.StateKey(),
|
||||
}
|
||||
updates = append(updates, api.OutputEvent{
|
||||
Type: api.OutputTypeRetireInviteEvent,
|
||||
RetireInviteEvent: &orie,
|
||||
Type: api.OutputTypeRetireInviteEvent,
|
||||
RetireInviteEvent: &api.OutputRetireInviteEvent{
|
||||
EventID: eventID,
|
||||
Membership: newMembership,
|
||||
RetiredByEventID: add.EventID(),
|
||||
TargetUserID: *add.StateKey(),
|
||||
},
|
||||
})
|
||||
}
|
||||
return updates, nil
|
||||
}
|
||||
|
||||
func updateToKnockMembership(
|
||||
mu *shared.MembershipUpdater, add *gomatrixserverlib.Event, updates []api.OutputEvent,
|
||||
mu *shared.MembershipUpdater, add *types.Event, updates []api.OutputEvent,
|
||||
) ([]api.OutputEvent, error) {
|
||||
if mu.IsLeave() {
|
||||
_, err := mu.SetToKnock(add)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if _, _, err := mu.Update(tables.MembershipStateKnock, add); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return updates, nil
|
||||
}
|
||||
|
|
|
@ -39,11 +39,13 @@ type Inviter struct {
|
|||
Inputer *input.Inputer
|
||||
}
|
||||
|
||||
// nolint:gocyclo
|
||||
func (r *Inviter) PerformInvite(
|
||||
ctx context.Context,
|
||||
req *api.PerformInviteRequest,
|
||||
res *api.PerformInviteResponse,
|
||||
) ([]api.OutputEvent, error) {
|
||||
var outputUpdates []api.OutputEvent
|
||||
event := req.Event
|
||||
if event.StateKey() == nil {
|
||||
return nil, fmt.Errorf("invite must be a state event")
|
||||
|
@ -66,6 +68,13 @@ func (r *Inviter) PerformInvite(
|
|||
}
|
||||
isTargetLocal := domain == r.Cfg.Matrix.ServerName
|
||||
isOriginLocal := event.Origin() == r.Cfg.Matrix.ServerName
|
||||
if !isOriginLocal && !isTargetLocal {
|
||||
res.Error = &api.PerformError{
|
||||
Code: api.PerformErrorBadRequest,
|
||||
Msg: "The invite must be either from or to a local user",
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
logger := util.GetLogger(ctx).WithFields(map[string]interface{}{
|
||||
"inviter": event.Sender(),
|
||||
|
@ -97,6 +106,34 @@ func (r *Inviter) PerformInvite(
|
|||
}
|
||||
}
|
||||
|
||||
updateMembershipTableManually := func() ([]api.OutputEvent, error) {
|
||||
var updater *shared.MembershipUpdater
|
||||
if updater, err = r.DB.MembershipUpdater(ctx, roomID, targetUserID, isTargetLocal, req.RoomVersion); err != nil {
|
||||
return nil, fmt.Errorf("r.DB.MembershipUpdater: %w", err)
|
||||
}
|
||||
outputUpdates, err = helpers.UpdateToInviteMembership(updater, &types.Event{
|
||||
EventNID: 0,
|
||||
Event: event.Unwrap(),
|
||||
}, outputUpdates, req.Event.RoomVersion)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("updateToInviteMembership: %w", err)
|
||||
}
|
||||
if err = updater.Commit(); err != nil {
|
||||
return nil, fmt.Errorf("updater.Commit: %w", err)
|
||||
}
|
||||
logger.Debugf("updated membership to invite and sending invite OutputEvent")
|
||||
return outputUpdates, nil
|
||||
}
|
||||
|
||||
if (info == nil || info.IsStub) && !isOriginLocal && isTargetLocal {
|
||||
// The invite came in over federation for a room that we don't know about
|
||||
// yet. We need to handle this a bit differently to most invites because
|
||||
// we don't know the room state, therefore the roomserver can't process
|
||||
// an input event. Instead we will update the membership table with the
|
||||
// new invite and generate an output event.
|
||||
return updateMembershipTableManually()
|
||||
}
|
||||
|
||||
var isAlreadyJoined bool
|
||||
if info != nil {
|
||||
_, isAlreadyJoined, _, err = r.DB.GetMembership(ctx, info.RoomNID, *event.StateKey())
|
||||
|
@ -140,31 +177,13 @@ func (r *Inviter) PerformInvite(
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
// If the invite originated remotely then we can't send an
|
||||
// InputRoomEvent for the invite as it will never pass auth checks
|
||||
// due to lacking room state, but we still need to tell the client
|
||||
// about the invite so we can accept it, hence we return an output
|
||||
// event to send to the Sync API.
|
||||
if !isOriginLocal {
|
||||
// The invite originated over federation. Process the membership
|
||||
// update, which will notify the sync API etc about the incoming
|
||||
// invite. We do NOT send an InputRoomEvent for the invite as it
|
||||
// will never pass auth checks due to lacking room state, but we
|
||||
// still need to tell the client about the invite so we can accept
|
||||
// it, hence we return an output event to send to the sync api.
|
||||
var updater *shared.MembershipUpdater
|
||||
updater, err = r.DB.MembershipUpdater(ctx, roomID, targetUserID, isTargetLocal, req.RoomVersion)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("r.DB.MembershipUpdater: %w", err)
|
||||
}
|
||||
|
||||
unwrapped := event.Unwrap()
|
||||
var outputUpdates []api.OutputEvent
|
||||
outputUpdates, err = helpers.UpdateToInviteMembership(updater, unwrapped, nil, req.Event.RoomVersion)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("updateToInviteMembership: %w", err)
|
||||
}
|
||||
|
||||
if err = updater.Commit(); err != nil {
|
||||
return nil, fmt.Errorf("updater.Commit: %w", err)
|
||||
}
|
||||
logger.Debugf("updated membership to invite and sending invite OutputEvent")
|
||||
return outputUpdates, nil
|
||||
return updateMembershipTableManually()
|
||||
}
|
||||
|
||||
// The invite originated locally. Therefore we have a responsibility to
|
||||
|
@ -229,12 +248,11 @@ func (r *Inviter) PerformInvite(
|
|||
Code: api.PerformErrorNotAllowed,
|
||||
}
|
||||
logger.WithError(err).WithField("event_id", event.EventID()).Error("r.InputRoomEvents failed")
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Don't notify the sync api of this event in the same way as a federated invite so the invitee
|
||||
// gets the invite, as the roomserver will do this when it processes the m.room.member invite.
|
||||
return nil, nil
|
||||
return outputUpdates, nil
|
||||
}
|
||||
|
||||
func buildInviteStrippedState(
|
||||
|
|
|
@ -268,21 +268,19 @@ func (r *Joiner) performJoinRoomByID(
|
|||
case nil:
|
||||
// The room join is local. Send the new join event into the
|
||||
// roomserver. First of all check that the user isn't already
|
||||
// a member of the room.
|
||||
alreadyJoined := false
|
||||
for _, se := range buildRes.StateEvents {
|
||||
if !se.StateKeyEquals(userID) {
|
||||
continue
|
||||
}
|
||||
if membership, merr := se.Membership(); merr == nil {
|
||||
alreadyJoined = (membership == gomatrixserverlib.Join)
|
||||
break
|
||||
}
|
||||
// a member of the room. This is best-effort (as in we won't
|
||||
// fail if we can't find the existing membership) because there
|
||||
// is really no harm in just sending another membership event.
|
||||
membershipReq := &api.QueryMembershipForUserRequest{
|
||||
RoomID: req.RoomIDOrAlias,
|
||||
UserID: userID,
|
||||
}
|
||||
membershipRes := &api.QueryMembershipForUserResponse{}
|
||||
_ = r.Queryer.QueryMembershipForUser(ctx, membershipReq, membershipRes)
|
||||
|
||||
// If we haven't already joined the room then send an event
|
||||
// into the room changing our membership status.
|
||||
if !alreadyJoined {
|
||||
if !membershipRes.RoomExists || !membershipRes.IsInRoom {
|
||||
inputReq := rsAPI.InputRoomEventsRequest{
|
||||
InputRoomEvents: []rsAPI.InputRoomEvent{
|
||||
{
|
||||
|
|
|
@ -228,14 +228,14 @@ func (r *Leaver) performFederatedRejectInvite(
|
|||
util.GetLogger(ctx).WithError(err).Errorf("failed to get MembershipUpdater, still retiring invite event")
|
||||
}
|
||||
if updater != nil {
|
||||
if _, err = updater.SetToLeave(req.UserID, eventID); err != nil {
|
||||
util.GetLogger(ctx).WithError(err).Errorf("failed to set membership to leave, still retiring invite event")
|
||||
if err = updater.Delete(); err != nil {
|
||||
util.GetLogger(ctx).WithError(err).Errorf("failed to delete membership, still retiring invite event")
|
||||
if err = updater.Rollback(); err != nil {
|
||||
util.GetLogger(ctx).WithError(err).Errorf("failed to rollback membership leave, still retiring invite event")
|
||||
util.GetLogger(ctx).WithError(err).Errorf("failed to rollback deleting membership, still retiring invite event")
|
||||
}
|
||||
} else {
|
||||
if err = updater.Commit(); err != nil {
|
||||
util.GetLogger(ctx).WithError(err).Errorf("failed to commit membership update, still retiring invite event")
|
||||
util.GetLogger(ctx).WithError(err).Errorf("failed to commit deleting membership, still retiring invite event")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -118,6 +118,9 @@ const updateMembershipForgetRoom = "" +
|
|||
"UPDATE roomserver_membership SET forgotten = $3" +
|
||||
" WHERE room_nid = $1 AND target_nid = $2"
|
||||
|
||||
const deleteMembershipSQL = "" +
|
||||
"DELETE FROM roomserver_membership WHERE room_nid = $1 AND target_nid = $2"
|
||||
|
||||
const selectRoomsWithMembershipSQL = "" +
|
||||
"SELECT room_nid FROM roomserver_membership WHERE membership_nid = $1 AND target_nid = $2 and forgotten = false"
|
||||
|
||||
|
@ -165,6 +168,7 @@ type membershipStatements struct {
|
|||
updateMembershipForgetRoomStmt *sql.Stmt
|
||||
selectLocalServerInRoomStmt *sql.Stmt
|
||||
selectServerInRoomStmt *sql.Stmt
|
||||
deleteMembershipStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func CreateMembershipTable(db *sql.DB) error {
|
||||
|
@ -191,6 +195,7 @@ func PrepareMembershipTable(db *sql.DB) (tables.Membership, error) {
|
|||
{&s.updateMembershipForgetRoomStmt, updateMembershipForgetRoom},
|
||||
{&s.selectLocalServerInRoomStmt, selectLocalServerInRoomSQL},
|
||||
{&s.selectServerInRoomStmt, selectServerInRoomSQL},
|
||||
{&s.deleteMembershipStmt, deleteMembershipSQL},
|
||||
}.Prepare(db)
|
||||
}
|
||||
|
||||
|
@ -412,3 +417,13 @@ func (s *membershipStatements) SelectServerInRoom(
|
|||
}
|
||||
return roomNID == nid, nil
|
||||
}
|
||||
|
||||
func (s *membershipStatements) DeleteMembership(
|
||||
ctx context.Context, txn *sql.Tx,
|
||||
roomNID types.RoomNID, targetUserNID types.EventStateKeyNID,
|
||||
) error {
|
||||
_, err := sqlutil.TxStmt(txn, s.deleteMembershipStmt).ExecContext(
|
||||
ctx, roomNID, targetUserNID,
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -15,7 +15,7 @@ type MembershipUpdater struct {
|
|||
d *Database
|
||||
roomNID types.RoomNID
|
||||
targetUserNID types.EventStateKeyNID
|
||||
membership tables.MembershipState
|
||||
oldMembership tables.MembershipState
|
||||
}
|
||||
|
||||
func NewMembershipUpdater(
|
||||
|
@ -30,7 +30,6 @@ func NewMembershipUpdater(
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
targetUserNID, err = d.assignStateKeyNID(ctx, targetUserID)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -73,146 +72,62 @@ func (d *Database) membershipUpdaterTxn(
|
|||
|
||||
// IsInvite implements types.MembershipUpdater
|
||||
func (u *MembershipUpdater) IsInvite() bool {
|
||||
return u.membership == tables.MembershipStateInvite
|
||||
return u.oldMembership == tables.MembershipStateInvite
|
||||
}
|
||||
|
||||
// IsJoin implements types.MembershipUpdater
|
||||
func (u *MembershipUpdater) IsJoin() bool {
|
||||
return u.membership == tables.MembershipStateJoin
|
||||
return u.oldMembership == tables.MembershipStateJoin
|
||||
}
|
||||
|
||||
// IsLeave implements types.MembershipUpdater
|
||||
func (u *MembershipUpdater) IsLeave() bool {
|
||||
return u.membership == tables.MembershipStateLeaveOrBan
|
||||
return u.oldMembership == tables.MembershipStateLeaveOrBan
|
||||
}
|
||||
|
||||
// IsKnock implements types.MembershipUpdater
|
||||
func (u *MembershipUpdater) IsKnock() bool {
|
||||
return u.membership == tables.MembershipStateKnock
|
||||
return u.oldMembership == tables.MembershipStateKnock
|
||||
}
|
||||
|
||||
// SetToInvite implements types.MembershipUpdater
|
||||
func (u *MembershipUpdater) SetToInvite(event *gomatrixserverlib.Event) (bool, error) {
|
||||
var inserted bool
|
||||
err := u.d.Writer.Do(u.d.DB, u.txn, func(txn *sql.Tx) error {
|
||||
func (u *MembershipUpdater) Delete() error {
|
||||
if _, err := u.d.InvitesTable.UpdateInviteRetired(u.ctx, u.txn, u.roomNID, u.targetUserNID); err != nil {
|
||||
return err
|
||||
}
|
||||
return u.d.MembershipTable.DeleteMembership(u.ctx, u.txn, u.roomNID, u.targetUserNID)
|
||||
}
|
||||
|
||||
func (u *MembershipUpdater) Update(newMembership tables.MembershipState, event *types.Event) (bool, []string, error) {
|
||||
var inserted bool // Did the query result in a membership change?
|
||||
var retired []string // Did we retire any updates in the process?
|
||||
return inserted, retired, u.d.Writer.Do(u.d.DB, u.txn, func(txn *sql.Tx) error {
|
||||
senderUserNID, err := u.d.assignStateKeyNID(u.ctx, event.Sender())
|
||||
if err != nil {
|
||||
return fmt.Errorf("u.d.AssignStateKeyNID: %w", err)
|
||||
}
|
||||
inserted, err = u.d.InvitesTable.InsertInviteEvent(
|
||||
u.ctx, u.txn, event.EventID(), u.roomNID, u.targetUserNID, senderUserNID, event.JSON(),
|
||||
)
|
||||
inserted, err = u.d.MembershipTable.UpdateMembership(u.ctx, u.txn, u.roomNID, u.targetUserNID, senderUserNID, newMembership, event.EventNID, false)
|
||||
if err != nil {
|
||||
return fmt.Errorf("u.d.InvitesTable.InsertInviteEvent: %w", err)
|
||||
return fmt.Errorf("u.d.MembershipTable.UpdateMembership: %w", err)
|
||||
}
|
||||
|
||||
// Look up the NID of the invite event
|
||||
nIDs, err := u.d.eventNIDs(u.ctx, u.txn, []string{event.EventID()}, false)
|
||||
if err != nil {
|
||||
return fmt.Errorf("u.d.EventNIDs: %w", err)
|
||||
if !inserted {
|
||||
return nil
|
||||
}
|
||||
|
||||
if u.membership != tables.MembershipStateInvite {
|
||||
if inserted, err = u.d.MembershipTable.UpdateMembership(u.ctx, u.txn, u.roomNID, u.targetUserNID, senderUserNID, tables.MembershipStateInvite, nIDs[event.EventID()], false); err != nil {
|
||||
return fmt.Errorf("u.d.MembershipTable.UpdateMembership: %w", err)
|
||||
switch {
|
||||
case u.oldMembership != tables.MembershipStateInvite && newMembership == tables.MembershipStateInvite:
|
||||
inserted, err = u.d.InvitesTable.InsertInviteEvent(
|
||||
u.ctx, u.txn, event.EventID(), u.roomNID, u.targetUserNID, senderUserNID, event.JSON(),
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("u.d.InvitesTable.InsertInviteEvent: %w", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
return inserted, err
|
||||
}
|
||||
|
||||
// SetToJoin implements types.MembershipUpdater
|
||||
func (u *MembershipUpdater) SetToJoin(senderUserID string, eventID string, isUpdate bool) ([]string, error) {
|
||||
var inviteEventIDs []string
|
||||
|
||||
err := u.d.Writer.Do(u.d.DB, u.txn, func(txn *sql.Tx) error {
|
||||
senderUserNID, err := u.d.assignStateKeyNID(u.ctx, senderUserID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("u.d.AssignStateKeyNID: %w", err)
|
||||
}
|
||||
|
||||
// If this is a join event update, there is no invite to update
|
||||
if !isUpdate {
|
||||
inviteEventIDs, err = u.d.InvitesTable.UpdateInviteRetired(
|
||||
case u.oldMembership == tables.MembershipStateInvite && newMembership != tables.MembershipStateInvite:
|
||||
retired, err = u.d.InvitesTable.UpdateInviteRetired(
|
||||
u.ctx, u.txn, u.roomNID, u.targetUserNID,
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("u.d.InvitesTables.UpdateInviteRetired: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Look up the NID of the new join event
|
||||
nIDs, err := u.d.eventNIDs(u.ctx, u.txn, []string{eventID}, false)
|
||||
if err != nil {
|
||||
return fmt.Errorf("u.d.EventNIDs: %w", err)
|
||||
}
|
||||
|
||||
if u.membership != tables.MembershipStateJoin || isUpdate {
|
||||
if _, err = u.d.MembershipTable.UpdateMembership(u.ctx, u.txn, u.roomNID, u.targetUserNID, senderUserNID, tables.MembershipStateJoin, nIDs[eventID], false); err != nil {
|
||||
return fmt.Errorf("u.d.MembershipTable.UpdateMembership: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
return inviteEventIDs, err
|
||||
}
|
||||
|
||||
// SetToLeave implements types.MembershipUpdater
|
||||
func (u *MembershipUpdater) SetToLeave(senderUserID string, eventID string) ([]string, error) {
|
||||
var inviteEventIDs []string
|
||||
|
||||
err := u.d.Writer.Do(u.d.DB, u.txn, func(txn *sql.Tx) error {
|
||||
senderUserNID, err := u.d.assignStateKeyNID(u.ctx, senderUserID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("u.d.AssignStateKeyNID: %w", err)
|
||||
}
|
||||
inviteEventIDs, err = u.d.InvitesTable.UpdateInviteRetired(
|
||||
u.ctx, u.txn, u.roomNID, u.targetUserNID,
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("u.d.InvitesTable.updateInviteRetired: %w", err)
|
||||
}
|
||||
|
||||
// Look up the NID of the new leave event
|
||||
nIDs, err := u.d.eventNIDs(u.ctx, u.txn, []string{eventID}, false)
|
||||
if err != nil {
|
||||
return fmt.Errorf("u.d.EventNIDs: %w", err)
|
||||
}
|
||||
|
||||
if u.membership != tables.MembershipStateLeaveOrBan {
|
||||
if _, err = u.d.MembershipTable.UpdateMembership(u.ctx, u.txn, u.roomNID, u.targetUserNID, senderUserNID, tables.MembershipStateLeaveOrBan, nIDs[eventID], false); err != nil {
|
||||
return fmt.Errorf("u.d.MembershipTable.UpdateMembership: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
return inviteEventIDs, err
|
||||
}
|
||||
|
||||
// SetToKnock implements types.MembershipUpdater
|
||||
func (u *MembershipUpdater) SetToKnock(event *gomatrixserverlib.Event) (bool, error) {
|
||||
var inserted bool
|
||||
err := u.d.Writer.Do(u.d.DB, u.txn, func(txn *sql.Tx) error {
|
||||
senderUserNID, err := u.d.assignStateKeyNID(u.ctx, event.Sender())
|
||||
if err != nil {
|
||||
return fmt.Errorf("u.d.AssignStateKeyNID: %w", err)
|
||||
}
|
||||
if u.membership != tables.MembershipStateKnock {
|
||||
// Look up the NID of the new knock event
|
||||
nIDs, err := u.d.eventNIDs(u.ctx, u.txn, []string{event.EventID()}, false)
|
||||
if err != nil {
|
||||
return fmt.Errorf("u.d.EventNIDs: %w", err)
|
||||
}
|
||||
|
||||
if inserted, err = u.d.MembershipTable.UpdateMembership(u.ctx, u.txn, u.roomNID, u.targetUserNID, senderUserNID, tables.MembershipStateKnock, nIDs[event.EventID()], false); err != nil {
|
||||
return fmt.Errorf("u.d.MembershipTable.UpdateMembership: %w", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
return inserted, err
|
||||
}
|
||||
|
|
|
@ -125,6 +125,9 @@ const selectServerInRoomSQL = "" +
|
|||
" JOIN roomserver_event_state_keys ON roomserver_membership.target_nid = roomserver_event_state_keys.event_state_key_nid" +
|
||||
" WHERE membership_nid = $1 AND room_nid = $2 AND event_state_key LIKE '%:' || $3 LIMIT 1"
|
||||
|
||||
const deleteMembershipSQL = "" +
|
||||
"DELETE FROM roomserver_membership WHERE room_nid = $1 AND target_nid = $2"
|
||||
|
||||
type membershipStatements struct {
|
||||
db *sql.DB
|
||||
insertMembershipStmt *sql.Stmt
|
||||
|
@ -140,6 +143,7 @@ type membershipStatements struct {
|
|||
updateMembershipForgetRoomStmt *sql.Stmt
|
||||
selectLocalServerInRoomStmt *sql.Stmt
|
||||
selectServerInRoomStmt *sql.Stmt
|
||||
deleteMembershipStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func CreateMembershipTable(db *sql.DB) error {
|
||||
|
@ -166,6 +170,7 @@ func PrepareMembershipTable(db *sql.DB) (tables.Membership, error) {
|
|||
{&s.updateMembershipForgetRoomStmt, updateMembershipForgetRoom},
|
||||
{&s.selectLocalServerInRoomStmt, selectLocalServerInRoomSQL},
|
||||
{&s.selectServerInRoomStmt, selectServerInRoomSQL},
|
||||
{&s.deleteMembershipStmt, deleteMembershipSQL},
|
||||
}.Prepare(db)
|
||||
}
|
||||
|
||||
|
@ -383,3 +388,13 @@ func (s *membershipStatements) SelectServerInRoom(ctx context.Context, txn *sql.
|
|||
}
|
||||
return roomNID == nid, nil
|
||||
}
|
||||
|
||||
func (s *membershipStatements) DeleteMembership(
|
||||
ctx context.Context, txn *sql.Tx,
|
||||
roomNID types.RoomNID, targetUserNID types.EventStateKeyNID,
|
||||
) error {
|
||||
_, err := sqlutil.TxStmt(txn, s.deleteMembershipStmt).ExecContext(
|
||||
ctx, roomNID, targetUserNID,
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -133,6 +133,7 @@ type Membership interface {
|
|||
UpdateForgetMembership(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, targetUserNID types.EventStateKeyNID, forget bool) error
|
||||
SelectLocalServerInRoom(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID) (bool, error)
|
||||
SelectServerInRoom(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, serverName gomatrixserverlib.ServerName) (bool, error)
|
||||
DeleteMembership(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, targetUserNID types.EventStateKeyNID) error
|
||||
}
|
||||
|
||||
type Published interface {
|
||||
|
|
|
@ -365,7 +365,7 @@ func (s *OutputRoomEventConsumer) onNewInviteEvent(
|
|||
"event": string(msg.Event.JSON()),
|
||||
"pdupos": pduPos,
|
||||
log.ErrorKey: err,
|
||||
}).Panicf("roomserver output log: write invite failure")
|
||||
}).Errorf("roomserver output log: write invite failure")
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -385,7 +385,7 @@ func (s *OutputRoomEventConsumer) onRetireInviteEvent(
|
|||
log.WithFields(log.Fields{
|
||||
"event_id": msg.EventID,
|
||||
log.ErrorKey: err,
|
||||
}).Panicf("roomserver output log: remove invite failure")
|
||||
}).Errorf("roomserver output log: remove invite failure")
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -403,7 +403,7 @@ func (s *OutputRoomEventConsumer) onNewPeek(
|
|||
// panic rather than continue with an inconsistent database
|
||||
log.WithFields(log.Fields{
|
||||
log.ErrorKey: err,
|
||||
}).Panicf("roomserver output log: write peek failure")
|
||||
}).Errorf("roomserver output log: write peek failure")
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -422,7 +422,7 @@ func (s *OutputRoomEventConsumer) onRetirePeek(
|
|||
// panic rather than continue with an inconsistent database
|
||||
log.WithFields(log.Fields{
|
||||
log.ErrorKey: err,
|
||||
}).Panicf("roomserver output log: write peek failure")
|
||||
}).Errorf("roomserver output log: write peek failure")
|
||||
return
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue