From b452d774e48cf322573513924ed1be006a0ca956 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 22 Aug 2022 13:13:58 +0100 Subject: [PATCH] Purge the federation API --- federationapi/consumers/roomserver.go | 9 +++++++++ federationapi/storage/interface.go | 2 ++ .../storage/postgres/joined_hosts_table.go | 14 ++++++++++++++ federationapi/storage/shared/storage.go | 15 +++++++++++++++ .../storage/sqlite3/joined_hosts_table.go | 7 +++++++ federationapi/storage/tables/interface.go | 1 + 6 files changed, 48 insertions(+) diff --git a/federationapi/consumers/roomserver.go b/federationapi/consumers/roomserver.go index 2622ecb3f..047444313 100644 --- a/federationapi/consumers/roomserver.go +++ b/federationapi/consumers/roomserver.go @@ -21,6 +21,7 @@ import ( "github.com/matrix-org/gomatrixserverlib" "github.com/nats-io/nats.go" + "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus" "github.com/matrix-org/dendrite/federationapi/queue" @@ -109,6 +110,14 @@ func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) return false } + case api.OutputTypePurgeRoom: + log.WithField("room_id", output.PurgeRoom.RoomID).Warn("Purging room from federation API") + if err := s.db.PurgeRoom(ctx, output.PurgeRoom.RoomID); err != nil { + logrus.WithField("room_id", output.PurgeRoom.RoomID).WithError(err).Error("Failed to purge room from federation API") + } else { + logrus.WithField("room_id", output.PurgeRoom.RoomID).Warn("Room purged from federation API") + } + default: log.WithField("type", output.Type).Debug( "roomserver output log: ignoring unknown output type", diff --git a/federationapi/storage/interface.go b/federationapi/storage/interface.go index b8109b432..8c28031e4 100644 --- a/federationapi/storage/interface.go +++ b/federationapi/storage/interface.go @@ -73,4 +73,6 @@ type Database interface { GetNotaryKeys(ctx context.Context, serverName gomatrixserverlib.ServerName, optKeyIDs []gomatrixserverlib.KeyID) ([]gomatrixserverlib.ServerKeys, error) // DeleteExpiredEDUs cleans up expired EDUs DeleteExpiredEDUs(ctx context.Context) error + + PurgeRoom(ctx context.Context, roomID string) error } diff --git a/federationapi/storage/postgres/joined_hosts_table.go b/federationapi/storage/postgres/joined_hosts_table.go index 5c95b72a8..b3560b6ef 100644 --- a/federationapi/storage/postgres/joined_hosts_table.go +++ b/federationapi/storage/postgres/joined_hosts_table.go @@ -18,6 +18,7 @@ package postgres import ( "context" "database/sql" + "fmt" "github.com/lib/pq" "github.com/matrix-org/dendrite/federationapi/types" @@ -66,6 +67,9 @@ const selectAllJoinedHostsSQL = "" + const selectJoinedHostsForRoomsSQL = "" + "SELECT DISTINCT server_name FROM federationsender_joined_hosts WHERE room_id = ANY($1)" +const purgeJoinedHostsSQL = "" + + "DELETE FROM federationsender_joined_hosts WHERE room_id = $1" + type joinedHostsStatements struct { db *sql.DB insertJoinedHostsStmt *sql.Stmt @@ -74,6 +78,7 @@ type joinedHostsStatements struct { selectJoinedHostsStmt *sql.Stmt selectAllJoinedHostsStmt *sql.Stmt selectJoinedHostsForRoomsStmt *sql.Stmt + purgeJoinedHostsStmt *sql.Stmt } func NewPostgresJoinedHostsTable(db *sql.DB) (s *joinedHostsStatements, err error) { @@ -102,6 +107,9 @@ func NewPostgresJoinedHostsTable(db *sql.DB) (s *joinedHostsStatements, err erro if s.selectJoinedHostsForRoomsStmt, err = s.db.Prepare(selectJoinedHostsForRoomsSQL); err != nil { return } + if s.purgeJoinedHostsStmt, err = s.db.Prepare(purgeJoinedHostsSQL); err != nil { + return + } return } @@ -210,3 +218,9 @@ func joinedHostsFromStmt( return result, rows.Err() } + +func (s *joinedHostsStatements) PurgeJoinedHosts( + ctx context.Context, txn *sql.Tx, roomID string, +) error { + return fmt.Errorf("not implemented on SQLite") +} diff --git a/federationapi/storage/shared/storage.go b/federationapi/storage/shared/storage.go index a00d782f1..42b843091 100644 --- a/federationapi/storage/shared/storage.go +++ b/federationapi/storage/shared/storage.go @@ -253,3 +253,18 @@ func (d *Database) GetNotaryKeys( }) return sks, err } + +func (d *Database) PurgeRoom(ctx context.Context, roomID string) error { + return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { + if err := d.FederationJoinedHosts.PurgeJoinedHosts(ctx, txn, roomID); err != nil { + return fmt.Errorf("failed to purge joined hosts: %w", err) + } + if err := d.FederationInboundPeeks.DeleteInboundPeeks(ctx, txn, roomID); err != nil { + return fmt.Errorf("failed to purge inbound peeks: %w", err) + } + if err := d.FederationOutboundPeeks.DeleteOutboundPeeks(ctx, txn, roomID); err != nil { + return fmt.Errorf("failed to purge outbound peeks: %w", err) + } + return nil + }) +} diff --git a/federationapi/storage/sqlite3/joined_hosts_table.go b/federationapi/storage/sqlite3/joined_hosts_table.go index e0e0f2873..c056bc7ab 100644 --- a/federationapi/storage/sqlite3/joined_hosts_table.go +++ b/federationapi/storage/sqlite3/joined_hosts_table.go @@ -18,6 +18,7 @@ package sqlite3 import ( "context" "database/sql" + "fmt" "strings" "github.com/matrix-org/dendrite/federationapi/types" @@ -217,3 +218,9 @@ func joinedHostsFromStmt( return result, nil } + +func (s *joinedHostsStatements) PurgeJoinedHosts( + ctx context.Context, txn *sql.Tx, roomID string, +) error { + return fmt.Errorf("not implemented on SQLite") +} diff --git a/federationapi/storage/tables/interface.go b/federationapi/storage/tables/interface.go index 3c116a1d0..e7aa19e7c 100644 --- a/federationapi/storage/tables/interface.go +++ b/federationapi/storage/tables/interface.go @@ -59,6 +59,7 @@ type FederationJoinedHosts interface { SelectJoinedHosts(ctx context.Context, roomID string) ([]types.JoinedHost, error) SelectAllJoinedHosts(ctx context.Context) ([]gomatrixserverlib.ServerName, error) SelectJoinedHostsForRooms(ctx context.Context, roomIDs []string) ([]gomatrixserverlib.ServerName, error) + PurgeJoinedHosts(ctx context.Context, txn *sql.Tx, roomID string) error } type FederationBlacklist interface {