From ec5d23005ea462608baeba7f24f117b245f28e15 Mon Sep 17 00:00:00 2001 From: Till Faelligen <2353100+S7evinK@users.noreply.github.com> Date: Wed, 17 May 2023 11:28:03 +0200 Subject: [PATCH] Drop column in roomserver_previous_events --- .../20230516154000_drop_reference_sha.go | 10 +++++- roomserver/storage/postgres/events_table.go | 2 +- .../storage/postgres/previous_events_table.go | 28 ++++++++++----- roomserver/storage/shared/room_updater.go | 4 +-- .../20230516154000_drop_reference_sha.go | 34 ++++++++++++++++++ .../storage/sqlite3/previous_events_table.go | 35 +++++++++++++++++-- roomserver/storage/sqlite3/storage.go | 3 +- .../storage/tables/events_table_test.go | 7 ++-- 8 files changed, 101 insertions(+), 22 deletions(-) diff --git a/roomserver/storage/postgres/deltas/20230516154000_drop_reference_sha.go b/roomserver/storage/postgres/deltas/20230516154000_drop_reference_sha.go index 2397231f8..f8677eb4e 100644 --- a/roomserver/storage/postgres/deltas/20230516154000_drop_reference_sha.go +++ b/roomserver/storage/postgres/deltas/20230516154000_drop_reference_sha.go @@ -20,7 +20,7 @@ import ( "fmt" ) -func UpDropEventReferenceSHA(ctx context.Context, tx *sql.Tx) error { +func UpDropEventReferenceSHAEvents(ctx context.Context, tx *sql.Tx) error { var count int err := tx.QueryRowContext(ctx, `SELECT count(*) FROM roomserver_events GROUP BY event_id HAVING count(event_id) > 1`). Scan(&count) @@ -36,3 +36,11 @@ func UpDropEventReferenceSHA(ctx context.Context, tx *sql.Tx) error { } return nil } + +func UpDropEventReferenceSHAPrevEvents(ctx context.Context, tx *sql.Tx) error { + _, err := tx.ExecContext(ctx, `ALTER TABLE roomserver_previous_events DROP COLUMN IF EXISTS previous_reference_sha256;`) + if err != nil { + return fmt.Errorf("failed to execute upgrade: %w", err) + } + return nil +} diff --git a/roomserver/storage/postgres/events_table.go b/roomserver/storage/postgres/events_table.go index 0129efdf9..b9203a69d 100644 --- a/roomserver/storage/postgres/events_table.go +++ b/roomserver/storage/postgres/events_table.go @@ -184,7 +184,7 @@ func CreateEventsTable(db *sql.DB) error { m.AddMigrations([]sqlutil.Migration{ { Version: "roomserver: drop column reference_sha from roomserver_events", - Up: deltas.UpDropEventReferenceSHA, + Up: deltas.UpDropEventReferenceSHAEvents, }, }...) return m.Up(context.Background()) diff --git a/roomserver/storage/postgres/previous_events_table.go b/roomserver/storage/postgres/previous_events_table.go index 1b4b94a1d..ceb5e26ba 100644 --- a/roomserver/storage/postgres/previous_events_table.go +++ b/roomserver/storage/postgres/previous_events_table.go @@ -20,6 +20,7 @@ import ( "database/sql" "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/dendrite/roomserver/storage/postgres/deltas" "github.com/matrix-org/dendrite/roomserver/storage/tables" "github.com/matrix-org/dendrite/roomserver/types" ) @@ -32,11 +33,9 @@ const previousEventSchema = ` CREATE TABLE IF NOT EXISTS roomserver_previous_events ( -- The string event ID taken from the prev_events key of an event. previous_event_id TEXT NOT NULL, - -- The SHA256 reference hash taken from the prev_events key of an event. - previous_reference_sha256 BYTEA NOT NULL, -- A list of numeric event IDs of events that reference this prev_event. event_nids BIGINT[] NOT NULL, - CONSTRAINT roomserver_previous_event_id_unique UNIQUE (previous_event_id, previous_reference_sha256) + CONSTRAINT roomserver_previous_event_id_unique UNIQUE (previous_event_id) ); ` @@ -47,11 +46,11 @@ CREATE TABLE IF NOT EXISTS roomserver_previous_events ( // The lock is necessary to avoid data races when checking whether an event is already referenced by another event. const insertPreviousEventSQL = "" + "INSERT INTO roomserver_previous_events" + - " (previous_event_id, previous_reference_sha256, event_nids)" + - " VALUES ($1, $2, array_append('{}'::bigint[], $3))" + + " (previous_event_id, event_nids)" + + " VALUES ($1, array_append('{}'::bigint[], $2))" + " ON CONFLICT ON CONSTRAINT roomserver_previous_event_id_unique" + - " DO UPDATE SET event_nids = array_append(roomserver_previous_events.event_nids, $3)" + - " WHERE $3 != ALL(roomserver_previous_events.event_nids)" + " DO UPDATE SET event_nids = array_append(roomserver_previous_events.event_nids, $2)" + + " WHERE $2 != ALL(roomserver_previous_events.event_nids)" // Check if the event is referenced by another event in the table. // This should only be done while holding a "FOR UPDATE" lock on the row in the rooms table for this room. @@ -66,7 +65,18 @@ type previousEventStatements struct { func CreatePrevEventsTable(db *sql.DB) error { _, err := db.Exec(previousEventSchema) - return err + if err != nil { + return err + } + + m := sqlutil.NewMigrator(db) + m.AddMigrations([]sqlutil.Migration{ + { + Version: "roomserver: drop column reference_sha from roomserver_prev_events", + Up: deltas.UpDropEventReferenceSHAPrevEvents, + }, + }...) + return m.Up(context.Background()) } func PreparePrevEventsTable(db *sql.DB) (tables.PreviousEvents, error) { @@ -86,7 +96,7 @@ func (s *previousEventStatements) InsertPreviousEvent( ) error { stmt := sqlutil.TxStmt(txn, s.insertPreviousEventStmt) _, err := stmt.ExecContext( - ctx, previousEventID, []byte(""), int64(eventNID), + ctx, previousEventID, int64(eventNID), ) return err } diff --git a/roomserver/storage/shared/room_updater.go b/roomserver/storage/shared/room_updater.go index dd45af350..70672a33e 100644 --- a/roomserver/storage/shared/room_updater.go +++ b/roomserver/storage/shared/room_updater.go @@ -191,8 +191,8 @@ func (u *RoomUpdater) EventsFromIDs(ctx context.Context, roomInfo *types.RoomInf } // IsReferenced implements types.RoomRecentEventsUpdater -func (u *RoomUpdater) IsReferenced(eventReference gomatrixserverlib.EventReference) (bool, error) { - err := u.d.PrevEventsTable.SelectPreviousEventExists(u.ctx, u.txn, eventReference.EventID) +func (u *RoomUpdater) IsReferenced(eventID string) (bool, error) { + err := u.d.PrevEventsTable.SelectPreviousEventExists(u.ctx, u.txn, eventID) if err == nil { return true, nil } diff --git a/roomserver/storage/sqlite3/deltas/20230516154000_drop_reference_sha.go b/roomserver/storage/sqlite3/deltas/20230516154000_drop_reference_sha.go index c31593b4c..452d72ace 100644 --- a/roomserver/storage/sqlite3/deltas/20230516154000_drop_reference_sha.go +++ b/roomserver/storage/sqlite3/deltas/20230516154000_drop_reference_sha.go @@ -36,3 +36,37 @@ func UpDropEventReferenceSHA(ctx context.Context, tx *sql.Tx) error { } return nil } + +func UpDropEventReferenceSHAPrevEvents(ctx context.Context, tx *sql.Tx) error { + // rename the table + if _, err := tx.ExecContext(ctx, `ALTER TABLE roomserver_previous_events RENAME TO _roomserver_previous_events;`); err != nil { + return fmt.Errorf("tx.ExecContext: %w", err) + } + + // create new table + if _, err := tx.ExecContext(ctx, `CREATE TABLE IF NOT EXISTS roomserver_previous_events ( + previous_event_id TEXT NOT NULL, + event_nids TEXT NOT NULL, + UNIQUE (previous_event_id) + );`); err != nil { + return fmt.Errorf("tx.ExecContext: %w", err) + } + + // move data + if _, err := tx.ExecContext(ctx, ` +INSERT + INTO roomserver_previous_events ( + previous_event_id, event_nids + ) SELECT + previous_event_id, event_nids + FROM _roomserver_previous_events +;`); err != nil { + return fmt.Errorf("tx.ExecContext: %w", err) + } + // drop old table + _, err := tx.ExecContext(ctx, `DROP TABLE _roomserver_previous_events;`) + if err != nil { + return fmt.Errorf("failed to execute upgrade: %w", err) + } + return nil +} diff --git a/roomserver/storage/sqlite3/previous_events_table.go b/roomserver/storage/sqlite3/previous_events_table.go index f8e11257e..d9cf23195 100644 --- a/roomserver/storage/sqlite3/previous_events_table.go +++ b/roomserver/storage/sqlite3/previous_events_table.go @@ -18,12 +18,15 @@ package sqlite3 import ( "context" "database/sql" + "errors" "fmt" "strings" "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/dendrite/roomserver/storage/sqlite3/deltas" "github.com/matrix-org/dendrite/roomserver/storage/tables" "github.com/matrix-org/dendrite/roomserver/types" + "github.com/sirupsen/logrus" ) // TODO: previous_reference_sha256 was NOT NULL before but it broke sytest because @@ -34,9 +37,8 @@ import ( const previousEventSchema = ` CREATE TABLE IF NOT EXISTS roomserver_previous_events ( previous_event_id TEXT NOT NULL, - previous_reference_sha256 BLOB, event_nids TEXT NOT NULL, - UNIQUE (previous_event_id, previous_reference_sha256) + UNIQUE (previous_event_id) ); ` @@ -72,7 +74,34 @@ type previousEventStatements struct { func CreatePrevEventsTable(db *sql.DB) error { _, err := db.Exec(previousEventSchema) - return err + if err != nil { + return err + } + // check if the column exists + var cName string + migrationName := "roomserver: drop column reference_sha from roomserver_prev_events" + err = db.QueryRowContext(context.Background(), `SELECT p.name FROM sqlite_master AS m JOIN pragma_table_info(m.name) AS p WHERE m.name = 'roomserver_previous_events' AND p.name = 'previous_reference_sha256'`).Scan(&cName) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { // migration was already executed, as the column was removed + if err = sqlutil.InsertMigration(context.Background(), db, migrationName); err != nil { + return fmt.Errorf("unable to manually insert migration '%s': %w", migrationName, err) + } + return nil + } + return err + } + m := sqlutil.NewMigrator(db) + m.AddMigrations([]sqlutil.Migration{ + { + Version: migrationName, + Up: deltas.UpDropEventReferenceSHAPrevEvents, + }, + }...) + logrus.Infof("cName: %s", cName) + if err := m.Up(context.Background()); err != nil { + panic(err) + } + return nil } func PreparePrevEventsTable(db *sql.DB) (tables.PreviousEvents, error) { diff --git a/roomserver/storage/sqlite3/storage.go b/roomserver/storage/sqlite3/storage.go index 89e16fc14..6ab427a84 100644 --- a/roomserver/storage/sqlite3/storage.go +++ b/roomserver/storage/sqlite3/storage.go @@ -21,14 +21,13 @@ import ( "errors" "fmt" - "github.com/matrix-org/gomatrixserverlib" - "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/roomserver/storage/shared" "github.com/matrix-org/dendrite/roomserver/storage/sqlite3/deltas" "github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/gomatrixserverlib" ) // A Database is used to store room events and stream offsets. diff --git a/roomserver/storage/tables/events_table_test.go b/roomserver/storage/tables/events_table_test.go index d2721bded..15c1b7445 100644 --- a/roomserver/storage/tables/events_table_test.go +++ b/roomserver/storage/tables/events_table_test.go @@ -75,8 +75,7 @@ func Test_EventsTable(t *testing.T) { assert.True(t, sentToOutput) eventIDs = append(eventIDs, ev.EventID()) - ref := ev.EventReference() - ref.EventSHA256 = nil + ref := gomatrixserverlib.EventReference{EventID: eventID} wantEventReferences = append(wantEventReferences, ref) // Set the stateSnapshot to 2 for some events to verify they are returned later @@ -99,8 +98,8 @@ func Test_EventsTable(t *testing.T) { } wantStateAtEvent = append(wantStateAtEvent, stateAtEvent) wantStateAtEventAndRefs = append(wantStateAtEventAndRefs, types.StateAtEventAndReference{ - StateAtEvent: stateAtEvent, - EventReference: ref, + StateAtEvent: stateAtEvent, + EventID: ev.EventID(), }) }