Retire all pending invites for a given user and room

This commit is contained in:
Neil Alexander 2022-08-15 15:10:56 +01:00
parent bcdbd5c00a
commit b74022cd9c
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
10 changed files with 47 additions and 54 deletions

View file

@ -207,8 +207,8 @@ type OutputNewInviteEvent struct {
// active. An invite stops being active if the user joins the room or if the
// invite is rejected by the user.
type OutputRetireInviteEvent struct {
// The ID of the "m.room.member" invite event.
EventID string
// The room ID we are retiring the invite for.
RoomID string
// The target user ID of the "m.room.member" invite event that was retired.
TargetUserID string
// Optional event ID of the event that replaced the invite.

View file

@ -29,7 +29,7 @@ func UpdateToInviteMembership(
// reprocessing this event, or because the we received this invite from a
// remote server via the federation invite API. In those cases we don't need
// to send the event.
needsSending, retired, err := mu.Update(tables.MembershipStateInvite, add)
needsSending, _, err := mu.Update(tables.MembershipStateInvite, add)
if err != nil {
return nil, err
}
@ -47,17 +47,15 @@ func UpdateToInviteMembership(
},
})
}
for _, eventID := range retired {
updates = append(updates, api.OutputEvent{
Type: api.OutputTypeRetireInviteEvent,
RetireInviteEvent: &api.OutputRetireInviteEvent{
EventID: eventID,
Membership: gomatrixserverlib.Join,
RetiredByEventID: add.EventID(),
TargetUserID: *add.StateKey(),
},
})
}
updates = append(updates, api.OutputEvent{
Type: api.OutputTypeRetireInviteEvent,
RetireInviteEvent: &api.OutputRetireInviteEvent{
RoomID: add.RoomID(),
Membership: gomatrixserverlib.Join,
RetiredByEventID: add.EventID(),
TargetUserID: *add.StateKey(),
},
})
return updates, nil
}

View file

@ -150,21 +150,18 @@ func updateToJoinMembership(
// are active for that user. We notify the consumers that the invites have
// been retired using a special event, even though they could infer this
// by studying the state changes in the room event stream.
_, retired, err := mu.Update(tables.MembershipStateJoin, add)
if err != nil {
if _, _, err := mu.Update(tables.MembershipStateJoin, add); err != nil {
return nil, err
}
for _, eventID := range retired {
updates = append(updates, api.OutputEvent{
Type: api.OutputTypeRetireInviteEvent,
RetireInviteEvent: &api.OutputRetireInviteEvent{
EventID: eventID,
Membership: gomatrixserverlib.Join,
RetiredByEventID: add.EventID(),
TargetUserID: *add.StateKey(),
},
})
}
updates = append(updates, api.OutputEvent{
Type: api.OutputTypeRetireInviteEvent,
RetireInviteEvent: &api.OutputRetireInviteEvent{
RoomID: add.RoomID(),
Membership: gomatrixserverlib.Join,
RetiredByEventID: add.EventID(),
TargetUserID: *add.StateKey(),
},
})
return updates, nil
}
@ -176,21 +173,18 @@ func updateToLeaveMembership(
// are active for that user. We notify the consumers that the invites have
// been retired using a special event, even though they could infer this
// by studying the state changes in the room event stream.
_, retired, err := mu.Update(tables.MembershipStateLeaveOrBan, add)
if err != nil {
if _, _, err := mu.Update(tables.MembershipStateLeaveOrBan, add); err != nil {
return nil, err
}
for _, eventID := range retired {
updates = append(updates, api.OutputEvent{
Type: api.OutputTypeRetireInviteEvent,
RetireInviteEvent: &api.OutputRetireInviteEvent{
EventID: eventID,
Membership: newMembership,
RetiredByEventID: add.EventID(),
TargetUserID: *add.StateKey(),
},
})
}
updates = append(updates, api.OutputEvent{
Type: api.OutputTypeRetireInviteEvent,
RetireInviteEvent: &api.OutputRetireInviteEvent{
RoomID: add.RoomID(),
Membership: newMembership,
RetiredByEventID: add.EventID(),
TargetUserID: *add.StateKey(),
},
})
return updates, nil
}

View file

@ -248,7 +248,7 @@ func (r *Leaver) performFederatedRejectInvite(
{
Type: api.OutputTypeRetireInviteEvent,
RetireInviteEvent: &api.OutputRetireInviteEvent{
EventID: eventID,
RoomID: req.RoomID,
Membership: "leave",
TargetUserID: req.UserID,
},

View file

@ -376,15 +376,16 @@ func (s *OutputRoomEventConsumer) onNewInviteEvent(
func (s *OutputRoomEventConsumer) onRetireInviteEvent(
ctx context.Context, msg api.OutputRetireInviteEvent,
) {
pduPos, err := s.db.RetireInviteEvent(ctx, msg.EventID)
pduPos, err := s.db.RetireInviteEvent(ctx, msg.TargetUserID, msg.RoomID)
// It's possible we just haven't heard of this invite yet, so
// we should not panic if we try to retire it.
if err != nil && err != sql.ErrNoRows {
sentry.CaptureException(err)
// panic rather than continue with an inconsistent database
log.WithFields(log.Fields{
"event_id": msg.EventID,
log.ErrorKey: err,
"target_user_id": msg.TargetUserID,
"room_id": msg.RoomID,
log.ErrorKey: err,
}).Errorf("roomserver output log: remove invite failure")
return
}

View file

@ -102,7 +102,7 @@ type Database interface {
AddInviteEvent(ctx context.Context, inviteEvent *gomatrixserverlib.HeaderedEvent) (types.StreamPosition, error)
// RetireInviteEvent removes an old invite event from the database. Returns the new position of the retired invite.
// Returns an error if there was a problem communicating with the database.
RetireInviteEvent(ctx context.Context, inviteEventID string) (types.StreamPosition, error)
RetireInviteEvent(ctx context.Context, targetUserID, roomID string) (types.StreamPosition, error)
// AddPeek adds a new peek to our DB for a given room by a given user's device.
// Returns an error if there was a problem communicating with the database.
AddPeek(ctx context.Context, RoomID, UserID, DeviceID string) (types.StreamPosition, error)

View file

@ -52,7 +52,7 @@ const insertInviteEventSQL = "" +
") VALUES ($1, $2, $3, $4, FALSE) RETURNING id"
const deleteInviteEventSQL = "" +
"UPDATE syncapi_invite_events SET deleted=TRUE, id=nextval('syncapi_stream_id') WHERE event_id = $1 AND deleted=FALSE RETURNING id"
"UPDATE syncapi_invite_events SET deleted=TRUE, id=nextval('syncapi_stream_id') WHERE target_user_id = $1 AND room_id = $2 AND deleted=FALSE RETURNING id"
const selectInviteEventsInRangeSQL = "" +
"SELECT room_id, headered_event_json, deleted FROM syncapi_invite_events" +
@ -110,10 +110,10 @@ func (s *inviteEventsStatements) InsertInviteEvent(
}
func (s *inviteEventsStatements) DeleteInviteEvent(
ctx context.Context, txn *sql.Tx, inviteEventID string,
ctx context.Context, txn *sql.Tx, targetUserID, roomID string,
) (sp types.StreamPosition, err error) {
stmt := sqlutil.TxStmt(txn, s.deleteInviteEventStmt)
err = stmt.QueryRowContext(ctx, inviteEventID).Scan(&sp)
err = stmt.QueryRowContext(ctx, targetUserID, roomID).Scan(&sp)
return
}

View file

@ -209,10 +209,10 @@ func (d *Database) AddInviteEvent(
// RetireInviteEvent removes an old invite event from the database.
// Returns an error if there was a problem communicating with the database.
func (d *Database) RetireInviteEvent(
ctx context.Context, inviteEventID string,
ctx context.Context, targetUserID, roomID string,
) (sp types.StreamPosition, err error) {
_ = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
sp, err = d.Invites.DeleteInviteEvent(ctx, txn, inviteEventID)
sp, err = d.Invites.DeleteInviteEvent(ctx, txn, targetUserID, roomID)
return err
})
return

View file

@ -47,7 +47,7 @@ const insertInviteEventSQL = "" +
" VALUES ($1, $2, $3, $4, $5, false)"
const deleteInviteEventSQL = "" +
"UPDATE syncapi_invite_events SET deleted=true, id=$1 WHERE event_id = $2 AND deleted=false"
"UPDATE syncapi_invite_events SET deleted=true, target_user_id = $1 WHERE room_id = $2 AND deleted=false"
const selectInviteEventsInRangeSQL = "" +
"SELECT room_id, headered_event_json, deleted FROM syncapi_invite_events" +
@ -117,14 +117,14 @@ func (s *inviteEventsStatements) InsertInviteEvent(
}
func (s *inviteEventsStatements) DeleteInviteEvent(
ctx context.Context, txn *sql.Tx, inviteEventID string,
ctx context.Context, txn *sql.Tx, targetUserID, roomID string,
) (types.StreamPosition, error) {
streamPos, err := s.streamIDStatements.nextInviteID(ctx, txn)
if err != nil {
return streamPos, err
}
stmt := sqlutil.TxStmt(txn, s.deleteInviteEventStmt)
_, err = stmt.ExecContext(ctx, streamPos, inviteEventID)
_, err = stmt.ExecContext(ctx, targetUserID, roomID)
return streamPos, err
}

View file

@ -34,7 +34,7 @@ type AccountData interface {
type Invites interface {
InsertInviteEvent(ctx context.Context, txn *sql.Tx, inviteEvent *gomatrixserverlib.HeaderedEvent) (streamPos types.StreamPosition, err error)
DeleteInviteEvent(ctx context.Context, txn *sql.Tx, inviteEventID string) (types.StreamPosition, error)
DeleteInviteEvent(ctx context.Context, txn *sql.Tx, targetUserID, roomID string) (types.StreamPosition, error)
// SelectInviteEventsInRange returns a map of room ID to invite events. If multiple invite/retired invites exist in the given range, return the latest value
// for the room.
SelectInviteEventsInRange(ctx context.Context, txn *sql.Tx, targetUserID string, r types.Range) (invites map[string]*gomatrixserverlib.HeaderedEvent, retired map[string]*gomatrixserverlib.HeaderedEvent, err error)