Purge the federation API
This commit is contained in:
parent
cd81635cf4
commit
b452d774e4
|
@ -21,6 +21,7 @@ import (
|
|||
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/sirupsen/logrus"
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/matrix-org/dendrite/federationapi/queue"
|
||||
|
@ -109,6 +110,14 @@ func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msg *nats.Msg)
|
|||
return false
|
||||
}
|
||||
|
||||
case api.OutputTypePurgeRoom:
|
||||
log.WithField("room_id", output.PurgeRoom.RoomID).Warn("Purging room from federation API")
|
||||
if err := s.db.PurgeRoom(ctx, output.PurgeRoom.RoomID); err != nil {
|
||||
logrus.WithField("room_id", output.PurgeRoom.RoomID).WithError(err).Error("Failed to purge room from federation API")
|
||||
} else {
|
||||
logrus.WithField("room_id", output.PurgeRoom.RoomID).Warn("Room purged from federation API")
|
||||
}
|
||||
|
||||
default:
|
||||
log.WithField("type", output.Type).Debug(
|
||||
"roomserver output log: ignoring unknown output type",
|
||||
|
|
|
@ -73,4 +73,6 @@ type Database interface {
|
|||
GetNotaryKeys(ctx context.Context, serverName gomatrixserverlib.ServerName, optKeyIDs []gomatrixserverlib.KeyID) ([]gomatrixserverlib.ServerKeys, error)
|
||||
// DeleteExpiredEDUs cleans up expired EDUs
|
||||
DeleteExpiredEDUs(ctx context.Context) error
|
||||
|
||||
PurgeRoom(ctx context.Context, roomID string) error
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@ package postgres
|
|||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
|
||||
"github.com/lib/pq"
|
||||
"github.com/matrix-org/dendrite/federationapi/types"
|
||||
|
@ -66,6 +67,9 @@ const selectAllJoinedHostsSQL = "" +
|
|||
const selectJoinedHostsForRoomsSQL = "" +
|
||||
"SELECT DISTINCT server_name FROM federationsender_joined_hosts WHERE room_id = ANY($1)"
|
||||
|
||||
const purgeJoinedHostsSQL = "" +
|
||||
"DELETE FROM federationsender_joined_hosts WHERE room_id = $1"
|
||||
|
||||
type joinedHostsStatements struct {
|
||||
db *sql.DB
|
||||
insertJoinedHostsStmt *sql.Stmt
|
||||
|
@ -74,6 +78,7 @@ type joinedHostsStatements struct {
|
|||
selectJoinedHostsStmt *sql.Stmt
|
||||
selectAllJoinedHostsStmt *sql.Stmt
|
||||
selectJoinedHostsForRoomsStmt *sql.Stmt
|
||||
purgeJoinedHostsStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func NewPostgresJoinedHostsTable(db *sql.DB) (s *joinedHostsStatements, err error) {
|
||||
|
@ -102,6 +107,9 @@ func NewPostgresJoinedHostsTable(db *sql.DB) (s *joinedHostsStatements, err erro
|
|||
if s.selectJoinedHostsForRoomsStmt, err = s.db.Prepare(selectJoinedHostsForRoomsSQL); err != nil {
|
||||
return
|
||||
}
|
||||
if s.purgeJoinedHostsStmt, err = s.db.Prepare(purgeJoinedHostsSQL); err != nil {
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -210,3 +218,9 @@ func joinedHostsFromStmt(
|
|||
|
||||
return result, rows.Err()
|
||||
}
|
||||
|
||||
func (s *joinedHostsStatements) PurgeJoinedHosts(
|
||||
ctx context.Context, txn *sql.Tx, roomID string,
|
||||
) error {
|
||||
return fmt.Errorf("not implemented on SQLite")
|
||||
}
|
||||
|
|
|
@ -253,3 +253,18 @@ func (d *Database) GetNotaryKeys(
|
|||
})
|
||||
return sks, err
|
||||
}
|
||||
|
||||
func (d *Database) PurgeRoom(ctx context.Context, roomID string) error {
|
||||
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||
if err := d.FederationJoinedHosts.PurgeJoinedHosts(ctx, txn, roomID); err != nil {
|
||||
return fmt.Errorf("failed to purge joined hosts: %w", err)
|
||||
}
|
||||
if err := d.FederationInboundPeeks.DeleteInboundPeeks(ctx, txn, roomID); err != nil {
|
||||
return fmt.Errorf("failed to purge inbound peeks: %w", err)
|
||||
}
|
||||
if err := d.FederationOutboundPeeks.DeleteOutboundPeeks(ctx, txn, roomID); err != nil {
|
||||
return fmt.Errorf("failed to purge outbound peeks: %w", err)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@ package sqlite3
|
|||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/matrix-org/dendrite/federationapi/types"
|
||||
|
@ -217,3 +218,9 @@ func joinedHostsFromStmt(
|
|||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (s *joinedHostsStatements) PurgeJoinedHosts(
|
||||
ctx context.Context, txn *sql.Tx, roomID string,
|
||||
) error {
|
||||
return fmt.Errorf("not implemented on SQLite")
|
||||
}
|
||||
|
|
|
@ -59,6 +59,7 @@ type FederationJoinedHosts interface {
|
|||
SelectJoinedHosts(ctx context.Context, roomID string) ([]types.JoinedHost, error)
|
||||
SelectAllJoinedHosts(ctx context.Context) ([]gomatrixserverlib.ServerName, error)
|
||||
SelectJoinedHostsForRooms(ctx context.Context, roomIDs []string) ([]gomatrixserverlib.ServerName, error)
|
||||
PurgeJoinedHosts(ctx context.Context, txn *sql.Tx, roomID string) error
|
||||
}
|
||||
|
||||
type FederationBlacklist interface {
|
||||
|
|
Loading…
Reference in a new issue