Compare commits

...

47 commits

Author SHA1 Message Date
Neil Alexander 3b24f1908c
Merge branch 'main' into neilalexander/jassuko 2022-09-23 13:05:27 +01:00
Neil Alexander 1055206194
Merge branch 'main' into neilalexander/jassuko 2022-09-22 14:57:40 +01:00
Neil Alexander 181cc34af8
Merge branch 'main' into neilalexander/jassuko 2022-09-20 15:42:11 +01:00
Neil Alexander 1464403215
Merge branch 'main' into neilalexander/jassuko 2022-09-16 10:48:21 +01:00
Neil Alexander ec49e0919c
Merge branch 'neilalexander/downloadstate' into neilalexander/jassuko 2022-09-14 11:59:48 +01:00
Neil Alexander e87f576303
Send the glue event 2022-09-14 11:23:21 +01:00
Neil Alexander 07df8fdd42
Add an empty body 2022-09-14 11:22:00 +01:00
Neil Alexander 19e0ce855d
Get 2022-09-14 11:20:52 +01:00
Neil Alexander d4cdab0a44
Admin API to redownload room state from another server 2022-09-14 11:18:50 +01:00
Neil Alexander 1728a95239
Merge branch 'neilalexander/loadmembershipatevent' into neilalexander/jassuko 2022-09-13 11:16:00 +01:00
Neil Alexander 1ca58ec9d1
Merge branch 'main' into neilalexander/purgeroom 2022-09-13 11:15:14 +01:00
Neil Alexander 7928138034
Exclude events we don't know state for 2022-09-13 11:10:57 +01:00
Neil Alexander c279229f6f
Tweak LoadMembershipAtEvent 2022-09-13 10:48:50 +01:00
Neil Alexander 6df7b43378
Merge branch 'main' into neilalexander/purgeroom 2022-09-12 15:36:12 +01:00
Neil Alexander 31b71df1cd
Merge branch 'main' into neilalexander/purgeroom 2022-09-12 13:58:35 +01:00
Neil Alexander 47be521dd9
Merge branch 'main' into neilalexander/purgeroom 2022-09-11 20:42:32 +01:00
Neil Alexander 03e72e9832
Merge branch 'main' into neilalexander/purgeroom 2022-09-07 15:15:10 +01:00
Neil Alexander 049ebda94c
Merge branch 'main' into neilalexander/purgeroom 2022-09-07 13:50:07 +01:00
Neil Alexander 9c5df79799
Merge branch 'main' into neilalexander/purgeroom 2022-09-01 15:23:55 +01:00
Neil Alexander 4c3810a76b
Merge branch 'main' into neilalexander/purgeroom 2022-09-01 09:26:14 +01:00
Neil Alexander 9a4ff4f128
Merge branch 'main' into neilalexander/purgeroom 2022-08-30 22:22:25 +01:00
Neil Alexander e1eccbbfb7
Merge branch 'main' into neilalexander/purgeroom 2022-08-30 16:38:42 +01:00
Neil Alexander 08a0dd5932
Merge branch 'main' into neilalexander/purgeroom 2022-08-28 09:54:41 +01:00
Neil Alexander ef0dcf6bee
Use background context, don't fall through 2022-08-25 14:38:19 +01:00
Neil Alexander 1622df2499
Merge branch 'main' into neilalexander/purgeroom 2022-08-25 14:37:07 +01:00
Neil Alexander 11856a502e
Merge branch 'main' into neilalexander/purgeroom 2022-08-25 12:11:15 +01:00
Neil Alexander b7258dcc00
Let the roomserver work first 2022-08-25 12:11:00 +01:00
Neil Alexander c1d6e18152
Lock the room during a purge operation to prevent other input events from populating mid-operation 2022-08-25 12:09:34 +01:00
Neil Alexander 295ba38937
Merge branch 'main' into neilalexander/purgeroom 2022-08-25 10:00:27 +01:00
Neil Alexander 965f532bb0
Merge branch 'main' into neilalexander/purgeroom 2022-08-24 14:47:11 +01:00
Neil Alexander e31469682d
Send the output event first 2022-08-22 14:14:29 +01:00
Neil Alexander 4e4fc400a2
Consolidate purge statements as they need to be prepared after other tables are created 2022-08-22 13:48:09 +01:00
Neil Alexander 468d4b5bbe
Fix joined hosts 2022-08-22 13:15:56 +01:00
Neil Alexander b452d774e4
Purge the federation API 2022-08-22 13:13:58 +01:00
Neil Alexander cd81635cf4
Let us retry 2022-08-22 12:52:22 +01:00
Neil Alexander db9ca401b0
Wire that up 2022-08-22 12:49:04 +01:00
Neil Alexander 53e218914d
Purge sync API too 2022-08-22 12:45:58 +01:00
Neil Alexander c45bf118a8
Send output event for purge 2022-08-22 12:27:17 +01:00
Neil Alexander 4c4de0dfc6
Purge the room entry too 2022-08-22 12:05:38 +01:00
Neil Alexander b56011514b
Fix a query 2022-08-22 12:01:23 +01:00
Neil Alexander 83c7984dba
Fix table name 2022-08-22 11:58:17 +01:00
Neil Alexander 6442aea104
Fix API trace 2022-08-22 11:57:00 +01:00
Neil Alexander 856fe9d62b
Try adding an endpoint for it 2022-08-22 11:55:22 +01:00
Neil Alexander c1dcd5b218
SQLite stubs 2022-08-22 11:49:46 +01:00
Neil Alexander d11c243599
More PostgreSQL purging 2022-08-22 11:46:44 +01:00
Neil Alexander 537f300edf
Some PostgreSQL purging 2022-08-22 11:26:18 +01:00
Neil Alexander 4ef780c076
Define output message 2022-08-22 10:38:19 +01:00
42 changed files with 876 additions and 12 deletions

View file

@ -1,6 +1,7 @@
package routing
import (
"context"
"encoding/json"
"net/http"
@ -90,6 +91,37 @@ func AdminEvacuateUser(req *http.Request, cfg *config.ClientAPI, device *userapi
}
}
func AdminPurgeRoom(req *http.Request, cfg *config.ClientAPI, device *userapi.Device, rsAPI roomserverAPI.ClientRoomserverAPI) util.JSONResponse {
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
if err != nil {
return util.ErrorResponse(err)
}
roomID, ok := vars["roomID"]
if !ok {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.MissingArgument("Expecting room ID."),
}
}
res := &roomserverAPI.PerformAdminPurgeRoomResponse{}
if err := rsAPI.PerformAdminPurgeRoom(
context.Background(),
&roomserverAPI.PerformAdminPurgeRoomRequest{
RoomID: roomID,
},
res,
); err != nil {
return util.ErrorResponse(err)
}
if err := res.Error; err != nil {
return err.JSONResponse()
}
return util.JSONResponse{
Code: 200,
JSON: res,
}
}
func AdminResetPassword(req *http.Request, cfg *config.ClientAPI, device *userapi.Device, userAPI userapi.ClientUserAPI) util.JSONResponse {
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
if err != nil {
@ -138,3 +170,43 @@ func AdminResetPassword(req *http.Request, cfg *config.ClientAPI, device *userap
},
}
}
func AdminDownloadState(req *http.Request, cfg *config.ClientAPI, device *userapi.Device, rsAPI roomserverAPI.ClientRoomserverAPI) util.JSONResponse {
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
if err != nil {
return util.ErrorResponse(err)
}
roomID, ok := vars["roomID"]
if !ok {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.MissingArgument("Expecting room ID."),
}
}
serverName, ok := vars["serverName"]
if !ok {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.MissingArgument("Expecting remote server name."),
}
}
res := &roomserverAPI.PerformAdminDownloadStateResponse{}
if err := rsAPI.PerformAdminDownloadState(
req.Context(),
&roomserverAPI.PerformAdminDownloadStateRequest{
UserID: device.UserID,
RoomID: roomID,
ServerName: gomatrixserverlib.ServerName(serverName),
},
res,
); err != nil {
return jsonerror.InternalAPIError(req.Context(), err)
}
if err := res.Error; err != nil {
return err.JSONResponse()
}
return util.JSONResponse{
Code: 200,
JSON: map[string]interface{}{},
}
}

View file

@ -155,12 +155,24 @@ func Setup(
}),
).Methods(http.MethodGet, http.MethodOptions)
dendriteAdminRouter.Handle("/admin/purgeRoom/{roomID}",
httputil.MakeAdminAPI("admin_purge_room", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
return AdminPurgeRoom(req, cfg, device, rsAPI)
}),
).Methods(http.MethodGet, http.MethodOptions)
dendriteAdminRouter.Handle("/admin/resetPassword/{localpart}",
httputil.MakeAdminAPI("admin_reset_password", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
return AdminResetPassword(req, cfg, device, userAPI)
}),
).Methods(http.MethodPost, http.MethodOptions)
dendriteAdminRouter.Handle("/admin/downloadState/{serverName}/{roomID}",
httputil.MakeAdminAPI("admin_download_state", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
return AdminDownloadState(req, cfg, device, rsAPI)
}),
).Methods(http.MethodGet, http.MethodOptions)
// server notifications
if cfg.Matrix.ServerNotices.Enabled {
logrus.Info("Enabling server notices at /_synapse/admin/v1/send_server_notice")

View file

@ -21,6 +21,7 @@ import (
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
"github.com/sirupsen/logrus"
log "github.com/sirupsen/logrus"
"github.com/matrix-org/dendrite/federationapi/queue"
@ -110,6 +111,14 @@ func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msgs []*nats.Ms
return false
}
case api.OutputTypePurgeRoom:
log.WithField("room_id", output.PurgeRoom.RoomID).Warn("Purging room from federation API")
if err := s.db.PurgeRoom(ctx, output.PurgeRoom.RoomID); err != nil {
logrus.WithField("room_id", output.PurgeRoom.RoomID).WithError(err).Error("Failed to purge room from federation API")
} else {
logrus.WithField("room_id", output.PurgeRoom.RoomID).Warn("Room purged from federation API")
}
default:
log.WithField("type", output.Type).Debug(
"roomserver output log: ignoring unknown output type",

View file

@ -73,4 +73,6 @@ type Database interface {
GetNotaryKeys(ctx context.Context, serverName gomatrixserverlib.ServerName, optKeyIDs []gomatrixserverlib.KeyID) ([]gomatrixserverlib.ServerKeys, error)
// DeleteExpiredEDUs cleans up expired EDUs
DeleteExpiredEDUs(ctx context.Context) error
PurgeRoom(ctx context.Context, roomID string) error
}

View file

@ -66,6 +66,9 @@ const selectAllJoinedHostsSQL = "" +
const selectJoinedHostsForRoomsSQL = "" +
"SELECT DISTINCT server_name FROM federationsender_joined_hosts WHERE room_id = ANY($1)"
const purgeJoinedHostsSQL = "" +
"DELETE FROM federationsender_joined_hosts WHERE room_id = $1"
type joinedHostsStatements struct {
db *sql.DB
insertJoinedHostsStmt *sql.Stmt
@ -74,6 +77,7 @@ type joinedHostsStatements struct {
selectJoinedHostsStmt *sql.Stmt
selectAllJoinedHostsStmt *sql.Stmt
selectJoinedHostsForRoomsStmt *sql.Stmt
purgeJoinedHostsStmt *sql.Stmt
}
func NewPostgresJoinedHostsTable(db *sql.DB) (s *joinedHostsStatements, err error) {
@ -102,6 +106,9 @@ func NewPostgresJoinedHostsTable(db *sql.DB) (s *joinedHostsStatements, err erro
if s.selectJoinedHostsForRoomsStmt, err = s.db.Prepare(selectJoinedHostsForRoomsSQL); err != nil {
return
}
if s.purgeJoinedHostsStmt, err = s.db.Prepare(purgeJoinedHostsSQL); err != nil {
return
}
return
}
@ -210,3 +217,10 @@ func joinedHostsFromStmt(
return result, rows.Err()
}
func (s *joinedHostsStatements) PurgeJoinedHosts(
ctx context.Context, txn *sql.Tx, roomID string,
) error {
_, err := sqlutil.TxStmt(txn, s.purgeJoinedHostsStmt).ExecContext(ctx, roomID)
return err
}

View file

@ -253,3 +253,18 @@ func (d *Database) GetNotaryKeys(
})
return sks, err
}
func (d *Database) PurgeRoom(ctx context.Context, roomID string) error {
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
if err := d.FederationJoinedHosts.PurgeJoinedHosts(ctx, txn, roomID); err != nil {
return fmt.Errorf("failed to purge joined hosts: %w", err)
}
if err := d.FederationInboundPeeks.DeleteInboundPeeks(ctx, txn, roomID); err != nil {
return fmt.Errorf("failed to purge inbound peeks: %w", err)
}
if err := d.FederationOutboundPeeks.DeleteOutboundPeeks(ctx, txn, roomID); err != nil {
return fmt.Errorf("failed to purge outbound peeks: %w", err)
}
return nil
})
}

View file

@ -18,6 +18,7 @@ package sqlite3
import (
"context"
"database/sql"
"fmt"
"strings"
"github.com/matrix-org/dendrite/federationapi/types"
@ -217,3 +218,9 @@ func joinedHostsFromStmt(
return result, nil
}
func (s *joinedHostsStatements) PurgeJoinedHosts(
ctx context.Context, txn *sql.Tx, roomID string,
) error {
return fmt.Errorf("not implemented on SQLite")
}

View file

@ -59,6 +59,7 @@ type FederationJoinedHosts interface {
SelectJoinedHosts(ctx context.Context, roomID string) ([]types.JoinedHost, error)
SelectAllJoinedHosts(ctx context.Context) ([]gomatrixserverlib.ServerName, error)
SelectJoinedHostsForRooms(ctx context.Context, roomIDs []string) ([]gomatrixserverlib.ServerName, error)
PurgeJoinedHosts(ctx context.Context, txn *sql.Tx, roomID string) error
}
type FederationBlacklist interface {

View file

@ -150,6 +150,8 @@ type ClientRoomserverAPI interface {
PerformRoomUpgrade(ctx context.Context, req *PerformRoomUpgradeRequest, resp *PerformRoomUpgradeResponse) error
PerformAdminEvacuateRoom(ctx context.Context, req *PerformAdminEvacuateRoomRequest, res *PerformAdminEvacuateRoomResponse) error
PerformAdminEvacuateUser(ctx context.Context, req *PerformAdminEvacuateUserRequest, res *PerformAdminEvacuateUserResponse) error
PerformAdminPurgeRoom(ctx context.Context, req *PerformAdminPurgeRoomRequest, res *PerformAdminPurgeRoomResponse) error
PerformAdminDownloadState(ctx context.Context, req *PerformAdminDownloadStateRequest, res *PerformAdminDownloadStateResponse) error
PerformPeek(ctx context.Context, req *PerformPeekRequest, res *PerformPeekResponse) error
PerformUnpeek(ctx context.Context, req *PerformUnpeekRequest, res *PerformUnpeekResponse) error
PerformInvite(ctx context.Context, req *PerformInviteRequest, res *PerformInviteResponse) error

View file

@ -131,6 +131,26 @@ func (t *RoomserverInternalAPITrace) PerformAdminEvacuateUser(
return err
}
func (t *RoomserverInternalAPITrace) PerformAdminPurgeRoom(
ctx context.Context,
req *PerformAdminPurgeRoomRequest,
res *PerformAdminPurgeRoomResponse,
) error {
err := t.Impl.PerformAdminPurgeRoom(ctx, req, res)
util.GetLogger(ctx).WithError(err).Infof("PerformAdminPurgeRoom req=%+v res=%+v", js(req), js(res))
return err
}
func (t *RoomserverInternalAPITrace) PerformAdminDownloadState(
ctx context.Context,
req *PerformAdminDownloadStateRequest,
res *PerformAdminDownloadStateResponse,
) error {
err := t.Impl.PerformAdminDownloadState(ctx, req, res)
util.GetLogger(ctx).WithError(err).Infof("PerformAdminDownloadState req=%+v res=%+v", js(req), js(res))
return err
}
func (t *RoomserverInternalAPITrace) PerformInboundPeek(
ctx context.Context,
req *PerformInboundPeekRequest,

View file

@ -55,6 +55,8 @@ const (
OutputTypeNewInboundPeek OutputType = "new_inbound_peek"
// OutputTypeRetirePeek indicates that the kafka event is an OutputRetirePeek
OutputTypeRetirePeek OutputType = "retire_peek"
// OutputTypePurgeRoom indicates the event is an OutputPurgeRoom
OutputTypePurgeRoom OutputType = "purge_room"
)
// An OutputEvent is an entry in the roomserver output kafka log.
@ -78,6 +80,8 @@ type OutputEvent struct {
NewInboundPeek *OutputNewInboundPeek `json:"new_inbound_peek,omitempty"`
// The content of event with type OutputTypeRetirePeek
RetirePeek *OutputRetirePeek `json:"retire_peek,omitempty"`
// The content of the event with type OutputPurgeRoom
PurgeRoom *OutputPurgeRoom `json:"purge_room,omitempty"`
}
// Type of the OutputNewRoomEvent.
@ -257,3 +261,7 @@ type OutputRetirePeek struct {
UserID string
DeviceID string
}
type OutputPurgeRoom struct {
RoomID string
}

View file

@ -234,3 +234,21 @@ type PerformAdminEvacuateUserResponse struct {
Affected []string `json:"affected"`
Error *PerformError
}
type PerformAdminPurgeRoomRequest struct {
RoomID string `json:"room_id"`
}
type PerformAdminPurgeRoomResponse struct {
Error *PerformError `json:"error,omitempty"`
}
type PerformAdminDownloadStateRequest struct {
RoomID string `json:"room_id"`
UserID string `json:"user_id"`
ServerName gomatrixserverlib.ServerName `json:"server_name"`
}
type PerformAdminDownloadStateResponse struct {
Error *PerformError `json:"error,omitempty"`
}

View file

@ -28,6 +28,7 @@ import (
"github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
)
type Admin struct {
@ -231,3 +232,181 @@ func (r *Admin) PerformAdminEvacuateUser(
}
return nil
}
func (r *Admin) PerformAdminPurgeRoom(
ctx context.Context,
req *api.PerformAdminPurgeRoomRequest,
res *api.PerformAdminPurgeRoomResponse,
) error {
if _, _, err := gomatrixserverlib.SplitID('!', req.RoomID); err != nil {
res.Error = &api.PerformError{
Code: api.PerformErrorBadRequest,
Msg: fmt.Sprintf("Malformed room ID: %s", err),
}
return nil
}
logrus.WithField("room_id", req.RoomID).Warn("Purging room from roomserver")
if err := r.DB.PurgeRoom(ctx, req.RoomID); err != nil {
logrus.WithField("room_id", req.RoomID).WithError(err).Warn("Failed to purge room from roomserver")
res.Error = &api.PerformError{
Code: api.PerformErrorBadRequest,
Msg: err.Error(),
}
return nil
} else {
logrus.WithField("room_id", req.RoomID).Warn("Room purged from roomserver")
}
return r.Inputer.OutputProducer.ProduceRoomEvents(req.RoomID, []api.OutputEvent{
{
Type: api.OutputTypePurgeRoom,
PurgeRoom: &api.OutputPurgeRoom{
RoomID: req.RoomID,
},
},
})
}
func (r *Admin) PerformAdminDownloadState(
ctx context.Context,
req *api.PerformAdminDownloadStateRequest,
res *api.PerformAdminDownloadStateResponse,
) error {
roomInfo, err := r.DB.RoomInfo(ctx, req.RoomID)
if err != nil {
res.Error = &api.PerformError{
Code: api.PerformErrorBadRequest,
Msg: fmt.Sprintf("r.DB.RoomInfo: %s", err),
}
return nil
}
if roomInfo == nil || roomInfo.IsStub() {
res.Error = &api.PerformError{
Code: api.PerformErrorBadRequest,
Msg: fmt.Sprintf("room %q not found", req.RoomID),
}
return nil
}
fwdExtremities, _, depth, err := r.DB.LatestEventIDs(ctx, roomInfo.RoomNID)
if err != nil {
res.Error = &api.PerformError{
Code: api.PerformErrorBadRequest,
Msg: fmt.Sprintf("r.DB.LatestEventIDs: %s", err),
}
return nil
}
authEventMap := map[string]*gomatrixserverlib.Event{}
stateEventMap := map[string]*gomatrixserverlib.Event{}
for _, fwdExtremity := range fwdExtremities {
var state gomatrixserverlib.RespState
state, err = r.Inputer.FSAPI.LookupState(ctx, req.ServerName, req.RoomID, fwdExtremity.EventID, roomInfo.RoomVersion)
if err != nil {
res.Error = &api.PerformError{
Code: api.PerformErrorBadRequest,
Msg: fmt.Sprintf("r.Inputer.FSAPI.LookupState (%q): %s", fwdExtremity.EventID, err),
}
return nil
}
for _, authEvent := range state.AuthEvents.UntrustedEvents(roomInfo.RoomVersion) {
if err = authEvent.VerifyEventSignatures(ctx, r.Inputer.KeyRing); err != nil {
continue
}
authEventMap[authEvent.EventID()] = authEvent
}
for _, stateEvent := range state.StateEvents.UntrustedEvents(roomInfo.RoomVersion) {
if err = stateEvent.VerifyEventSignatures(ctx, r.Inputer.KeyRing); err != nil {
continue
}
stateEventMap[stateEvent.EventID()] = stateEvent
}
}
authEvents := make([]*gomatrixserverlib.HeaderedEvent, 0, len(authEventMap))
stateEvents := make([]*gomatrixserverlib.HeaderedEvent, 0, len(stateEventMap))
stateIDs := make([]string, 0, len(stateEventMap))
for _, authEvent := range authEventMap {
authEvents = append(authEvents, authEvent.Headered(roomInfo.RoomVersion))
}
for _, stateEvent := range stateEventMap {
stateEvents = append(stateEvents, stateEvent.Headered(roomInfo.RoomVersion))
stateIDs = append(stateIDs, stateEvent.EventID())
}
builder := &gomatrixserverlib.EventBuilder{
Type: "org.matrix.dendrite.state_download",
Sender: req.UserID,
RoomID: req.RoomID,
Content: gomatrixserverlib.RawJSON("{}"),
}
eventsNeeded, err := gomatrixserverlib.StateNeededForEventBuilder(builder)
if err != nil {
res.Error = &api.PerformError{
Code: api.PerformErrorBadRequest,
Msg: fmt.Sprintf("gomatrixserverlib.StateNeededForEventBuilder: %s", err),
}
return nil
}
queryRes := &api.QueryLatestEventsAndStateResponse{
RoomExists: true,
RoomVersion: roomInfo.RoomVersion,
LatestEvents: fwdExtremities,
StateEvents: stateEvents,
Depth: depth,
}
ev, err := eventutil.BuildEvent(ctx, builder, r.Cfg.Matrix, time.Now(), &eventsNeeded, queryRes)
if err != nil {
res.Error = &api.PerformError{
Code: api.PerformErrorBadRequest,
Msg: fmt.Sprintf("eventutil.BuildEvent: %s", err),
}
return nil
}
inputReq := &api.InputRoomEventsRequest{
Asynchronous: false,
}
inputRes := &api.InputRoomEventsResponse{}
for _, authEvent := range append(authEvents, stateEvents...) {
inputReq.InputRoomEvents = append(inputReq.InputRoomEvents, api.InputRoomEvent{
Kind: api.KindOutlier,
Event: authEvent,
Origin: authEvent.Origin(),
})
}
inputReq.InputRoomEvents = append(inputReq.InputRoomEvents, api.InputRoomEvent{
Kind: api.KindNew,
Event: ev,
Origin: r.Cfg.Matrix.ServerName,
HasState: true,
StateEventIDs: stateIDs,
SendAsServer: string(r.Cfg.Matrix.ServerName),
})
if err := r.Inputer.InputRoomEvents(ctx, inputReq, inputRes); err != nil {
res.Error = &api.PerformError{
Code: api.PerformErrorBadRequest,
Msg: fmt.Sprintf("r.Inputer.InputRoomEvents: %s", err),
}
return nil
}
if inputRes.ErrMsg != "" {
res.Error = &api.PerformError{
Code: api.PerformErrorBadRequest,
Msg: inputRes.ErrMsg,
}
}
return nil
}

View file

@ -27,18 +27,20 @@ const (
RoomserverInputRoomEventsPath = "/roomserver/inputRoomEvents"
// Perform operations
RoomserverPerformInvitePath = "/roomserver/performInvite"
RoomserverPerformPeekPath = "/roomserver/performPeek"
RoomserverPerformUnpeekPath = "/roomserver/performUnpeek"
RoomserverPerformRoomUpgradePath = "/roomserver/performRoomUpgrade"
RoomserverPerformJoinPath = "/roomserver/performJoin"
RoomserverPerformLeavePath = "/roomserver/performLeave"
RoomserverPerformBackfillPath = "/roomserver/performBackfill"
RoomserverPerformPublishPath = "/roomserver/performPublish"
RoomserverPerformInboundPeekPath = "/roomserver/performInboundPeek"
RoomserverPerformForgetPath = "/roomserver/performForget"
RoomserverPerformAdminEvacuateRoomPath = "/roomserver/performAdminEvacuateRoom"
RoomserverPerformAdminEvacuateUserPath = "/roomserver/performAdminEvacuateUser"
RoomserverPerformInvitePath = "/roomserver/performInvite"
RoomserverPerformPeekPath = "/roomserver/performPeek"
RoomserverPerformUnpeekPath = "/roomserver/performUnpeek"
RoomserverPerformRoomUpgradePath = "/roomserver/performRoomUpgrade"
RoomserverPerformJoinPath = "/roomserver/performJoin"
RoomserverPerformLeavePath = "/roomserver/performLeave"
RoomserverPerformBackfillPath = "/roomserver/performBackfill"
RoomserverPerformPublishPath = "/roomserver/performPublish"
RoomserverPerformInboundPeekPath = "/roomserver/performInboundPeek"
RoomserverPerformForgetPath = "/roomserver/performForget"
RoomserverPerformAdminEvacuateRoomPath = "/roomserver/performAdminEvacuateRoom"
RoomserverPerformAdminEvacuateUserPath = "/roomserver/performAdminEvacuateUser"
RoomserverPerformAdminPurgeRoomPath = "/roomserver/performAdminPurgeRoom"
RoomserverPerformAdminDownloadStatePath = "/roomserver/performAdminDownloadState"
// Query operations
RoomserverQueryLatestEventsAndStatePath = "/roomserver/queryLatestEventsAndState"
@ -261,6 +263,17 @@ func (h *httpRoomserverInternalAPI) PerformAdminEvacuateRoom(
)
}
func (h *httpRoomserverInternalAPI) PerformAdminDownloadState(
ctx context.Context,
request *api.PerformAdminDownloadStateRequest,
response *api.PerformAdminDownloadStateResponse,
) error {
return httputil.CallInternalRPCAPI(
"PerformAdminDownloadState", h.roomserverURL+RoomserverPerformAdminDownloadStatePath,
h.httpClient, ctx, request, response,
)
}
func (h *httpRoomserverInternalAPI) PerformAdminEvacuateUser(
ctx context.Context,
request *api.PerformAdminEvacuateUserRequest,
@ -272,6 +285,17 @@ func (h *httpRoomserverInternalAPI) PerformAdminEvacuateUser(
)
}
func (h *httpRoomserverInternalAPI) PerformAdminPurgeRoom(
ctx context.Context,
request *api.PerformAdminPurgeRoomRequest,
response *api.PerformAdminPurgeRoomResponse,
) error {
return httputil.CallInternalRPCAPI(
"PerformAdminPurgeRoom", h.roomserverURL+RoomserverPerformAdminPurgeRoomPath,
h.httpClient, ctx, request, response,
)
}
// QueryLatestEventsAndState implements RoomserverQueryAPI
func (h *httpRoomserverInternalAPI) QueryLatestEventsAndState(
ctx context.Context,

View file

@ -65,6 +65,16 @@ func AddRoutes(r api.RoomserverInternalAPI, internalAPIMux *mux.Router) {
httputil.MakeInternalRPCAPI("RoomserverPerformAdminEvacuateUser", r.PerformAdminEvacuateUser),
)
internalAPIMux.Handle(
RoomserverPerformAdminPurgeRoomPath,
httputil.MakeInternalRPCAPI("RoomserverPerformAdminPurgeRoom", r.PerformAdminPurgeRoom),
)
internalAPIMux.Handle(
RoomserverPerformAdminDownloadStatePath,
httputil.MakeInternalRPCAPI("RoomserverPerformAdminDownloadState", r.PerformAdminDownloadState),
)
internalAPIMux.Handle(
RoomserverQueryPublishedRoomsPath,
httputil.MakeInternalRPCAPI("RoomserverQueryPublishedRooms", r.QueryPublishedRooms),

View file

@ -170,4 +170,5 @@ type Database interface {
ForgetRoom(ctx context.Context, userID, roomID string, forget bool) error
GetHistoryVisibilityState(ctx context.Context, roomInfo *types.RoomInfo, eventID string, domain string) ([]*gomatrixserverlib.Event, error)
PurgeRoom(ctx context.Context, roomID string) error
}

View file

@ -0,0 +1,159 @@
package postgres
import (
"context"
"database/sql"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/roomserver/types"
)
const purgeEventJSONSQL = "" +
"DELETE FROM roomserver_event_json WHERE event_nid = ANY(" +
" SELECT event_nid FROM roomserver_events WHERE room_nid = $1" +
")"
const purgeEventsSQL = "" +
"DELETE FROM roomserver_events WHERE room_nid = $1"
const purgeInvitesSQL = "" +
"DELETE FROM roomserver_invites WHERE room_nid = $1"
const purgeMembershipsSQL = "" +
"DELETE FROM roomserver_membership WHERE room_nid = $1"
const purgePreviousEventsSQL = "" +
"DELETE FROM roomserver_previous_events WHERE event_nids && ANY(" +
" SELECT ARRAY_AGG(event_nid) FROM roomserver_events WHERE room_nid = $1" +
")"
const purgePublishedSQL = "" +
"DELETE FROM roomserver_published WHERE room_id = $1"
const purgeRedactionsSQL = "" +
"DELETE FROM roomserver_redactions WHERE redaction_event_id = ANY(" +
" SELECT event_id FROM roomserver_events WHERE room_nid = $1" +
")"
const purgeRoomAliasesSQL = "" +
"DELETE FROM roomserver_room_aliases WHERE room_id = $1"
const purgeRoomSQL = "" +
"DELETE FROM roomserver_rooms WHERE room_nid = $1"
const purgeStateBlockEntriesSQL = "" +
"DELETE FROM roomserver_state_block WHERE state_block_nid = ANY(" +
" SELECT DISTINCT UNNEST(state_block_nids) FROM roomserver_state_snapshots WHERE room_nid = $1" +
")"
const purgeStateSnapshotEntriesSQL = "" +
"DELETE FROM roomserver_state_snapshots WHERE room_nid = $1"
type purgeStatements struct {
purgeEventJSONStmt *sql.Stmt
purgeEventsStmt *sql.Stmt
purgeInvitesStmt *sql.Stmt
purgeMembershipsStmt *sql.Stmt
purgePreviousEventsStmt *sql.Stmt
purgePublishedStmt *sql.Stmt
purgeRedactionStmt *sql.Stmt
purgeRoomAliasesStmt *sql.Stmt
purgeRoomStmt *sql.Stmt
purgeStateBlockEntriesStmt *sql.Stmt
purgeStateSnapshotEntriesStmt *sql.Stmt
}
func PreparePurgeStatements(db *sql.DB) (*purgeStatements, error) {
s := &purgeStatements{}
return s, sqlutil.StatementList{
{&s.purgeEventJSONStmt, purgeEventJSONSQL},
{&s.purgeEventsStmt, purgeEventsSQL},
{&s.purgeInvitesStmt, purgeInvitesSQL},
{&s.purgeMembershipsStmt, purgeMembershipsSQL},
{&s.purgePublishedStmt, purgePublishedSQL},
{&s.purgePreviousEventsStmt, purgePreviousEventsSQL},
{&s.purgeRedactionStmt, purgeRedactionsSQL},
{&s.purgeRoomAliasesStmt, purgeRoomAliasesSQL},
{&s.purgeRoomStmt, purgeRoomSQL},
{&s.purgeStateBlockEntriesStmt, purgeStateBlockEntriesSQL},
{&s.purgeStateSnapshotEntriesStmt, purgeStateSnapshotEntriesSQL},
}.Prepare(db)
}
func (s *purgeStatements) PurgeEventJSONs(
ctx context.Context, txn *sql.Tx, roomNID types.RoomNID,
) error {
_, err := sqlutil.TxStmt(txn, s.purgeEventJSONStmt).ExecContext(ctx, roomNID)
return err
}
func (s *purgeStatements) PurgeEvents(
ctx context.Context, txn *sql.Tx, roomNID types.RoomNID,
) error {
_, err := sqlutil.TxStmt(txn, s.purgeEventsStmt).ExecContext(ctx, roomNID)
return err
}
func (s *purgeStatements) PurgeInvites(
ctx context.Context, txn *sql.Tx, roomNID types.RoomNID,
) error {
_, err := sqlutil.TxStmt(txn, s.purgeInvitesStmt).ExecContext(ctx, roomNID)
return err
}
func (s *purgeStatements) PurgeMemberships(
ctx context.Context, txn *sql.Tx, roomNID types.RoomNID,
) error {
_, err := sqlutil.TxStmt(txn, s.purgeMembershipsStmt).ExecContext(ctx, roomNID)
return err
}
func (s *purgeStatements) PurgePreviousEvents(
ctx context.Context, txn *sql.Tx, roomNID types.RoomNID,
) error {
_, err := sqlutil.TxStmt(txn, s.purgePreviousEventsStmt).ExecContext(ctx, roomNID)
return err
}
func (s *purgeStatements) PurgePublished(
ctx context.Context, txn *sql.Tx, roomID string,
) error {
_, err := sqlutil.TxStmt(txn, s.purgePublishedStmt).ExecContext(ctx, roomID)
return err
}
func (s *purgeStatements) PurgeRedactions(
ctx context.Context, txn *sql.Tx, roomNID types.RoomNID,
) error {
_, err := sqlutil.TxStmt(txn, s.purgeRedactionStmt).ExecContext(ctx, roomNID)
return err
}
func (s *purgeStatements) PurgeRoomAliases(
ctx context.Context, txn *sql.Tx, roomID string,
) error {
_, err := sqlutil.TxStmt(txn, s.purgeRoomAliasesStmt).ExecContext(ctx, roomID)
return err
}
func (s *purgeStatements) PurgeRoom(
ctx context.Context, txn *sql.Tx, roomNID types.RoomNID,
) error {
_, err := sqlutil.TxStmt(txn, s.purgeRoomStmt).ExecContext(ctx, roomNID)
return err
}
func (s *purgeStatements) PurgeStateBlocks(
ctx context.Context, txn *sql.Tx, roomNID types.RoomNID,
) error {
_, err := sqlutil.TxStmt(txn, s.purgeStateBlockEntriesStmt).ExecContext(ctx, roomNID)
return err
}
func (s *purgeStatements) PurgeStateSnapshots(
ctx context.Context, txn *sql.Tx, roomNID types.RoomNID,
) error {
_, err := sqlutil.TxStmt(txn, s.purgeStateSnapshotEntriesStmt).ExecContext(ctx, roomNID)
return err
}

View file

@ -58,6 +58,9 @@ const insertRoomNIDSQL = "" +
const selectRoomNIDSQL = "" +
"SELECT room_nid FROM roomserver_rooms WHERE room_id = $1"
const selectRoomNIDForUpdateSQL = "" +
"SELECT room_nid FROM roomserver_rooms WHERE room_id = $1 FOR UPDATE"
const selectLatestEventNIDsSQL = "" +
"SELECT latest_event_nids, state_snapshot_nid FROM roomserver_rooms WHERE room_nid = $1"
@ -85,6 +88,7 @@ const bulkSelectRoomNIDsSQL = "" +
type roomStatements struct {
insertRoomNIDStmt *sql.Stmt
selectRoomNIDStmt *sql.Stmt
selectRoomNIDForUpdateStmt *sql.Stmt
selectLatestEventNIDsStmt *sql.Stmt
selectLatestEventNIDsForUpdateStmt *sql.Stmt
updateLatestEventNIDsStmt *sql.Stmt
@ -106,6 +110,7 @@ func PrepareRoomsTable(db *sql.DB) (tables.Rooms, error) {
return s, sqlutil.StatementList{
{&s.insertRoomNIDStmt, insertRoomNIDSQL},
{&s.selectRoomNIDStmt, selectRoomNIDSQL},
{&s.selectRoomNIDForUpdateStmt, selectRoomNIDForUpdateSQL},
{&s.selectLatestEventNIDsStmt, selectLatestEventNIDsSQL},
{&s.selectLatestEventNIDsForUpdateStmt, selectLatestEventNIDsForUpdateSQL},
{&s.updateLatestEventNIDsStmt, updateLatestEventNIDsSQL},
@ -169,6 +174,15 @@ func (s *roomStatements) SelectRoomNID(
return types.RoomNID(roomNID), err
}
func (s *roomStatements) SelectRoomNIDForUpdate(
ctx context.Context, txn *sql.Tx, roomID string,
) (types.RoomNID, error) {
var roomNID int64
stmt := sqlutil.TxStmt(txn, s.selectRoomNIDForUpdateStmt)
err := stmt.QueryRowContext(ctx, roomID).Scan(&roomNID)
return types.RoomNID(roomNID), err
}
func (s *roomStatements) SelectLatestEventNIDs(
ctx context.Context, txn *sql.Tx, roomNID types.RoomNID,
) ([]types.EventNID, types.StateSnapshotNID, error) {

View file

@ -189,6 +189,10 @@ func (d *Database) prepare(db *sql.DB, writer sqlutil.Writer, cache caching.Room
if err != nil {
return err
}
purge, err := PreparePurgeStatements(db)
if err != nil {
return err
}
d.Database = shared.Database{
DB: db,
Cache: cache,
@ -206,6 +210,7 @@ func (d *Database) prepare(db *sql.DB, writer sqlutil.Writer, cache caching.Room
MembershipTable: membership,
PublishedTable: published,
RedactionsTable: redactions,
Purge: purge,
}
return nil
}

View file

@ -42,6 +42,7 @@ type Database struct {
MembershipTable tables.Membership
PublishedTable tables.Published
RedactionsTable tables.Redactions
Purge tables.Purge
GetRoomUpdaterFn func(ctx context.Context, roomInfo *types.RoomInfo) (*RoomUpdater, error)
}
@ -1362,6 +1363,57 @@ func (d *Database) ForgetRoom(ctx context.Context, userID, roomID string, forget
})
}
// PurgeRoom removes all information about a given room from the roomserver.
// For large rooms this operation may take a considerable amount of time.
func (d *Database) PurgeRoom(ctx context.Context, roomID string) error {
if d.Purge == nil {
return fmt.Errorf("not supported on this database engine")
}
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
roomNID, err := d.RoomsTable.SelectRoomNIDForUpdate(ctx, txn, roomID)
if err != nil {
if err == sql.ErrNoRows {
return nil
}
return fmt.Errorf("failed to lock the room: %w", err)
}
if err := d.Purge.PurgeStateBlocks(ctx, txn, roomNID); err != nil {
return fmt.Errorf("failed to purge state blocks: %w", err)
}
if err := d.Purge.PurgeStateSnapshots(ctx, txn, roomNID); err != nil {
return fmt.Errorf("failed to purge state blocks: %w", err)
}
if err := d.Purge.PurgeInvites(ctx, txn, roomNID); err != nil {
return fmt.Errorf("failed to purge invites: %w", err)
}
if err := d.Purge.PurgeMemberships(ctx, txn, roomNID); err != nil {
return fmt.Errorf("failed to purge memberships: %w", err)
}
if err := d.Purge.PurgeRoomAliases(ctx, txn, roomID); err != nil {
return fmt.Errorf("failed to purge room aliases: %w", err)
}
if err := d.Purge.PurgePublished(ctx, txn, roomID); err != nil {
return fmt.Errorf("failed to purge published: %w", err)
}
if err := d.Purge.PurgePreviousEvents(ctx, txn, roomNID); err != nil {
return fmt.Errorf("failed to purge previous events: %w", err)
}
if err := d.Purge.PurgeEventJSONs(ctx, txn, roomNID); err != nil {
return fmt.Errorf("failed to purge event JSONs: %w", err)
}
if err := d.Purge.PurgeRedactions(ctx, txn, roomNID); err != nil {
return fmt.Errorf("failed to purge redactions: %w", err)
}
if err := d.Purge.PurgeEvents(ctx, txn, roomNID); err != nil {
return fmt.Errorf("failed to purge events: %w", err)
}
if err := d.Purge.PurgeRoom(ctx, txn, roomNID); err != nil {
return fmt.Errorf("failed to purge room: %w", err)
}
return nil
})
}
// FIXME TODO: Remove all this - horrible dupe with roomserver/state. Can't use the original impl because of circular loops
// it should live in this package!

View file

@ -169,6 +169,12 @@ func (s *roomStatements) SelectRoomNID(
return types.RoomNID(roomNID), err
}
func (s *roomStatements) SelectRoomNIDForUpdate(
ctx context.Context, txn *sql.Tx, roomID string,
) (types.RoomNID, error) {
return 0, fmt.Errorf("not supported on SQLite")
}
func (s *roomStatements) SelectLatestEventNIDs(
ctx context.Context, txn *sql.Tx, roomNID types.RoomNID,
) ([]types.EventNID, types.StateSnapshotNID, error) {

View file

@ -72,6 +72,7 @@ type Events interface {
type Rooms interface {
InsertRoomNID(ctx context.Context, txn *sql.Tx, roomID string, roomVersion gomatrixserverlib.RoomVersion) (types.RoomNID, error)
SelectRoomNID(ctx context.Context, txn *sql.Tx, roomID string) (types.RoomNID, error)
SelectRoomNIDForUpdate(ctx context.Context, txn *sql.Tx, roomID string) (types.RoomNID, error)
SelectLatestEventNIDs(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID) ([]types.EventNID, types.StateSnapshotNID, error)
SelectLatestEventsNIDsForUpdate(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID) ([]types.EventNID, types.EventNID, types.StateSnapshotNID, error)
UpdateLatestEventNIDs(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, eventNIDs []types.EventNID, lastEventSentNID types.EventNID, stateSnapshotNID types.StateSnapshotNID) error
@ -171,6 +172,20 @@ type Redactions interface {
MarkRedactionValidated(ctx context.Context, txn *sql.Tx, redactionEventID string, validated bool) error
}
type Purge interface {
PurgeEventJSONs(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID) error
PurgeEvents(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID) error
PurgeRoom(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID) error
PurgeStateSnapshots(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID) error
PurgeStateBlocks(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID) error
PurgePreviousEvents(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID) error
PurgeInvites(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID) error
PurgeMemberships(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID) error
PurgePublished(ctx context.Context, txn *sql.Tx, roomID string) error
PurgeRedactions(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID) error
PurgeRoomAliases(ctx context.Context, txn *sql.Tx, roomID string) error
}
// StrippedEvent represents a stripped event for returning extracted content values.
type StrippedEvent struct {
RoomID string

View file

@ -31,6 +31,7 @@ import (
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
"github.com/sirupsen/logrus"
log "github.com/sirupsen/logrus"
)
@ -123,6 +124,8 @@ func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msgs []*nats.Ms
s.onRetirePeek(s.ctx, *output.RetirePeek)
case api.OutputTypeRedactedEvent:
err = s.onRedactEvent(s.ctx, *output.RedactedEvent)
case api.OutputTypePurgeRoom:
err = s.onPurgeRoom(s.ctx, *output.PurgeRoom)
default:
log.WithField("type", output.Type).Debug(
"roomserver output log: ignoring unknown output type",
@ -434,6 +437,20 @@ func (s *OutputRoomEventConsumer) onRetirePeek(
s.notifier.OnRetirePeek(msg.RoomID, msg.UserID, msg.DeviceID, types.StreamingToken{PDUPosition: sp})
}
func (s *OutputRoomEventConsumer) onPurgeRoom(
ctx context.Context, req api.OutputPurgeRoom,
) error {
logrus.WithField("room_id", req.RoomID).Warn("Purging room from sync API")
if err := s.db.PurgeRoom(ctx, req.RoomID); err != nil {
logrus.WithField("room_id", req.RoomID).WithError(err).Error("Failed to purge room from sync API")
return err
} else {
logrus.WithField("room_id", req.RoomID).Warn("Room purged from sync API")
return nil
}
}
func (s *OutputRoomEventConsumer) updateStateEvent(event *gomatrixserverlib.HeaderedEvent) (*gomatrixserverlib.HeaderedEvent, error) {
if event.StateKey() == nil {
return event, nil

View file

@ -76,6 +76,7 @@ type Database interface {
// PurgeRoomState completely purges room state from the sync API. This is done when
// receiving an output event that completely resets the state.
PurgeRoomState(ctx context.Context, roomID string) error
PurgeRoom(ctx context.Context, roomID string) error
// GetStateEvent returns the Matrix state event of a given type for a given room with a given state key
// If no event could be found, returns nil
// If there was an issue during the retrieval, returns an error

View file

@ -47,10 +47,14 @@ const selectBackwardExtremitiesForRoomSQL = "" +
const deleteBackwardExtremitySQL = "" +
"DELETE FROM syncapi_backward_extremities WHERE room_id = $1 AND prev_event_id = $2"
const purgeBackwardExtremitiesSQL = "" +
"DELETE FROM syncapi_backward_extremities WHERE room_id = $1"
type backwardExtremitiesStatements struct {
insertBackwardExtremityStmt *sql.Stmt
selectBackwardExtremitiesForRoomStmt *sql.Stmt
deleteBackwardExtremityStmt *sql.Stmt
purgeBackwardExtremitiesStmt *sql.Stmt
}
func NewPostgresBackwardsExtremitiesTable(db *sql.DB) (tables.BackwardsExtremities, error) {
@ -68,6 +72,9 @@ func NewPostgresBackwardsExtremitiesTable(db *sql.DB) (tables.BackwardsExtremiti
if s.deleteBackwardExtremityStmt, err = db.Prepare(deleteBackwardExtremitySQL); err != nil {
return nil, err
}
if s.purgeBackwardExtremitiesStmt, err = db.Prepare(purgeBackwardExtremitiesSQL); err != nil {
return nil, err
}
return s, nil
}
@ -106,3 +113,10 @@ func (s *backwardExtremitiesStatements) DeleteBackwardExtremity(
_, err = sqlutil.TxStmt(txn, s.deleteBackwardExtremityStmt).ExecContext(ctx, roomID, knownEventID)
return
}
func (s *backwardExtremitiesStatements) PurgeBackwardExtremities(
ctx context.Context, txn *sql.Tx, roomID string,
) error {
_, err := sqlutil.TxStmt(txn, s.purgeBackwardExtremitiesStmt).ExecContext(ctx, roomID)
return err
}

View file

@ -62,11 +62,15 @@ const selectInviteEventsInRangeSQL = "" +
const selectMaxInviteIDSQL = "" +
"SELECT MAX(id) FROM syncapi_invite_events"
const purgeInvitesSQL = "" +
"DELETE FROM syncapi_invite_events WHERE room_id = $1"
type inviteEventsStatements struct {
insertInviteEventStmt *sql.Stmt
selectInviteEventsInRangeStmt *sql.Stmt
deleteInviteEventStmt *sql.Stmt
selectMaxInviteIDStmt *sql.Stmt
purgeInvitesStmt *sql.Stmt
}
func NewPostgresInvitesTable(db *sql.DB) (tables.Invites, error) {
@ -87,6 +91,9 @@ func NewPostgresInvitesTable(db *sql.DB) (tables.Invites, error) {
if s.selectMaxInviteIDStmt, err = db.Prepare(selectMaxInviteIDSQL); err != nil {
return nil, err
}
if s.purgeInvitesStmt, err = db.Prepare(purgeInvitesSQL); err != nil {
return nil, err
}
return s, nil
}
@ -173,3 +180,10 @@ func (s *inviteEventsStatements) SelectMaxInviteID(
}
return
}
func (s *inviteEventsStatements) PurgeInvites(
ctx context.Context, txn *sql.Tx, roomID string,
) error {
_, err := sqlutil.TxStmt(txn, s.purgeInvitesStmt).ExecContext(ctx, roomID)
return err
}

View file

@ -69,11 +69,15 @@ const selectHeroesSQL = "" +
const selectMembershipBeforeSQL = "" +
"SELECT membership, topological_pos FROM syncapi_memberships WHERE room_id = $1 and user_id = $2 AND topological_pos <= $3 ORDER BY topological_pos DESC LIMIT 1"
const purgeMembershipsSQL = "" +
"DELETE FROM syncapi_memberships WHERE room_id = $1"
type membershipsStatements struct {
upsertMembershipStmt *sql.Stmt
selectMembershipCountStmt *sql.Stmt
selectHeroesStmt *sql.Stmt
selectMembershipForUserStmt *sql.Stmt
purgeMembershipsStmt *sql.Stmt
}
func NewPostgresMembershipsTable(db *sql.DB) (tables.Memberships, error) {
@ -87,6 +91,7 @@ func NewPostgresMembershipsTable(db *sql.DB) (tables.Memberships, error) {
{&s.selectMembershipCountStmt, selectMembershipCountSQL},
{&s.selectHeroesStmt, selectHeroesSQL},
{&s.selectMembershipForUserStmt, selectMembershipBeforeSQL},
{&s.purgeMembershipsStmt, purgeMembershipsSQL},
}.Prepare(db)
}
@ -154,3 +159,10 @@ func (s *membershipsStatements) SelectMembershipForUser(
}
return membership, topologyPos, nil
}
func (s *membershipsStatements) PurgeMemberships(
ctx context.Context, txn *sql.Tx, roomID string,
) error {
_, err := sqlutil.TxStmt(txn, s.purgeMembershipsStmt).ExecContext(ctx, roomID)
return err
}

View file

@ -35,6 +35,7 @@ func NewPostgresNotificationDataTable(db *sql.DB) (tables.NotificationData, erro
{&r.upsertRoomUnreadCounts, upsertRoomUnreadNotificationCountsSQL},
{&r.selectUserUnreadCounts, selectUserUnreadNotificationCountsSQL},
{&r.selectMaxID, selectMaxNotificationIDSQL},
{&r.purgeNotificationData, purgeNotificationDataSQL},
}.Prepare(db)
}
@ -42,6 +43,7 @@ type notificationDataStatements struct {
upsertRoomUnreadCounts *sql.Stmt
selectUserUnreadCounts *sql.Stmt
selectMaxID *sql.Stmt
purgeNotificationData *sql.Stmt
}
const notificationDataSchema = `
@ -70,6 +72,9 @@ const selectUserUnreadNotificationCountsSQL = `SELECT
const selectMaxNotificationIDSQL = `SELECT CASE COUNT(*) WHEN 0 THEN 0 ELSE MAX(id) END FROM syncapi_notification_data`
const purgeNotificationDataSQL = "" +
"DELETE FROM syncapi_notification_data WHERE room_id = $1"
func (r *notificationDataStatements) UpsertRoomUnreadCounts(ctx context.Context, txn *sql.Tx, userID, roomID string, notificationCount, highlightCount int) (pos types.StreamPosition, err error) {
err = sqlutil.TxStmt(txn, r.upsertRoomUnreadCounts).QueryRowContext(ctx, userID, roomID, notificationCount, highlightCount).Scan(&pos)
return
@ -106,3 +111,10 @@ func (r *notificationDataStatements) SelectMaxID(ctx context.Context, txn *sql.T
err := sqlutil.TxStmt(txn, r.selectMaxID).QueryRowContext(ctx).Scan(&id)
return id, err
}
func (s *notificationDataStatements) PurgeNotificationData(
ctx context.Context, txn *sql.Tx, roomID string,
) error {
_, err := sqlutil.TxStmt(txn, s.purgeNotificationData).ExecContext(ctx, roomID)
return err
}

View file

@ -166,6 +166,9 @@ const selectContextAfterEventSQL = "" +
" AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )" +
" ORDER BY id ASC LIMIT $3"
const purgeEventsSQL = "" +
"DELETE FROM syncapi_output_room_events WHERE room_id = $1"
type outputRoomEventsStatements struct {
insertEventStmt *sql.Stmt
selectEventsStmt *sql.Stmt
@ -180,6 +183,7 @@ type outputRoomEventsStatements struct {
selectContextEventStmt *sql.Stmt
selectContextBeforeEventStmt *sql.Stmt
selectContextAfterEventStmt *sql.Stmt
purgeEventsStmt *sql.Stmt
}
func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) {
@ -215,6 +219,7 @@ func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) {
{&s.selectContextEventStmt, selectContextEventSQL},
{&s.selectContextBeforeEventStmt, selectContextBeforeEventSQL},
{&s.selectContextAfterEventStmt, selectContextAfterEventSQL},
{&s.purgeEventsStmt, purgeEventsSQL},
}.Prepare(db)
}
@ -632,3 +637,10 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) {
}
return result, rows.Err()
}
func (s *outputRoomEventsStatements) PurgeEvents(
ctx context.Context, txn *sql.Tx, roomID string,
) error {
_, err := sqlutil.TxStmt(txn, s.purgeEventsStmt).ExecContext(ctx, roomID)
return err
}

View file

@ -79,6 +79,9 @@ const selectStreamToTopologicalPositionAscSQL = "" +
const selectStreamToTopologicalPositionDescSQL = "" +
"SELECT topological_position FROM syncapi_output_room_events_topology WHERE room_id = $1 AND stream_position <= $2 ORDER BY topological_position DESC LIMIT 1;"
const purgeEventsTopologySQL = "" +
"DELETE FROM syncapi_output_room_events_topology WHERE room_id = $1"
type outputRoomEventsTopologyStatements struct {
insertEventInTopologyStmt *sql.Stmt
selectEventIDsInRangeASCStmt *sql.Stmt
@ -87,6 +90,7 @@ type outputRoomEventsTopologyStatements struct {
selectMaxPositionInTopologyStmt *sql.Stmt
selectStreamToTopologicalPositionAscStmt *sql.Stmt
selectStreamToTopologicalPositionDescStmt *sql.Stmt
purgeEventsTopologyStmt *sql.Stmt
}
func NewPostgresTopologyTable(db *sql.DB) (tables.Topology, error) {
@ -116,6 +120,9 @@ func NewPostgresTopologyTable(db *sql.DB) (tables.Topology, error) {
if s.selectStreamToTopologicalPositionDescStmt, err = db.Prepare(selectStreamToTopologicalPositionDescSQL); err != nil {
return nil, err
}
if s.purgeEventsTopologyStmt, err = db.Prepare(purgeEventsTopologySQL); err != nil {
return nil, err
}
return s, nil
}
@ -196,3 +203,10 @@ func (s *outputRoomEventsTopologyStatements) SelectMaxPositionInTopology(
err = s.selectMaxPositionInTopologyStmt.QueryRowContext(ctx, roomID).Scan(&pos, &spos)
return
}
func (s *outputRoomEventsTopologyStatements) PurgeEventsTopology(
ctx context.Context, txn *sql.Tx, roomID string,
) error {
_, err := sqlutil.TxStmt(txn, s.purgeEventsTopologyStmt).ExecContext(ctx, roomID)
return err
}

View file

@ -65,6 +65,9 @@ const selectPeekingDevicesSQL = "" +
const selectMaxPeekIDSQL = "" +
"SELECT MAX(id) FROM syncapi_peeks"
const purgePeeksSQL = "" +
"DELETE FROM syncapi_peeks WHERE room_id = $1"
type peekStatements struct {
db *sql.DB
insertPeekStmt *sql.Stmt
@ -73,6 +76,7 @@ type peekStatements struct {
selectPeeksInRangeStmt *sql.Stmt
selectPeekingDevicesStmt *sql.Stmt
selectMaxPeekIDStmt *sql.Stmt
purgePeeksStmt *sql.Stmt
}
func NewPostgresPeeksTable(db *sql.DB) (tables.Peeks, error) {
@ -101,6 +105,9 @@ func NewPostgresPeeksTable(db *sql.DB) (tables.Peeks, error) {
if s.selectMaxPeekIDStmt, err = db.Prepare(selectMaxPeekIDSQL); err != nil {
return nil, err
}
if s.purgePeeksStmt, err = db.Prepare(purgePeeksSQL); err != nil {
return nil, err
}
return s, nil
}
@ -184,3 +191,10 @@ func (s *peekStatements) SelectMaxPeekID(
}
return
}
func (s *peekStatements) PurgePeeks(
ctx context.Context, txn *sql.Tx, roomID string,
) error {
_, err := sqlutil.TxStmt(txn, s.purgePeeksStmt).ExecContext(ctx, roomID)
return err
}

View file

@ -62,11 +62,15 @@ const selectRoomReceipts = "" +
const selectMaxReceiptIDSQL = "" +
"SELECT MAX(id) FROM syncapi_receipts"
const purgeReceiptsSQL = "" +
"DELETE FROM syncapi_receipts WHERE room_id = $1"
type receiptStatements struct {
db *sql.DB
upsertReceipt *sql.Stmt
selectRoomReceipts *sql.Stmt
selectMaxReceiptID *sql.Stmt
purgeReceiptsStmt *sql.Stmt
}
func NewPostgresReceiptsTable(db *sql.DB) (tables.Receipts, error) {
@ -95,6 +99,9 @@ func NewPostgresReceiptsTable(db *sql.DB) (tables.Receipts, error) {
if r.selectMaxReceiptID, err = db.Prepare(selectMaxReceiptIDSQL); err != nil {
return nil, fmt.Errorf("unable to prepare selectRoomReceipts statement: %w", err)
}
if r.purgeReceiptsStmt, err = db.Prepare(purgeReceiptsSQL); err != nil {
return nil, fmt.Errorf("unable to prepare purgeReceiptsStmt statement: %w", err)
}
return r, nil
}
@ -138,3 +145,10 @@ func (s *receiptStatements) SelectMaxReceiptID(
}
return
}
func (s *receiptStatements) PurgeReceipts(
ctx context.Context, txn *sql.Tx, roomID string,
) error {
_, err := sqlutil.TxStmt(txn, s.purgeReceiptsStmt).ExecContext(ctx, roomID)
return err
}

View file

@ -1086,3 +1086,36 @@ func (d *Database) MaxStreamPositionForPresence(ctx context.Context) (types.Stre
func (d *Database) SelectMembershipForUser(ctx context.Context, roomID, userID string, pos int64) (membership string, topologicalPos int, err error) {
return d.Memberships.SelectMembershipForUser(ctx, nil, roomID, userID, pos)
}
func (d *Database) PurgeRoom(ctx context.Context, roomID string) error {
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
if err := d.BackwardExtremities.PurgeBackwardExtremities(ctx, txn, roomID); err != nil {
return fmt.Errorf("failed to purge backward extremities: %w", err)
}
if err := d.CurrentRoomState.DeleteRoomStateForRoom(ctx, txn, roomID); err != nil {
return fmt.Errorf("failed to purge current room state: %w", err)
}
if err := d.Invites.PurgeInvites(ctx, txn, roomID); err != nil {
return fmt.Errorf("failed to purge invites: %w", err)
}
if err := d.Memberships.PurgeMemberships(ctx, txn, roomID); err != nil {
return fmt.Errorf("failed to purge memberships: %w", err)
}
if err := d.NotificationData.PurgeNotificationData(ctx, txn, roomID); err != nil {
return fmt.Errorf("failed to purge notification data: %w", err)
}
if err := d.OutputEvents.PurgeEvents(ctx, txn, roomID); err != nil {
return fmt.Errorf("failed to purge events: %w", err)
}
if err := d.Topology.PurgeEventsTopology(ctx, txn, roomID); err != nil {
return fmt.Errorf("failed to purge events topology: %w", err)
}
if err := d.Peeks.PurgePeeks(ctx, txn, roomID); err != nil {
return fmt.Errorf("failed to purge peeks: %w", err)
}
if err := d.Receipts.PurgeReceipts(ctx, txn, roomID); err != nil {
return fmt.Errorf("failed to purge receipts: %w", err)
}
return nil
})
}

View file

@ -17,6 +17,7 @@ package sqlite3
import (
"context"
"database/sql"
"fmt"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
@ -109,3 +110,9 @@ func (s *backwardExtremitiesStatements) DeleteBackwardExtremity(
_, err = sqlutil.TxStmt(txn, s.deleteBackwardExtremityStmt).ExecContext(ctx, roomID, knownEventID)
return err
}
func (s *backwardExtremitiesStatements) PurgeBackwardExtremities(
ctx context.Context, txn *sql.Tx, roomID string,
) error {
return fmt.Errorf("not implemented on SQLite")
}

View file

@ -19,6 +19,7 @@ import (
"context"
"database/sql"
"encoding/json"
"fmt"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
@ -183,3 +184,9 @@ func (s *inviteEventsStatements) SelectMaxInviteID(
}
return
}
func (s *inviteEventsStatements) PurgeInvites(
ctx context.Context, txn *sql.Tx, roomID string,
) error {
return fmt.Errorf("not implemented on SQLite")
}

View file

@ -170,3 +170,9 @@ func (s *membershipsStatements) SelectMembershipForUser(
}
return membership, topologyPos, nil
}
func (s *membershipsStatements) PurgeMemberships(
ctx context.Context, txn *sql.Tx, roomID string,
) error {
return fmt.Errorf("not implemented on SQLite")
}

View file

@ -17,6 +17,7 @@ package sqlite3
import (
"context"
"database/sql"
"fmt"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/eventutil"
@ -112,3 +113,9 @@ func (r *notificationDataStatements) SelectMaxID(ctx context.Context, txn *sql.T
err := sqlutil.TxStmt(txn, r.selectMaxID).QueryRowContext(ctx).Scan(&id)
return id, err
}
func (s *notificationDataStatements) PurgeNotificationData(
ctx context.Context, txn *sql.Tx, roomID string,
) error {
return fmt.Errorf("not implemented on SQLite")
}

View file

@ -628,3 +628,9 @@ func unmarshalStateIDs(addIDsJSON, delIDsJSON string) (addIDs []string, delIDs [
}
return
}
func (s *outputRoomEventsStatements) PurgeEvents(
ctx context.Context, txn *sql.Tx, roomID string,
) error {
return fmt.Errorf("not implemented on SQLite")
}

View file

@ -17,6 +17,7 @@ package sqlite3
import (
"context"
"database/sql"
"fmt"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/syncapi/storage/tables"
@ -190,3 +191,9 @@ func (s *outputRoomEventsTopologyStatements) SelectMaxPositionInTopology(
err = stmt.QueryRowContext(ctx, roomID).Scan(&pos, &spos)
return
}
func (s *outputRoomEventsTopologyStatements) PurgeEventsTopology(
ctx context.Context, txn *sql.Tx, roomID string,
) error {
return fmt.Errorf("not implemented on SQLite")
}

View file

@ -17,6 +17,7 @@ package sqlite3
import (
"context"
"database/sql"
"fmt"
"time"
"github.com/matrix-org/dendrite/internal"
@ -204,3 +205,9 @@ func (s *peekStatements) SelectMaxPeekID(
}
return
}
func (s *peekStatements) PurgePeeks(
ctx context.Context, txn *sql.Tx, roomID string,
) error {
return fmt.Errorf("not implemented on SQLite")
}

View file

@ -148,3 +148,9 @@ func (s *receiptStatements) SelectMaxReceiptID(
}
return
}
func (s *receiptStatements) PurgeReceipts(
ctx context.Context, txn *sql.Tx, roomID string,
) error {
return fmt.Errorf("not implemented on SQLite")
}

View file

@ -39,6 +39,7 @@ type Invites interface {
// for the room.
SelectInviteEventsInRange(ctx context.Context, txn *sql.Tx, targetUserID string, r types.Range) (invites map[string]*gomatrixserverlib.HeaderedEvent, retired map[string]*gomatrixserverlib.HeaderedEvent, err error)
SelectMaxInviteID(ctx context.Context, txn *sql.Tx) (id int64, err error)
PurgeInvites(ctx context.Context, txn *sql.Tx, roomID string) error
}
type Peeks interface {
@ -48,6 +49,7 @@ type Peeks interface {
SelectPeeksInRange(ctxt context.Context, txn *sql.Tx, userID, deviceID string, r types.Range) (peeks []types.Peek, err error)
SelectPeekingDevices(ctxt context.Context) (peekingDevices map[string][]types.PeekingDevice, err error)
SelectMaxPeekID(ctx context.Context, txn *sql.Tx) (id int64, err error)
PurgePeeks(ctx context.Context, txn *sql.Tx, roomID string) error
}
type Events interface {
@ -75,6 +77,8 @@ type Events interface {
SelectContextEvent(ctx context.Context, txn *sql.Tx, roomID, eventID string) (int, gomatrixserverlib.HeaderedEvent, error)
SelectContextBeforeEvent(ctx context.Context, txn *sql.Tx, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) ([]*gomatrixserverlib.HeaderedEvent, error)
SelectContextAfterEvent(ctx context.Context, txn *sql.Tx, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) (int, []*gomatrixserverlib.HeaderedEvent, error)
PurgeEvents(ctx context.Context, txn *sql.Tx, roomID string) error
}
// Topology keeps track of the depths and stream positions for all events.
@ -94,6 +98,7 @@ type Topology interface {
SelectMaxPositionInTopology(ctx context.Context, txn *sql.Tx, roomID string) (depth types.StreamPosition, spos types.StreamPosition, err error)
// SelectStreamToTopologicalPosition converts a stream position to a topological position by finding the nearest topological position in the room.
SelectStreamToTopologicalPosition(ctx context.Context, txn *sql.Tx, roomID string, streamPos types.StreamPosition, forward bool) (topoPos types.StreamPosition, err error)
PurgeEventsTopology(ctx context.Context, txn *sql.Tx, roomID string) error
}
type CurrentRoomState interface {
@ -144,6 +149,7 @@ type BackwardsExtremities interface {
SelectBackwardExtremitiesForRoom(ctx context.Context, roomID string) (bwExtrems map[string][]string, err error)
// DeleteBackwardExtremity removes a backwards extremity for a room, if one existed.
DeleteBackwardExtremity(ctx context.Context, txn *sql.Tx, roomID, knownEventID string) (err error)
PurgeBackwardExtremities(ctx context.Context, txn *sql.Tx, roomID string) error
}
// SendToDevice tracks send-to-device messages which are sent to individual
@ -179,6 +185,7 @@ type Receipts interface {
UpsertReceipt(ctx context.Context, txn *sql.Tx, roomId, receiptType, userId, eventId string, timestamp gomatrixserverlib.Timestamp) (pos types.StreamPosition, err error)
SelectRoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []types.OutputReceiptEvent, error)
SelectMaxReceiptID(ctx context.Context, txn *sql.Tx) (id int64, err error)
PurgeReceipts(ctx context.Context, txn *sql.Tx, roomID string) error
}
type Memberships interface {
@ -186,12 +193,14 @@ type Memberships interface {
SelectMembershipCount(ctx context.Context, txn *sql.Tx, roomID, membership string, pos types.StreamPosition) (count int, err error)
SelectHeroes(ctx context.Context, txn *sql.Tx, roomID, userID string, memberships []string) (heroes []string, err error)
SelectMembershipForUser(ctx context.Context, txn *sql.Tx, roomID, userID string, pos int64) (membership string, topologicalPos int, err error)
PurgeMemberships(ctx context.Context, txn *sql.Tx, roomID string) error
}
type NotificationData interface {
UpsertRoomUnreadCounts(ctx context.Context, txn *sql.Tx, userID, roomID string, notificationCount, highlightCount int) (types.StreamPosition, error)
SelectUserUnreadCounts(ctx context.Context, txn *sql.Tx, userID string, fromExcl, toIncl types.StreamPosition) (map[string]*eventutil.NotificationData, error)
SelectMaxID(ctx context.Context, txn *sql.Tx) (int64, error)
PurgeNotificationData(ctx context.Context, txn *sql.Tx, roomID string) error
}
type Ignores interface {