diff --git a/roomserver/api/output.go b/roomserver/api/output.go index 36d0625c7..449b103a9 100644 --- a/roomserver/api/output.go +++ b/roomserver/api/output.go @@ -207,8 +207,8 @@ type OutputNewInviteEvent struct { // active. An invite stops being active if the user joins the room or if the // invite is rejected by the user. type OutputRetireInviteEvent struct { - // The ID of the "m.room.member" invite event. - EventID string + // The room ID we are retiring the invite for. + RoomID string // The target user ID of the "m.room.member" invite event that was retired. TargetUserID string // Optional event ID of the event that replaced the invite. diff --git a/roomserver/internal/helpers/helpers.go b/roomserver/internal/helpers/helpers.go index 6091f8ec2..b83277c81 100644 --- a/roomserver/internal/helpers/helpers.go +++ b/roomserver/internal/helpers/helpers.go @@ -29,7 +29,7 @@ func UpdateToInviteMembership( // reprocessing this event, or because the we received this invite from a // remote server via the federation invite API. In those cases we don't need // to send the event. - needsSending, retired, err := mu.Update(tables.MembershipStateInvite, add) + needsSending, _, err := mu.Update(tables.MembershipStateInvite, add) if err != nil { return nil, err } @@ -47,17 +47,15 @@ func UpdateToInviteMembership( }, }) } - for _, eventID := range retired { - updates = append(updates, api.OutputEvent{ - Type: api.OutputTypeRetireInviteEvent, - RetireInviteEvent: &api.OutputRetireInviteEvent{ - EventID: eventID, - Membership: gomatrixserverlib.Join, - RetiredByEventID: add.EventID(), - TargetUserID: *add.StateKey(), - }, - }) - } + updates = append(updates, api.OutputEvent{ + Type: api.OutputTypeRetireInviteEvent, + RetireInviteEvent: &api.OutputRetireInviteEvent{ + RoomID: add.RoomID(), + Membership: gomatrixserverlib.Join, + RetiredByEventID: add.EventID(), + TargetUserID: *add.StateKey(), + }, + }) return updates, nil } diff --git a/roomserver/internal/input/input_membership.go b/roomserver/internal/input/input_membership.go index 28a54623b..0822c7c2e 100644 --- a/roomserver/internal/input/input_membership.go +++ b/roomserver/internal/input/input_membership.go @@ -150,21 +150,18 @@ func updateToJoinMembership( // are active for that user. We notify the consumers that the invites have // been retired using a special event, even though they could infer this // by studying the state changes in the room event stream. - _, retired, err := mu.Update(tables.MembershipStateJoin, add) - if err != nil { + if _, _, err := mu.Update(tables.MembershipStateJoin, add); err != nil { return nil, err } - for _, eventID := range retired { - updates = append(updates, api.OutputEvent{ - Type: api.OutputTypeRetireInviteEvent, - RetireInviteEvent: &api.OutputRetireInviteEvent{ - EventID: eventID, - Membership: gomatrixserverlib.Join, - RetiredByEventID: add.EventID(), - TargetUserID: *add.StateKey(), - }, - }) - } + updates = append(updates, api.OutputEvent{ + Type: api.OutputTypeRetireInviteEvent, + RetireInviteEvent: &api.OutputRetireInviteEvent{ + RoomID: add.RoomID(), + Membership: gomatrixserverlib.Join, + RetiredByEventID: add.EventID(), + TargetUserID: *add.StateKey(), + }, + }) return updates, nil } @@ -176,21 +173,18 @@ func updateToLeaveMembership( // are active for that user. We notify the consumers that the invites have // been retired using a special event, even though they could infer this // by studying the state changes in the room event stream. - _, retired, err := mu.Update(tables.MembershipStateLeaveOrBan, add) - if err != nil { + if _, _, err := mu.Update(tables.MembershipStateLeaveOrBan, add); err != nil { return nil, err } - for _, eventID := range retired { - updates = append(updates, api.OutputEvent{ - Type: api.OutputTypeRetireInviteEvent, - RetireInviteEvent: &api.OutputRetireInviteEvent{ - EventID: eventID, - Membership: newMembership, - RetiredByEventID: add.EventID(), - TargetUserID: *add.StateKey(), - }, - }) - } + updates = append(updates, api.OutputEvent{ + Type: api.OutputTypeRetireInviteEvent, + RetireInviteEvent: &api.OutputRetireInviteEvent{ + RoomID: add.RoomID(), + Membership: newMembership, + RetiredByEventID: add.EventID(), + TargetUserID: *add.StateKey(), + }, + }) return updates, nil } diff --git a/roomserver/internal/perform/perform_leave.go b/roomserver/internal/perform/perform_leave.go index 036404cd2..be5238c0d 100644 --- a/roomserver/internal/perform/perform_leave.go +++ b/roomserver/internal/perform/perform_leave.go @@ -248,7 +248,7 @@ func (r *Leaver) performFederatedRejectInvite( { Type: api.OutputTypeRetireInviteEvent, RetireInviteEvent: &api.OutputRetireInviteEvent{ - EventID: eventID, + RoomID: req.RoomID, Membership: "leave", TargetUserID: req.UserID, }, diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index f77b1673b..7f3b6e148 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -376,15 +376,16 @@ func (s *OutputRoomEventConsumer) onNewInviteEvent( func (s *OutputRoomEventConsumer) onRetireInviteEvent( ctx context.Context, msg api.OutputRetireInviteEvent, ) { - pduPos, err := s.db.RetireInviteEvent(ctx, msg.EventID) + pduPos, err := s.db.RetireInviteEvent(ctx, msg.TargetUserID, msg.RoomID) // It's possible we just haven't heard of this invite yet, so // we should not panic if we try to retire it. if err != nil && err != sql.ErrNoRows { sentry.CaptureException(err) // panic rather than continue with an inconsistent database log.WithFields(log.Fields{ - "event_id": msg.EventID, - log.ErrorKey: err, + "target_user_id": msg.TargetUserID, + "room_id": msg.RoomID, + log.ErrorKey: err, }).Errorf("roomserver output log: remove invite failure") return } diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index 43a75da95..7e74378da 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -102,7 +102,7 @@ type Database interface { AddInviteEvent(ctx context.Context, inviteEvent *gomatrixserverlib.HeaderedEvent) (types.StreamPosition, error) // RetireInviteEvent removes an old invite event from the database. Returns the new position of the retired invite. // Returns an error if there was a problem communicating with the database. - RetireInviteEvent(ctx context.Context, inviteEventID string) (types.StreamPosition, error) + RetireInviteEvent(ctx context.Context, targetUserID, roomID string) (types.StreamPosition, error) // AddPeek adds a new peek to our DB for a given room by a given user's device. // Returns an error if there was a problem communicating with the database. AddPeek(ctx context.Context, RoomID, UserID, DeviceID string) (types.StreamPosition, error) diff --git a/syncapi/storage/postgres/invites_table.go b/syncapi/storage/postgres/invites_table.go index 97001ae2c..aee79f37d 100644 --- a/syncapi/storage/postgres/invites_table.go +++ b/syncapi/storage/postgres/invites_table.go @@ -52,7 +52,7 @@ const insertInviteEventSQL = "" + ") VALUES ($1, $2, $3, $4, FALSE) RETURNING id" const deleteInviteEventSQL = "" + - "UPDATE syncapi_invite_events SET deleted=TRUE, id=nextval('syncapi_stream_id') WHERE event_id = $1 AND deleted=FALSE RETURNING id" + "UPDATE syncapi_invite_events SET deleted=TRUE, id=nextval('syncapi_stream_id') WHERE target_user_id = $1 AND room_id = $2 AND deleted=FALSE RETURNING id" const selectInviteEventsInRangeSQL = "" + "SELECT room_id, headered_event_json, deleted FROM syncapi_invite_events" + @@ -110,10 +110,10 @@ func (s *inviteEventsStatements) InsertInviteEvent( } func (s *inviteEventsStatements) DeleteInviteEvent( - ctx context.Context, txn *sql.Tx, inviteEventID string, + ctx context.Context, txn *sql.Tx, targetUserID, roomID string, ) (sp types.StreamPosition, err error) { stmt := sqlutil.TxStmt(txn, s.deleteInviteEventStmt) - err = stmt.QueryRowContext(ctx, inviteEventID).Scan(&sp) + err = stmt.QueryRowContext(ctx, targetUserID, roomID).Scan(&sp) return } diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index a46e55256..e8b4bb515 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -209,10 +209,10 @@ func (d *Database) AddInviteEvent( // RetireInviteEvent removes an old invite event from the database. // Returns an error if there was a problem communicating with the database. func (d *Database) RetireInviteEvent( - ctx context.Context, inviteEventID string, + ctx context.Context, targetUserID, roomID string, ) (sp types.StreamPosition, err error) { _ = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { - sp, err = d.Invites.DeleteInviteEvent(ctx, txn, inviteEventID) + sp, err = d.Invites.DeleteInviteEvent(ctx, txn, targetUserID, roomID) return err }) return diff --git a/syncapi/storage/sqlite3/invites_table.go b/syncapi/storage/sqlite3/invites_table.go index 58ab8461e..47944089e 100644 --- a/syncapi/storage/sqlite3/invites_table.go +++ b/syncapi/storage/sqlite3/invites_table.go @@ -47,7 +47,7 @@ const insertInviteEventSQL = "" + " VALUES ($1, $2, $3, $4, $5, false)" const deleteInviteEventSQL = "" + - "UPDATE syncapi_invite_events SET deleted=true, id=$1 WHERE event_id = $2 AND deleted=false" + "UPDATE syncapi_invite_events SET deleted=true, target_user_id = $1 WHERE room_id = $2 AND deleted=false" const selectInviteEventsInRangeSQL = "" + "SELECT room_id, headered_event_json, deleted FROM syncapi_invite_events" + @@ -117,14 +117,14 @@ func (s *inviteEventsStatements) InsertInviteEvent( } func (s *inviteEventsStatements) DeleteInviteEvent( - ctx context.Context, txn *sql.Tx, inviteEventID string, + ctx context.Context, txn *sql.Tx, targetUserID, roomID string, ) (types.StreamPosition, error) { streamPos, err := s.streamIDStatements.nextInviteID(ctx, txn) if err != nil { return streamPos, err } stmt := sqlutil.TxStmt(txn, s.deleteInviteEventStmt) - _, err = stmt.ExecContext(ctx, streamPos, inviteEventID) + _, err = stmt.ExecContext(ctx, targetUserID, roomID) return streamPos, err } diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go index 468d26aca..0a0bdc9b2 100644 --- a/syncapi/storage/tables/interface.go +++ b/syncapi/storage/tables/interface.go @@ -34,7 +34,7 @@ type AccountData interface { type Invites interface { InsertInviteEvent(ctx context.Context, txn *sql.Tx, inviteEvent *gomatrixserverlib.HeaderedEvent) (streamPos types.StreamPosition, err error) - DeleteInviteEvent(ctx context.Context, txn *sql.Tx, inviteEventID string) (types.StreamPosition, error) + DeleteInviteEvent(ctx context.Context, txn *sql.Tx, targetUserID, roomID string) (types.StreamPosition, error) // SelectInviteEventsInRange returns a map of room ID to invite events. If multiple invite/retired invites exist in the given range, return the latest value // for the room. SelectInviteEventsInRange(ctx context.Context, txn *sql.Tx, targetUserID string, r types.Range) (invites map[string]*gomatrixserverlib.HeaderedEvent, retired map[string]*gomatrixserverlib.HeaderedEvent, err error)