From 537f300edf14e2dc20eb5f8adf9aa99f9377db76 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 22 Aug 2022 11:26:18 +0100 Subject: [PATCH] Some PostgreSQL purging --- roomserver/storage/postgres/invite_table.go | 12 ++++++ .../storage/postgres/membership_table.go | 12 ++++++ .../storage/postgres/published_table.go | 12 ++++++ .../storage/postgres/room_aliases_table.go | 12 ++++++ .../storage/postgres/state_block_table.go | 15 ++++++++ .../storage/postgres/state_snapshot_table.go | 14 +++++++ roomserver/storage/shared/storage.go | 37 +++++++++++++++++++ roomserver/storage/tables/interface.go | 6 +++ 8 files changed, 120 insertions(+) diff --git a/roomserver/storage/postgres/invite_table.go b/roomserver/storage/postgres/invite_table.go index 4cddfe2e9..d59136788 100644 --- a/roomserver/storage/postgres/invite_table.go +++ b/roomserver/storage/postgres/invite_table.go @@ -75,10 +75,14 @@ const updateInviteRetiredSQL = "" + " WHERE room_nid = $1 AND target_nid = $2 AND NOT retired" + " RETURNING invite_event_id" +const purgeInvitesSQL = "" + + "DELETE FROM roomserver_invites WHERE room_nid = $1" + type inviteStatements struct { insertInviteEventStmt *sql.Stmt selectInviteActiveForUserInRoomStmt *sql.Stmt updateInviteRetiredStmt *sql.Stmt + purgeInvitesStmt *sql.Stmt } func CreateInvitesTable(db *sql.DB) error { @@ -93,6 +97,7 @@ func PrepareInvitesTable(db *sql.DB) (tables.Invites, error) { {&s.insertInviteEventStmt, insertInviteEventSQL}, {&s.selectInviteActiveForUserInRoomStmt, selectInviteActiveForUserInRoomSQL}, {&s.updateInviteRetiredStmt, updateInviteRetiredSQL}, + {&s.purgeInvitesStmt, purgeInvitesSQL}, }.Prepare(db) } @@ -163,3 +168,10 @@ func (s *inviteStatements) SelectInviteActiveForUserInRoom( } return result, eventIDs, rows.Err() } + +func (s *inviteStatements) PurgeInvites( + ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, +) error { + _, err := sqlutil.TxStmt(txn, s.purgeInvitesStmt).ExecContext(ctx, roomNID) + return err +} diff --git a/roomserver/storage/postgres/membership_table.go b/roomserver/storage/postgres/membership_table.go index bd3fd5592..855d27ba7 100644 --- a/roomserver/storage/postgres/membership_table.go +++ b/roomserver/storage/postgres/membership_table.go @@ -153,6 +153,9 @@ const selectServerInRoomSQL = "" + " JOIN roomserver_event_state_keys ON roomserver_membership.target_nid = roomserver_event_state_keys.event_state_key_nid" + " WHERE membership_nid = $1 AND room_nid = $2 AND event_state_key LIKE '%:' || $3 LIMIT 1" +const purgeMembershipsSQL = "" + + "DELETE FROM roomserver_memberships WHERE room_nid = $1" + type membershipStatements struct { insertMembershipStmt *sql.Stmt selectMembershipForUpdateStmt *sql.Stmt @@ -170,6 +173,7 @@ type membershipStatements struct { selectLocalServerInRoomStmt *sql.Stmt selectServerInRoomStmt *sql.Stmt deleteMembershipStmt *sql.Stmt + purgeMembershipsStmt *sql.Stmt } func CreateMembershipTable(db *sql.DB) error { @@ -205,6 +209,7 @@ func PrepareMembershipTable(db *sql.DB) (tables.Membership, error) { {&s.selectLocalServerInRoomStmt, selectLocalServerInRoomSQL}, {&s.selectServerInRoomStmt, selectServerInRoomSQL}, {&s.deleteMembershipStmt, deleteMembershipSQL}, + {&s.purgeMembershipsStmt, purgeMembershipsSQL}, }.Prepare(db) } @@ -436,3 +441,10 @@ func (s *membershipStatements) DeleteMembership( ) return err } + +func (s *membershipStatements) PurgeMemberships( + ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, +) error { + _, err := sqlutil.TxStmt(txn, s.purgeMembershipsStmt).ExecContext(ctx, roomNID) + return err +} diff --git a/roomserver/storage/postgres/published_table.go b/roomserver/storage/postgres/published_table.go index 56fa02f7b..0fab5670d 100644 --- a/roomserver/storage/postgres/published_table.go +++ b/roomserver/storage/postgres/published_table.go @@ -43,10 +43,14 @@ const selectAllPublishedSQL = "" + const selectPublishedSQL = "" + "SELECT published FROM roomserver_published WHERE room_id = $1" +const purgePublishedSQL = "" + + "DELETE FROM roomserver_published WHERE room_id = $1" + type publishedStatements struct { upsertPublishedStmt *sql.Stmt selectAllPublishedStmt *sql.Stmt selectPublishedStmt *sql.Stmt + purgePublishedStmt *sql.Stmt } func CreatePublishedTable(db *sql.DB) error { @@ -61,6 +65,7 @@ func PreparePublishedTable(db *sql.DB) (tables.Published, error) { {&s.upsertPublishedStmt, upsertPublishedSQL}, {&s.selectAllPublishedStmt, selectAllPublishedSQL}, {&s.selectPublishedStmt, selectPublishedSQL}, + {&s.purgePublishedStmt, purgePublishedSQL}, }.Prepare(db) } @@ -104,3 +109,10 @@ func (s *publishedStatements) SelectAllPublishedRooms( } return roomIDs, rows.Err() } + +func (s *publishedStatements) PurgePublished( + ctx context.Context, txn *sql.Tx, roomID string, +) error { + _, err := sqlutil.TxStmt(txn, s.purgePublishedStmt).ExecContext(ctx, roomID) + return err +} diff --git a/roomserver/storage/postgres/room_aliases_table.go b/roomserver/storage/postgres/room_aliases_table.go index a84929f61..5e95e8452 100644 --- a/roomserver/storage/postgres/room_aliases_table.go +++ b/roomserver/storage/postgres/room_aliases_table.go @@ -53,12 +53,16 @@ const selectCreatorIDFromAliasSQL = "" + const deleteRoomAliasSQL = "" + "DELETE FROM roomserver_room_aliases WHERE alias = $1" +const purgeRoomAliasesSQL = "" + + "DELETE FROM roomserver_room_aliases WHERE room_id = $1" + type roomAliasesStatements struct { insertRoomAliasStmt *sql.Stmt selectRoomIDFromAliasStmt *sql.Stmt selectAliasesFromRoomIDStmt *sql.Stmt selectCreatorIDFromAliasStmt *sql.Stmt deleteRoomAliasStmt *sql.Stmt + purgeRoomAliasesStmt *sql.Stmt } func CreateRoomAliasesTable(db *sql.DB) error { @@ -75,6 +79,7 @@ func PrepareRoomAliasesTable(db *sql.DB) (tables.RoomAliases, error) { {&s.selectAliasesFromRoomIDStmt, selectAliasesFromRoomIDSQL}, {&s.selectCreatorIDFromAliasStmt, selectCreatorIDFromAliasSQL}, {&s.deleteRoomAliasStmt, deleteRoomAliasSQL}, + {&s.purgeRoomAliasesStmt, purgeRoomAliasesSQL}, }.Prepare(db) } @@ -137,3 +142,10 @@ func (s *roomAliasesStatements) DeleteRoomAlias( _, err = stmt.ExecContext(ctx, alias) return } + +func (s *roomAliasesStatements) PurgeRoomAliases( + ctx context.Context, txn *sql.Tx, roomID string, +) error { + _, err := sqlutil.TxStmt(txn, s.purgeRoomAliasesStmt).ExecContext(ctx, roomID) + return err +} diff --git a/roomserver/storage/postgres/state_block_table.go b/roomserver/storage/postgres/state_block_table.go index 5af48f031..42672fe94 100644 --- a/roomserver/storage/postgres/state_block_table.go +++ b/roomserver/storage/postgres/state_block_table.go @@ -65,9 +65,16 @@ const bulkSelectStateBlockEntriesSQL = "" + "SELECT state_block_nid, event_nids" + " FROM roomserver_state_block WHERE state_block_nid = ANY($1) ORDER BY state_block_nid ASC" +const purgeStateBlockEntriesSQL = ` + DELETE FROM roomserver_state_block WHERE state_block_nid = ANY( + SELECT DISTINCT UNNEST(state_block_nids) FROM roomserver_state_snapshots WHERE room_nid = $1 + ) +` + type stateBlockStatements struct { insertStateDataStmt *sql.Stmt bulkSelectStateBlockEntriesStmt *sql.Stmt + purgeStateBlockEntriesStmt *sql.Stmt } func CreateStateBlockTable(db *sql.DB) error { @@ -81,6 +88,7 @@ func PrepareStateBlockTable(db *sql.DB) (tables.StateBlock, error) { return s, sqlutil.StatementList{ {&s.insertStateDataStmt, insertStateDataSQL}, {&s.bulkSelectStateBlockEntriesStmt, bulkSelectStateBlockEntriesSQL}, + {&s.purgeStateBlockEntriesStmt, purgeStateBlockEntriesSQL}, }.Prepare(db) } @@ -133,6 +141,13 @@ func (s *stateBlockStatements) BulkSelectStateBlockEntries( return results, err } +func (s *stateBlockStatements) PurgeStateBlocks( + ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, +) error { + _, err := sqlutil.TxStmt(txn, s.purgeStateBlockEntriesStmt).ExecContext(ctx, roomNID) + return err +} + func stateBlockNIDsAsArray(stateBlockNIDs []types.StateBlockNID) pq.Int64Array { nids := make([]int64, len(stateBlockNIDs)) for i := range stateBlockNIDs { diff --git a/roomserver/storage/postgres/state_snapshot_table.go b/roomserver/storage/postgres/state_snapshot_table.go index 99c76befe..4d2fc5a88 100644 --- a/roomserver/storage/postgres/state_snapshot_table.go +++ b/roomserver/storage/postgres/state_snapshot_table.go @@ -72,6 +72,11 @@ const bulkSelectStateBlockNIDsSQL = "" + "SELECT state_snapshot_nid, state_block_nids FROM roomserver_state_snapshots" + " WHERE state_snapshot_nid = ANY($1) ORDER BY state_snapshot_nid ASC" +// Look up state snapshot NIDs for the given room. +const purgeStateSnapshotEntriesSQL = ` + DELETE FROM roomserver_state_snapshots WHERE room_nid = $1 +` + // Looks up both the history visibility event and relevant membership events from // a given domain name from a given state snapshot. This is used to optimise the // helpers.CheckServerAllowedToSeeEvent function. @@ -101,6 +106,7 @@ type stateSnapshotStatements struct { insertStateStmt *sql.Stmt bulkSelectStateBlockNIDsStmt *sql.Stmt bulkSelectStateForHistoryVisibilityStmt *sql.Stmt + purgeStateSnapshotEntriesStmt *sql.Stmt } func CreateStateSnapshotTable(db *sql.DB) error { @@ -115,6 +121,7 @@ func PrepareStateSnapshotTable(db *sql.DB) (tables.StateSnapshot, error) { {&s.insertStateStmt, insertStateSQL}, {&s.bulkSelectStateBlockNIDsStmt, bulkSelectStateBlockNIDsSQL}, {&s.bulkSelectStateForHistoryVisibilityStmt, bulkSelectStateForHistoryVisibilitySQL}, + {&s.purgeStateSnapshotEntriesStmt, purgeStateSnapshotEntriesSQL}, }.Prepare(db) } @@ -183,3 +190,10 @@ func (s *stateSnapshotStatements) BulkSelectStateForHistoryVisibility( } return results, rows.Err() } + +func (s *stateSnapshotStatements) PurgeStateSnapshots( + ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, +) error { + _, err := sqlutil.TxStmt(txn, s.purgeStateSnapshotEntriesStmt).ExecContext(ctx, roomNID) + return err +} diff --git a/roomserver/storage/shared/storage.go b/roomserver/storage/shared/storage.go index f35592a76..6accaf1d8 100644 --- a/roomserver/storage/shared/storage.go +++ b/roomserver/storage/shared/storage.go @@ -1352,6 +1352,43 @@ func (d *Database) ForgetRoom(ctx context.Context, userID, roomID string, forget }) } +// PurgeRoom removes all information about a given room from the roomserver. +// For large rooms this operation may take a considerable amount of time. +func (d *Database) PurgeRoom(ctx context.Context, roomID string) error { + return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { + roomNID, err := d.RoomsTable.SelectRoomNID(ctx, txn, roomID) + if err != nil { + return fmt.Errorf("failed to find room NID: %w", err) + } + if err := d.StateBlockTable.PurgeStateBlocks(ctx, txn, roomNID); err != nil { + return fmt.Errorf("failed to purge state blocks: %w", err) + } + if err := d.StateSnapshotTable.PurgeStateSnapshots(ctx, txn, roomNID); err != nil { + return fmt.Errorf("failed to purge state blocks: %w", err) + } + if err := d.InvitesTable.PurgeInvites(ctx, txn, roomNID); err != nil { + return fmt.Errorf("failed to purge invites: %w", err) + } + if err := d.MembershipTable.PurgeMemberships(ctx, txn, roomNID); err != nil { + return fmt.Errorf("failed to purge memberships: %w", err) + } + if err := d.RoomAliasesTable.PurgeRoomAliases(ctx, txn, roomID); err != nil { + return fmt.Errorf("failed to purge memberships: %w", err) + } + if err := d.PublishedTable.PurgePublished(ctx, txn, roomID); err != nil { + return fmt.Errorf("failed to purge memberships: %w", err) + } + + // List: + // * events table + // * previous events table + // * event JSONs table + // * redactions table + + return nil + }) +} + // FIXME TODO: Remove all this - horrible dupe with roomserver/state. Can't use the original impl because of circular loops // it should live in this package! diff --git a/roomserver/storage/tables/interface.go b/roomserver/storage/tables/interface.go index 68d30f994..71eea45d1 100644 --- a/roomserver/storage/tables/interface.go +++ b/roomserver/storage/tables/interface.go @@ -89,12 +89,14 @@ type StateSnapshot interface { // which users are in a room faster than having to load the entire room state. In the // case of SQLite, this will return tables.OptimisationNotSupportedError. BulkSelectStateForHistoryVisibility(ctx context.Context, txn *sql.Tx, stateSnapshotNID types.StateSnapshotNID, domain string) ([]types.EventNID, error) + PurgeStateSnapshots(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID) error } type StateBlock interface { BulkInsertStateData(ctx context.Context, txn *sql.Tx, entries types.StateEntries) (types.StateBlockNID, error) BulkSelectStateBlockEntries(ctx context.Context, txn *sql.Tx, stateBlockNIDs types.StateBlockNIDs) ([][]types.EventNID, error) //BulkSelectFilteredStateBlockEntries(ctx context.Context, stateBlockNIDs []types.StateBlockNID, stateKeyTuples []types.StateKeyTuple) ([]types.StateEntryList, error) + PurgeStateBlocks(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID) error } type RoomAliases interface { @@ -103,6 +105,7 @@ type RoomAliases interface { SelectAliasesFromRoomID(ctx context.Context, txn *sql.Tx, roomID string) ([]string, error) SelectCreatorIDFromAlias(ctx context.Context, txn *sql.Tx, alias string) (creatorID string, err error) DeleteRoomAlias(ctx context.Context, txn *sql.Tx, alias string) (err error) + PurgeRoomAliases(ctx context.Context, txn *sql.Tx, roomID string) error } type PreviousEvents interface { @@ -117,6 +120,7 @@ type Invites interface { UpdateInviteRetired(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, targetUserNID types.EventStateKeyNID) ([]string, error) // SelectInviteActiveForUserInRoom returns a list of sender state key NIDs and invite event IDs matching those nids. SelectInviteActiveForUserInRoom(ctx context.Context, txn *sql.Tx, targetUserNID types.EventStateKeyNID, roomNID types.RoomNID) ([]types.EventStateKeyNID, []string, error) + PurgeInvites(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID) error } type MembershipState int64 @@ -143,12 +147,14 @@ type Membership interface { SelectLocalServerInRoom(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID) (bool, error) SelectServerInRoom(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, serverName gomatrixserverlib.ServerName) (bool, error) DeleteMembership(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, targetUserNID types.EventStateKeyNID) error + PurgeMemberships(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID) error } type Published interface { UpsertRoomPublished(ctx context.Context, txn *sql.Tx, roomID string, published bool) (err error) SelectPublishedFromRoomID(ctx context.Context, txn *sql.Tx, roomID string) (published bool, err error) SelectAllPublishedRooms(ctx context.Context, txn *sql.Tx, published bool) ([]string, error) + PurgePublished(ctx context.Context, txn *sql.Tx, roomID string) error } type RedactionInfo struct {