Initial purge room support for SQLite

This commit is contained in:
Till Faelligen 2022-12-22 14:47:16 +01:00
parent 2b7d1023ba
commit e779b5b3a8
No known key found for this signature in database
GPG key ID: ACCDC9606D472758
10 changed files with 273 additions and 19 deletions

View file

@ -124,6 +124,10 @@ type QueryProvider interface {
QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
}
type ExecProvider interface {
ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
}
// SQLite3MaxVariables is the default maximum number of host parameters in a single SQL statement
// SQLlite can handle. See https://www.sqlite.org/limits.html for more information.
const SQLite3MaxVariables = 999
@ -153,6 +157,22 @@ func RunLimitedVariablesQuery(ctx context.Context, query string, qp QueryProvide
return nil
}
// RunLimitedVariablesQuery split up a query with more variables than the used database can handle in multiple queries.
func RunLimitedVariablesExec(ctx context.Context, query string, qp ExecProvider, variables []interface{}, limit uint) error {
var start int
for start < len(variables) {
n := minOfInts(len(variables)-start, int(limit))
nextQuery := strings.Replace(query, "($1)", QueryVariadic(n), 1)
_, err := qp.ExecContext(ctx, nextQuery, variables[start:start+n]...)
if err != nil {
util.GetLogger(ctx).WithError(err).Error("QueryContext returned an error")
return err
}
start = start + n
}
return nil
}
// StatementList is a list of SQL statements to prepare and a pointer to where to store the resulting prepared statement.
type StatementList []struct {
Statement **sql.Stmt

View file

@ -241,9 +241,6 @@ func TestPurgeRoom(t *testing.T) {
ctx := context.Background()
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
if dbType == test.DBTypeSQLite {
t.Skip("purging rooms on SQLite is not yet implemented")
}
base, db, close := mustCreateDatabase(t, dbType)
defer close()

View file

@ -1,3 +1,17 @@
// Copyright 2022 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package postgres
import (

View file

@ -1449,9 +1449,6 @@ func (d *Database) ForgetRoom(ctx context.Context, userID, roomID string, forget
// PurgeRoom removes all information about a given room from the roomserver.
// For large rooms this operation may take a considerable amount of time.
func (d *Database) PurgeRoom(ctx context.Context, roomID string) error {
if d.Purge == nil {
return fmt.Errorf("not supported on this database engine")
}
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
roomNID, err := d.RoomsTable.SelectRoomNIDForUpdate(ctx, txn, roomID)
if err != nil {

View file

@ -0,0 +1,179 @@
// Copyright 2022 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sqlite3
import (
"context"
"database/sql"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/roomserver/types"
)
const purgeEventJSONSQL = "" +
"DELETE FROM roomserver_event_json WHERE event_nid IN (" +
" SELECT event_nid FROM roomserver_events WHERE room_nid = $1" +
")"
const purgeEventsSQL = "" +
"DELETE FROM roomserver_events WHERE room_nid = $1"
const purgeInvitesSQL = "" +
"DELETE FROM roomserver_invites WHERE room_nid = $1"
const purgeMembershipsSQL = "" +
"DELETE FROM roomserver_membership WHERE room_nid = $1"
const purgePreviousEventsSQL = "" +
"DELETE FROM roomserver_previous_events WHERE event_nids IN(" +
" SELECT event_nid FROM roomserver_events WHERE room_nid = $1" +
")"
const purgePublishedSQL = "" +
"DELETE FROM roomserver_published WHERE room_id = $1"
const purgeRedactionsSQL = "" +
"DELETE FROM roomserver_redactions WHERE redaction_event_id IN(" +
" SELECT event_id FROM roomserver_events WHERE room_nid = $1" +
")"
const purgeRoomAliasesSQL = "" +
"DELETE FROM roomserver_room_aliases WHERE room_id = $1"
const purgeRoomSQL = "" +
"DELETE FROM roomserver_rooms WHERE room_nid = $1"
const purgeStateSnapshotEntriesSQL = "" +
"DELETE FROM roomserver_state_snapshots WHERE room_nid = $1"
type purgeStatements struct {
purgeEventJSONStmt *sql.Stmt
purgeEventsStmt *sql.Stmt
purgeInvitesStmt *sql.Stmt
purgeMembershipsStmt *sql.Stmt
purgePreviousEventsStmt *sql.Stmt
purgePublishedStmt *sql.Stmt
purgeRedactionStmt *sql.Stmt
purgeRoomAliasesStmt *sql.Stmt
purgeRoomStmt *sql.Stmt
purgeStateSnapshotEntriesStmt *sql.Stmt
stateBlock *StateBlockStatements
stateSnapshot *StateSnapshotStatements
}
func PreparePurgeStatements(db *sql.DB, stateBlock *StateBlockStatements, stateSnapshot *StateSnapshotStatements) (*purgeStatements, error) {
s := &purgeStatements{stateBlock: stateBlock, stateSnapshot: stateSnapshot}
return s, sqlutil.StatementList{
{&s.purgeEventJSONStmt, purgeEventJSONSQL},
{&s.purgeEventsStmt, purgeEventsSQL},
{&s.purgeInvitesStmt, purgeInvitesSQL},
{&s.purgeMembershipsStmt, purgeMembershipsSQL},
{&s.purgePublishedStmt, purgePublishedSQL},
{&s.purgePreviousEventsStmt, purgePreviousEventsSQL},
{&s.purgeRedactionStmt, purgeRedactionsSQL},
{&s.purgeRoomAliasesStmt, purgeRoomAliasesSQL},
{&s.purgeRoomStmt, purgeRoomSQL},
//{&s.purgeStateBlockEntriesStmt, purgeStateBlockEntriesSQL},
{&s.purgeStateSnapshotEntriesStmt, purgeStateSnapshotEntriesSQL},
}.Prepare(db)
}
func (s *purgeStatements) PurgeEventJSONs(
ctx context.Context, txn *sql.Tx, roomNID types.RoomNID,
) error {
_, err := sqlutil.TxStmt(txn, s.purgeEventJSONStmt).ExecContext(ctx, roomNID)
return err
}
func (s *purgeStatements) PurgeEvents(
ctx context.Context, txn *sql.Tx, roomNID types.RoomNID,
) error {
_, err := sqlutil.TxStmt(txn, s.purgeEventsStmt).ExecContext(ctx, roomNID)
return err
}
func (s *purgeStatements) PurgeInvites(
ctx context.Context, txn *sql.Tx, roomNID types.RoomNID,
) error {
_, err := sqlutil.TxStmt(txn, s.purgeInvitesStmt).ExecContext(ctx, roomNID)
return err
}
func (s *purgeStatements) PurgeMemberships(
ctx context.Context, txn *sql.Tx, roomNID types.RoomNID,
) error {
_, err := sqlutil.TxStmt(txn, s.purgeMembershipsStmt).ExecContext(ctx, roomNID)
return err
}
func (s *purgeStatements) PurgePreviousEvents(
ctx context.Context, txn *sql.Tx, roomNID types.RoomNID,
) error {
_, err := sqlutil.TxStmt(txn, s.purgePreviousEventsStmt).ExecContext(ctx, roomNID)
return err
}
func (s *purgeStatements) PurgePublished(
ctx context.Context, txn *sql.Tx, roomID string,
) error {
_, err := sqlutil.TxStmt(txn, s.purgePublishedStmt).ExecContext(ctx, roomID)
return err
}
func (s *purgeStatements) PurgeRedactions(
ctx context.Context, txn *sql.Tx, roomNID types.RoomNID,
) error {
_, err := sqlutil.TxStmt(txn, s.purgeRedactionStmt).ExecContext(ctx, roomNID)
return err
}
func (s *purgeStatements) PurgeRoomAliases(
ctx context.Context, txn *sql.Tx, roomID string,
) error {
_, err := sqlutil.TxStmt(txn, s.purgeRoomAliasesStmt).ExecContext(ctx, roomID)
return err
}
func (s *purgeStatements) PurgeRoom(
ctx context.Context, txn *sql.Tx, roomNID types.RoomNID,
) error {
_, err := sqlutil.TxStmt(txn, s.purgeRoomStmt).ExecContext(ctx, roomNID)
return err
}
func (s *purgeStatements) PurgeStateBlocks(
ctx context.Context, txn *sql.Tx, roomNID types.RoomNID,
) error {
// Get all stateBlockNIDs
stateBlockNIDs, err := s.stateSnapshot.selectStateBlockNIDsForRoomNID(ctx, txn, roomNID)
if err != nil {
return err
}
params := make([]interface{}, len(stateBlockNIDs)+1)
for k, v := range stateBlockNIDs {
params[k] = v
}
query := "DELETE FROM roomserver_state_block WHERE state_block_nid IN($1)"
return sqlutil.RunLimitedVariablesExec(ctx, query, txn, params, sqlutil.SQLite3MaxVariables)
}
func (s *purgeStatements) PurgeStateSnapshots(
ctx context.Context, txn *sql.Tx, roomNID types.RoomNID,
) error {
_, err := sqlutil.TxStmt(txn, s.purgeStateSnapshotEntriesStmt).ExecContext(ctx, roomNID)
return err
}

View file

@ -74,10 +74,14 @@ const bulkSelectRoomIDsSQL = "" +
const bulkSelectRoomNIDsSQL = "" +
"SELECT room_nid FROM roomserver_rooms WHERE room_id IN ($1)"
const selectRoomNIDForUpdateSQL = "" +
"SELECT room_nid FROM roomserver_rooms WHERE room_id = $1"
type roomStatements struct {
db *sql.DB
insertRoomNIDStmt *sql.Stmt
selectRoomNIDStmt *sql.Stmt
selectRoomNIDForUpdateStmt *sql.Stmt
selectLatestEventNIDsStmt *sql.Stmt
selectLatestEventNIDsForUpdateStmt *sql.Stmt
updateLatestEventNIDsStmt *sql.Stmt
@ -105,6 +109,7 @@ func PrepareRoomsTable(db *sql.DB) (tables.Rooms, error) {
//{&s.selectRoomVersionForRoomNIDsStmt, selectRoomVersionForRoomNIDsSQL},
{&s.selectRoomInfoStmt, selectRoomInfoSQL},
{&s.selectRoomIDsStmt, selectRoomIDsSQL},
{&s.selectRoomNIDForUpdateStmt, selectRoomNIDForUpdateSQL},
}.Prepare(db)
}
@ -172,7 +177,10 @@ func (s *roomStatements) SelectRoomNID(
func (s *roomStatements) SelectRoomNIDForUpdate(
ctx context.Context, txn *sql.Tx, roomID string,
) (types.RoomNID, error) {
return 0, fmt.Errorf("not supported on SQLite")
var roomNID int64
stmt := sqlutil.TxStmt(txn, s.selectRoomNIDForUpdateStmt)
err := stmt.QueryRowContext(ctx, roomID).Scan(&roomNID)
return types.RoomNID(roomNID), err
}
func (s *roomStatements) SelectLatestEventNIDs(

View file

@ -24,7 +24,6 @@ import (
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/roomserver/storage/tables"
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/util"
)
@ -57,7 +56,7 @@ const bulkSelectStateBlockEntriesSQL = "" +
"SELECT state_block_nid, event_nids" +
" FROM roomserver_state_block WHERE state_block_nid IN ($1) ORDER BY state_block_nid ASC"
type stateBlockStatements struct {
type StateBlockStatements struct {
db *sql.DB
insertStateDataStmt *sql.Stmt
bulkSelectStateBlockEntriesStmt *sql.Stmt
@ -68,8 +67,8 @@ func CreateStateBlockTable(db *sql.DB) error {
return err
}
func PrepareStateBlockTable(db *sql.DB) (tables.StateBlock, error) {
s := &stateBlockStatements{
func PrepareStateBlockTable(db *sql.DB) (*StateBlockStatements, error) {
s := &StateBlockStatements{
db: db,
}
@ -79,7 +78,7 @@ func PrepareStateBlockTable(db *sql.DB) (tables.StateBlock, error) {
}.Prepare(db)
}
func (s *stateBlockStatements) BulkInsertStateData(
func (s *StateBlockStatements) BulkInsertStateData(
ctx context.Context, txn *sql.Tx,
entries types.StateEntries,
) (id types.StateBlockNID, err error) {
@ -99,7 +98,7 @@ func (s *stateBlockStatements) BulkInsertStateData(
return
}
func (s *stateBlockStatements) BulkSelectStateBlockEntries(
func (s *StateBlockStatements) BulkSelectStateBlockEntries(
ctx context.Context, txn *sql.Tx, stateBlockNIDs types.StateBlockNIDs,
) ([][]types.EventNID, error) {
intfs := make([]interface{}, len(stateBlockNIDs))

View file

@ -62,10 +62,14 @@ const bulkSelectStateBlockNIDsSQL = "" +
"SELECT state_snapshot_nid, state_block_nids FROM roomserver_state_snapshots" +
" WHERE state_snapshot_nid IN ($1) ORDER BY state_snapshot_nid ASC"
type stateSnapshotStatements struct {
const selectStateBlockNIDsForRoomNID = "" +
"SELECT state_block_nids FROM roomserver_state_snapshots WHERE room_nid = $1"
type StateSnapshotStatements struct {
db *sql.DB
insertStateStmt *sql.Stmt
bulkSelectStateBlockNIDsStmt *sql.Stmt
selectStateBlockNIDsStmt *sql.Stmt
}
func CreateStateSnapshotTable(db *sql.DB) error {
@ -73,18 +77,19 @@ func CreateStateSnapshotTable(db *sql.DB) error {
return err
}
func PrepareStateSnapshotTable(db *sql.DB) (tables.StateSnapshot, error) {
s := &stateSnapshotStatements{
func PrepareStateSnapshotTable(db *sql.DB) (*StateSnapshotStatements, error) {
s := &StateSnapshotStatements{
db: db,
}
return s, sqlutil.StatementList{
{&s.insertStateStmt, insertStateSQL},
{&s.bulkSelectStateBlockNIDsStmt, bulkSelectStateBlockNIDsSQL},
{&s.selectStateBlockNIDsStmt, selectStateBlockNIDsForRoomNID},
}.Prepare(db)
}
func (s *stateSnapshotStatements) InsertState(
func (s *StateSnapshotStatements) InsertState(
ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, stateBlockNIDs types.StateBlockNIDs,
) (stateNID types.StateSnapshotNID, err error) {
if stateBlockNIDs == nil {
@ -103,7 +108,7 @@ func (s *stateSnapshotStatements) InsertState(
return
}
func (s *stateSnapshotStatements) BulkSelectStateBlockNIDs(
func (s *StateSnapshotStatements) BulkSelectStateBlockNIDs(
ctx context.Context, txn *sql.Tx, stateNIDs []types.StateSnapshotNID,
) ([]types.StateBlockNIDList, error) {
nids := make([]interface{}, len(stateNIDs))
@ -141,8 +146,34 @@ func (s *stateSnapshotStatements) BulkSelectStateBlockNIDs(
return results, nil
}
func (s *stateSnapshotStatements) BulkSelectStateForHistoryVisibility(
func (s *StateSnapshotStatements) BulkSelectStateForHistoryVisibility(
ctx context.Context, txn *sql.Tx, stateSnapshotNID types.StateSnapshotNID, domain string,
) ([]types.EventNID, error) {
return nil, tables.OptimisationNotSupportedError
}
func (s *StateSnapshotStatements) selectStateBlockNIDsForRoomNID(
ctx context.Context, txn *sql.Tx, roomNID types.RoomNID,
) ([]types.StateBlockNID, error) {
var res []types.StateBlockNID
rows, err := sqlutil.TxStmt(txn, s.selectStateBlockNIDsStmt).QueryContext(ctx, roomNID)
if err != nil {
return res, nil
}
defer internal.CloseAndLogIfError(ctx, rows, "selectStateBlockNIDsForRoomNID: rows.close() failed")
var stateBlockNIDs []types.StateBlockNID
var stateBlockNIDsJSON string
for rows.Next() {
if err = rows.Scan(&stateBlockNIDsJSON); err != nil {
return nil, err
}
if err = json.Unmarshal([]byte(stateBlockNIDsJSON), &stateBlockNIDs); err != nil {
return nil, err
}
res = append(res, stateBlockNIDs...)
}
return res, rows.Err()
}

View file

@ -197,6 +197,11 @@ func (d *Database) prepare(db *sql.DB, writer sqlutil.Writer, cache caching.Room
if err != nil {
return err
}
purge, err := PreparePurgeStatements(db, stateBlock, stateSnapshot)
if err != nil {
return err
}
d.Database = shared.Database{
DB: db,
Cache: cache,
@ -215,6 +220,7 @@ func (d *Database) prepare(db *sql.DB, writer sqlutil.Writer, cache caching.Room
PublishedTable: published,
RedactionsTable: redactions,
GetRoomUpdaterFn: d.GetRoomUpdater,
Purge: purge,
}
return nil
}

View file

@ -130,6 +130,9 @@ func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msgs []*nats.Ms
err = s.onRedactEvent(s.ctx, *output.RedactedEvent)
case api.OutputTypePurgeRoom:
err = s.onPurgeRoom(s.ctx, *output.PurgeRoom)
if err != nil {
return true
}
default:
log.WithField("type", output.Type).Debug(
"roomserver output log: ignoring unknown output type",