diff --git a/roomserver/storage/postgres/event_json_table.go b/roomserver/storage/postgres/event_json_table.go index 5f069ca10..6b493b566 100644 --- a/roomserver/storage/postgres/event_json_table.go +++ b/roomserver/storage/postgres/event_json_table.go @@ -54,9 +54,16 @@ const bulkSelectEventJSONSQL = "" + " WHERE event_nid = ANY($1)" + " ORDER BY event_nid ASC" +const purgeEventJSONSQL = ` + DELETE FROM roomserver_event_json WHERE event_nid = ANY( + SELECT event_nid FROM roomserver_events WHERE room_nid = $1 + ) +` + type eventJSONStatements struct { insertEventJSONStmt *sql.Stmt bulkSelectEventJSONStmt *sql.Stmt + purgeEventJSONStmt *sql.Stmt } func CreateEventJSONTable(db *sql.DB) error { @@ -70,6 +77,7 @@ func PrepareEventJSONTable(db *sql.DB) (tables.EventJSON, error) { return s, sqlutil.StatementList{ {&s.insertEventJSONStmt, insertEventJSONSQL}, {&s.bulkSelectEventJSONStmt, bulkSelectEventJSONSQL}, + {&s.purgeEventJSONStmt, purgeEventJSONSQL}, }.Prepare(db) } @@ -107,3 +115,10 @@ func (s *eventJSONStatements) BulkSelectEventJSON( } return results[:i], rows.Err() } + +func (s *eventJSONStatements) PurgeEventJSONs( + ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, +) error { + _, err := sqlutil.TxStmt(txn, s.purgeEventJSONStmt).ExecContext(ctx, roomNID) + return err +} diff --git a/roomserver/storage/postgres/events_table.go b/roomserver/storage/postgres/events_table.go index a310c3963..6fa95705f 100644 --- a/roomserver/storage/postgres/events_table.go +++ b/roomserver/storage/postgres/events_table.go @@ -147,6 +147,9 @@ const selectRoomNIDsForEventNIDsSQL = "" + const selectEventRejectedSQL = "" + "SELECT is_rejected FROM roomserver_events WHERE room_nid = $1 AND event_id = $2" +const purgeEventsSQL = "" + + "DELETE FROM roomserver_events WHERE room_nid = $1" + type eventStatements struct { insertEventStmt *sql.Stmt selectEventStmt *sql.Stmt @@ -166,6 +169,7 @@ type eventStatements struct { selectMaxEventDepthStmt *sql.Stmt selectRoomNIDsForEventNIDsStmt *sql.Stmt selectEventRejectedStmt *sql.Stmt + purgeEventsStmt *sql.Stmt } func CreateEventsTable(db *sql.DB) error { @@ -195,6 +199,7 @@ func PrepareEventsTable(db *sql.DB) (tables.Events, error) { {&s.selectMaxEventDepthStmt, selectMaxEventDepthSQL}, {&s.selectRoomNIDsForEventNIDsStmt, selectRoomNIDsForEventNIDsSQL}, {&s.selectEventRejectedStmt, selectEventRejectedSQL}, + {&s.purgeEventsStmt, purgeEventsSQL}, }.Prepare(db) } @@ -571,3 +576,10 @@ func (s *eventStatements) SelectEventRejected( err = stmt.QueryRowContext(ctx, roomNID, eventID).Scan(&rejected) return } + +func (s *eventStatements) PurgeEvents( + ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, +) error { + _, err := sqlutil.TxStmt(txn, s.purgeEventsStmt).ExecContext(ctx, roomNID) + return err +} diff --git a/roomserver/storage/postgres/previous_events_table.go b/roomserver/storage/postgres/previous_events_table.go index 26999a290..a4a8521a5 100644 --- a/roomserver/storage/postgres/previous_events_table.go +++ b/roomserver/storage/postgres/previous_events_table.go @@ -59,9 +59,16 @@ const selectPreviousEventExistsSQL = "" + "SELECT 1 FROM roomserver_previous_events" + " WHERE previous_event_id = $1 AND previous_reference_sha256 = $2" +const purgePreviousEventsSQL = ` + DELETE FROM roomserver_previous_events WHERE event_nids && ANY( + SELECT ARRAY_AGG(event_nid) FROM roomserver_events WHERE room_nid = 42 + ) +` + type previousEventStatements struct { insertPreviousEventStmt *sql.Stmt selectPreviousEventExistsStmt *sql.Stmt + purgePreviousEventsStmt *sql.Stmt } func CreatePrevEventsTable(db *sql.DB) error { @@ -75,6 +82,7 @@ func PreparePrevEventsTable(db *sql.DB) (tables.PreviousEvents, error) { return s, sqlutil.StatementList{ {&s.insertPreviousEventStmt, insertPreviousEventSQL}, {&s.selectPreviousEventExistsStmt, selectPreviousEventExistsSQL}, + {&s.purgePreviousEventsStmt, purgePreviousEventsSQL}, }.Prepare(db) } @@ -101,3 +109,10 @@ func (s *previousEventStatements) SelectPreviousEventExists( stmt := sqlutil.TxStmt(txn, s.selectPreviousEventExistsStmt) return stmt.QueryRowContext(ctx, eventID, eventReferenceSHA256).Scan(&ok) } + +func (s *previousEventStatements) PurgePreviousEvents( + ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, +) error { + _, err := sqlutil.TxStmt(txn, s.purgePreviousEventsStmt).ExecContext(ctx, roomNID) + return err +} diff --git a/roomserver/storage/postgres/redactions_table.go b/roomserver/storage/postgres/redactions_table.go index 6e2f6712d..53f130fae 100644 --- a/roomserver/storage/postgres/redactions_table.go +++ b/roomserver/storage/postgres/redactions_table.go @@ -20,6 +20,7 @@ import ( "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/roomserver/storage/tables" + "github.com/matrix-org/dendrite/roomserver/types" ) const redactionsSchema = ` @@ -52,11 +53,18 @@ const selectRedactionInfoByEventBeingRedactedSQL = "" + const markRedactionValidatedSQL = "" + " UPDATE roomserver_redactions SET validated = $2 WHERE redaction_event_id = $1" +const purgeRedactionsSQL = ` + DELETE FROM roomserver_redactions WHERE redaction_event_id = ANY( + SELECT event_id FROM roomserver_events WHERE room_nid = $1 + ) +` + type redactionStatements struct { insertRedactionStmt *sql.Stmt selectRedactionInfoByRedactionEventIDStmt *sql.Stmt selectRedactionInfoByEventBeingRedactedStmt *sql.Stmt markRedactionValidatedStmt *sql.Stmt + purgeRedactionStmt *sql.Stmt } func CreateRedactionsTable(db *sql.DB) error { @@ -72,6 +80,7 @@ func PrepareRedactionsTable(db *sql.DB) (tables.Redactions, error) { {&s.selectRedactionInfoByRedactionEventIDStmt, selectRedactionInfoByRedactionEventIDSQL}, {&s.selectRedactionInfoByEventBeingRedactedStmt, selectRedactionInfoByEventBeingRedactedSQL}, {&s.markRedactionValidatedStmt, markRedactionValidatedSQL}, + {&s.purgeRedactionStmt, purgeRedactionsSQL}, }.Prepare(db) } @@ -120,3 +129,10 @@ func (s *redactionStatements) MarkRedactionValidated( _, err := stmt.ExecContext(ctx, redactionEventID, validated) return err } + +func (s *redactionStatements) PurgeRedactions( + ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, +) error { + _, err := sqlutil.TxStmt(txn, s.purgeRedactionStmt).ExecContext(ctx, roomNID) + return err +} diff --git a/roomserver/storage/shared/storage.go b/roomserver/storage/shared/storage.go index 6accaf1d8..b94d4b80a 100644 --- a/roomserver/storage/shared/storage.go +++ b/roomserver/storage/shared/storage.go @@ -1373,18 +1373,23 @@ func (d *Database) PurgeRoom(ctx context.Context, roomID string) error { return fmt.Errorf("failed to purge memberships: %w", err) } if err := d.RoomAliasesTable.PurgeRoomAliases(ctx, txn, roomID); err != nil { - return fmt.Errorf("failed to purge memberships: %w", err) + return fmt.Errorf("failed to purge room aliases: %w", err) } if err := d.PublishedTable.PurgePublished(ctx, txn, roomID); err != nil { - return fmt.Errorf("failed to purge memberships: %w", err) + return fmt.Errorf("failed to purge published: %w", err) + } + if err := d.PrevEventsTable.PurgePreviousEvents(ctx, txn, roomNID); err != nil { + return fmt.Errorf("failed to purge previous events: %w", err) + } + if err := d.EventJSONTable.PurgeEventJSONs(ctx, txn, roomNID); err != nil { + return fmt.Errorf("failed to purge event JSONs: %w", err) + } + if err := d.RedactionsTable.PurgeRedactions(ctx, txn, roomNID); err != nil { + return fmt.Errorf("failed to purge redactions: %w", err) + } + if err := d.EventsTable.PurgeEvents(ctx, txn, roomNID); err != nil { + return fmt.Errorf("failed to purge events: %w", err) } - - // List: - // * events table - // * previous events table - // * event JSONs table - // * redactions table - return nil }) } diff --git a/roomserver/storage/tables/interface.go b/roomserver/storage/tables/interface.go index 71eea45d1..2712afb3b 100644 --- a/roomserver/storage/tables/interface.go +++ b/roomserver/storage/tables/interface.go @@ -22,6 +22,7 @@ type EventJSON interface { // Insert the event JSON. On conflict, replace the event JSON with the new value (for redactions). InsertEventJSON(ctx context.Context, tx *sql.Tx, eventNID types.EventNID, eventJSON []byte) error BulkSelectEventJSON(ctx context.Context, tx *sql.Tx, eventNIDs []types.EventNID) ([]EventJSONPair, error) + PurgeEventJSONs(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID) error } type EventTypes interface { @@ -67,6 +68,7 @@ type Events interface { SelectMaxEventDepth(ctx context.Context, txn *sql.Tx, eventNIDs []types.EventNID) (int64, error) SelectRoomNIDsForEventNIDs(ctx context.Context, txn *sql.Tx, eventNIDs []types.EventNID) (roomNIDs map[types.EventNID]types.RoomNID, err error) SelectEventRejected(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, eventID string) (rejected bool, err error) + PurgeEvents(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID) error } type Rooms interface { @@ -113,6 +115,7 @@ type PreviousEvents interface { // Check if the event reference exists // Returns sql.ErrNoRows if the event reference doesn't exist. SelectPreviousEventExists(ctx context.Context, txn *sql.Tx, eventID string, eventReferenceSHA256 []byte) error + PurgePreviousEvents(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID) error } type Invites interface { @@ -175,6 +178,7 @@ type Redactions interface { // Mark this redaction event as having been validated. This means we have both sides of the redaction and have // successfully redacted the event JSON. MarkRedactionValidated(ctx context.Context, txn *sql.Tx, redactionEventID string, validated bool) error + PurgeRedactions(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID) error } // StrippedEvent represents a stripped event for returning extracted content values.