More PostgreSQL purging
This commit is contained in:
parent
537f300edf
commit
d11c243599
|
@ -54,9 +54,16 @@ const bulkSelectEventJSONSQL = "" +
|
|||
" WHERE event_nid = ANY($1)" +
|
||||
" ORDER BY event_nid ASC"
|
||||
|
||||
const purgeEventJSONSQL = `
|
||||
DELETE FROM roomserver_event_json WHERE event_nid = ANY(
|
||||
SELECT event_nid FROM roomserver_events WHERE room_nid = $1
|
||||
)
|
||||
`
|
||||
|
||||
type eventJSONStatements struct {
|
||||
insertEventJSONStmt *sql.Stmt
|
||||
bulkSelectEventJSONStmt *sql.Stmt
|
||||
purgeEventJSONStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func CreateEventJSONTable(db *sql.DB) error {
|
||||
|
@ -70,6 +77,7 @@ func PrepareEventJSONTable(db *sql.DB) (tables.EventJSON, error) {
|
|||
return s, sqlutil.StatementList{
|
||||
{&s.insertEventJSONStmt, insertEventJSONSQL},
|
||||
{&s.bulkSelectEventJSONStmt, bulkSelectEventJSONSQL},
|
||||
{&s.purgeEventJSONStmt, purgeEventJSONSQL},
|
||||
}.Prepare(db)
|
||||
}
|
||||
|
||||
|
@ -107,3 +115,10 @@ func (s *eventJSONStatements) BulkSelectEventJSON(
|
|||
}
|
||||
return results[:i], rows.Err()
|
||||
}
|
||||
|
||||
func (s *eventJSONStatements) PurgeEventJSONs(
|
||||
ctx context.Context, txn *sql.Tx, roomNID types.RoomNID,
|
||||
) error {
|
||||
_, err := sqlutil.TxStmt(txn, s.purgeEventJSONStmt).ExecContext(ctx, roomNID)
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -147,6 +147,9 @@ const selectRoomNIDsForEventNIDsSQL = "" +
|
|||
const selectEventRejectedSQL = "" +
|
||||
"SELECT is_rejected FROM roomserver_events WHERE room_nid = $1 AND event_id = $2"
|
||||
|
||||
const purgeEventsSQL = "" +
|
||||
"DELETE FROM roomserver_events WHERE room_nid = $1"
|
||||
|
||||
type eventStatements struct {
|
||||
insertEventStmt *sql.Stmt
|
||||
selectEventStmt *sql.Stmt
|
||||
|
@ -166,6 +169,7 @@ type eventStatements struct {
|
|||
selectMaxEventDepthStmt *sql.Stmt
|
||||
selectRoomNIDsForEventNIDsStmt *sql.Stmt
|
||||
selectEventRejectedStmt *sql.Stmt
|
||||
purgeEventsStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func CreateEventsTable(db *sql.DB) error {
|
||||
|
@ -195,6 +199,7 @@ func PrepareEventsTable(db *sql.DB) (tables.Events, error) {
|
|||
{&s.selectMaxEventDepthStmt, selectMaxEventDepthSQL},
|
||||
{&s.selectRoomNIDsForEventNIDsStmt, selectRoomNIDsForEventNIDsSQL},
|
||||
{&s.selectEventRejectedStmt, selectEventRejectedSQL},
|
||||
{&s.purgeEventsStmt, purgeEventsSQL},
|
||||
}.Prepare(db)
|
||||
}
|
||||
|
||||
|
@ -571,3 +576,10 @@ func (s *eventStatements) SelectEventRejected(
|
|||
err = stmt.QueryRowContext(ctx, roomNID, eventID).Scan(&rejected)
|
||||
return
|
||||
}
|
||||
|
||||
func (s *eventStatements) PurgeEvents(
|
||||
ctx context.Context, txn *sql.Tx, roomNID types.RoomNID,
|
||||
) error {
|
||||
_, err := sqlutil.TxStmt(txn, s.purgeEventsStmt).ExecContext(ctx, roomNID)
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -59,9 +59,16 @@ const selectPreviousEventExistsSQL = "" +
|
|||
"SELECT 1 FROM roomserver_previous_events" +
|
||||
" WHERE previous_event_id = $1 AND previous_reference_sha256 = $2"
|
||||
|
||||
const purgePreviousEventsSQL = `
|
||||
DELETE FROM roomserver_previous_events WHERE event_nids && ANY(
|
||||
SELECT ARRAY_AGG(event_nid) FROM roomserver_events WHERE room_nid = 42
|
||||
)
|
||||
`
|
||||
|
||||
type previousEventStatements struct {
|
||||
insertPreviousEventStmt *sql.Stmt
|
||||
selectPreviousEventExistsStmt *sql.Stmt
|
||||
purgePreviousEventsStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func CreatePrevEventsTable(db *sql.DB) error {
|
||||
|
@ -75,6 +82,7 @@ func PreparePrevEventsTable(db *sql.DB) (tables.PreviousEvents, error) {
|
|||
return s, sqlutil.StatementList{
|
||||
{&s.insertPreviousEventStmt, insertPreviousEventSQL},
|
||||
{&s.selectPreviousEventExistsStmt, selectPreviousEventExistsSQL},
|
||||
{&s.purgePreviousEventsStmt, purgePreviousEventsSQL},
|
||||
}.Prepare(db)
|
||||
}
|
||||
|
||||
|
@ -101,3 +109,10 @@ func (s *previousEventStatements) SelectPreviousEventExists(
|
|||
stmt := sqlutil.TxStmt(txn, s.selectPreviousEventExistsStmt)
|
||||
return stmt.QueryRowContext(ctx, eventID, eventReferenceSHA256).Scan(&ok)
|
||||
}
|
||||
|
||||
func (s *previousEventStatements) PurgePreviousEvents(
|
||||
ctx context.Context, txn *sql.Tx, roomNID types.RoomNID,
|
||||
) error {
|
||||
_, err := sqlutil.TxStmt(txn, s.purgePreviousEventsStmt).ExecContext(ctx, roomNID)
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@ import (
|
|||
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
"github.com/matrix-org/dendrite/roomserver/storage/tables"
|
||||
"github.com/matrix-org/dendrite/roomserver/types"
|
||||
)
|
||||
|
||||
const redactionsSchema = `
|
||||
|
@ -52,11 +53,18 @@ const selectRedactionInfoByEventBeingRedactedSQL = "" +
|
|||
const markRedactionValidatedSQL = "" +
|
||||
" UPDATE roomserver_redactions SET validated = $2 WHERE redaction_event_id = $1"
|
||||
|
||||
const purgeRedactionsSQL = `
|
||||
DELETE FROM roomserver_redactions WHERE redaction_event_id = ANY(
|
||||
SELECT event_id FROM roomserver_events WHERE room_nid = $1
|
||||
)
|
||||
`
|
||||
|
||||
type redactionStatements struct {
|
||||
insertRedactionStmt *sql.Stmt
|
||||
selectRedactionInfoByRedactionEventIDStmt *sql.Stmt
|
||||
selectRedactionInfoByEventBeingRedactedStmt *sql.Stmt
|
||||
markRedactionValidatedStmt *sql.Stmt
|
||||
purgeRedactionStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func CreateRedactionsTable(db *sql.DB) error {
|
||||
|
@ -72,6 +80,7 @@ func PrepareRedactionsTable(db *sql.DB) (tables.Redactions, error) {
|
|||
{&s.selectRedactionInfoByRedactionEventIDStmt, selectRedactionInfoByRedactionEventIDSQL},
|
||||
{&s.selectRedactionInfoByEventBeingRedactedStmt, selectRedactionInfoByEventBeingRedactedSQL},
|
||||
{&s.markRedactionValidatedStmt, markRedactionValidatedSQL},
|
||||
{&s.purgeRedactionStmt, purgeRedactionsSQL},
|
||||
}.Prepare(db)
|
||||
}
|
||||
|
||||
|
@ -120,3 +129,10 @@ func (s *redactionStatements) MarkRedactionValidated(
|
|||
_, err := stmt.ExecContext(ctx, redactionEventID, validated)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *redactionStatements) PurgeRedactions(
|
||||
ctx context.Context, txn *sql.Tx, roomNID types.RoomNID,
|
||||
) error {
|
||||
_, err := sqlutil.TxStmt(txn, s.purgeRedactionStmt).ExecContext(ctx, roomNID)
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -1373,18 +1373,23 @@ func (d *Database) PurgeRoom(ctx context.Context, roomID string) error {
|
|||
return fmt.Errorf("failed to purge memberships: %w", err)
|
||||
}
|
||||
if err := d.RoomAliasesTable.PurgeRoomAliases(ctx, txn, roomID); err != nil {
|
||||
return fmt.Errorf("failed to purge memberships: %w", err)
|
||||
return fmt.Errorf("failed to purge room aliases: %w", err)
|
||||
}
|
||||
if err := d.PublishedTable.PurgePublished(ctx, txn, roomID); err != nil {
|
||||
return fmt.Errorf("failed to purge memberships: %w", err)
|
||||
return fmt.Errorf("failed to purge published: %w", err)
|
||||
}
|
||||
if err := d.PrevEventsTable.PurgePreviousEvents(ctx, txn, roomNID); err != nil {
|
||||
return fmt.Errorf("failed to purge previous events: %w", err)
|
||||
}
|
||||
if err := d.EventJSONTable.PurgeEventJSONs(ctx, txn, roomNID); err != nil {
|
||||
return fmt.Errorf("failed to purge event JSONs: %w", err)
|
||||
}
|
||||
if err := d.RedactionsTable.PurgeRedactions(ctx, txn, roomNID); err != nil {
|
||||
return fmt.Errorf("failed to purge redactions: %w", err)
|
||||
}
|
||||
if err := d.EventsTable.PurgeEvents(ctx, txn, roomNID); err != nil {
|
||||
return fmt.Errorf("failed to purge events: %w", err)
|
||||
}
|
||||
|
||||
// List:
|
||||
// * events table
|
||||
// * previous events table
|
||||
// * event JSONs table
|
||||
// * redactions table
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@ type EventJSON interface {
|
|||
// Insert the event JSON. On conflict, replace the event JSON with the new value (for redactions).
|
||||
InsertEventJSON(ctx context.Context, tx *sql.Tx, eventNID types.EventNID, eventJSON []byte) error
|
||||
BulkSelectEventJSON(ctx context.Context, tx *sql.Tx, eventNIDs []types.EventNID) ([]EventJSONPair, error)
|
||||
PurgeEventJSONs(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID) error
|
||||
}
|
||||
|
||||
type EventTypes interface {
|
||||
|
@ -67,6 +68,7 @@ type Events interface {
|
|||
SelectMaxEventDepth(ctx context.Context, txn *sql.Tx, eventNIDs []types.EventNID) (int64, error)
|
||||
SelectRoomNIDsForEventNIDs(ctx context.Context, txn *sql.Tx, eventNIDs []types.EventNID) (roomNIDs map[types.EventNID]types.RoomNID, err error)
|
||||
SelectEventRejected(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, eventID string) (rejected bool, err error)
|
||||
PurgeEvents(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID) error
|
||||
}
|
||||
|
||||
type Rooms interface {
|
||||
|
@ -113,6 +115,7 @@ type PreviousEvents interface {
|
|||
// Check if the event reference exists
|
||||
// Returns sql.ErrNoRows if the event reference doesn't exist.
|
||||
SelectPreviousEventExists(ctx context.Context, txn *sql.Tx, eventID string, eventReferenceSHA256 []byte) error
|
||||
PurgePreviousEvents(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID) error
|
||||
}
|
||||
|
||||
type Invites interface {
|
||||
|
@ -175,6 +178,7 @@ type Redactions interface {
|
|||
// Mark this redaction event as having been validated. This means we have both sides of the redaction and have
|
||||
// successfully redacted the event JSON.
|
||||
MarkRedactionValidated(ctx context.Context, txn *sql.Tx, redactionEventID string, validated bool) error
|
||||
PurgeRedactions(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID) error
|
||||
}
|
||||
|
||||
// StrippedEvent represents a stripped event for returning extracted content values.
|
||||
|
|
Loading…
Reference in a new issue