Some PostgreSQL purging
This commit is contained in:
parent
4ef780c076
commit
537f300edf
|
@ -75,10 +75,14 @@ const updateInviteRetiredSQL = "" +
|
|||
" WHERE room_nid = $1 AND target_nid = $2 AND NOT retired" +
|
||||
" RETURNING invite_event_id"
|
||||
|
||||
const purgeInvitesSQL = "" +
|
||||
"DELETE FROM roomserver_invites WHERE room_nid = $1"
|
||||
|
||||
type inviteStatements struct {
|
||||
insertInviteEventStmt *sql.Stmt
|
||||
selectInviteActiveForUserInRoomStmt *sql.Stmt
|
||||
updateInviteRetiredStmt *sql.Stmt
|
||||
purgeInvitesStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func CreateInvitesTable(db *sql.DB) error {
|
||||
|
@ -93,6 +97,7 @@ func PrepareInvitesTable(db *sql.DB) (tables.Invites, error) {
|
|||
{&s.insertInviteEventStmt, insertInviteEventSQL},
|
||||
{&s.selectInviteActiveForUserInRoomStmt, selectInviteActiveForUserInRoomSQL},
|
||||
{&s.updateInviteRetiredStmt, updateInviteRetiredSQL},
|
||||
{&s.purgeInvitesStmt, purgeInvitesSQL},
|
||||
}.Prepare(db)
|
||||
}
|
||||
|
||||
|
@ -163,3 +168,10 @@ func (s *inviteStatements) SelectInviteActiveForUserInRoom(
|
|||
}
|
||||
return result, eventIDs, rows.Err()
|
||||
}
|
||||
|
||||
func (s *inviteStatements) PurgeInvites(
|
||||
ctx context.Context, txn *sql.Tx, roomNID types.RoomNID,
|
||||
) error {
|
||||
_, err := sqlutil.TxStmt(txn, s.purgeInvitesStmt).ExecContext(ctx, roomNID)
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -153,6 +153,9 @@ const selectServerInRoomSQL = "" +
|
|||
" JOIN roomserver_event_state_keys ON roomserver_membership.target_nid = roomserver_event_state_keys.event_state_key_nid" +
|
||||
" WHERE membership_nid = $1 AND room_nid = $2 AND event_state_key LIKE '%:' || $3 LIMIT 1"
|
||||
|
||||
const purgeMembershipsSQL = "" +
|
||||
"DELETE FROM roomserver_memberships WHERE room_nid = $1"
|
||||
|
||||
type membershipStatements struct {
|
||||
insertMembershipStmt *sql.Stmt
|
||||
selectMembershipForUpdateStmt *sql.Stmt
|
||||
|
@ -170,6 +173,7 @@ type membershipStatements struct {
|
|||
selectLocalServerInRoomStmt *sql.Stmt
|
||||
selectServerInRoomStmt *sql.Stmt
|
||||
deleteMembershipStmt *sql.Stmt
|
||||
purgeMembershipsStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func CreateMembershipTable(db *sql.DB) error {
|
||||
|
@ -205,6 +209,7 @@ func PrepareMembershipTable(db *sql.DB) (tables.Membership, error) {
|
|||
{&s.selectLocalServerInRoomStmt, selectLocalServerInRoomSQL},
|
||||
{&s.selectServerInRoomStmt, selectServerInRoomSQL},
|
||||
{&s.deleteMembershipStmt, deleteMembershipSQL},
|
||||
{&s.purgeMembershipsStmt, purgeMembershipsSQL},
|
||||
}.Prepare(db)
|
||||
}
|
||||
|
||||
|
@ -436,3 +441,10 @@ func (s *membershipStatements) DeleteMembership(
|
|||
)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *membershipStatements) PurgeMemberships(
|
||||
ctx context.Context, txn *sql.Tx, roomNID types.RoomNID,
|
||||
) error {
|
||||
_, err := sqlutil.TxStmt(txn, s.purgeMembershipsStmt).ExecContext(ctx, roomNID)
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -43,10 +43,14 @@ const selectAllPublishedSQL = "" +
|
|||
const selectPublishedSQL = "" +
|
||||
"SELECT published FROM roomserver_published WHERE room_id = $1"
|
||||
|
||||
const purgePublishedSQL = "" +
|
||||
"DELETE FROM roomserver_published WHERE room_id = $1"
|
||||
|
||||
type publishedStatements struct {
|
||||
upsertPublishedStmt *sql.Stmt
|
||||
selectAllPublishedStmt *sql.Stmt
|
||||
selectPublishedStmt *sql.Stmt
|
||||
purgePublishedStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func CreatePublishedTable(db *sql.DB) error {
|
||||
|
@ -61,6 +65,7 @@ func PreparePublishedTable(db *sql.DB) (tables.Published, error) {
|
|||
{&s.upsertPublishedStmt, upsertPublishedSQL},
|
||||
{&s.selectAllPublishedStmt, selectAllPublishedSQL},
|
||||
{&s.selectPublishedStmt, selectPublishedSQL},
|
||||
{&s.purgePublishedStmt, purgePublishedSQL},
|
||||
}.Prepare(db)
|
||||
}
|
||||
|
||||
|
@ -104,3 +109,10 @@ func (s *publishedStatements) SelectAllPublishedRooms(
|
|||
}
|
||||
return roomIDs, rows.Err()
|
||||
}
|
||||
|
||||
func (s *publishedStatements) PurgePublished(
|
||||
ctx context.Context, txn *sql.Tx, roomID string,
|
||||
) error {
|
||||
_, err := sqlutil.TxStmt(txn, s.purgePublishedStmt).ExecContext(ctx, roomID)
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -53,12 +53,16 @@ const selectCreatorIDFromAliasSQL = "" +
|
|||
const deleteRoomAliasSQL = "" +
|
||||
"DELETE FROM roomserver_room_aliases WHERE alias = $1"
|
||||
|
||||
const purgeRoomAliasesSQL = "" +
|
||||
"DELETE FROM roomserver_room_aliases WHERE room_id = $1"
|
||||
|
||||
type roomAliasesStatements struct {
|
||||
insertRoomAliasStmt *sql.Stmt
|
||||
selectRoomIDFromAliasStmt *sql.Stmt
|
||||
selectAliasesFromRoomIDStmt *sql.Stmt
|
||||
selectCreatorIDFromAliasStmt *sql.Stmt
|
||||
deleteRoomAliasStmt *sql.Stmt
|
||||
purgeRoomAliasesStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func CreateRoomAliasesTable(db *sql.DB) error {
|
||||
|
@ -75,6 +79,7 @@ func PrepareRoomAliasesTable(db *sql.DB) (tables.RoomAliases, error) {
|
|||
{&s.selectAliasesFromRoomIDStmt, selectAliasesFromRoomIDSQL},
|
||||
{&s.selectCreatorIDFromAliasStmt, selectCreatorIDFromAliasSQL},
|
||||
{&s.deleteRoomAliasStmt, deleteRoomAliasSQL},
|
||||
{&s.purgeRoomAliasesStmt, purgeRoomAliasesSQL},
|
||||
}.Prepare(db)
|
||||
}
|
||||
|
||||
|
@ -137,3 +142,10 @@ func (s *roomAliasesStatements) DeleteRoomAlias(
|
|||
_, err = stmt.ExecContext(ctx, alias)
|
||||
return
|
||||
}
|
||||
|
||||
func (s *roomAliasesStatements) PurgeRoomAliases(
|
||||
ctx context.Context, txn *sql.Tx, roomID string,
|
||||
) error {
|
||||
_, err := sqlutil.TxStmt(txn, s.purgeRoomAliasesStmt).ExecContext(ctx, roomID)
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -65,9 +65,16 @@ const bulkSelectStateBlockEntriesSQL = "" +
|
|||
"SELECT state_block_nid, event_nids" +
|
||||
" FROM roomserver_state_block WHERE state_block_nid = ANY($1) ORDER BY state_block_nid ASC"
|
||||
|
||||
const purgeStateBlockEntriesSQL = `
|
||||
DELETE FROM roomserver_state_block WHERE state_block_nid = ANY(
|
||||
SELECT DISTINCT UNNEST(state_block_nids) FROM roomserver_state_snapshots WHERE room_nid = $1
|
||||
)
|
||||
`
|
||||
|
||||
type stateBlockStatements struct {
|
||||
insertStateDataStmt *sql.Stmt
|
||||
bulkSelectStateBlockEntriesStmt *sql.Stmt
|
||||
purgeStateBlockEntriesStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func CreateStateBlockTable(db *sql.DB) error {
|
||||
|
@ -81,6 +88,7 @@ func PrepareStateBlockTable(db *sql.DB) (tables.StateBlock, error) {
|
|||
return s, sqlutil.StatementList{
|
||||
{&s.insertStateDataStmt, insertStateDataSQL},
|
||||
{&s.bulkSelectStateBlockEntriesStmt, bulkSelectStateBlockEntriesSQL},
|
||||
{&s.purgeStateBlockEntriesStmt, purgeStateBlockEntriesSQL},
|
||||
}.Prepare(db)
|
||||
}
|
||||
|
||||
|
@ -133,6 +141,13 @@ func (s *stateBlockStatements) BulkSelectStateBlockEntries(
|
|||
return results, err
|
||||
}
|
||||
|
||||
func (s *stateBlockStatements) PurgeStateBlocks(
|
||||
ctx context.Context, txn *sql.Tx, roomNID types.RoomNID,
|
||||
) error {
|
||||
_, err := sqlutil.TxStmt(txn, s.purgeStateBlockEntriesStmt).ExecContext(ctx, roomNID)
|
||||
return err
|
||||
}
|
||||
|
||||
func stateBlockNIDsAsArray(stateBlockNIDs []types.StateBlockNID) pq.Int64Array {
|
||||
nids := make([]int64, len(stateBlockNIDs))
|
||||
for i := range stateBlockNIDs {
|
||||
|
|
|
@ -72,6 +72,11 @@ const bulkSelectStateBlockNIDsSQL = "" +
|
|||
"SELECT state_snapshot_nid, state_block_nids FROM roomserver_state_snapshots" +
|
||||
" WHERE state_snapshot_nid = ANY($1) ORDER BY state_snapshot_nid ASC"
|
||||
|
||||
// Look up state snapshot NIDs for the given room.
|
||||
const purgeStateSnapshotEntriesSQL = `
|
||||
DELETE FROM roomserver_state_snapshots WHERE room_nid = $1
|
||||
`
|
||||
|
||||
// Looks up both the history visibility event and relevant membership events from
|
||||
// a given domain name from a given state snapshot. This is used to optimise the
|
||||
// helpers.CheckServerAllowedToSeeEvent function.
|
||||
|
@ -101,6 +106,7 @@ type stateSnapshotStatements struct {
|
|||
insertStateStmt *sql.Stmt
|
||||
bulkSelectStateBlockNIDsStmt *sql.Stmt
|
||||
bulkSelectStateForHistoryVisibilityStmt *sql.Stmt
|
||||
purgeStateSnapshotEntriesStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func CreateStateSnapshotTable(db *sql.DB) error {
|
||||
|
@ -115,6 +121,7 @@ func PrepareStateSnapshotTable(db *sql.DB) (tables.StateSnapshot, error) {
|
|||
{&s.insertStateStmt, insertStateSQL},
|
||||
{&s.bulkSelectStateBlockNIDsStmt, bulkSelectStateBlockNIDsSQL},
|
||||
{&s.bulkSelectStateForHistoryVisibilityStmt, bulkSelectStateForHistoryVisibilitySQL},
|
||||
{&s.purgeStateSnapshotEntriesStmt, purgeStateSnapshotEntriesSQL},
|
||||
}.Prepare(db)
|
||||
}
|
||||
|
||||
|
@ -183,3 +190,10 @@ func (s *stateSnapshotStatements) BulkSelectStateForHistoryVisibility(
|
|||
}
|
||||
return results, rows.Err()
|
||||
}
|
||||
|
||||
func (s *stateSnapshotStatements) PurgeStateSnapshots(
|
||||
ctx context.Context, txn *sql.Tx, roomNID types.RoomNID,
|
||||
) error {
|
||||
_, err := sqlutil.TxStmt(txn, s.purgeStateSnapshotEntriesStmt).ExecContext(ctx, roomNID)
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -1352,6 +1352,43 @@ func (d *Database) ForgetRoom(ctx context.Context, userID, roomID string, forget
|
|||
})
|
||||
}
|
||||
|
||||
// PurgeRoom removes all information about a given room from the roomserver.
|
||||
// For large rooms this operation may take a considerable amount of time.
|
||||
func (d *Database) PurgeRoom(ctx context.Context, roomID string) error {
|
||||
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||
roomNID, err := d.RoomsTable.SelectRoomNID(ctx, txn, roomID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to find room NID: %w", err)
|
||||
}
|
||||
if err := d.StateBlockTable.PurgeStateBlocks(ctx, txn, roomNID); err != nil {
|
||||
return fmt.Errorf("failed to purge state blocks: %w", err)
|
||||
}
|
||||
if err := d.StateSnapshotTable.PurgeStateSnapshots(ctx, txn, roomNID); err != nil {
|
||||
return fmt.Errorf("failed to purge state blocks: %w", err)
|
||||
}
|
||||
if err := d.InvitesTable.PurgeInvites(ctx, txn, roomNID); err != nil {
|
||||
return fmt.Errorf("failed to purge invites: %w", err)
|
||||
}
|
||||
if err := d.MembershipTable.PurgeMemberships(ctx, txn, roomNID); err != nil {
|
||||
return fmt.Errorf("failed to purge memberships: %w", err)
|
||||
}
|
||||
if err := d.RoomAliasesTable.PurgeRoomAliases(ctx, txn, roomID); err != nil {
|
||||
return fmt.Errorf("failed to purge memberships: %w", err)
|
||||
}
|
||||
if err := d.PublishedTable.PurgePublished(ctx, txn, roomID); err != nil {
|
||||
return fmt.Errorf("failed to purge memberships: %w", err)
|
||||
}
|
||||
|
||||
// List:
|
||||
// * events table
|
||||
// * previous events table
|
||||
// * event JSONs table
|
||||
// * redactions table
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// FIXME TODO: Remove all this - horrible dupe with roomserver/state. Can't use the original impl because of circular loops
|
||||
// it should live in this package!
|
||||
|
||||
|
|
|
@ -89,12 +89,14 @@ type StateSnapshot interface {
|
|||
// which users are in a room faster than having to load the entire room state. In the
|
||||
// case of SQLite, this will return tables.OptimisationNotSupportedError.
|
||||
BulkSelectStateForHistoryVisibility(ctx context.Context, txn *sql.Tx, stateSnapshotNID types.StateSnapshotNID, domain string) ([]types.EventNID, error)
|
||||
PurgeStateSnapshots(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID) error
|
||||
}
|
||||
|
||||
type StateBlock interface {
|
||||
BulkInsertStateData(ctx context.Context, txn *sql.Tx, entries types.StateEntries) (types.StateBlockNID, error)
|
||||
BulkSelectStateBlockEntries(ctx context.Context, txn *sql.Tx, stateBlockNIDs types.StateBlockNIDs) ([][]types.EventNID, error)
|
||||
//BulkSelectFilteredStateBlockEntries(ctx context.Context, stateBlockNIDs []types.StateBlockNID, stateKeyTuples []types.StateKeyTuple) ([]types.StateEntryList, error)
|
||||
PurgeStateBlocks(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID) error
|
||||
}
|
||||
|
||||
type RoomAliases interface {
|
||||
|
@ -103,6 +105,7 @@ type RoomAliases interface {
|
|||
SelectAliasesFromRoomID(ctx context.Context, txn *sql.Tx, roomID string) ([]string, error)
|
||||
SelectCreatorIDFromAlias(ctx context.Context, txn *sql.Tx, alias string) (creatorID string, err error)
|
||||
DeleteRoomAlias(ctx context.Context, txn *sql.Tx, alias string) (err error)
|
||||
PurgeRoomAliases(ctx context.Context, txn *sql.Tx, roomID string) error
|
||||
}
|
||||
|
||||
type PreviousEvents interface {
|
||||
|
@ -117,6 +120,7 @@ type Invites interface {
|
|||
UpdateInviteRetired(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, targetUserNID types.EventStateKeyNID) ([]string, error)
|
||||
// SelectInviteActiveForUserInRoom returns a list of sender state key NIDs and invite event IDs matching those nids.
|
||||
SelectInviteActiveForUserInRoom(ctx context.Context, txn *sql.Tx, targetUserNID types.EventStateKeyNID, roomNID types.RoomNID) ([]types.EventStateKeyNID, []string, error)
|
||||
PurgeInvites(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID) error
|
||||
}
|
||||
|
||||
type MembershipState int64
|
||||
|
@ -143,12 +147,14 @@ type Membership interface {
|
|||
SelectLocalServerInRoom(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID) (bool, error)
|
||||
SelectServerInRoom(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, serverName gomatrixserverlib.ServerName) (bool, error)
|
||||
DeleteMembership(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, targetUserNID types.EventStateKeyNID) error
|
||||
PurgeMemberships(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID) error
|
||||
}
|
||||
|
||||
type Published interface {
|
||||
UpsertRoomPublished(ctx context.Context, txn *sql.Tx, roomID string, published bool) (err error)
|
||||
SelectPublishedFromRoomID(ctx context.Context, txn *sql.Tx, roomID string) (published bool, err error)
|
||||
SelectAllPublishedRooms(ctx context.Context, txn *sql.Tx, published bool) ([]string, error)
|
||||
PurgePublished(ctx context.Context, txn *sql.Tx, roomID string) error
|
||||
}
|
||||
|
||||
type RedactionInfo struct {
|
||||
|
|
Loading…
Reference in a new issue