syncapi SQLite purge support

This commit is contained in:
Till Faelligen 2022-12-22 15:08:07 +01:00
parent 1536a6245f
commit 455321dcbf
No known key found for this signature in database
GPG key ID: ACCDC9606D472758
8 changed files with 66 additions and 13 deletions

View file

@ -17,7 +17,6 @@ package sqlite3
import ( import (
"context" "context"
"database/sql" "database/sql"
"fmt"
"github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/internal/sqlutil"
@ -48,11 +47,15 @@ const selectBackwardExtremitiesForRoomSQL = "" +
const deleteBackwardExtremitySQL = "" + const deleteBackwardExtremitySQL = "" +
"DELETE FROM syncapi_backward_extremities WHERE room_id = $1 AND prev_event_id = $2" "DELETE FROM syncapi_backward_extremities WHERE room_id = $1 AND prev_event_id = $2"
const purgeBackwardExtremitiesSQL = "" +
"DELETE FROM syncapi_backward_extremities WHERE room_id = $1"
type backwardExtremitiesStatements struct { type backwardExtremitiesStatements struct {
db *sql.DB db *sql.DB
insertBackwardExtremityStmt *sql.Stmt insertBackwardExtremityStmt *sql.Stmt
selectBackwardExtremitiesForRoomStmt *sql.Stmt selectBackwardExtremitiesForRoomStmt *sql.Stmt
deleteBackwardExtremityStmt *sql.Stmt deleteBackwardExtremityStmt *sql.Stmt
purgeBackwardExtremitiesStmt *sql.Stmt
} }
func NewSqliteBackwardsExtremitiesTable(db *sql.DB) (tables.BackwardsExtremities, error) { func NewSqliteBackwardsExtremitiesTable(db *sql.DB) (tables.BackwardsExtremities, error) {
@ -72,6 +75,9 @@ func NewSqliteBackwardsExtremitiesTable(db *sql.DB) (tables.BackwardsExtremities
if s.deleteBackwardExtremityStmt, err = db.Prepare(deleteBackwardExtremitySQL); err != nil { if s.deleteBackwardExtremityStmt, err = db.Prepare(deleteBackwardExtremitySQL); err != nil {
return nil, err return nil, err
} }
if s.purgeBackwardExtremitiesStmt, err = db.Prepare(purgeBackwardExtremitiesSQL); err != nil {
return nil, err
}
return s, nil return s, nil
} }
@ -114,5 +120,6 @@ func (s *backwardExtremitiesStatements) DeleteBackwardExtremity(
func (s *backwardExtremitiesStatements) PurgeBackwardExtremities( func (s *backwardExtremitiesStatements) PurgeBackwardExtremities(
ctx context.Context, txn *sql.Tx, roomID string, ctx context.Context, txn *sql.Tx, roomID string,
) error { ) error {
return fmt.Errorf("not implemented on SQLite") _, err := sqlutil.TxStmt(txn, s.purgeBackwardExtremitiesStmt).ExecContext(ctx, roomID)
return err
} }

View file

@ -19,7 +19,6 @@ import (
"context" "context"
"database/sql" "database/sql"
"encoding/json" "encoding/json"
"fmt"
"github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/internal/sqlutil"
@ -58,6 +57,9 @@ const selectInviteEventsInRangeSQL = "" +
const selectMaxInviteIDSQL = "" + const selectMaxInviteIDSQL = "" +
"SELECT MAX(id) FROM syncapi_invite_events" "SELECT MAX(id) FROM syncapi_invite_events"
const purgeInvitesSQL = "" +
"DELETE FROM syncapi_invite_events WHERE room_id = $1"
type inviteEventsStatements struct { type inviteEventsStatements struct {
db *sql.DB db *sql.DB
streamIDStatements *StreamIDStatements streamIDStatements *StreamIDStatements
@ -65,6 +67,7 @@ type inviteEventsStatements struct {
selectInviteEventsInRangeStmt *sql.Stmt selectInviteEventsInRangeStmt *sql.Stmt
deleteInviteEventStmt *sql.Stmt deleteInviteEventStmt *sql.Stmt
selectMaxInviteIDStmt *sql.Stmt selectMaxInviteIDStmt *sql.Stmt
purgeInvitesStmt *sql.Stmt
} }
func NewSqliteInvitesTable(db *sql.DB, streamID *StreamIDStatements) (tables.Invites, error) { func NewSqliteInvitesTable(db *sql.DB, streamID *StreamIDStatements) (tables.Invites, error) {
@ -88,6 +91,9 @@ func NewSqliteInvitesTable(db *sql.DB, streamID *StreamIDStatements) (tables.Inv
if s.selectMaxInviteIDStmt, err = db.Prepare(selectMaxInviteIDSQL); err != nil { if s.selectMaxInviteIDStmt, err = db.Prepare(selectMaxInviteIDSQL); err != nil {
return nil, err return nil, err
} }
if s.purgeInvitesStmt, err = db.Prepare(purgeInvitesSQL); err != nil {
return nil, err
}
return s, nil return s, nil
} }
@ -197,5 +203,6 @@ func (s *inviteEventsStatements) SelectMaxInviteID(
func (s *inviteEventsStatements) PurgeInvites( func (s *inviteEventsStatements) PurgeInvites(
ctx context.Context, txn *sql.Tx, roomID string, ctx context.Context, txn *sql.Tx, roomID string,
) error { ) error {
return fmt.Errorf("not implemented on SQLite") _, err := sqlutil.TxStmt(txn, s.purgeInvitesStmt).ExecContext(ctx, roomID)
return err
} }

View file

@ -77,6 +77,9 @@ SELECT event_id FROM
AND ($4 IS NULL OR t.membership <> $4) AND ($4 IS NULL OR t.membership <> $4)
` `
const purgeMembershipsSQL = "" +
"DELETE FROM syncapi_memberships WHERE room_id = $1"
type membershipsStatements struct { type membershipsStatements struct {
db *sql.DB db *sql.DB
upsertMembershipStmt *sql.Stmt upsertMembershipStmt *sql.Stmt
@ -84,6 +87,7 @@ type membershipsStatements struct {
//selectHeroesStmt *sql.Stmt - prepared at runtime due to variadic //selectHeroesStmt *sql.Stmt - prepared at runtime due to variadic
selectMembershipForUserStmt *sql.Stmt selectMembershipForUserStmt *sql.Stmt
selectMembersStmt *sql.Stmt selectMembersStmt *sql.Stmt
purgeMembershipsStmt *sql.Stmt
} }
func NewSqliteMembershipsTable(db *sql.DB) (tables.Memberships, error) { func NewSqliteMembershipsTable(db *sql.DB) (tables.Memberships, error) {
@ -99,6 +103,7 @@ func NewSqliteMembershipsTable(db *sql.DB) (tables.Memberships, error) {
{&s.selectMembershipCountStmt, selectMembershipCountSQL}, {&s.selectMembershipCountStmt, selectMembershipCountSQL},
{&s.selectMembershipForUserStmt, selectMembershipBeforeSQL}, {&s.selectMembershipForUserStmt, selectMembershipBeforeSQL},
{&s.selectMembersStmt, selectMembersSQL}, {&s.selectMembersStmt, selectMembersSQL},
{&s.purgeMembershipsStmt, purgeMembershipsSQL},
// {&s.selectHeroesStmt, selectHeroesSQL}, - prepared at runtime due to variadic // {&s.selectHeroesStmt, selectHeroesSQL}, - prepared at runtime due to variadic
}.Prepare(db) }.Prepare(db)
} }
@ -184,7 +189,8 @@ func (s *membershipsStatements) SelectMembershipForUser(
func (s *membershipsStatements) PurgeMemberships( func (s *membershipsStatements) PurgeMemberships(
ctx context.Context, txn *sql.Tx, roomID string, ctx context.Context, txn *sql.Tx, roomID string,
) error { ) error {
return fmt.Errorf("not implemented on SQLite") _, err := sqlutil.TxStmt(txn, s.purgeMembershipsStmt).ExecContext(ctx, roomID)
return err
} }
func (s *membershipsStatements) SelectMemberships( func (s *membershipsStatements) SelectMemberships(

View file

@ -17,7 +17,6 @@ package sqlite3
import ( import (
"context" "context"
"database/sql" "database/sql"
"fmt"
"strings" "strings"
"github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal"
@ -39,6 +38,7 @@ func NewSqliteNotificationDataTable(db *sql.DB, streamID *StreamIDStatements) (t
return r, sqlutil.StatementList{ return r, sqlutil.StatementList{
{&r.upsertRoomUnreadCounts, upsertRoomUnreadNotificationCountsSQL}, {&r.upsertRoomUnreadCounts, upsertRoomUnreadNotificationCountsSQL},
{&r.selectMaxID, selectMaxNotificationIDSQL}, {&r.selectMaxID, selectMaxNotificationIDSQL},
{&r.purgeNotificationData, purgeNotificationDataSQL},
// {&r.selectUserUnreadCountsForRooms, selectUserUnreadNotificationsForRooms}, // used at runtime // {&r.selectUserUnreadCountsForRooms, selectUserUnreadNotificationsForRooms}, // used at runtime
}.Prepare(db) }.Prepare(db)
} }
@ -48,6 +48,7 @@ type notificationDataStatements struct {
streamIDStatements *StreamIDStatements streamIDStatements *StreamIDStatements
upsertRoomUnreadCounts *sql.Stmt upsertRoomUnreadCounts *sql.Stmt
selectMaxID *sql.Stmt selectMaxID *sql.Stmt
purgeNotificationData *sql.Stmt
//selectUserUnreadCountsForRooms *sql.Stmt //selectUserUnreadCountsForRooms *sql.Stmt
} }
@ -74,6 +75,9 @@ const selectUserUnreadNotificationsForRooms = `SELECT room_id, notification_coun
const selectMaxNotificationIDSQL = `SELECT CASE COUNT(*) WHEN 0 THEN 0 ELSE MAX(id) END FROM syncapi_notification_data` const selectMaxNotificationIDSQL = `SELECT CASE COUNT(*) WHEN 0 THEN 0 ELSE MAX(id) END FROM syncapi_notification_data`
const purgeNotificationDataSQL = "" +
"DELETE FROM syncapi_notification_data WHERE room_id = $1"
func (r *notificationDataStatements) UpsertRoomUnreadCounts(ctx context.Context, txn *sql.Tx, userID, roomID string, notificationCount, highlightCount int) (pos types.StreamPosition, err error) { func (r *notificationDataStatements) UpsertRoomUnreadCounts(ctx context.Context, txn *sql.Tx, userID, roomID string, notificationCount, highlightCount int) (pos types.StreamPosition, err error) {
pos, err = r.streamIDStatements.nextNotificationID(ctx, nil) pos, err = r.streamIDStatements.nextNotificationID(ctx, nil)
if err != nil { if err != nil {
@ -129,5 +133,6 @@ func (r *notificationDataStatements) SelectMaxID(ctx context.Context, txn *sql.T
func (s *notificationDataStatements) PurgeNotificationData( func (s *notificationDataStatements) PurgeNotificationData(
ctx context.Context, txn *sql.Tx, roomID string, ctx context.Context, txn *sql.Tx, roomID string,
) error { ) error {
return fmt.Errorf("not implemented on SQLite") _, err := sqlutil.TxStmt(txn, s.purgeNotificationData).ExecContext(ctx, roomID)
return err
} }

View file

@ -120,6 +120,9 @@ const selectContextAfterEventSQL = "" +
const selectSearchSQL = "SELECT id, event_id, headered_event_json FROM syncapi_output_room_events WHERE type IN ($1) AND id > $2 LIMIT $3 ORDER BY id ASC" const selectSearchSQL = "SELECT id, event_id, headered_event_json FROM syncapi_output_room_events WHERE type IN ($1) AND id > $2 LIMIT $3 ORDER BY id ASC"
const purgeEventsSQL = "" +
"DELETE FROM syncapi_output_room_events WHERE room_id = $1"
type outputRoomEventsStatements struct { type outputRoomEventsStatements struct {
db *sql.DB db *sql.DB
streamIDStatements *StreamIDStatements streamIDStatements *StreamIDStatements
@ -130,6 +133,7 @@ type outputRoomEventsStatements struct {
selectContextEventStmt *sql.Stmt selectContextEventStmt *sql.Stmt
selectContextBeforeEventStmt *sql.Stmt selectContextBeforeEventStmt *sql.Stmt
selectContextAfterEventStmt *sql.Stmt selectContextAfterEventStmt *sql.Stmt
purgeEventsStmt *sql.Stmt
//selectSearchStmt *sql.Stmt - prepared at runtime //selectSearchStmt *sql.Stmt - prepared at runtime
} }
@ -163,6 +167,7 @@ func NewSqliteEventsTable(db *sql.DB, streamID *StreamIDStatements) (tables.Even
{&s.selectContextEventStmt, selectContextEventSQL}, {&s.selectContextEventStmt, selectContextEventSQL},
{&s.selectContextBeforeEventStmt, selectContextBeforeEventSQL}, {&s.selectContextBeforeEventStmt, selectContextBeforeEventSQL},
{&s.selectContextAfterEventStmt, selectContextAfterEventSQL}, {&s.selectContextAfterEventStmt, selectContextAfterEventSQL},
{&s.purgeEventsStmt, purgeEventsSQL},
//{&s.selectSearchStmt, selectSearchSQL}, - prepared at runtime //{&s.selectSearchStmt, selectSearchSQL}, - prepared at runtime
}.Prepare(db) }.Prepare(db)
} }
@ -669,7 +674,8 @@ func unmarshalStateIDs(addIDsJSON, delIDsJSON string) (addIDs []string, delIDs [
func (s *outputRoomEventsStatements) PurgeEvents( func (s *outputRoomEventsStatements) PurgeEvents(
ctx context.Context, txn *sql.Tx, roomID string, ctx context.Context, txn *sql.Tx, roomID string,
) error { ) error {
return fmt.Errorf("not implemented on SQLite") _, err := sqlutil.TxStmt(txn, s.purgeEventsStmt).ExecContext(ctx, roomID)
return err
} }
func (s *outputRoomEventsStatements) ReIndex(ctx context.Context, txn *sql.Tx, limit, afterID int64, types []string) (map[int64]gomatrixserverlib.HeaderedEvent, error) { func (s *outputRoomEventsStatements) ReIndex(ctx context.Context, txn *sql.Tx, limit, afterID int64, types []string) (map[int64]gomatrixserverlib.HeaderedEvent, error) {

View file

@ -17,7 +17,6 @@ package sqlite3
import ( import (
"context" "context"
"database/sql" "database/sql"
"fmt"
"github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/syncapi/storage/tables" "github.com/matrix-org/dendrite/syncapi/storage/tables"
@ -72,6 +71,9 @@ const selectStreamToTopologicalPositionAscSQL = "" +
const selectStreamToTopologicalPositionDescSQL = "" + const selectStreamToTopologicalPositionDescSQL = "" +
"SELECT topological_position FROM syncapi_output_room_events_topology WHERE room_id = $1 AND stream_position <= $2 ORDER BY topological_position DESC LIMIT 1;" "SELECT topological_position FROM syncapi_output_room_events_topology WHERE room_id = $1 AND stream_position <= $2 ORDER BY topological_position DESC LIMIT 1;"
const purgeEventsTopologySQL = "" +
"DELETE FROM syncapi_output_room_events_topology WHERE room_id = $1"
type outputRoomEventsTopologyStatements struct { type outputRoomEventsTopologyStatements struct {
db *sql.DB db *sql.DB
insertEventInTopologyStmt *sql.Stmt insertEventInTopologyStmt *sql.Stmt
@ -81,6 +83,7 @@ type outputRoomEventsTopologyStatements struct {
selectMaxPositionInTopologyStmt *sql.Stmt selectMaxPositionInTopologyStmt *sql.Stmt
selectStreamToTopologicalPositionAscStmt *sql.Stmt selectStreamToTopologicalPositionAscStmt *sql.Stmt
selectStreamToTopologicalPositionDescStmt *sql.Stmt selectStreamToTopologicalPositionDescStmt *sql.Stmt
purgeEventsTopologyStmt *sql.Stmt
} }
func NewSqliteTopologyTable(db *sql.DB) (tables.Topology, error) { func NewSqliteTopologyTable(db *sql.DB) (tables.Topology, error) {
@ -112,6 +115,9 @@ func NewSqliteTopologyTable(db *sql.DB) (tables.Topology, error) {
if s.selectStreamToTopologicalPositionDescStmt, err = db.Prepare(selectStreamToTopologicalPositionDescSQL); err != nil { if s.selectStreamToTopologicalPositionDescStmt, err = db.Prepare(selectStreamToTopologicalPositionDescSQL); err != nil {
return nil, err return nil, err
} }
if s.purgeEventsTopologyStmt, err = db.Prepare(purgeEventsTopologySQL); err != nil {
return nil, err
}
return s, nil return s, nil
} }
@ -195,5 +201,6 @@ func (s *outputRoomEventsTopologyStatements) SelectMaxPositionInTopology(
func (s *outputRoomEventsTopologyStatements) PurgeEventsTopology( func (s *outputRoomEventsTopologyStatements) PurgeEventsTopology(
ctx context.Context, txn *sql.Tx, roomID string, ctx context.Context, txn *sql.Tx, roomID string,
) error { ) error {
return fmt.Errorf("not implemented on SQLite") _, err := sqlutil.TxStmt(txn, s.purgeEventsTopologyStmt).ExecContext(ctx, roomID)
return err
} }

View file

@ -17,7 +17,6 @@ package sqlite3
import ( import (
"context" "context"
"database/sql" "database/sql"
"fmt"
"time" "time"
"github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal"
@ -65,6 +64,9 @@ const selectPeekingDevicesSQL = "" +
const selectMaxPeekIDSQL = "" + const selectMaxPeekIDSQL = "" +
"SELECT MAX(id) FROM syncapi_peeks" "SELECT MAX(id) FROM syncapi_peeks"
const purgePeeksSQL = "" +
"DELETE FROM syncapi_peeks WHERE room_id = $1"
type peekStatements struct { type peekStatements struct {
db *sql.DB db *sql.DB
streamIDStatements *StreamIDStatements streamIDStatements *StreamIDStatements
@ -74,6 +76,7 @@ type peekStatements struct {
selectPeeksInRangeStmt *sql.Stmt selectPeeksInRangeStmt *sql.Stmt
selectPeekingDevicesStmt *sql.Stmt selectPeekingDevicesStmt *sql.Stmt
selectMaxPeekIDStmt *sql.Stmt selectMaxPeekIDStmt *sql.Stmt
purgePeeksStmt *sql.Stmt
} }
func NewSqlitePeeksTable(db *sql.DB, streamID *StreamIDStatements) (tables.Peeks, error) { func NewSqlitePeeksTable(db *sql.DB, streamID *StreamIDStatements) (tables.Peeks, error) {
@ -103,6 +106,9 @@ func NewSqlitePeeksTable(db *sql.DB, streamID *StreamIDStatements) (tables.Peeks
if s.selectMaxPeekIDStmt, err = db.Prepare(selectMaxPeekIDSQL); err != nil { if s.selectMaxPeekIDStmt, err = db.Prepare(selectMaxPeekIDSQL); err != nil {
return nil, err return nil, err
} }
if s.purgePeeksStmt, err = db.Prepare(purgePeeksSQL); err != nil {
return nil, err
}
return s, nil return s, nil
} }
@ -209,5 +215,6 @@ func (s *peekStatements) SelectMaxPeekID(
func (s *peekStatements) PurgePeeks( func (s *peekStatements) PurgePeeks(
ctx context.Context, txn *sql.Tx, roomID string, ctx context.Context, txn *sql.Tx, roomID string,
) error { ) error {
return fmt.Errorf("not implemented on SQLite") _, err := sqlutil.TxStmt(txn, s.purgePeeksStmt).ExecContext(ctx, roomID)
return err
} }

View file

@ -58,12 +58,16 @@ const selectRoomReceipts = "" +
const selectMaxReceiptIDSQL = "" + const selectMaxReceiptIDSQL = "" +
"SELECT MAX(id) FROM syncapi_receipts" "SELECT MAX(id) FROM syncapi_receipts"
const purgeReceiptsSQL = "" +
"DELETE FROM syncapi_receipts WHERE room_id = $1"
type receiptStatements struct { type receiptStatements struct {
db *sql.DB db *sql.DB
streamIDStatements *StreamIDStatements streamIDStatements *StreamIDStatements
upsertReceipt *sql.Stmt upsertReceipt *sql.Stmt
selectRoomReceipts *sql.Stmt selectRoomReceipts *sql.Stmt
selectMaxReceiptID *sql.Stmt selectMaxReceiptID *sql.Stmt
purgeReceiptsStmt *sql.Stmt
} }
func NewSqliteReceiptsTable(db *sql.DB, streamID *StreamIDStatements) (tables.Receipts, error) { func NewSqliteReceiptsTable(db *sql.DB, streamID *StreamIDStatements) (tables.Receipts, error) {
@ -93,6 +97,9 @@ func NewSqliteReceiptsTable(db *sql.DB, streamID *StreamIDStatements) (tables.Re
if r.selectMaxReceiptID, err = db.Prepare(selectMaxReceiptIDSQL); err != nil { if r.selectMaxReceiptID, err = db.Prepare(selectMaxReceiptIDSQL); err != nil {
return nil, fmt.Errorf("unable to prepare selectRoomReceipts statement: %w", err) return nil, fmt.Errorf("unable to prepare selectRoomReceipts statement: %w", err)
} }
if r.purgeReceiptsStmt, err = db.Prepare(purgeReceiptsSQL); err != nil {
return nil, fmt.Errorf("unable to prepare purgeReceiptsStmt statement: %w", err)
}
return r, nil return r, nil
} }
@ -157,5 +164,6 @@ func (s *receiptStatements) SelectMaxReceiptID(
func (s *receiptStatements) PurgeReceipts( func (s *receiptStatements) PurgeReceipts(
ctx context.Context, txn *sql.Tx, roomID string, ctx context.Context, txn *sql.Tx, roomID string,
) error { ) error {
return fmt.Errorf("not implemented on SQLite") _, err := sqlutil.TxStmt(txn, s.purgeReceiptsStmt).ExecContext(ctx, roomID)
return err
} }