* db migration: fix #1844 and add additional assertions - Migration scripts will now check to see if there are any unconverted snapshot IDs and fail the migration if there are any. This should prevent people from getting a corrupt database in the event the root cause is still unknown. - Add an ORDER BY clause when doing batch queries in the postgres migration. LIMIT and OFFSET without ORDER BY are undefined and must not be relied upon to produce a deterministic ordering (e.g row order). See https://www.postgresql.org/docs/current/queries-limit.html * Linting Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com>
This commit is contained in:
parent
e2b6a90d90
commit
c849e74dfc
|
@ -119,11 +119,15 @@ func UpStateBlocksRefactor(tx *sql.Tx) error {
|
||||||
_roomserver_state_snapshots
|
_roomserver_state_snapshots
|
||||||
JOIN _roomserver_state_block ON _roomserver_state_block.state_block_nid = ANY (_roomserver_state_snapshots.state_block_nids)
|
JOIN _roomserver_state_block ON _roomserver_state_block.state_block_nid = ANY (_roomserver_state_snapshots.state_block_nids)
|
||||||
WHERE
|
WHERE
|
||||||
_roomserver_state_snapshots.state_snapshot_nid = ANY ( SELECT DISTINCT
|
_roomserver_state_snapshots.state_snapshot_nid = ANY (
|
||||||
|
SELECT
|
||||||
_roomserver_state_snapshots.state_snapshot_nid
|
_roomserver_state_snapshots.state_snapshot_nid
|
||||||
FROM
|
FROM
|
||||||
_roomserver_state_snapshots
|
_roomserver_state_snapshots
|
||||||
LIMIT $1 OFFSET $2)) AS _roomserver_state_block
|
ORDER BY _roomserver_state_snapshots.state_snapshot_nid ASC
|
||||||
|
LIMIT $1 OFFSET $2
|
||||||
|
)
|
||||||
|
) AS _roomserver_state_block
|
||||||
GROUP BY
|
GROUP BY
|
||||||
state_snapshot_nid,
|
state_snapshot_nid,
|
||||||
room_nid,
|
room_nid,
|
||||||
|
@ -202,6 +206,23 @@ func UpStateBlocksRefactor(tx *sql.Tx) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// By this point we should have no more state_snapshot_nids below maxsnapshotid in either roomserver_rooms or roomserver_events
|
||||||
|
// If we do, this is a problem if Dendrite tries to load the snapshot as it will not exist
|
||||||
|
// in roomserver_state_snapshots
|
||||||
|
var count int64
|
||||||
|
if err = tx.QueryRow(`SELECT COUNT(*) FROM roomserver_events WHERE state_snapshot_nid < $1 AND state_snapshot_nid != 0`, maxsnapshotid).Scan(&count); err != nil {
|
||||||
|
return fmt.Errorf("assertion query failed: %s", err)
|
||||||
|
}
|
||||||
|
if count > 0 {
|
||||||
|
return fmt.Errorf("%d events exist in roomserver_events which have not been converted to a new state_snapshot_nid; this is a bug, please report", count)
|
||||||
|
}
|
||||||
|
if err = tx.QueryRow(`SELECT COUNT(*) FROM roomserver_rooms WHERE state_snapshot_nid < $1 AND state_snapshot_nid != 0`, maxsnapshotid).Scan(&count); err != nil {
|
||||||
|
return fmt.Errorf("assertion query failed: %s", err)
|
||||||
|
}
|
||||||
|
if count > 0 {
|
||||||
|
return fmt.Errorf("%d rooms exist in roomserver_rooms which have not been converted to a new state_snapshot_nid; this is a bug, please report", count)
|
||||||
|
}
|
||||||
|
|
||||||
if _, err = tx.Exec(`
|
if _, err = tx.Exec(`
|
||||||
DROP TABLE _roomserver_state_snapshots;
|
DROP TABLE _roomserver_state_snapshots;
|
||||||
DROP SEQUENCE roomserver_state_snapshot_nid_seq;
|
DROP SEQUENCE roomserver_state_snapshot_nid_seq;
|
||||||
|
|
|
@ -31,6 +31,7 @@ func LoadStateBlocksRefactor(m *sqlutil.Migrations) {
|
||||||
m.AddMigration(UpStateBlocksRefactor, DownStateBlocksRefactor)
|
m.AddMigration(UpStateBlocksRefactor, DownStateBlocksRefactor)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// nolint:gocyclo
|
||||||
func UpStateBlocksRefactor(tx *sql.Tx) error {
|
func UpStateBlocksRefactor(tx *sql.Tx) error {
|
||||||
logrus.Warn("Performing state storage upgrade. Please wait, this may take some time!")
|
logrus.Warn("Performing state storage upgrade. Please wait, this may take some time!")
|
||||||
defer logrus.Warn("State storage upgrade complete")
|
defer logrus.Warn("State storage upgrade complete")
|
||||||
|
@ -45,6 +46,7 @@ func UpStateBlocksRefactor(tx *sql.Tx) error {
|
||||||
}
|
}
|
||||||
maxsnapshotid++
|
maxsnapshotid++
|
||||||
maxblockid++
|
maxblockid++
|
||||||
|
oldMaxSnapshotID := maxsnapshotid
|
||||||
|
|
||||||
if _, err := tx.Exec(`ALTER TABLE roomserver_state_block RENAME TO _roomserver_state_block;`); err != nil {
|
if _, err := tx.Exec(`ALTER TABLE roomserver_state_block RENAME TO _roomserver_state_block;`); err != nil {
|
||||||
return fmt.Errorf("tx.Exec: %w", err)
|
return fmt.Errorf("tx.Exec: %w", err)
|
||||||
|
@ -133,6 +135,7 @@ func UpStateBlocksRefactor(tx *sql.Tx) error {
|
||||||
if jerr != nil {
|
if jerr != nil {
|
||||||
return fmt.Errorf("json.Marshal (new blocks): %w", jerr)
|
return fmt.Errorf("json.Marshal (new blocks): %w", jerr)
|
||||||
}
|
}
|
||||||
|
|
||||||
var newsnapshot types.StateSnapshotNID
|
var newsnapshot types.StateSnapshotNID
|
||||||
err = tx.QueryRow(`
|
err = tx.QueryRow(`
|
||||||
INSERT INTO roomserver_state_snapshots (state_snapshot_nid, state_snapshot_hash, room_nid, state_block_nids)
|
INSERT INTO roomserver_state_snapshots (state_snapshot_nid, state_snapshot_hash, room_nid, state_block_nids)
|
||||||
|
@ -144,7 +147,8 @@ func UpStateBlocksRefactor(tx *sql.Tx) error {
|
||||||
return fmt.Errorf("tx.QueryRow.Scan (insert new snapshot): %w", err)
|
return fmt.Errorf("tx.QueryRow.Scan (insert new snapshot): %w", err)
|
||||||
}
|
}
|
||||||
maxsnapshotid++
|
maxsnapshotid++
|
||||||
if _, err = tx.Exec(`UPDATE roomserver_events SET state_snapshot_nid=$1 WHERE state_snapshot_nid=$2 AND state_snapshot_nid<$3`, newsnapshot, snapshot, maxsnapshotid); err != nil {
|
_, err = tx.Exec(`UPDATE roomserver_events SET state_snapshot_nid=$1 WHERE state_snapshot_nid=$2 AND state_snapshot_nid<$3`, newsnapshot, snapshot, maxsnapshotid)
|
||||||
|
if err != nil {
|
||||||
return fmt.Errorf("tx.Exec (update events): %w", err)
|
return fmt.Errorf("tx.Exec (update events): %w", err)
|
||||||
}
|
}
|
||||||
if _, err = tx.Exec(`UPDATE roomserver_rooms SET state_snapshot_nid=$1 WHERE state_snapshot_nid=$2 AND state_snapshot_nid<$3`, newsnapshot, snapshot, maxsnapshotid); err != nil {
|
if _, err = tx.Exec(`UPDATE roomserver_rooms SET state_snapshot_nid=$1 WHERE state_snapshot_nid=$2 AND state_snapshot_nid<$3`, newsnapshot, snapshot, maxsnapshotid); err != nil {
|
||||||
|
@ -153,6 +157,23 @@ func UpStateBlocksRefactor(tx *sql.Tx) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// By this point we should have no more state_snapshot_nids below oldMaxSnapshotID in either roomserver_rooms or roomserver_events
|
||||||
|
// If we do, this is a problem if Dendrite tries to load the snapshot as it will not exist
|
||||||
|
// in roomserver_state_snapshots
|
||||||
|
var count int64
|
||||||
|
if err = tx.QueryRow(`SELECT COUNT(*) FROM roomserver_events WHERE state_snapshot_nid < $1 AND state_snapshot_nid != 0`, oldMaxSnapshotID).Scan(&count); err != nil {
|
||||||
|
return fmt.Errorf("assertion query failed: %s", err)
|
||||||
|
}
|
||||||
|
if count > 0 {
|
||||||
|
return fmt.Errorf("%d events exist in roomserver_events which have not been converted to a new state_snapshot_nid; this is a bug, please report", count)
|
||||||
|
}
|
||||||
|
if err = tx.QueryRow(`SELECT COUNT(*) FROM roomserver_rooms WHERE state_snapshot_nid < $1 AND state_snapshot_nid != 0`, oldMaxSnapshotID).Scan(&count); err != nil {
|
||||||
|
return fmt.Errorf("assertion query failed: %s", err)
|
||||||
|
}
|
||||||
|
if count > 0 {
|
||||||
|
return fmt.Errorf("%d rooms exist in roomserver_rooms which have not been converted to a new state_snapshot_nid; this is a bug, please report", count)
|
||||||
|
}
|
||||||
|
|
||||||
if _, err = tx.Exec(`DROP TABLE _roomserver_state_snapshots;`); err != nil {
|
if _, err = tx.Exec(`DROP TABLE _roomserver_state_snapshots;`); err != nil {
|
||||||
return fmt.Errorf("tx.Exec (delete old snapshot table): %w", err)
|
return fmt.Errorf("tx.Exec (delete old snapshot table): %w", err)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue