Compare commits
47 commits
main
...
neilalexan
Author | SHA1 | Date | |
---|---|---|---|
3b24f1908c | |||
1055206194 | |||
181cc34af8 | |||
1464403215 | |||
ec49e0919c | |||
e87f576303 | |||
07df8fdd42 | |||
19e0ce855d | |||
d4cdab0a44 | |||
1728a95239 | |||
1ca58ec9d1 | |||
7928138034 | |||
c279229f6f | |||
6df7b43378 | |||
31b71df1cd | |||
47be521dd9 | |||
03e72e9832 | |||
049ebda94c | |||
9c5df79799 | |||
4c3810a76b | |||
9a4ff4f128 | |||
e1eccbbfb7 | |||
08a0dd5932 | |||
ef0dcf6bee | |||
1622df2499 | |||
11856a502e | |||
b7258dcc00 | |||
c1d6e18152 | |||
295ba38937 | |||
965f532bb0 | |||
e31469682d | |||
4e4fc400a2 | |||
468d4b5bbe | |||
b452d774e4 | |||
cd81635cf4 | |||
db9ca401b0 | |||
53e218914d | |||
c45bf118a8 | |||
4c4de0dfc6 | |||
b56011514b | |||
83c7984dba | |||
6442aea104 | |||
856fe9d62b | |||
c1dcd5b218 | |||
d11c243599 | |||
537f300edf | |||
4ef780c076 |
|
@ -1,6 +1,7 @@
|
||||||
package routing
|
package routing
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
|
||||||
|
@ -90,6 +91,37 @@ func AdminEvacuateUser(req *http.Request, cfg *config.ClientAPI, device *userapi
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func AdminPurgeRoom(req *http.Request, cfg *config.ClientAPI, device *userapi.Device, rsAPI roomserverAPI.ClientRoomserverAPI) util.JSONResponse {
|
||||||
|
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
|
||||||
|
if err != nil {
|
||||||
|
return util.ErrorResponse(err)
|
||||||
|
}
|
||||||
|
roomID, ok := vars["roomID"]
|
||||||
|
if !ok {
|
||||||
|
return util.JSONResponse{
|
||||||
|
Code: http.StatusBadRequest,
|
||||||
|
JSON: jsonerror.MissingArgument("Expecting room ID."),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
res := &roomserverAPI.PerformAdminPurgeRoomResponse{}
|
||||||
|
if err := rsAPI.PerformAdminPurgeRoom(
|
||||||
|
context.Background(),
|
||||||
|
&roomserverAPI.PerformAdminPurgeRoomRequest{
|
||||||
|
RoomID: roomID,
|
||||||
|
},
|
||||||
|
res,
|
||||||
|
); err != nil {
|
||||||
|
return util.ErrorResponse(err)
|
||||||
|
}
|
||||||
|
if err := res.Error; err != nil {
|
||||||
|
return err.JSONResponse()
|
||||||
|
}
|
||||||
|
return util.JSONResponse{
|
||||||
|
Code: 200,
|
||||||
|
JSON: res,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func AdminResetPassword(req *http.Request, cfg *config.ClientAPI, device *userapi.Device, userAPI userapi.ClientUserAPI) util.JSONResponse {
|
func AdminResetPassword(req *http.Request, cfg *config.ClientAPI, device *userapi.Device, userAPI userapi.ClientUserAPI) util.JSONResponse {
|
||||||
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
|
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -138,3 +170,43 @@ func AdminResetPassword(req *http.Request, cfg *config.ClientAPI, device *userap
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func AdminDownloadState(req *http.Request, cfg *config.ClientAPI, device *userapi.Device, rsAPI roomserverAPI.ClientRoomserverAPI) util.JSONResponse {
|
||||||
|
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
|
||||||
|
if err != nil {
|
||||||
|
return util.ErrorResponse(err)
|
||||||
|
}
|
||||||
|
roomID, ok := vars["roomID"]
|
||||||
|
if !ok {
|
||||||
|
return util.JSONResponse{
|
||||||
|
Code: http.StatusBadRequest,
|
||||||
|
JSON: jsonerror.MissingArgument("Expecting room ID."),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
serverName, ok := vars["serverName"]
|
||||||
|
if !ok {
|
||||||
|
return util.JSONResponse{
|
||||||
|
Code: http.StatusBadRequest,
|
||||||
|
JSON: jsonerror.MissingArgument("Expecting remote server name."),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
res := &roomserverAPI.PerformAdminDownloadStateResponse{}
|
||||||
|
if err := rsAPI.PerformAdminDownloadState(
|
||||||
|
req.Context(),
|
||||||
|
&roomserverAPI.PerformAdminDownloadStateRequest{
|
||||||
|
UserID: device.UserID,
|
||||||
|
RoomID: roomID,
|
||||||
|
ServerName: gomatrixserverlib.ServerName(serverName),
|
||||||
|
},
|
||||||
|
res,
|
||||||
|
); err != nil {
|
||||||
|
return jsonerror.InternalAPIError(req.Context(), err)
|
||||||
|
}
|
||||||
|
if err := res.Error; err != nil {
|
||||||
|
return err.JSONResponse()
|
||||||
|
}
|
||||||
|
return util.JSONResponse{
|
||||||
|
Code: 200,
|
||||||
|
JSON: map[string]interface{}{},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -155,12 +155,24 @@ func Setup(
|
||||||
}),
|
}),
|
||||||
).Methods(http.MethodGet, http.MethodOptions)
|
).Methods(http.MethodGet, http.MethodOptions)
|
||||||
|
|
||||||
|
dendriteAdminRouter.Handle("/admin/purgeRoom/{roomID}",
|
||||||
|
httputil.MakeAdminAPI("admin_purge_room", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
|
||||||
|
return AdminPurgeRoom(req, cfg, device, rsAPI)
|
||||||
|
}),
|
||||||
|
).Methods(http.MethodGet, http.MethodOptions)
|
||||||
|
|
||||||
dendriteAdminRouter.Handle("/admin/resetPassword/{localpart}",
|
dendriteAdminRouter.Handle("/admin/resetPassword/{localpart}",
|
||||||
httputil.MakeAdminAPI("admin_reset_password", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
|
httputil.MakeAdminAPI("admin_reset_password", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
|
||||||
return AdminResetPassword(req, cfg, device, userAPI)
|
return AdminResetPassword(req, cfg, device, userAPI)
|
||||||
}),
|
}),
|
||||||
).Methods(http.MethodPost, http.MethodOptions)
|
).Methods(http.MethodPost, http.MethodOptions)
|
||||||
|
|
||||||
|
dendriteAdminRouter.Handle("/admin/downloadState/{serverName}/{roomID}",
|
||||||
|
httputil.MakeAdminAPI("admin_download_state", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
|
||||||
|
return AdminDownloadState(req, cfg, device, rsAPI)
|
||||||
|
}),
|
||||||
|
).Methods(http.MethodGet, http.MethodOptions)
|
||||||
|
|
||||||
// server notifications
|
// server notifications
|
||||||
if cfg.Matrix.ServerNotices.Enabled {
|
if cfg.Matrix.ServerNotices.Enabled {
|
||||||
logrus.Info("Enabling server notices at /_synapse/admin/v1/send_server_notice")
|
logrus.Info("Enabling server notices at /_synapse/admin/v1/send_server_notice")
|
||||||
|
|
|
@ -21,6 +21,7 @@ import (
|
||||||
|
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/nats-io/nats.go"
|
"github.com/nats-io/nats.go"
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/federationapi/queue"
|
"github.com/matrix-org/dendrite/federationapi/queue"
|
||||||
|
@ -110,6 +111,14 @@ func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msgs []*nats.Ms
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
case api.OutputTypePurgeRoom:
|
||||||
|
log.WithField("room_id", output.PurgeRoom.RoomID).Warn("Purging room from federation API")
|
||||||
|
if err := s.db.PurgeRoom(ctx, output.PurgeRoom.RoomID); err != nil {
|
||||||
|
logrus.WithField("room_id", output.PurgeRoom.RoomID).WithError(err).Error("Failed to purge room from federation API")
|
||||||
|
} else {
|
||||||
|
logrus.WithField("room_id", output.PurgeRoom.RoomID).Warn("Room purged from federation API")
|
||||||
|
}
|
||||||
|
|
||||||
default:
|
default:
|
||||||
log.WithField("type", output.Type).Debug(
|
log.WithField("type", output.Type).Debug(
|
||||||
"roomserver output log: ignoring unknown output type",
|
"roomserver output log: ignoring unknown output type",
|
||||||
|
|
|
@ -73,4 +73,6 @@ type Database interface {
|
||||||
GetNotaryKeys(ctx context.Context, serverName gomatrixserverlib.ServerName, optKeyIDs []gomatrixserverlib.KeyID) ([]gomatrixserverlib.ServerKeys, error)
|
GetNotaryKeys(ctx context.Context, serverName gomatrixserverlib.ServerName, optKeyIDs []gomatrixserverlib.KeyID) ([]gomatrixserverlib.ServerKeys, error)
|
||||||
// DeleteExpiredEDUs cleans up expired EDUs
|
// DeleteExpiredEDUs cleans up expired EDUs
|
||||||
DeleteExpiredEDUs(ctx context.Context) error
|
DeleteExpiredEDUs(ctx context.Context) error
|
||||||
|
|
||||||
|
PurgeRoom(ctx context.Context, roomID string) error
|
||||||
}
|
}
|
||||||
|
|
|
@ -66,6 +66,9 @@ const selectAllJoinedHostsSQL = "" +
|
||||||
const selectJoinedHostsForRoomsSQL = "" +
|
const selectJoinedHostsForRoomsSQL = "" +
|
||||||
"SELECT DISTINCT server_name FROM federationsender_joined_hosts WHERE room_id = ANY($1)"
|
"SELECT DISTINCT server_name FROM federationsender_joined_hosts WHERE room_id = ANY($1)"
|
||||||
|
|
||||||
|
const purgeJoinedHostsSQL = "" +
|
||||||
|
"DELETE FROM federationsender_joined_hosts WHERE room_id = $1"
|
||||||
|
|
||||||
type joinedHostsStatements struct {
|
type joinedHostsStatements struct {
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
insertJoinedHostsStmt *sql.Stmt
|
insertJoinedHostsStmt *sql.Stmt
|
||||||
|
@ -74,6 +77,7 @@ type joinedHostsStatements struct {
|
||||||
selectJoinedHostsStmt *sql.Stmt
|
selectJoinedHostsStmt *sql.Stmt
|
||||||
selectAllJoinedHostsStmt *sql.Stmt
|
selectAllJoinedHostsStmt *sql.Stmt
|
||||||
selectJoinedHostsForRoomsStmt *sql.Stmt
|
selectJoinedHostsForRoomsStmt *sql.Stmt
|
||||||
|
purgeJoinedHostsStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewPostgresJoinedHostsTable(db *sql.DB) (s *joinedHostsStatements, err error) {
|
func NewPostgresJoinedHostsTable(db *sql.DB) (s *joinedHostsStatements, err error) {
|
||||||
|
@ -102,6 +106,9 @@ func NewPostgresJoinedHostsTable(db *sql.DB) (s *joinedHostsStatements, err erro
|
||||||
if s.selectJoinedHostsForRoomsStmt, err = s.db.Prepare(selectJoinedHostsForRoomsSQL); err != nil {
|
if s.selectJoinedHostsForRoomsStmt, err = s.db.Prepare(selectJoinedHostsForRoomsSQL); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
if s.purgeJoinedHostsStmt, err = s.db.Prepare(purgeJoinedHostsSQL); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -210,3 +217,10 @@ func joinedHostsFromStmt(
|
||||||
|
|
||||||
return result, rows.Err()
|
return result, rows.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *joinedHostsStatements) PurgeJoinedHosts(
|
||||||
|
ctx context.Context, txn *sql.Tx, roomID string,
|
||||||
|
) error {
|
||||||
|
_, err := sqlutil.TxStmt(txn, s.purgeJoinedHostsStmt).ExecContext(ctx, roomID)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
|
@ -253,3 +253,18 @@ func (d *Database) GetNotaryKeys(
|
||||||
})
|
})
|
||||||
return sks, err
|
return sks, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (d *Database) PurgeRoom(ctx context.Context, roomID string) error {
|
||||||
|
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||||
|
if err := d.FederationJoinedHosts.PurgeJoinedHosts(ctx, txn, roomID); err != nil {
|
||||||
|
return fmt.Errorf("failed to purge joined hosts: %w", err)
|
||||||
|
}
|
||||||
|
if err := d.FederationInboundPeeks.DeleteInboundPeeks(ctx, txn, roomID); err != nil {
|
||||||
|
return fmt.Errorf("failed to purge inbound peeks: %w", err)
|
||||||
|
}
|
||||||
|
if err := d.FederationOutboundPeeks.DeleteOutboundPeeks(ctx, txn, roomID); err != nil {
|
||||||
|
return fmt.Errorf("failed to purge outbound peeks: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
|
@ -18,6 +18,7 @@ package sqlite3
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/federationapi/types"
|
"github.com/matrix-org/dendrite/federationapi/types"
|
||||||
|
@ -217,3 +218,9 @@ func joinedHostsFromStmt(
|
||||||
|
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *joinedHostsStatements) PurgeJoinedHosts(
|
||||||
|
ctx context.Context, txn *sql.Tx, roomID string,
|
||||||
|
) error {
|
||||||
|
return fmt.Errorf("not implemented on SQLite")
|
||||||
|
}
|
||||||
|
|
|
@ -59,6 +59,7 @@ type FederationJoinedHosts interface {
|
||||||
SelectJoinedHosts(ctx context.Context, roomID string) ([]types.JoinedHost, error)
|
SelectJoinedHosts(ctx context.Context, roomID string) ([]types.JoinedHost, error)
|
||||||
SelectAllJoinedHosts(ctx context.Context) ([]gomatrixserverlib.ServerName, error)
|
SelectAllJoinedHosts(ctx context.Context) ([]gomatrixserverlib.ServerName, error)
|
||||||
SelectJoinedHostsForRooms(ctx context.Context, roomIDs []string) ([]gomatrixserverlib.ServerName, error)
|
SelectJoinedHostsForRooms(ctx context.Context, roomIDs []string) ([]gomatrixserverlib.ServerName, error)
|
||||||
|
PurgeJoinedHosts(ctx context.Context, txn *sql.Tx, roomID string) error
|
||||||
}
|
}
|
||||||
|
|
||||||
type FederationBlacklist interface {
|
type FederationBlacklist interface {
|
||||||
|
|
|
@ -150,6 +150,8 @@ type ClientRoomserverAPI interface {
|
||||||
PerformRoomUpgrade(ctx context.Context, req *PerformRoomUpgradeRequest, resp *PerformRoomUpgradeResponse) error
|
PerformRoomUpgrade(ctx context.Context, req *PerformRoomUpgradeRequest, resp *PerformRoomUpgradeResponse) error
|
||||||
PerformAdminEvacuateRoom(ctx context.Context, req *PerformAdminEvacuateRoomRequest, res *PerformAdminEvacuateRoomResponse) error
|
PerformAdminEvacuateRoom(ctx context.Context, req *PerformAdminEvacuateRoomRequest, res *PerformAdminEvacuateRoomResponse) error
|
||||||
PerformAdminEvacuateUser(ctx context.Context, req *PerformAdminEvacuateUserRequest, res *PerformAdminEvacuateUserResponse) error
|
PerformAdminEvacuateUser(ctx context.Context, req *PerformAdminEvacuateUserRequest, res *PerformAdminEvacuateUserResponse) error
|
||||||
|
PerformAdminPurgeRoom(ctx context.Context, req *PerformAdminPurgeRoomRequest, res *PerformAdminPurgeRoomResponse) error
|
||||||
|
PerformAdminDownloadState(ctx context.Context, req *PerformAdminDownloadStateRequest, res *PerformAdminDownloadStateResponse) error
|
||||||
PerformPeek(ctx context.Context, req *PerformPeekRequest, res *PerformPeekResponse) error
|
PerformPeek(ctx context.Context, req *PerformPeekRequest, res *PerformPeekResponse) error
|
||||||
PerformUnpeek(ctx context.Context, req *PerformUnpeekRequest, res *PerformUnpeekResponse) error
|
PerformUnpeek(ctx context.Context, req *PerformUnpeekRequest, res *PerformUnpeekResponse) error
|
||||||
PerformInvite(ctx context.Context, req *PerformInviteRequest, res *PerformInviteResponse) error
|
PerformInvite(ctx context.Context, req *PerformInviteRequest, res *PerformInviteResponse) error
|
||||||
|
|
|
@ -131,6 +131,26 @@ func (t *RoomserverInternalAPITrace) PerformAdminEvacuateUser(
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *RoomserverInternalAPITrace) PerformAdminPurgeRoom(
|
||||||
|
ctx context.Context,
|
||||||
|
req *PerformAdminPurgeRoomRequest,
|
||||||
|
res *PerformAdminPurgeRoomResponse,
|
||||||
|
) error {
|
||||||
|
err := t.Impl.PerformAdminPurgeRoom(ctx, req, res)
|
||||||
|
util.GetLogger(ctx).WithError(err).Infof("PerformAdminPurgeRoom req=%+v res=%+v", js(req), js(res))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *RoomserverInternalAPITrace) PerformAdminDownloadState(
|
||||||
|
ctx context.Context,
|
||||||
|
req *PerformAdminDownloadStateRequest,
|
||||||
|
res *PerformAdminDownloadStateResponse,
|
||||||
|
) error {
|
||||||
|
err := t.Impl.PerformAdminDownloadState(ctx, req, res)
|
||||||
|
util.GetLogger(ctx).WithError(err).Infof("PerformAdminDownloadState req=%+v res=%+v", js(req), js(res))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
func (t *RoomserverInternalAPITrace) PerformInboundPeek(
|
func (t *RoomserverInternalAPITrace) PerformInboundPeek(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
req *PerformInboundPeekRequest,
|
req *PerformInboundPeekRequest,
|
||||||
|
|
|
@ -55,6 +55,8 @@ const (
|
||||||
OutputTypeNewInboundPeek OutputType = "new_inbound_peek"
|
OutputTypeNewInboundPeek OutputType = "new_inbound_peek"
|
||||||
// OutputTypeRetirePeek indicates that the kafka event is an OutputRetirePeek
|
// OutputTypeRetirePeek indicates that the kafka event is an OutputRetirePeek
|
||||||
OutputTypeRetirePeek OutputType = "retire_peek"
|
OutputTypeRetirePeek OutputType = "retire_peek"
|
||||||
|
// OutputTypePurgeRoom indicates the event is an OutputPurgeRoom
|
||||||
|
OutputTypePurgeRoom OutputType = "purge_room"
|
||||||
)
|
)
|
||||||
|
|
||||||
// An OutputEvent is an entry in the roomserver output kafka log.
|
// An OutputEvent is an entry in the roomserver output kafka log.
|
||||||
|
@ -78,6 +80,8 @@ type OutputEvent struct {
|
||||||
NewInboundPeek *OutputNewInboundPeek `json:"new_inbound_peek,omitempty"`
|
NewInboundPeek *OutputNewInboundPeek `json:"new_inbound_peek,omitempty"`
|
||||||
// The content of event with type OutputTypeRetirePeek
|
// The content of event with type OutputTypeRetirePeek
|
||||||
RetirePeek *OutputRetirePeek `json:"retire_peek,omitempty"`
|
RetirePeek *OutputRetirePeek `json:"retire_peek,omitempty"`
|
||||||
|
// The content of the event with type OutputPurgeRoom
|
||||||
|
PurgeRoom *OutputPurgeRoom `json:"purge_room,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// Type of the OutputNewRoomEvent.
|
// Type of the OutputNewRoomEvent.
|
||||||
|
@ -257,3 +261,7 @@ type OutputRetirePeek struct {
|
||||||
UserID string
|
UserID string
|
||||||
DeviceID string
|
DeviceID string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type OutputPurgeRoom struct {
|
||||||
|
RoomID string
|
||||||
|
}
|
||||||
|
|
|
@ -234,3 +234,21 @@ type PerformAdminEvacuateUserResponse struct {
|
||||||
Affected []string `json:"affected"`
|
Affected []string `json:"affected"`
|
||||||
Error *PerformError
|
Error *PerformError
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type PerformAdminPurgeRoomRequest struct {
|
||||||
|
RoomID string `json:"room_id"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type PerformAdminPurgeRoomResponse struct {
|
||||||
|
Error *PerformError `json:"error,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type PerformAdminDownloadStateRequest struct {
|
||||||
|
RoomID string `json:"room_id"`
|
||||||
|
UserID string `json:"user_id"`
|
||||||
|
ServerName gomatrixserverlib.ServerName `json:"server_name"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type PerformAdminDownloadStateResponse struct {
|
||||||
|
Error *PerformError `json:"error,omitempty"`
|
||||||
|
}
|
||||||
|
|
|
@ -28,6 +28,7 @@ import (
|
||||||
"github.com/matrix-org/dendrite/roomserver/storage"
|
"github.com/matrix-org/dendrite/roomserver/storage"
|
||||||
"github.com/matrix-org/dendrite/setup/config"
|
"github.com/matrix-org/dendrite/setup/config"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Admin struct {
|
type Admin struct {
|
||||||
|
@ -231,3 +232,181 @@ func (r *Admin) PerformAdminEvacuateUser(
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *Admin) PerformAdminPurgeRoom(
|
||||||
|
ctx context.Context,
|
||||||
|
req *api.PerformAdminPurgeRoomRequest,
|
||||||
|
res *api.PerformAdminPurgeRoomResponse,
|
||||||
|
) error {
|
||||||
|
if _, _, err := gomatrixserverlib.SplitID('!', req.RoomID); err != nil {
|
||||||
|
res.Error = &api.PerformError{
|
||||||
|
Code: api.PerformErrorBadRequest,
|
||||||
|
Msg: fmt.Sprintf("Malformed room ID: %s", err),
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
logrus.WithField("room_id", req.RoomID).Warn("Purging room from roomserver")
|
||||||
|
if err := r.DB.PurgeRoom(ctx, req.RoomID); err != nil {
|
||||||
|
logrus.WithField("room_id", req.RoomID).WithError(err).Warn("Failed to purge room from roomserver")
|
||||||
|
res.Error = &api.PerformError{
|
||||||
|
Code: api.PerformErrorBadRequest,
|
||||||
|
Msg: err.Error(),
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
} else {
|
||||||
|
logrus.WithField("room_id", req.RoomID).Warn("Room purged from roomserver")
|
||||||
|
}
|
||||||
|
|
||||||
|
return r.Inputer.OutputProducer.ProduceRoomEvents(req.RoomID, []api.OutputEvent{
|
||||||
|
{
|
||||||
|
Type: api.OutputTypePurgeRoom,
|
||||||
|
PurgeRoom: &api.OutputPurgeRoom{
|
||||||
|
RoomID: req.RoomID,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Admin) PerformAdminDownloadState(
|
||||||
|
ctx context.Context,
|
||||||
|
req *api.PerformAdminDownloadStateRequest,
|
||||||
|
res *api.PerformAdminDownloadStateResponse,
|
||||||
|
) error {
|
||||||
|
roomInfo, err := r.DB.RoomInfo(ctx, req.RoomID)
|
||||||
|
if err != nil {
|
||||||
|
res.Error = &api.PerformError{
|
||||||
|
Code: api.PerformErrorBadRequest,
|
||||||
|
Msg: fmt.Sprintf("r.DB.RoomInfo: %s", err),
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if roomInfo == nil || roomInfo.IsStub() {
|
||||||
|
res.Error = &api.PerformError{
|
||||||
|
Code: api.PerformErrorBadRequest,
|
||||||
|
Msg: fmt.Sprintf("room %q not found", req.RoomID),
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
fwdExtremities, _, depth, err := r.DB.LatestEventIDs(ctx, roomInfo.RoomNID)
|
||||||
|
if err != nil {
|
||||||
|
res.Error = &api.PerformError{
|
||||||
|
Code: api.PerformErrorBadRequest,
|
||||||
|
Msg: fmt.Sprintf("r.DB.LatestEventIDs: %s", err),
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
authEventMap := map[string]*gomatrixserverlib.Event{}
|
||||||
|
stateEventMap := map[string]*gomatrixserverlib.Event{}
|
||||||
|
|
||||||
|
for _, fwdExtremity := range fwdExtremities {
|
||||||
|
var state gomatrixserverlib.RespState
|
||||||
|
state, err = r.Inputer.FSAPI.LookupState(ctx, req.ServerName, req.RoomID, fwdExtremity.EventID, roomInfo.RoomVersion)
|
||||||
|
if err != nil {
|
||||||
|
res.Error = &api.PerformError{
|
||||||
|
Code: api.PerformErrorBadRequest,
|
||||||
|
Msg: fmt.Sprintf("r.Inputer.FSAPI.LookupState (%q): %s", fwdExtremity.EventID, err),
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
for _, authEvent := range state.AuthEvents.UntrustedEvents(roomInfo.RoomVersion) {
|
||||||
|
if err = authEvent.VerifyEventSignatures(ctx, r.Inputer.KeyRing); err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
authEventMap[authEvent.EventID()] = authEvent
|
||||||
|
}
|
||||||
|
for _, stateEvent := range state.StateEvents.UntrustedEvents(roomInfo.RoomVersion) {
|
||||||
|
if err = stateEvent.VerifyEventSignatures(ctx, r.Inputer.KeyRing); err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
stateEventMap[stateEvent.EventID()] = stateEvent
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
authEvents := make([]*gomatrixserverlib.HeaderedEvent, 0, len(authEventMap))
|
||||||
|
stateEvents := make([]*gomatrixserverlib.HeaderedEvent, 0, len(stateEventMap))
|
||||||
|
stateIDs := make([]string, 0, len(stateEventMap))
|
||||||
|
|
||||||
|
for _, authEvent := range authEventMap {
|
||||||
|
authEvents = append(authEvents, authEvent.Headered(roomInfo.RoomVersion))
|
||||||
|
}
|
||||||
|
for _, stateEvent := range stateEventMap {
|
||||||
|
stateEvents = append(stateEvents, stateEvent.Headered(roomInfo.RoomVersion))
|
||||||
|
stateIDs = append(stateIDs, stateEvent.EventID())
|
||||||
|
}
|
||||||
|
|
||||||
|
builder := &gomatrixserverlib.EventBuilder{
|
||||||
|
Type: "org.matrix.dendrite.state_download",
|
||||||
|
Sender: req.UserID,
|
||||||
|
RoomID: req.RoomID,
|
||||||
|
Content: gomatrixserverlib.RawJSON("{}"),
|
||||||
|
}
|
||||||
|
|
||||||
|
eventsNeeded, err := gomatrixserverlib.StateNeededForEventBuilder(builder)
|
||||||
|
if err != nil {
|
||||||
|
res.Error = &api.PerformError{
|
||||||
|
Code: api.PerformErrorBadRequest,
|
||||||
|
Msg: fmt.Sprintf("gomatrixserverlib.StateNeededForEventBuilder: %s", err),
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
queryRes := &api.QueryLatestEventsAndStateResponse{
|
||||||
|
RoomExists: true,
|
||||||
|
RoomVersion: roomInfo.RoomVersion,
|
||||||
|
LatestEvents: fwdExtremities,
|
||||||
|
StateEvents: stateEvents,
|
||||||
|
Depth: depth,
|
||||||
|
}
|
||||||
|
|
||||||
|
ev, err := eventutil.BuildEvent(ctx, builder, r.Cfg.Matrix, time.Now(), &eventsNeeded, queryRes)
|
||||||
|
if err != nil {
|
||||||
|
res.Error = &api.PerformError{
|
||||||
|
Code: api.PerformErrorBadRequest,
|
||||||
|
Msg: fmt.Sprintf("eventutil.BuildEvent: %s", err),
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
inputReq := &api.InputRoomEventsRequest{
|
||||||
|
Asynchronous: false,
|
||||||
|
}
|
||||||
|
inputRes := &api.InputRoomEventsResponse{}
|
||||||
|
|
||||||
|
for _, authEvent := range append(authEvents, stateEvents...) {
|
||||||
|
inputReq.InputRoomEvents = append(inputReq.InputRoomEvents, api.InputRoomEvent{
|
||||||
|
Kind: api.KindOutlier,
|
||||||
|
Event: authEvent,
|
||||||
|
Origin: authEvent.Origin(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
inputReq.InputRoomEvents = append(inputReq.InputRoomEvents, api.InputRoomEvent{
|
||||||
|
Kind: api.KindNew,
|
||||||
|
Event: ev,
|
||||||
|
Origin: r.Cfg.Matrix.ServerName,
|
||||||
|
HasState: true,
|
||||||
|
StateEventIDs: stateIDs,
|
||||||
|
SendAsServer: string(r.Cfg.Matrix.ServerName),
|
||||||
|
})
|
||||||
|
|
||||||
|
if err := r.Inputer.InputRoomEvents(ctx, inputReq, inputRes); err != nil {
|
||||||
|
res.Error = &api.PerformError{
|
||||||
|
Code: api.PerformErrorBadRequest,
|
||||||
|
Msg: fmt.Sprintf("r.Inputer.InputRoomEvents: %s", err),
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if inputRes.ErrMsg != "" {
|
||||||
|
res.Error = &api.PerformError{
|
||||||
|
Code: api.PerformErrorBadRequest,
|
||||||
|
Msg: inputRes.ErrMsg,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
|
@ -27,18 +27,20 @@ const (
|
||||||
RoomserverInputRoomEventsPath = "/roomserver/inputRoomEvents"
|
RoomserverInputRoomEventsPath = "/roomserver/inputRoomEvents"
|
||||||
|
|
||||||
// Perform operations
|
// Perform operations
|
||||||
RoomserverPerformInvitePath = "/roomserver/performInvite"
|
RoomserverPerformInvitePath = "/roomserver/performInvite"
|
||||||
RoomserverPerformPeekPath = "/roomserver/performPeek"
|
RoomserverPerformPeekPath = "/roomserver/performPeek"
|
||||||
RoomserverPerformUnpeekPath = "/roomserver/performUnpeek"
|
RoomserverPerformUnpeekPath = "/roomserver/performUnpeek"
|
||||||
RoomserverPerformRoomUpgradePath = "/roomserver/performRoomUpgrade"
|
RoomserverPerformRoomUpgradePath = "/roomserver/performRoomUpgrade"
|
||||||
RoomserverPerformJoinPath = "/roomserver/performJoin"
|
RoomserverPerformJoinPath = "/roomserver/performJoin"
|
||||||
RoomserverPerformLeavePath = "/roomserver/performLeave"
|
RoomserverPerformLeavePath = "/roomserver/performLeave"
|
||||||
RoomserverPerformBackfillPath = "/roomserver/performBackfill"
|
RoomserverPerformBackfillPath = "/roomserver/performBackfill"
|
||||||
RoomserverPerformPublishPath = "/roomserver/performPublish"
|
RoomserverPerformPublishPath = "/roomserver/performPublish"
|
||||||
RoomserverPerformInboundPeekPath = "/roomserver/performInboundPeek"
|
RoomserverPerformInboundPeekPath = "/roomserver/performInboundPeek"
|
||||||
RoomserverPerformForgetPath = "/roomserver/performForget"
|
RoomserverPerformForgetPath = "/roomserver/performForget"
|
||||||
RoomserverPerformAdminEvacuateRoomPath = "/roomserver/performAdminEvacuateRoom"
|
RoomserverPerformAdminEvacuateRoomPath = "/roomserver/performAdminEvacuateRoom"
|
||||||
RoomserverPerformAdminEvacuateUserPath = "/roomserver/performAdminEvacuateUser"
|
RoomserverPerformAdminEvacuateUserPath = "/roomserver/performAdminEvacuateUser"
|
||||||
|
RoomserverPerformAdminPurgeRoomPath = "/roomserver/performAdminPurgeRoom"
|
||||||
|
RoomserverPerformAdminDownloadStatePath = "/roomserver/performAdminDownloadState"
|
||||||
|
|
||||||
// Query operations
|
// Query operations
|
||||||
RoomserverQueryLatestEventsAndStatePath = "/roomserver/queryLatestEventsAndState"
|
RoomserverQueryLatestEventsAndStatePath = "/roomserver/queryLatestEventsAndState"
|
||||||
|
@ -261,6 +263,17 @@ func (h *httpRoomserverInternalAPI) PerformAdminEvacuateRoom(
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (h *httpRoomserverInternalAPI) PerformAdminDownloadState(
|
||||||
|
ctx context.Context,
|
||||||
|
request *api.PerformAdminDownloadStateRequest,
|
||||||
|
response *api.PerformAdminDownloadStateResponse,
|
||||||
|
) error {
|
||||||
|
return httputil.CallInternalRPCAPI(
|
||||||
|
"PerformAdminDownloadState", h.roomserverURL+RoomserverPerformAdminDownloadStatePath,
|
||||||
|
h.httpClient, ctx, request, response,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
func (h *httpRoomserverInternalAPI) PerformAdminEvacuateUser(
|
func (h *httpRoomserverInternalAPI) PerformAdminEvacuateUser(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
request *api.PerformAdminEvacuateUserRequest,
|
request *api.PerformAdminEvacuateUserRequest,
|
||||||
|
@ -272,6 +285,17 @@ func (h *httpRoomserverInternalAPI) PerformAdminEvacuateUser(
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (h *httpRoomserverInternalAPI) PerformAdminPurgeRoom(
|
||||||
|
ctx context.Context,
|
||||||
|
request *api.PerformAdminPurgeRoomRequest,
|
||||||
|
response *api.PerformAdminPurgeRoomResponse,
|
||||||
|
) error {
|
||||||
|
return httputil.CallInternalRPCAPI(
|
||||||
|
"PerformAdminPurgeRoom", h.roomserverURL+RoomserverPerformAdminPurgeRoomPath,
|
||||||
|
h.httpClient, ctx, request, response,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
// QueryLatestEventsAndState implements RoomserverQueryAPI
|
// QueryLatestEventsAndState implements RoomserverQueryAPI
|
||||||
func (h *httpRoomserverInternalAPI) QueryLatestEventsAndState(
|
func (h *httpRoomserverInternalAPI) QueryLatestEventsAndState(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
|
|
|
@ -65,6 +65,16 @@ func AddRoutes(r api.RoomserverInternalAPI, internalAPIMux *mux.Router) {
|
||||||
httputil.MakeInternalRPCAPI("RoomserverPerformAdminEvacuateUser", r.PerformAdminEvacuateUser),
|
httputil.MakeInternalRPCAPI("RoomserverPerformAdminEvacuateUser", r.PerformAdminEvacuateUser),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
internalAPIMux.Handle(
|
||||||
|
RoomserverPerformAdminPurgeRoomPath,
|
||||||
|
httputil.MakeInternalRPCAPI("RoomserverPerformAdminPurgeRoom", r.PerformAdminPurgeRoom),
|
||||||
|
)
|
||||||
|
|
||||||
|
internalAPIMux.Handle(
|
||||||
|
RoomserverPerformAdminDownloadStatePath,
|
||||||
|
httputil.MakeInternalRPCAPI("RoomserverPerformAdminDownloadState", r.PerformAdminDownloadState),
|
||||||
|
)
|
||||||
|
|
||||||
internalAPIMux.Handle(
|
internalAPIMux.Handle(
|
||||||
RoomserverQueryPublishedRoomsPath,
|
RoomserverQueryPublishedRoomsPath,
|
||||||
httputil.MakeInternalRPCAPI("RoomserverQueryPublishedRooms", r.QueryPublishedRooms),
|
httputil.MakeInternalRPCAPI("RoomserverQueryPublishedRooms", r.QueryPublishedRooms),
|
||||||
|
|
|
@ -170,4 +170,5 @@ type Database interface {
|
||||||
ForgetRoom(ctx context.Context, userID, roomID string, forget bool) error
|
ForgetRoom(ctx context.Context, userID, roomID string, forget bool) error
|
||||||
|
|
||||||
GetHistoryVisibilityState(ctx context.Context, roomInfo *types.RoomInfo, eventID string, domain string) ([]*gomatrixserverlib.Event, error)
|
GetHistoryVisibilityState(ctx context.Context, roomInfo *types.RoomInfo, eventID string, domain string) ([]*gomatrixserverlib.Event, error)
|
||||||
|
PurgeRoom(ctx context.Context, roomID string) error
|
||||||
}
|
}
|
||||||
|
|
159
roomserver/storage/postgres/purge_statements.go
Normal file
159
roomserver/storage/postgres/purge_statements.go
Normal file
|
@ -0,0 +1,159 @@
|
||||||
|
package postgres
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
|
||||||
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
|
"github.com/matrix-org/dendrite/roomserver/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
const purgeEventJSONSQL = "" +
|
||||||
|
"DELETE FROM roomserver_event_json WHERE event_nid = ANY(" +
|
||||||
|
" SELECT event_nid FROM roomserver_events WHERE room_nid = $1" +
|
||||||
|
")"
|
||||||
|
|
||||||
|
const purgeEventsSQL = "" +
|
||||||
|
"DELETE FROM roomserver_events WHERE room_nid = $1"
|
||||||
|
|
||||||
|
const purgeInvitesSQL = "" +
|
||||||
|
"DELETE FROM roomserver_invites WHERE room_nid = $1"
|
||||||
|
|
||||||
|
const purgeMembershipsSQL = "" +
|
||||||
|
"DELETE FROM roomserver_membership WHERE room_nid = $1"
|
||||||
|
|
||||||
|
const purgePreviousEventsSQL = "" +
|
||||||
|
"DELETE FROM roomserver_previous_events WHERE event_nids && ANY(" +
|
||||||
|
" SELECT ARRAY_AGG(event_nid) FROM roomserver_events WHERE room_nid = $1" +
|
||||||
|
")"
|
||||||
|
|
||||||
|
const purgePublishedSQL = "" +
|
||||||
|
"DELETE FROM roomserver_published WHERE room_id = $1"
|
||||||
|
|
||||||
|
const purgeRedactionsSQL = "" +
|
||||||
|
"DELETE FROM roomserver_redactions WHERE redaction_event_id = ANY(" +
|
||||||
|
" SELECT event_id FROM roomserver_events WHERE room_nid = $1" +
|
||||||
|
")"
|
||||||
|
|
||||||
|
const purgeRoomAliasesSQL = "" +
|
||||||
|
"DELETE FROM roomserver_room_aliases WHERE room_id = $1"
|
||||||
|
|
||||||
|
const purgeRoomSQL = "" +
|
||||||
|
"DELETE FROM roomserver_rooms WHERE room_nid = $1"
|
||||||
|
|
||||||
|
const purgeStateBlockEntriesSQL = "" +
|
||||||
|
"DELETE FROM roomserver_state_block WHERE state_block_nid = ANY(" +
|
||||||
|
" SELECT DISTINCT UNNEST(state_block_nids) FROM roomserver_state_snapshots WHERE room_nid = $1" +
|
||||||
|
")"
|
||||||
|
|
||||||
|
const purgeStateSnapshotEntriesSQL = "" +
|
||||||
|
"DELETE FROM roomserver_state_snapshots WHERE room_nid = $1"
|
||||||
|
|
||||||
|
type purgeStatements struct {
|
||||||
|
purgeEventJSONStmt *sql.Stmt
|
||||||
|
purgeEventsStmt *sql.Stmt
|
||||||
|
purgeInvitesStmt *sql.Stmt
|
||||||
|
purgeMembershipsStmt *sql.Stmt
|
||||||
|
purgePreviousEventsStmt *sql.Stmt
|
||||||
|
purgePublishedStmt *sql.Stmt
|
||||||
|
purgeRedactionStmt *sql.Stmt
|
||||||
|
purgeRoomAliasesStmt *sql.Stmt
|
||||||
|
purgeRoomStmt *sql.Stmt
|
||||||
|
purgeStateBlockEntriesStmt *sql.Stmt
|
||||||
|
purgeStateSnapshotEntriesStmt *sql.Stmt
|
||||||
|
}
|
||||||
|
|
||||||
|
func PreparePurgeStatements(db *sql.DB) (*purgeStatements, error) {
|
||||||
|
s := &purgeStatements{}
|
||||||
|
|
||||||
|
return s, sqlutil.StatementList{
|
||||||
|
{&s.purgeEventJSONStmt, purgeEventJSONSQL},
|
||||||
|
{&s.purgeEventsStmt, purgeEventsSQL},
|
||||||
|
{&s.purgeInvitesStmt, purgeInvitesSQL},
|
||||||
|
{&s.purgeMembershipsStmt, purgeMembershipsSQL},
|
||||||
|
{&s.purgePublishedStmt, purgePublishedSQL},
|
||||||
|
{&s.purgePreviousEventsStmt, purgePreviousEventsSQL},
|
||||||
|
{&s.purgeRedactionStmt, purgeRedactionsSQL},
|
||||||
|
{&s.purgeRoomAliasesStmt, purgeRoomAliasesSQL},
|
||||||
|
{&s.purgeRoomStmt, purgeRoomSQL},
|
||||||
|
{&s.purgeStateBlockEntriesStmt, purgeStateBlockEntriesSQL},
|
||||||
|
{&s.purgeStateSnapshotEntriesStmt, purgeStateSnapshotEntriesSQL},
|
||||||
|
}.Prepare(db)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *purgeStatements) PurgeEventJSONs(
|
||||||
|
ctx context.Context, txn *sql.Tx, roomNID types.RoomNID,
|
||||||
|
) error {
|
||||||
|
_, err := sqlutil.TxStmt(txn, s.purgeEventJSONStmt).ExecContext(ctx, roomNID)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *purgeStatements) PurgeEvents(
|
||||||
|
ctx context.Context, txn *sql.Tx, roomNID types.RoomNID,
|
||||||
|
) error {
|
||||||
|
_, err := sqlutil.TxStmt(txn, s.purgeEventsStmt).ExecContext(ctx, roomNID)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *purgeStatements) PurgeInvites(
|
||||||
|
ctx context.Context, txn *sql.Tx, roomNID types.RoomNID,
|
||||||
|
) error {
|
||||||
|
_, err := sqlutil.TxStmt(txn, s.purgeInvitesStmt).ExecContext(ctx, roomNID)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *purgeStatements) PurgeMemberships(
|
||||||
|
ctx context.Context, txn *sql.Tx, roomNID types.RoomNID,
|
||||||
|
) error {
|
||||||
|
_, err := sqlutil.TxStmt(txn, s.purgeMembershipsStmt).ExecContext(ctx, roomNID)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *purgeStatements) PurgePreviousEvents(
|
||||||
|
ctx context.Context, txn *sql.Tx, roomNID types.RoomNID,
|
||||||
|
) error {
|
||||||
|
_, err := sqlutil.TxStmt(txn, s.purgePreviousEventsStmt).ExecContext(ctx, roomNID)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *purgeStatements) PurgePublished(
|
||||||
|
ctx context.Context, txn *sql.Tx, roomID string,
|
||||||
|
) error {
|
||||||
|
_, err := sqlutil.TxStmt(txn, s.purgePublishedStmt).ExecContext(ctx, roomID)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *purgeStatements) PurgeRedactions(
|
||||||
|
ctx context.Context, txn *sql.Tx, roomNID types.RoomNID,
|
||||||
|
) error {
|
||||||
|
_, err := sqlutil.TxStmt(txn, s.purgeRedactionStmt).ExecContext(ctx, roomNID)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *purgeStatements) PurgeRoomAliases(
|
||||||
|
ctx context.Context, txn *sql.Tx, roomID string,
|
||||||
|
) error {
|
||||||
|
_, err := sqlutil.TxStmt(txn, s.purgeRoomAliasesStmt).ExecContext(ctx, roomID)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *purgeStatements) PurgeRoom(
|
||||||
|
ctx context.Context, txn *sql.Tx, roomNID types.RoomNID,
|
||||||
|
) error {
|
||||||
|
_, err := sqlutil.TxStmt(txn, s.purgeRoomStmt).ExecContext(ctx, roomNID)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *purgeStatements) PurgeStateBlocks(
|
||||||
|
ctx context.Context, txn *sql.Tx, roomNID types.RoomNID,
|
||||||
|
) error {
|
||||||
|
_, err := sqlutil.TxStmt(txn, s.purgeStateBlockEntriesStmt).ExecContext(ctx, roomNID)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *purgeStatements) PurgeStateSnapshots(
|
||||||
|
ctx context.Context, txn *sql.Tx, roomNID types.RoomNID,
|
||||||
|
) error {
|
||||||
|
_, err := sqlutil.TxStmt(txn, s.purgeStateSnapshotEntriesStmt).ExecContext(ctx, roomNID)
|
||||||
|
return err
|
||||||
|
}
|
|
@ -58,6 +58,9 @@ const insertRoomNIDSQL = "" +
|
||||||
const selectRoomNIDSQL = "" +
|
const selectRoomNIDSQL = "" +
|
||||||
"SELECT room_nid FROM roomserver_rooms WHERE room_id = $1"
|
"SELECT room_nid FROM roomserver_rooms WHERE room_id = $1"
|
||||||
|
|
||||||
|
const selectRoomNIDForUpdateSQL = "" +
|
||||||
|
"SELECT room_nid FROM roomserver_rooms WHERE room_id = $1 FOR UPDATE"
|
||||||
|
|
||||||
const selectLatestEventNIDsSQL = "" +
|
const selectLatestEventNIDsSQL = "" +
|
||||||
"SELECT latest_event_nids, state_snapshot_nid FROM roomserver_rooms WHERE room_nid = $1"
|
"SELECT latest_event_nids, state_snapshot_nid FROM roomserver_rooms WHERE room_nid = $1"
|
||||||
|
|
||||||
|
@ -85,6 +88,7 @@ const bulkSelectRoomNIDsSQL = "" +
|
||||||
type roomStatements struct {
|
type roomStatements struct {
|
||||||
insertRoomNIDStmt *sql.Stmt
|
insertRoomNIDStmt *sql.Stmt
|
||||||
selectRoomNIDStmt *sql.Stmt
|
selectRoomNIDStmt *sql.Stmt
|
||||||
|
selectRoomNIDForUpdateStmt *sql.Stmt
|
||||||
selectLatestEventNIDsStmt *sql.Stmt
|
selectLatestEventNIDsStmt *sql.Stmt
|
||||||
selectLatestEventNIDsForUpdateStmt *sql.Stmt
|
selectLatestEventNIDsForUpdateStmt *sql.Stmt
|
||||||
updateLatestEventNIDsStmt *sql.Stmt
|
updateLatestEventNIDsStmt *sql.Stmt
|
||||||
|
@ -106,6 +110,7 @@ func PrepareRoomsTable(db *sql.DB) (tables.Rooms, error) {
|
||||||
return s, sqlutil.StatementList{
|
return s, sqlutil.StatementList{
|
||||||
{&s.insertRoomNIDStmt, insertRoomNIDSQL},
|
{&s.insertRoomNIDStmt, insertRoomNIDSQL},
|
||||||
{&s.selectRoomNIDStmt, selectRoomNIDSQL},
|
{&s.selectRoomNIDStmt, selectRoomNIDSQL},
|
||||||
|
{&s.selectRoomNIDForUpdateStmt, selectRoomNIDForUpdateSQL},
|
||||||
{&s.selectLatestEventNIDsStmt, selectLatestEventNIDsSQL},
|
{&s.selectLatestEventNIDsStmt, selectLatestEventNIDsSQL},
|
||||||
{&s.selectLatestEventNIDsForUpdateStmt, selectLatestEventNIDsForUpdateSQL},
|
{&s.selectLatestEventNIDsForUpdateStmt, selectLatestEventNIDsForUpdateSQL},
|
||||||
{&s.updateLatestEventNIDsStmt, updateLatestEventNIDsSQL},
|
{&s.updateLatestEventNIDsStmt, updateLatestEventNIDsSQL},
|
||||||
|
@ -169,6 +174,15 @@ func (s *roomStatements) SelectRoomNID(
|
||||||
return types.RoomNID(roomNID), err
|
return types.RoomNID(roomNID), err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *roomStatements) SelectRoomNIDForUpdate(
|
||||||
|
ctx context.Context, txn *sql.Tx, roomID string,
|
||||||
|
) (types.RoomNID, error) {
|
||||||
|
var roomNID int64
|
||||||
|
stmt := sqlutil.TxStmt(txn, s.selectRoomNIDForUpdateStmt)
|
||||||
|
err := stmt.QueryRowContext(ctx, roomID).Scan(&roomNID)
|
||||||
|
return types.RoomNID(roomNID), err
|
||||||
|
}
|
||||||
|
|
||||||
func (s *roomStatements) SelectLatestEventNIDs(
|
func (s *roomStatements) SelectLatestEventNIDs(
|
||||||
ctx context.Context, txn *sql.Tx, roomNID types.RoomNID,
|
ctx context.Context, txn *sql.Tx, roomNID types.RoomNID,
|
||||||
) ([]types.EventNID, types.StateSnapshotNID, error) {
|
) ([]types.EventNID, types.StateSnapshotNID, error) {
|
||||||
|
|
|
@ -189,6 +189,10 @@ func (d *Database) prepare(db *sql.DB, writer sqlutil.Writer, cache caching.Room
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
purge, err := PreparePurgeStatements(db)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
d.Database = shared.Database{
|
d.Database = shared.Database{
|
||||||
DB: db,
|
DB: db,
|
||||||
Cache: cache,
|
Cache: cache,
|
||||||
|
@ -206,6 +210,7 @@ func (d *Database) prepare(db *sql.DB, writer sqlutil.Writer, cache caching.Room
|
||||||
MembershipTable: membership,
|
MembershipTable: membership,
|
||||||
PublishedTable: published,
|
PublishedTable: published,
|
||||||
RedactionsTable: redactions,
|
RedactionsTable: redactions,
|
||||||
|
Purge: purge,
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -42,6 +42,7 @@ type Database struct {
|
||||||
MembershipTable tables.Membership
|
MembershipTable tables.Membership
|
||||||
PublishedTable tables.Published
|
PublishedTable tables.Published
|
||||||
RedactionsTable tables.Redactions
|
RedactionsTable tables.Redactions
|
||||||
|
Purge tables.Purge
|
||||||
GetRoomUpdaterFn func(ctx context.Context, roomInfo *types.RoomInfo) (*RoomUpdater, error)
|
GetRoomUpdaterFn func(ctx context.Context, roomInfo *types.RoomInfo) (*RoomUpdater, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1362,6 +1363,57 @@ func (d *Database) ForgetRoom(ctx context.Context, userID, roomID string, forget
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// PurgeRoom removes all information about a given room from the roomserver.
|
||||||
|
// For large rooms this operation may take a considerable amount of time.
|
||||||
|
func (d *Database) PurgeRoom(ctx context.Context, roomID string) error {
|
||||||
|
if d.Purge == nil {
|
||||||
|
return fmt.Errorf("not supported on this database engine")
|
||||||
|
}
|
||||||
|
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||||
|
roomNID, err := d.RoomsTable.SelectRoomNIDForUpdate(ctx, txn, roomID)
|
||||||
|
if err != nil {
|
||||||
|
if err == sql.ErrNoRows {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return fmt.Errorf("failed to lock the room: %w", err)
|
||||||
|
}
|
||||||
|
if err := d.Purge.PurgeStateBlocks(ctx, txn, roomNID); err != nil {
|
||||||
|
return fmt.Errorf("failed to purge state blocks: %w", err)
|
||||||
|
}
|
||||||
|
if err := d.Purge.PurgeStateSnapshots(ctx, txn, roomNID); err != nil {
|
||||||
|
return fmt.Errorf("failed to purge state blocks: %w", err)
|
||||||
|
}
|
||||||
|
if err := d.Purge.PurgeInvites(ctx, txn, roomNID); err != nil {
|
||||||
|
return fmt.Errorf("failed to purge invites: %w", err)
|
||||||
|
}
|
||||||
|
if err := d.Purge.PurgeMemberships(ctx, txn, roomNID); err != nil {
|
||||||
|
return fmt.Errorf("failed to purge memberships: %w", err)
|
||||||
|
}
|
||||||
|
if err := d.Purge.PurgeRoomAliases(ctx, txn, roomID); err != nil {
|
||||||
|
return fmt.Errorf("failed to purge room aliases: %w", err)
|
||||||
|
}
|
||||||
|
if err := d.Purge.PurgePublished(ctx, txn, roomID); err != nil {
|
||||||
|
return fmt.Errorf("failed to purge published: %w", err)
|
||||||
|
}
|
||||||
|
if err := d.Purge.PurgePreviousEvents(ctx, txn, roomNID); err != nil {
|
||||||
|
return fmt.Errorf("failed to purge previous events: %w", err)
|
||||||
|
}
|
||||||
|
if err := d.Purge.PurgeEventJSONs(ctx, txn, roomNID); err != nil {
|
||||||
|
return fmt.Errorf("failed to purge event JSONs: %w", err)
|
||||||
|
}
|
||||||
|
if err := d.Purge.PurgeRedactions(ctx, txn, roomNID); err != nil {
|
||||||
|
return fmt.Errorf("failed to purge redactions: %w", err)
|
||||||
|
}
|
||||||
|
if err := d.Purge.PurgeEvents(ctx, txn, roomNID); err != nil {
|
||||||
|
return fmt.Errorf("failed to purge events: %w", err)
|
||||||
|
}
|
||||||
|
if err := d.Purge.PurgeRoom(ctx, txn, roomNID); err != nil {
|
||||||
|
return fmt.Errorf("failed to purge room: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
// FIXME TODO: Remove all this - horrible dupe with roomserver/state. Can't use the original impl because of circular loops
|
// FIXME TODO: Remove all this - horrible dupe with roomserver/state. Can't use the original impl because of circular loops
|
||||||
// it should live in this package!
|
// it should live in this package!
|
||||||
|
|
||||||
|
|
|
@ -169,6 +169,12 @@ func (s *roomStatements) SelectRoomNID(
|
||||||
return types.RoomNID(roomNID), err
|
return types.RoomNID(roomNID), err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *roomStatements) SelectRoomNIDForUpdate(
|
||||||
|
ctx context.Context, txn *sql.Tx, roomID string,
|
||||||
|
) (types.RoomNID, error) {
|
||||||
|
return 0, fmt.Errorf("not supported on SQLite")
|
||||||
|
}
|
||||||
|
|
||||||
func (s *roomStatements) SelectLatestEventNIDs(
|
func (s *roomStatements) SelectLatestEventNIDs(
|
||||||
ctx context.Context, txn *sql.Tx, roomNID types.RoomNID,
|
ctx context.Context, txn *sql.Tx, roomNID types.RoomNID,
|
||||||
) ([]types.EventNID, types.StateSnapshotNID, error) {
|
) ([]types.EventNID, types.StateSnapshotNID, error) {
|
||||||
|
|
|
@ -72,6 +72,7 @@ type Events interface {
|
||||||
type Rooms interface {
|
type Rooms interface {
|
||||||
InsertRoomNID(ctx context.Context, txn *sql.Tx, roomID string, roomVersion gomatrixserverlib.RoomVersion) (types.RoomNID, error)
|
InsertRoomNID(ctx context.Context, txn *sql.Tx, roomID string, roomVersion gomatrixserverlib.RoomVersion) (types.RoomNID, error)
|
||||||
SelectRoomNID(ctx context.Context, txn *sql.Tx, roomID string) (types.RoomNID, error)
|
SelectRoomNID(ctx context.Context, txn *sql.Tx, roomID string) (types.RoomNID, error)
|
||||||
|
SelectRoomNIDForUpdate(ctx context.Context, txn *sql.Tx, roomID string) (types.RoomNID, error)
|
||||||
SelectLatestEventNIDs(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID) ([]types.EventNID, types.StateSnapshotNID, error)
|
SelectLatestEventNIDs(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID) ([]types.EventNID, types.StateSnapshotNID, error)
|
||||||
SelectLatestEventsNIDsForUpdate(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID) ([]types.EventNID, types.EventNID, types.StateSnapshotNID, error)
|
SelectLatestEventsNIDsForUpdate(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID) ([]types.EventNID, types.EventNID, types.StateSnapshotNID, error)
|
||||||
UpdateLatestEventNIDs(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, eventNIDs []types.EventNID, lastEventSentNID types.EventNID, stateSnapshotNID types.StateSnapshotNID) error
|
UpdateLatestEventNIDs(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, eventNIDs []types.EventNID, lastEventSentNID types.EventNID, stateSnapshotNID types.StateSnapshotNID) error
|
||||||
|
@ -171,6 +172,20 @@ type Redactions interface {
|
||||||
MarkRedactionValidated(ctx context.Context, txn *sql.Tx, redactionEventID string, validated bool) error
|
MarkRedactionValidated(ctx context.Context, txn *sql.Tx, redactionEventID string, validated bool) error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type Purge interface {
|
||||||
|
PurgeEventJSONs(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID) error
|
||||||
|
PurgeEvents(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID) error
|
||||||
|
PurgeRoom(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID) error
|
||||||
|
PurgeStateSnapshots(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID) error
|
||||||
|
PurgeStateBlocks(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID) error
|
||||||
|
PurgePreviousEvents(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID) error
|
||||||
|
PurgeInvites(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID) error
|
||||||
|
PurgeMemberships(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID) error
|
||||||
|
PurgePublished(ctx context.Context, txn *sql.Tx, roomID string) error
|
||||||
|
PurgeRedactions(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID) error
|
||||||
|
PurgeRoomAliases(ctx context.Context, txn *sql.Tx, roomID string) error
|
||||||
|
}
|
||||||
|
|
||||||
// StrippedEvent represents a stripped event for returning extracted content values.
|
// StrippedEvent represents a stripped event for returning extracted content values.
|
||||||
type StrippedEvent struct {
|
type StrippedEvent struct {
|
||||||
RoomID string
|
RoomID string
|
||||||
|
|
|
@ -31,6 +31,7 @@ import (
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/nats-io/nats.go"
|
"github.com/nats-io/nats.go"
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -123,6 +124,8 @@ func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msgs []*nats.Ms
|
||||||
s.onRetirePeek(s.ctx, *output.RetirePeek)
|
s.onRetirePeek(s.ctx, *output.RetirePeek)
|
||||||
case api.OutputTypeRedactedEvent:
|
case api.OutputTypeRedactedEvent:
|
||||||
err = s.onRedactEvent(s.ctx, *output.RedactedEvent)
|
err = s.onRedactEvent(s.ctx, *output.RedactedEvent)
|
||||||
|
case api.OutputTypePurgeRoom:
|
||||||
|
err = s.onPurgeRoom(s.ctx, *output.PurgeRoom)
|
||||||
default:
|
default:
|
||||||
log.WithField("type", output.Type).Debug(
|
log.WithField("type", output.Type).Debug(
|
||||||
"roomserver output log: ignoring unknown output type",
|
"roomserver output log: ignoring unknown output type",
|
||||||
|
@ -434,6 +437,20 @@ func (s *OutputRoomEventConsumer) onRetirePeek(
|
||||||
s.notifier.OnRetirePeek(msg.RoomID, msg.UserID, msg.DeviceID, types.StreamingToken{PDUPosition: sp})
|
s.notifier.OnRetirePeek(msg.RoomID, msg.UserID, msg.DeviceID, types.StreamingToken{PDUPosition: sp})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *OutputRoomEventConsumer) onPurgeRoom(
|
||||||
|
ctx context.Context, req api.OutputPurgeRoom,
|
||||||
|
) error {
|
||||||
|
logrus.WithField("room_id", req.RoomID).Warn("Purging room from sync API")
|
||||||
|
|
||||||
|
if err := s.db.PurgeRoom(ctx, req.RoomID); err != nil {
|
||||||
|
logrus.WithField("room_id", req.RoomID).WithError(err).Error("Failed to purge room from sync API")
|
||||||
|
return err
|
||||||
|
} else {
|
||||||
|
logrus.WithField("room_id", req.RoomID).Warn("Room purged from sync API")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (s *OutputRoomEventConsumer) updateStateEvent(event *gomatrixserverlib.HeaderedEvent) (*gomatrixserverlib.HeaderedEvent, error) {
|
func (s *OutputRoomEventConsumer) updateStateEvent(event *gomatrixserverlib.HeaderedEvent) (*gomatrixserverlib.HeaderedEvent, error) {
|
||||||
if event.StateKey() == nil {
|
if event.StateKey() == nil {
|
||||||
return event, nil
|
return event, nil
|
||||||
|
|
|
@ -76,6 +76,7 @@ type Database interface {
|
||||||
// PurgeRoomState completely purges room state from the sync API. This is done when
|
// PurgeRoomState completely purges room state from the sync API. This is done when
|
||||||
// receiving an output event that completely resets the state.
|
// receiving an output event that completely resets the state.
|
||||||
PurgeRoomState(ctx context.Context, roomID string) error
|
PurgeRoomState(ctx context.Context, roomID string) error
|
||||||
|
PurgeRoom(ctx context.Context, roomID string) error
|
||||||
// GetStateEvent returns the Matrix state event of a given type for a given room with a given state key
|
// GetStateEvent returns the Matrix state event of a given type for a given room with a given state key
|
||||||
// If no event could be found, returns nil
|
// If no event could be found, returns nil
|
||||||
// If there was an issue during the retrieval, returns an error
|
// If there was an issue during the retrieval, returns an error
|
||||||
|
|
|
@ -47,10 +47,14 @@ const selectBackwardExtremitiesForRoomSQL = "" +
|
||||||
const deleteBackwardExtremitySQL = "" +
|
const deleteBackwardExtremitySQL = "" +
|
||||||
"DELETE FROM syncapi_backward_extremities WHERE room_id = $1 AND prev_event_id = $2"
|
"DELETE FROM syncapi_backward_extremities WHERE room_id = $1 AND prev_event_id = $2"
|
||||||
|
|
||||||
|
const purgeBackwardExtremitiesSQL = "" +
|
||||||
|
"DELETE FROM syncapi_backward_extremities WHERE room_id = $1"
|
||||||
|
|
||||||
type backwardExtremitiesStatements struct {
|
type backwardExtremitiesStatements struct {
|
||||||
insertBackwardExtremityStmt *sql.Stmt
|
insertBackwardExtremityStmt *sql.Stmt
|
||||||
selectBackwardExtremitiesForRoomStmt *sql.Stmt
|
selectBackwardExtremitiesForRoomStmt *sql.Stmt
|
||||||
deleteBackwardExtremityStmt *sql.Stmt
|
deleteBackwardExtremityStmt *sql.Stmt
|
||||||
|
purgeBackwardExtremitiesStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewPostgresBackwardsExtremitiesTable(db *sql.DB) (tables.BackwardsExtremities, error) {
|
func NewPostgresBackwardsExtremitiesTable(db *sql.DB) (tables.BackwardsExtremities, error) {
|
||||||
|
@ -68,6 +72,9 @@ func NewPostgresBackwardsExtremitiesTable(db *sql.DB) (tables.BackwardsExtremiti
|
||||||
if s.deleteBackwardExtremityStmt, err = db.Prepare(deleteBackwardExtremitySQL); err != nil {
|
if s.deleteBackwardExtremityStmt, err = db.Prepare(deleteBackwardExtremitySQL); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
if s.purgeBackwardExtremitiesStmt, err = db.Prepare(purgeBackwardExtremitiesSQL); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
return s, nil
|
return s, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -106,3 +113,10 @@ func (s *backwardExtremitiesStatements) DeleteBackwardExtremity(
|
||||||
_, err = sqlutil.TxStmt(txn, s.deleteBackwardExtremityStmt).ExecContext(ctx, roomID, knownEventID)
|
_, err = sqlutil.TxStmt(txn, s.deleteBackwardExtremityStmt).ExecContext(ctx, roomID, knownEventID)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *backwardExtremitiesStatements) PurgeBackwardExtremities(
|
||||||
|
ctx context.Context, txn *sql.Tx, roomID string,
|
||||||
|
) error {
|
||||||
|
_, err := sqlutil.TxStmt(txn, s.purgeBackwardExtremitiesStmt).ExecContext(ctx, roomID)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
|
@ -62,11 +62,15 @@ const selectInviteEventsInRangeSQL = "" +
|
||||||
const selectMaxInviteIDSQL = "" +
|
const selectMaxInviteIDSQL = "" +
|
||||||
"SELECT MAX(id) FROM syncapi_invite_events"
|
"SELECT MAX(id) FROM syncapi_invite_events"
|
||||||
|
|
||||||
|
const purgeInvitesSQL = "" +
|
||||||
|
"DELETE FROM syncapi_invite_events WHERE room_id = $1"
|
||||||
|
|
||||||
type inviteEventsStatements struct {
|
type inviteEventsStatements struct {
|
||||||
insertInviteEventStmt *sql.Stmt
|
insertInviteEventStmt *sql.Stmt
|
||||||
selectInviteEventsInRangeStmt *sql.Stmt
|
selectInviteEventsInRangeStmt *sql.Stmt
|
||||||
deleteInviteEventStmt *sql.Stmt
|
deleteInviteEventStmt *sql.Stmt
|
||||||
selectMaxInviteIDStmt *sql.Stmt
|
selectMaxInviteIDStmt *sql.Stmt
|
||||||
|
purgeInvitesStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewPostgresInvitesTable(db *sql.DB) (tables.Invites, error) {
|
func NewPostgresInvitesTable(db *sql.DB) (tables.Invites, error) {
|
||||||
|
@ -87,6 +91,9 @@ func NewPostgresInvitesTable(db *sql.DB) (tables.Invites, error) {
|
||||||
if s.selectMaxInviteIDStmt, err = db.Prepare(selectMaxInviteIDSQL); err != nil {
|
if s.selectMaxInviteIDStmt, err = db.Prepare(selectMaxInviteIDSQL); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
if s.purgeInvitesStmt, err = db.Prepare(purgeInvitesSQL); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
return s, nil
|
return s, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -173,3 +180,10 @@ func (s *inviteEventsStatements) SelectMaxInviteID(
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *inviteEventsStatements) PurgeInvites(
|
||||||
|
ctx context.Context, txn *sql.Tx, roomID string,
|
||||||
|
) error {
|
||||||
|
_, err := sqlutil.TxStmt(txn, s.purgeInvitesStmt).ExecContext(ctx, roomID)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
|
@ -69,11 +69,15 @@ const selectHeroesSQL = "" +
|
||||||
const selectMembershipBeforeSQL = "" +
|
const selectMembershipBeforeSQL = "" +
|
||||||
"SELECT membership, topological_pos FROM syncapi_memberships WHERE room_id = $1 and user_id = $2 AND topological_pos <= $3 ORDER BY topological_pos DESC LIMIT 1"
|
"SELECT membership, topological_pos FROM syncapi_memberships WHERE room_id = $1 and user_id = $2 AND topological_pos <= $3 ORDER BY topological_pos DESC LIMIT 1"
|
||||||
|
|
||||||
|
const purgeMembershipsSQL = "" +
|
||||||
|
"DELETE FROM syncapi_memberships WHERE room_id = $1"
|
||||||
|
|
||||||
type membershipsStatements struct {
|
type membershipsStatements struct {
|
||||||
upsertMembershipStmt *sql.Stmt
|
upsertMembershipStmt *sql.Stmt
|
||||||
selectMembershipCountStmt *sql.Stmt
|
selectMembershipCountStmt *sql.Stmt
|
||||||
selectHeroesStmt *sql.Stmt
|
selectHeroesStmt *sql.Stmt
|
||||||
selectMembershipForUserStmt *sql.Stmt
|
selectMembershipForUserStmt *sql.Stmt
|
||||||
|
purgeMembershipsStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewPostgresMembershipsTable(db *sql.DB) (tables.Memberships, error) {
|
func NewPostgresMembershipsTable(db *sql.DB) (tables.Memberships, error) {
|
||||||
|
@ -87,6 +91,7 @@ func NewPostgresMembershipsTable(db *sql.DB) (tables.Memberships, error) {
|
||||||
{&s.selectMembershipCountStmt, selectMembershipCountSQL},
|
{&s.selectMembershipCountStmt, selectMembershipCountSQL},
|
||||||
{&s.selectHeroesStmt, selectHeroesSQL},
|
{&s.selectHeroesStmt, selectHeroesSQL},
|
||||||
{&s.selectMembershipForUserStmt, selectMembershipBeforeSQL},
|
{&s.selectMembershipForUserStmt, selectMembershipBeforeSQL},
|
||||||
|
{&s.purgeMembershipsStmt, purgeMembershipsSQL},
|
||||||
}.Prepare(db)
|
}.Prepare(db)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -154,3 +159,10 @@ func (s *membershipsStatements) SelectMembershipForUser(
|
||||||
}
|
}
|
||||||
return membership, topologyPos, nil
|
return membership, topologyPos, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *membershipsStatements) PurgeMemberships(
|
||||||
|
ctx context.Context, txn *sql.Tx, roomID string,
|
||||||
|
) error {
|
||||||
|
_, err := sqlutil.TxStmt(txn, s.purgeMembershipsStmt).ExecContext(ctx, roomID)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
|
@ -35,6 +35,7 @@ func NewPostgresNotificationDataTable(db *sql.DB) (tables.NotificationData, erro
|
||||||
{&r.upsertRoomUnreadCounts, upsertRoomUnreadNotificationCountsSQL},
|
{&r.upsertRoomUnreadCounts, upsertRoomUnreadNotificationCountsSQL},
|
||||||
{&r.selectUserUnreadCounts, selectUserUnreadNotificationCountsSQL},
|
{&r.selectUserUnreadCounts, selectUserUnreadNotificationCountsSQL},
|
||||||
{&r.selectMaxID, selectMaxNotificationIDSQL},
|
{&r.selectMaxID, selectMaxNotificationIDSQL},
|
||||||
|
{&r.purgeNotificationData, purgeNotificationDataSQL},
|
||||||
}.Prepare(db)
|
}.Prepare(db)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -42,6 +43,7 @@ type notificationDataStatements struct {
|
||||||
upsertRoomUnreadCounts *sql.Stmt
|
upsertRoomUnreadCounts *sql.Stmt
|
||||||
selectUserUnreadCounts *sql.Stmt
|
selectUserUnreadCounts *sql.Stmt
|
||||||
selectMaxID *sql.Stmt
|
selectMaxID *sql.Stmt
|
||||||
|
purgeNotificationData *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
const notificationDataSchema = `
|
const notificationDataSchema = `
|
||||||
|
@ -70,6 +72,9 @@ const selectUserUnreadNotificationCountsSQL = `SELECT
|
||||||
|
|
||||||
const selectMaxNotificationIDSQL = `SELECT CASE COUNT(*) WHEN 0 THEN 0 ELSE MAX(id) END FROM syncapi_notification_data`
|
const selectMaxNotificationIDSQL = `SELECT CASE COUNT(*) WHEN 0 THEN 0 ELSE MAX(id) END FROM syncapi_notification_data`
|
||||||
|
|
||||||
|
const purgeNotificationDataSQL = "" +
|
||||||
|
"DELETE FROM syncapi_notification_data WHERE room_id = $1"
|
||||||
|
|
||||||
func (r *notificationDataStatements) UpsertRoomUnreadCounts(ctx context.Context, txn *sql.Tx, userID, roomID string, notificationCount, highlightCount int) (pos types.StreamPosition, err error) {
|
func (r *notificationDataStatements) UpsertRoomUnreadCounts(ctx context.Context, txn *sql.Tx, userID, roomID string, notificationCount, highlightCount int) (pos types.StreamPosition, err error) {
|
||||||
err = sqlutil.TxStmt(txn, r.upsertRoomUnreadCounts).QueryRowContext(ctx, userID, roomID, notificationCount, highlightCount).Scan(&pos)
|
err = sqlutil.TxStmt(txn, r.upsertRoomUnreadCounts).QueryRowContext(ctx, userID, roomID, notificationCount, highlightCount).Scan(&pos)
|
||||||
return
|
return
|
||||||
|
@ -106,3 +111,10 @@ func (r *notificationDataStatements) SelectMaxID(ctx context.Context, txn *sql.T
|
||||||
err := sqlutil.TxStmt(txn, r.selectMaxID).QueryRowContext(ctx).Scan(&id)
|
err := sqlutil.TxStmt(txn, r.selectMaxID).QueryRowContext(ctx).Scan(&id)
|
||||||
return id, err
|
return id, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *notificationDataStatements) PurgeNotificationData(
|
||||||
|
ctx context.Context, txn *sql.Tx, roomID string,
|
||||||
|
) error {
|
||||||
|
_, err := sqlutil.TxStmt(txn, s.purgeNotificationData).ExecContext(ctx, roomID)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
|
@ -166,6 +166,9 @@ const selectContextAfterEventSQL = "" +
|
||||||
" AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )" +
|
" AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )" +
|
||||||
" ORDER BY id ASC LIMIT $3"
|
" ORDER BY id ASC LIMIT $3"
|
||||||
|
|
||||||
|
const purgeEventsSQL = "" +
|
||||||
|
"DELETE FROM syncapi_output_room_events WHERE room_id = $1"
|
||||||
|
|
||||||
type outputRoomEventsStatements struct {
|
type outputRoomEventsStatements struct {
|
||||||
insertEventStmt *sql.Stmt
|
insertEventStmt *sql.Stmt
|
||||||
selectEventsStmt *sql.Stmt
|
selectEventsStmt *sql.Stmt
|
||||||
|
@ -180,6 +183,7 @@ type outputRoomEventsStatements struct {
|
||||||
selectContextEventStmt *sql.Stmt
|
selectContextEventStmt *sql.Stmt
|
||||||
selectContextBeforeEventStmt *sql.Stmt
|
selectContextBeforeEventStmt *sql.Stmt
|
||||||
selectContextAfterEventStmt *sql.Stmt
|
selectContextAfterEventStmt *sql.Stmt
|
||||||
|
purgeEventsStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) {
|
func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) {
|
||||||
|
@ -215,6 +219,7 @@ func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) {
|
||||||
{&s.selectContextEventStmt, selectContextEventSQL},
|
{&s.selectContextEventStmt, selectContextEventSQL},
|
||||||
{&s.selectContextBeforeEventStmt, selectContextBeforeEventSQL},
|
{&s.selectContextBeforeEventStmt, selectContextBeforeEventSQL},
|
||||||
{&s.selectContextAfterEventStmt, selectContextAfterEventSQL},
|
{&s.selectContextAfterEventStmt, selectContextAfterEventSQL},
|
||||||
|
{&s.purgeEventsStmt, purgeEventsSQL},
|
||||||
}.Prepare(db)
|
}.Prepare(db)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -632,3 +637,10 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) {
|
||||||
}
|
}
|
||||||
return result, rows.Err()
|
return result, rows.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *outputRoomEventsStatements) PurgeEvents(
|
||||||
|
ctx context.Context, txn *sql.Tx, roomID string,
|
||||||
|
) error {
|
||||||
|
_, err := sqlutil.TxStmt(txn, s.purgeEventsStmt).ExecContext(ctx, roomID)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
|
@ -79,6 +79,9 @@ const selectStreamToTopologicalPositionAscSQL = "" +
|
||||||
const selectStreamToTopologicalPositionDescSQL = "" +
|
const selectStreamToTopologicalPositionDescSQL = "" +
|
||||||
"SELECT topological_position FROM syncapi_output_room_events_topology WHERE room_id = $1 AND stream_position <= $2 ORDER BY topological_position DESC LIMIT 1;"
|
"SELECT topological_position FROM syncapi_output_room_events_topology WHERE room_id = $1 AND stream_position <= $2 ORDER BY topological_position DESC LIMIT 1;"
|
||||||
|
|
||||||
|
const purgeEventsTopologySQL = "" +
|
||||||
|
"DELETE FROM syncapi_output_room_events_topology WHERE room_id = $1"
|
||||||
|
|
||||||
type outputRoomEventsTopologyStatements struct {
|
type outputRoomEventsTopologyStatements struct {
|
||||||
insertEventInTopologyStmt *sql.Stmt
|
insertEventInTopologyStmt *sql.Stmt
|
||||||
selectEventIDsInRangeASCStmt *sql.Stmt
|
selectEventIDsInRangeASCStmt *sql.Stmt
|
||||||
|
@ -87,6 +90,7 @@ type outputRoomEventsTopologyStatements struct {
|
||||||
selectMaxPositionInTopologyStmt *sql.Stmt
|
selectMaxPositionInTopologyStmt *sql.Stmt
|
||||||
selectStreamToTopologicalPositionAscStmt *sql.Stmt
|
selectStreamToTopologicalPositionAscStmt *sql.Stmt
|
||||||
selectStreamToTopologicalPositionDescStmt *sql.Stmt
|
selectStreamToTopologicalPositionDescStmt *sql.Stmt
|
||||||
|
purgeEventsTopologyStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewPostgresTopologyTable(db *sql.DB) (tables.Topology, error) {
|
func NewPostgresTopologyTable(db *sql.DB) (tables.Topology, error) {
|
||||||
|
@ -116,6 +120,9 @@ func NewPostgresTopologyTable(db *sql.DB) (tables.Topology, error) {
|
||||||
if s.selectStreamToTopologicalPositionDescStmt, err = db.Prepare(selectStreamToTopologicalPositionDescSQL); err != nil {
|
if s.selectStreamToTopologicalPositionDescStmt, err = db.Prepare(selectStreamToTopologicalPositionDescSQL); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
if s.purgeEventsTopologyStmt, err = db.Prepare(purgeEventsTopologySQL); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
return s, nil
|
return s, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -196,3 +203,10 @@ func (s *outputRoomEventsTopologyStatements) SelectMaxPositionInTopology(
|
||||||
err = s.selectMaxPositionInTopologyStmt.QueryRowContext(ctx, roomID).Scan(&pos, &spos)
|
err = s.selectMaxPositionInTopologyStmt.QueryRowContext(ctx, roomID).Scan(&pos, &spos)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *outputRoomEventsTopologyStatements) PurgeEventsTopology(
|
||||||
|
ctx context.Context, txn *sql.Tx, roomID string,
|
||||||
|
) error {
|
||||||
|
_, err := sqlutil.TxStmt(txn, s.purgeEventsTopologyStmt).ExecContext(ctx, roomID)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
|
@ -65,6 +65,9 @@ const selectPeekingDevicesSQL = "" +
|
||||||
const selectMaxPeekIDSQL = "" +
|
const selectMaxPeekIDSQL = "" +
|
||||||
"SELECT MAX(id) FROM syncapi_peeks"
|
"SELECT MAX(id) FROM syncapi_peeks"
|
||||||
|
|
||||||
|
const purgePeeksSQL = "" +
|
||||||
|
"DELETE FROM syncapi_peeks WHERE room_id = $1"
|
||||||
|
|
||||||
type peekStatements struct {
|
type peekStatements struct {
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
insertPeekStmt *sql.Stmt
|
insertPeekStmt *sql.Stmt
|
||||||
|
@ -73,6 +76,7 @@ type peekStatements struct {
|
||||||
selectPeeksInRangeStmt *sql.Stmt
|
selectPeeksInRangeStmt *sql.Stmt
|
||||||
selectPeekingDevicesStmt *sql.Stmt
|
selectPeekingDevicesStmt *sql.Stmt
|
||||||
selectMaxPeekIDStmt *sql.Stmt
|
selectMaxPeekIDStmt *sql.Stmt
|
||||||
|
purgePeeksStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewPostgresPeeksTable(db *sql.DB) (tables.Peeks, error) {
|
func NewPostgresPeeksTable(db *sql.DB) (tables.Peeks, error) {
|
||||||
|
@ -101,6 +105,9 @@ func NewPostgresPeeksTable(db *sql.DB) (tables.Peeks, error) {
|
||||||
if s.selectMaxPeekIDStmt, err = db.Prepare(selectMaxPeekIDSQL); err != nil {
|
if s.selectMaxPeekIDStmt, err = db.Prepare(selectMaxPeekIDSQL); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
if s.purgePeeksStmt, err = db.Prepare(purgePeeksSQL); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
return s, nil
|
return s, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -184,3 +191,10 @@ func (s *peekStatements) SelectMaxPeekID(
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *peekStatements) PurgePeeks(
|
||||||
|
ctx context.Context, txn *sql.Tx, roomID string,
|
||||||
|
) error {
|
||||||
|
_, err := sqlutil.TxStmt(txn, s.purgePeeksStmt).ExecContext(ctx, roomID)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
|
@ -62,11 +62,15 @@ const selectRoomReceipts = "" +
|
||||||
const selectMaxReceiptIDSQL = "" +
|
const selectMaxReceiptIDSQL = "" +
|
||||||
"SELECT MAX(id) FROM syncapi_receipts"
|
"SELECT MAX(id) FROM syncapi_receipts"
|
||||||
|
|
||||||
|
const purgeReceiptsSQL = "" +
|
||||||
|
"DELETE FROM syncapi_receipts WHERE room_id = $1"
|
||||||
|
|
||||||
type receiptStatements struct {
|
type receiptStatements struct {
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
upsertReceipt *sql.Stmt
|
upsertReceipt *sql.Stmt
|
||||||
selectRoomReceipts *sql.Stmt
|
selectRoomReceipts *sql.Stmt
|
||||||
selectMaxReceiptID *sql.Stmt
|
selectMaxReceiptID *sql.Stmt
|
||||||
|
purgeReceiptsStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewPostgresReceiptsTable(db *sql.DB) (tables.Receipts, error) {
|
func NewPostgresReceiptsTable(db *sql.DB) (tables.Receipts, error) {
|
||||||
|
@ -95,6 +99,9 @@ func NewPostgresReceiptsTable(db *sql.DB) (tables.Receipts, error) {
|
||||||
if r.selectMaxReceiptID, err = db.Prepare(selectMaxReceiptIDSQL); err != nil {
|
if r.selectMaxReceiptID, err = db.Prepare(selectMaxReceiptIDSQL); err != nil {
|
||||||
return nil, fmt.Errorf("unable to prepare selectRoomReceipts statement: %w", err)
|
return nil, fmt.Errorf("unable to prepare selectRoomReceipts statement: %w", err)
|
||||||
}
|
}
|
||||||
|
if r.purgeReceiptsStmt, err = db.Prepare(purgeReceiptsSQL); err != nil {
|
||||||
|
return nil, fmt.Errorf("unable to prepare purgeReceiptsStmt statement: %w", err)
|
||||||
|
}
|
||||||
return r, nil
|
return r, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -138,3 +145,10 @@ func (s *receiptStatements) SelectMaxReceiptID(
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *receiptStatements) PurgeReceipts(
|
||||||
|
ctx context.Context, txn *sql.Tx, roomID string,
|
||||||
|
) error {
|
||||||
|
_, err := sqlutil.TxStmt(txn, s.purgeReceiptsStmt).ExecContext(ctx, roomID)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
|
@ -1086,3 +1086,36 @@ func (d *Database) MaxStreamPositionForPresence(ctx context.Context) (types.Stre
|
||||||
func (d *Database) SelectMembershipForUser(ctx context.Context, roomID, userID string, pos int64) (membership string, topologicalPos int, err error) {
|
func (d *Database) SelectMembershipForUser(ctx context.Context, roomID, userID string, pos int64) (membership string, topologicalPos int, err error) {
|
||||||
return d.Memberships.SelectMembershipForUser(ctx, nil, roomID, userID, pos)
|
return d.Memberships.SelectMembershipForUser(ctx, nil, roomID, userID, pos)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (d *Database) PurgeRoom(ctx context.Context, roomID string) error {
|
||||||
|
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||||
|
if err := d.BackwardExtremities.PurgeBackwardExtremities(ctx, txn, roomID); err != nil {
|
||||||
|
return fmt.Errorf("failed to purge backward extremities: %w", err)
|
||||||
|
}
|
||||||
|
if err := d.CurrentRoomState.DeleteRoomStateForRoom(ctx, txn, roomID); err != nil {
|
||||||
|
return fmt.Errorf("failed to purge current room state: %w", err)
|
||||||
|
}
|
||||||
|
if err := d.Invites.PurgeInvites(ctx, txn, roomID); err != nil {
|
||||||
|
return fmt.Errorf("failed to purge invites: %w", err)
|
||||||
|
}
|
||||||
|
if err := d.Memberships.PurgeMemberships(ctx, txn, roomID); err != nil {
|
||||||
|
return fmt.Errorf("failed to purge memberships: %w", err)
|
||||||
|
}
|
||||||
|
if err := d.NotificationData.PurgeNotificationData(ctx, txn, roomID); err != nil {
|
||||||
|
return fmt.Errorf("failed to purge notification data: %w", err)
|
||||||
|
}
|
||||||
|
if err := d.OutputEvents.PurgeEvents(ctx, txn, roomID); err != nil {
|
||||||
|
return fmt.Errorf("failed to purge events: %w", err)
|
||||||
|
}
|
||||||
|
if err := d.Topology.PurgeEventsTopology(ctx, txn, roomID); err != nil {
|
||||||
|
return fmt.Errorf("failed to purge events topology: %w", err)
|
||||||
|
}
|
||||||
|
if err := d.Peeks.PurgePeeks(ctx, txn, roomID); err != nil {
|
||||||
|
return fmt.Errorf("failed to purge peeks: %w", err)
|
||||||
|
}
|
||||||
|
if err := d.Receipts.PurgeReceipts(ctx, txn, roomID); err != nil {
|
||||||
|
return fmt.Errorf("failed to purge receipts: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
|
@ -17,6 +17,7 @@ package sqlite3
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/internal"
|
"github.com/matrix-org/dendrite/internal"
|
||||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
|
@ -109,3 +110,9 @@ func (s *backwardExtremitiesStatements) DeleteBackwardExtremity(
|
||||||
_, err = sqlutil.TxStmt(txn, s.deleteBackwardExtremityStmt).ExecContext(ctx, roomID, knownEventID)
|
_, err = sqlutil.TxStmt(txn, s.deleteBackwardExtremityStmt).ExecContext(ctx, roomID, knownEventID)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *backwardExtremitiesStatements) PurgeBackwardExtremities(
|
||||||
|
ctx context.Context, txn *sql.Tx, roomID string,
|
||||||
|
) error {
|
||||||
|
return fmt.Errorf("not implemented on SQLite")
|
||||||
|
}
|
||||||
|
|
|
@ -19,6 +19,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/internal"
|
"github.com/matrix-org/dendrite/internal"
|
||||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
|
@ -183,3 +184,9 @@ func (s *inviteEventsStatements) SelectMaxInviteID(
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *inviteEventsStatements) PurgeInvites(
|
||||||
|
ctx context.Context, txn *sql.Tx, roomID string,
|
||||||
|
) error {
|
||||||
|
return fmt.Errorf("not implemented on SQLite")
|
||||||
|
}
|
||||||
|
|
|
@ -170,3 +170,9 @@ func (s *membershipsStatements) SelectMembershipForUser(
|
||||||
}
|
}
|
||||||
return membership, topologyPos, nil
|
return membership, topologyPos, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *membershipsStatements) PurgeMemberships(
|
||||||
|
ctx context.Context, txn *sql.Tx, roomID string,
|
||||||
|
) error {
|
||||||
|
return fmt.Errorf("not implemented on SQLite")
|
||||||
|
}
|
||||||
|
|
|
@ -17,6 +17,7 @@ package sqlite3
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/internal"
|
"github.com/matrix-org/dendrite/internal"
|
||||||
"github.com/matrix-org/dendrite/internal/eventutil"
|
"github.com/matrix-org/dendrite/internal/eventutil"
|
||||||
|
@ -112,3 +113,9 @@ func (r *notificationDataStatements) SelectMaxID(ctx context.Context, txn *sql.T
|
||||||
err := sqlutil.TxStmt(txn, r.selectMaxID).QueryRowContext(ctx).Scan(&id)
|
err := sqlutil.TxStmt(txn, r.selectMaxID).QueryRowContext(ctx).Scan(&id)
|
||||||
return id, err
|
return id, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *notificationDataStatements) PurgeNotificationData(
|
||||||
|
ctx context.Context, txn *sql.Tx, roomID string,
|
||||||
|
) error {
|
||||||
|
return fmt.Errorf("not implemented on SQLite")
|
||||||
|
}
|
||||||
|
|
|
@ -628,3 +628,9 @@ func unmarshalStateIDs(addIDsJSON, delIDsJSON string) (addIDs []string, delIDs [
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *outputRoomEventsStatements) PurgeEvents(
|
||||||
|
ctx context.Context, txn *sql.Tx, roomID string,
|
||||||
|
) error {
|
||||||
|
return fmt.Errorf("not implemented on SQLite")
|
||||||
|
}
|
||||||
|
|
|
@ -17,6 +17,7 @@ package sqlite3
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
"github.com/matrix-org/dendrite/syncapi/storage/tables"
|
"github.com/matrix-org/dendrite/syncapi/storage/tables"
|
||||||
|
@ -190,3 +191,9 @@ func (s *outputRoomEventsTopologyStatements) SelectMaxPositionInTopology(
|
||||||
err = stmt.QueryRowContext(ctx, roomID).Scan(&pos, &spos)
|
err = stmt.QueryRowContext(ctx, roomID).Scan(&pos, &spos)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *outputRoomEventsTopologyStatements) PurgeEventsTopology(
|
||||||
|
ctx context.Context, txn *sql.Tx, roomID string,
|
||||||
|
) error {
|
||||||
|
return fmt.Errorf("not implemented on SQLite")
|
||||||
|
}
|
||||||
|
|
|
@ -17,6 +17,7 @@ package sqlite3
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/internal"
|
"github.com/matrix-org/dendrite/internal"
|
||||||
|
@ -204,3 +205,9 @@ func (s *peekStatements) SelectMaxPeekID(
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *peekStatements) PurgePeeks(
|
||||||
|
ctx context.Context, txn *sql.Tx, roomID string,
|
||||||
|
) error {
|
||||||
|
return fmt.Errorf("not implemented on SQLite")
|
||||||
|
}
|
||||||
|
|
|
@ -148,3 +148,9 @@ func (s *receiptStatements) SelectMaxReceiptID(
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *receiptStatements) PurgeReceipts(
|
||||||
|
ctx context.Context, txn *sql.Tx, roomID string,
|
||||||
|
) error {
|
||||||
|
return fmt.Errorf("not implemented on SQLite")
|
||||||
|
}
|
||||||
|
|
|
@ -39,6 +39,7 @@ type Invites interface {
|
||||||
// for the room.
|
// for the room.
|
||||||
SelectInviteEventsInRange(ctx context.Context, txn *sql.Tx, targetUserID string, r types.Range) (invites map[string]*gomatrixserverlib.HeaderedEvent, retired map[string]*gomatrixserverlib.HeaderedEvent, err error)
|
SelectInviteEventsInRange(ctx context.Context, txn *sql.Tx, targetUserID string, r types.Range) (invites map[string]*gomatrixserverlib.HeaderedEvent, retired map[string]*gomatrixserverlib.HeaderedEvent, err error)
|
||||||
SelectMaxInviteID(ctx context.Context, txn *sql.Tx) (id int64, err error)
|
SelectMaxInviteID(ctx context.Context, txn *sql.Tx) (id int64, err error)
|
||||||
|
PurgeInvites(ctx context.Context, txn *sql.Tx, roomID string) error
|
||||||
}
|
}
|
||||||
|
|
||||||
type Peeks interface {
|
type Peeks interface {
|
||||||
|
@ -48,6 +49,7 @@ type Peeks interface {
|
||||||
SelectPeeksInRange(ctxt context.Context, txn *sql.Tx, userID, deviceID string, r types.Range) (peeks []types.Peek, err error)
|
SelectPeeksInRange(ctxt context.Context, txn *sql.Tx, userID, deviceID string, r types.Range) (peeks []types.Peek, err error)
|
||||||
SelectPeekingDevices(ctxt context.Context) (peekingDevices map[string][]types.PeekingDevice, err error)
|
SelectPeekingDevices(ctxt context.Context) (peekingDevices map[string][]types.PeekingDevice, err error)
|
||||||
SelectMaxPeekID(ctx context.Context, txn *sql.Tx) (id int64, err error)
|
SelectMaxPeekID(ctx context.Context, txn *sql.Tx) (id int64, err error)
|
||||||
|
PurgePeeks(ctx context.Context, txn *sql.Tx, roomID string) error
|
||||||
}
|
}
|
||||||
|
|
||||||
type Events interface {
|
type Events interface {
|
||||||
|
@ -75,6 +77,8 @@ type Events interface {
|
||||||
SelectContextEvent(ctx context.Context, txn *sql.Tx, roomID, eventID string) (int, gomatrixserverlib.HeaderedEvent, error)
|
SelectContextEvent(ctx context.Context, txn *sql.Tx, roomID, eventID string) (int, gomatrixserverlib.HeaderedEvent, error)
|
||||||
SelectContextBeforeEvent(ctx context.Context, txn *sql.Tx, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) ([]*gomatrixserverlib.HeaderedEvent, error)
|
SelectContextBeforeEvent(ctx context.Context, txn *sql.Tx, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) ([]*gomatrixserverlib.HeaderedEvent, error)
|
||||||
SelectContextAfterEvent(ctx context.Context, txn *sql.Tx, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) (int, []*gomatrixserverlib.HeaderedEvent, error)
|
SelectContextAfterEvent(ctx context.Context, txn *sql.Tx, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) (int, []*gomatrixserverlib.HeaderedEvent, error)
|
||||||
|
|
||||||
|
PurgeEvents(ctx context.Context, txn *sql.Tx, roomID string) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// Topology keeps track of the depths and stream positions for all events.
|
// Topology keeps track of the depths and stream positions for all events.
|
||||||
|
@ -94,6 +98,7 @@ type Topology interface {
|
||||||
SelectMaxPositionInTopology(ctx context.Context, txn *sql.Tx, roomID string) (depth types.StreamPosition, spos types.StreamPosition, err error)
|
SelectMaxPositionInTopology(ctx context.Context, txn *sql.Tx, roomID string) (depth types.StreamPosition, spos types.StreamPosition, err error)
|
||||||
// SelectStreamToTopologicalPosition converts a stream position to a topological position by finding the nearest topological position in the room.
|
// SelectStreamToTopologicalPosition converts a stream position to a topological position by finding the nearest topological position in the room.
|
||||||
SelectStreamToTopologicalPosition(ctx context.Context, txn *sql.Tx, roomID string, streamPos types.StreamPosition, forward bool) (topoPos types.StreamPosition, err error)
|
SelectStreamToTopologicalPosition(ctx context.Context, txn *sql.Tx, roomID string, streamPos types.StreamPosition, forward bool) (topoPos types.StreamPosition, err error)
|
||||||
|
PurgeEventsTopology(ctx context.Context, txn *sql.Tx, roomID string) error
|
||||||
}
|
}
|
||||||
|
|
||||||
type CurrentRoomState interface {
|
type CurrentRoomState interface {
|
||||||
|
@ -144,6 +149,7 @@ type BackwardsExtremities interface {
|
||||||
SelectBackwardExtremitiesForRoom(ctx context.Context, roomID string) (bwExtrems map[string][]string, err error)
|
SelectBackwardExtremitiesForRoom(ctx context.Context, roomID string) (bwExtrems map[string][]string, err error)
|
||||||
// DeleteBackwardExtremity removes a backwards extremity for a room, if one existed.
|
// DeleteBackwardExtremity removes a backwards extremity for a room, if one existed.
|
||||||
DeleteBackwardExtremity(ctx context.Context, txn *sql.Tx, roomID, knownEventID string) (err error)
|
DeleteBackwardExtremity(ctx context.Context, txn *sql.Tx, roomID, knownEventID string) (err error)
|
||||||
|
PurgeBackwardExtremities(ctx context.Context, txn *sql.Tx, roomID string) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// SendToDevice tracks send-to-device messages which are sent to individual
|
// SendToDevice tracks send-to-device messages which are sent to individual
|
||||||
|
@ -179,6 +185,7 @@ type Receipts interface {
|
||||||
UpsertReceipt(ctx context.Context, txn *sql.Tx, roomId, receiptType, userId, eventId string, timestamp gomatrixserverlib.Timestamp) (pos types.StreamPosition, err error)
|
UpsertReceipt(ctx context.Context, txn *sql.Tx, roomId, receiptType, userId, eventId string, timestamp gomatrixserverlib.Timestamp) (pos types.StreamPosition, err error)
|
||||||
SelectRoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []types.OutputReceiptEvent, error)
|
SelectRoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []types.OutputReceiptEvent, error)
|
||||||
SelectMaxReceiptID(ctx context.Context, txn *sql.Tx) (id int64, err error)
|
SelectMaxReceiptID(ctx context.Context, txn *sql.Tx) (id int64, err error)
|
||||||
|
PurgeReceipts(ctx context.Context, txn *sql.Tx, roomID string) error
|
||||||
}
|
}
|
||||||
|
|
||||||
type Memberships interface {
|
type Memberships interface {
|
||||||
|
@ -186,12 +193,14 @@ type Memberships interface {
|
||||||
SelectMembershipCount(ctx context.Context, txn *sql.Tx, roomID, membership string, pos types.StreamPosition) (count int, err error)
|
SelectMembershipCount(ctx context.Context, txn *sql.Tx, roomID, membership string, pos types.StreamPosition) (count int, err error)
|
||||||
SelectHeroes(ctx context.Context, txn *sql.Tx, roomID, userID string, memberships []string) (heroes []string, err error)
|
SelectHeroes(ctx context.Context, txn *sql.Tx, roomID, userID string, memberships []string) (heroes []string, err error)
|
||||||
SelectMembershipForUser(ctx context.Context, txn *sql.Tx, roomID, userID string, pos int64) (membership string, topologicalPos int, err error)
|
SelectMembershipForUser(ctx context.Context, txn *sql.Tx, roomID, userID string, pos int64) (membership string, topologicalPos int, err error)
|
||||||
|
PurgeMemberships(ctx context.Context, txn *sql.Tx, roomID string) error
|
||||||
}
|
}
|
||||||
|
|
||||||
type NotificationData interface {
|
type NotificationData interface {
|
||||||
UpsertRoomUnreadCounts(ctx context.Context, txn *sql.Tx, userID, roomID string, notificationCount, highlightCount int) (types.StreamPosition, error)
|
UpsertRoomUnreadCounts(ctx context.Context, txn *sql.Tx, userID, roomID string, notificationCount, highlightCount int) (types.StreamPosition, error)
|
||||||
SelectUserUnreadCounts(ctx context.Context, txn *sql.Tx, userID string, fromExcl, toIncl types.StreamPosition) (map[string]*eventutil.NotificationData, error)
|
SelectUserUnreadCounts(ctx context.Context, txn *sql.Tx, userID string, fromExcl, toIncl types.StreamPosition) (map[string]*eventutil.NotificationData, error)
|
||||||
SelectMaxID(ctx context.Context, txn *sql.Tx) (int64, error)
|
SelectMaxID(ctx context.Context, txn *sql.Tx) (int64, error)
|
||||||
|
PurgeNotificationData(ctx context.Context, txn *sql.Tx, roomID string) error
|
||||||
}
|
}
|
||||||
|
|
||||||
type Ignores interface {
|
type Ignores interface {
|
||||||
|
|
Loading…
Reference in a new issue