Set a boundary for old to new block/snapshot IDs so we don't rewrite them more than once accidentally

This commit is contained in:
Neil Alexander 2021-04-21 09:49:21 +01:00
parent 5475f8a7d9
commit 8859e0e86b
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
3 changed files with 31 additions and 16 deletions

View file

@ -44,6 +44,21 @@ func UpStateBlocksRefactor(tx *sql.Tx) error {
logrus.Warn("Performing state storage upgrade. Please wait, this may take some time!") logrus.Warn("Performing state storage upgrade. Please wait, this may take some time!")
defer logrus.Warn("State storage upgrade complete") defer logrus.Warn("State storage upgrade complete")
var snapshotcount int
var maxsnapshotid int
var maxblockid int
if err := tx.QueryRow(`SELECT COUNT(DISTINCT state_snapshot_nid) FROM _roomserver_state_snapshots;`).Scan(&snapshotcount); err != nil {
return fmt.Errorf("tx.QueryRow.Scan (count snapshots): %w", err)
}
if err := tx.QueryRow(`SELECT MAX(state_snapshot_nid) FROM _roomserver_state_snapshots;`).Scan(&maxsnapshotid); err != nil {
return fmt.Errorf("tx.QueryRow.Scan (count snapshots): %w", err)
}
if err := tx.QueryRow(`SELECT MAX(state_block_nid) FROM _roomserver_state_block;`).Scan(&maxblockid); err != nil {
return fmt.Errorf("tx.QueryRow.Scan (count snapshots): %w", err)
}
maxsnapshotid++
maxblockid++
if _, err := tx.Exec(`ALTER TABLE roomserver_state_block RENAME TO _roomserver_state_block;`); err != nil { if _, err := tx.Exec(`ALTER TABLE roomserver_state_block RENAME TO _roomserver_state_block;`); err != nil {
return fmt.Errorf("tx.Exec: %w", err) return fmt.Errorf("tx.Exec: %w", err)
} }
@ -51,36 +66,34 @@ func UpStateBlocksRefactor(tx *sql.Tx) error {
return fmt.Errorf("tx.Exec: %w", err) return fmt.Errorf("tx.Exec: %w", err)
} }
_, err := tx.Exec(` _, err := tx.Exec(`
DROP SEQUENCE IF EXISTS roomserver_state_block_nid_seq;
CREATE SEQUENCE roomserver_state_block_nid_seq START AT $1;
CREATE TABLE IF NOT EXISTS roomserver_state_block ( CREATE TABLE IF NOT EXISTS roomserver_state_block (
state_block_nid bigserial PRIMARY KEY, state_block_nid bigint PRIMARY KEY DEFAULT nextval('roomserver_state_block_nid_seq'),
state_block_hash BYTEA UNIQUE, state_block_hash BYTEA UNIQUE,
event_nids bigint[] NOT NULL event_nids bigint[] NOT NULL
); );
`) `, maxblockid)
if err != nil { if err != nil {
return fmt.Errorf("tx.Exec: %w", err) return fmt.Errorf("tx.Exec: %w", err)
} }
_, err = tx.Exec(` _, err = tx.Exec(`
DROP SEQUENCE IF EXISTS roomserver_state_snapshot_nid_seq;
CREATE SEQUENCE roomserver_state_snapshot_nid_seq START AT $1;
CREATE TABLE IF NOT EXISTS roomserver_state_snapshots ( CREATE TABLE IF NOT EXISTS roomserver_state_snapshots (
state_snapshot_nid bigserial PRIMARY KEY, state_snapshot_nid bigint PRIMARY KEY DEFAULT nextval('roomserver_state_snapshot_nid_seq'),
state_snapshot_hash BYTEA UNIQUE, state_snapshot_hash BYTEA UNIQUE,
room_nid bigint NOT NULL, room_nid bigint NOT NULL,
state_block_nids bigint[] NOT NULL state_block_nids bigint[] NOT NULL
); );
`) `, maxsnapshotid)
if err != nil { if err != nil {
return fmt.Errorf("tx.Exec: %w", err) return fmt.Errorf("tx.Exec: %w", err)
} }
logrus.Warn("New tables created...") logrus.Warn("New tables created...")
var snapshotcount int
err = tx.QueryRow(`
SELECT COUNT(DISTINCT state_snapshot_nid) FROM _roomserver_state_snapshots;
`).Scan(&snapshotcount)
if err != nil {
return fmt.Errorf("tx.QueryRow.Scan (count snapshots): %w", err)
}
batchsize := 100 batchsize := 100
for batchoffset := 0; batchoffset < snapshotcount; batchoffset += batchsize { for batchoffset := 0; batchoffset < snapshotcount; batchoffset += batchsize {
var snapshotrows *sql.Rows var snapshotrows *sql.Rows
@ -173,11 +186,11 @@ func UpStateBlocksRefactor(tx *sql.Tx) error {
return fmt.Errorf("tx.QueryRow.Scan (insert new snapshot): %w", err) return fmt.Errorf("tx.QueryRow.Scan (insert new snapshot): %w", err)
} }
if _, err = tx.Exec(`UPDATE roomserver_events SET state_snapshot_nid=$1 WHERE state_snapshot_nid=$2`, newNID, snapshotdata.StateSnapshotNID); err != nil { if _, err = tx.Exec(`UPDATE roomserver_events SET state_snapshot_nid=$1 WHERE state_snapshot_nid=$2 AND state_snapshot_nid<$3`, newNID, snapshotdata.StateSnapshotNID, maxsnapshotid); err != nil {
return fmt.Errorf("tx.Exec (update events): %w", err) return fmt.Errorf("tx.Exec (update events): %w", err)
} }
if _, err = tx.Exec(`UPDATE roomserver_rooms SET state_snapshot_nid=$1 WHERE state_snapshot_nid=$2`, newNID, snapshotdata.StateSnapshotNID); err != nil { if _, err = tx.Exec(`UPDATE roomserver_rooms SET state_snapshot_nid=$1 WHERE state_snapshot_nid=$2 AND state_snapshot_nid<$3`, newNID, snapshotdata.StateSnapshotNID, maxsnapshotid); err != nil {
return fmt.Errorf("tx.Exec (update rooms): %w", err) return fmt.Errorf("tx.Exec (update rooms): %w", err)
} }
} }

View file

@ -39,8 +39,9 @@ const stateDataSchema = `
-- lookup a specific (type, state_key) pair for an event. It also makes it easy -- lookup a specific (type, state_key) pair for an event. It also makes it easy
-- to read the state for a given state_block_nid ordered by (type, state_key) -- to read the state for a given state_block_nid ordered by (type, state_key)
-- which in turn makes it easier to merge state data blocks. -- which in turn makes it easier to merge state data blocks.
CREATE SEQUENCE roomserver_state_block_nid_seq START AT $1;
CREATE TABLE IF NOT EXISTS roomserver_state_block ( CREATE TABLE IF NOT EXISTS roomserver_state_block (
state_block_nid bigserial PRIMARY KEY, state_block_nid bigint PRIMARY KEY DEFAULT nextval('roomserver_state_block_nid_seq'),
state_block_hash BYTEA UNIQUE, state_block_hash BYTEA UNIQUE,
event_nids bigint[] NOT NULL event_nids bigint[] NOT NULL
); );

View file

@ -39,8 +39,9 @@ const stateSnapshotSchema = `
-- because room state tends to accumulate small changes over time. Although if -- because room state tends to accumulate small changes over time. Although if
-- the list of deltas becomes too long it becomes more efficient to encode -- the list of deltas becomes too long it becomes more efficient to encode
-- the full state under single state_block_nid. -- the full state under single state_block_nid.
CREATE SEQUENCE roomserver_state_snapshot_nid_seq START AT $1;
CREATE TABLE IF NOT EXISTS roomserver_state_snapshots ( CREATE TABLE IF NOT EXISTS roomserver_state_snapshots (
state_snapshot_nid bigserial PRIMARY KEY, state_snapshot_nid bigint PRIMARY KEY DEFAULT nextval('roomserver_state_snapshot_nid_seq'),
state_snapshot_hash BYTEA UNIQUE, state_snapshot_hash BYTEA UNIQUE,
room_nid bigint NOT NULL, room_nid bigint NOT NULL,
state_block_nids bigint[] NOT NULL state_block_nids bigint[] NOT NULL