From 8859e0e86bdd1f3177857530db153739a253b8cc Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 21 Apr 2021 09:49:21 +0100 Subject: [PATCH] Set a boundary for old to new block/snapshot IDs so we don't rewrite them more than once accidentally --- .../2021041615092700_state_blocks_refactor.go | 41 ++++++++++++------- .../storage/postgres/state_block_table.go | 3 +- .../storage/postgres/state_snapshot_table.go | 3 +- 3 files changed, 31 insertions(+), 16 deletions(-) diff --git a/roomserver/storage/postgres/deltas/2021041615092700_state_blocks_refactor.go b/roomserver/storage/postgres/deltas/2021041615092700_state_blocks_refactor.go index 73997484b..45393cb48 100644 --- a/roomserver/storage/postgres/deltas/2021041615092700_state_blocks_refactor.go +++ b/roomserver/storage/postgres/deltas/2021041615092700_state_blocks_refactor.go @@ -44,6 +44,21 @@ func UpStateBlocksRefactor(tx *sql.Tx) error { logrus.Warn("Performing state storage upgrade. Please wait, this may take some time!") defer logrus.Warn("State storage upgrade complete") + var snapshotcount int + var maxsnapshotid int + var maxblockid int + if err := tx.QueryRow(`SELECT COUNT(DISTINCT state_snapshot_nid) FROM _roomserver_state_snapshots;`).Scan(&snapshotcount); err != nil { + return fmt.Errorf("tx.QueryRow.Scan (count snapshots): %w", err) + } + if err := tx.QueryRow(`SELECT MAX(state_snapshot_nid) FROM _roomserver_state_snapshots;`).Scan(&maxsnapshotid); err != nil { + return fmt.Errorf("tx.QueryRow.Scan (count snapshots): %w", err) + } + if err := tx.QueryRow(`SELECT MAX(state_block_nid) FROM _roomserver_state_block;`).Scan(&maxblockid); err != nil { + return fmt.Errorf("tx.QueryRow.Scan (count snapshots): %w", err) + } + maxsnapshotid++ + maxblockid++ + if _, err := tx.Exec(`ALTER TABLE roomserver_state_block RENAME TO _roomserver_state_block;`); err != nil { return fmt.Errorf("tx.Exec: %w", err) } @@ -51,36 +66,34 @@ func UpStateBlocksRefactor(tx *sql.Tx) error { return fmt.Errorf("tx.Exec: %w", err) } _, err := tx.Exec(` + DROP SEQUENCE IF EXISTS roomserver_state_block_nid_seq; + CREATE SEQUENCE roomserver_state_block_nid_seq START AT $1; + CREATE TABLE IF NOT EXISTS roomserver_state_block ( - state_block_nid bigserial PRIMARY KEY, + state_block_nid bigint PRIMARY KEY DEFAULT nextval('roomserver_state_block_nid_seq'), state_block_hash BYTEA UNIQUE, event_nids bigint[] NOT NULL ); - `) + `, maxblockid) if err != nil { return fmt.Errorf("tx.Exec: %w", err) } _, err = tx.Exec(` + DROP SEQUENCE IF EXISTS roomserver_state_snapshot_nid_seq; + CREATE SEQUENCE roomserver_state_snapshot_nid_seq START AT $1; + CREATE TABLE IF NOT EXISTS roomserver_state_snapshots ( - state_snapshot_nid bigserial PRIMARY KEY, + state_snapshot_nid bigint PRIMARY KEY DEFAULT nextval('roomserver_state_snapshot_nid_seq'), state_snapshot_hash BYTEA UNIQUE, room_nid bigint NOT NULL, state_block_nids bigint[] NOT NULL ); - `) + `, maxsnapshotid) if err != nil { return fmt.Errorf("tx.Exec: %w", err) } logrus.Warn("New tables created...") - var snapshotcount int - err = tx.QueryRow(` - SELECT COUNT(DISTINCT state_snapshot_nid) FROM _roomserver_state_snapshots; - `).Scan(&snapshotcount) - if err != nil { - return fmt.Errorf("tx.QueryRow.Scan (count snapshots): %w", err) - } - batchsize := 100 for batchoffset := 0; batchoffset < snapshotcount; batchoffset += batchsize { var snapshotrows *sql.Rows @@ -173,11 +186,11 @@ func UpStateBlocksRefactor(tx *sql.Tx) error { return fmt.Errorf("tx.QueryRow.Scan (insert new snapshot): %w", err) } - if _, err = tx.Exec(`UPDATE roomserver_events SET state_snapshot_nid=$1 WHERE state_snapshot_nid=$2`, newNID, snapshotdata.StateSnapshotNID); err != nil { + if _, err = tx.Exec(`UPDATE roomserver_events SET state_snapshot_nid=$1 WHERE state_snapshot_nid=$2 AND state_snapshot_nid<$3`, newNID, snapshotdata.StateSnapshotNID, maxsnapshotid); err != nil { return fmt.Errorf("tx.Exec (update events): %w", err) } - if _, err = tx.Exec(`UPDATE roomserver_rooms SET state_snapshot_nid=$1 WHERE state_snapshot_nid=$2`, newNID, snapshotdata.StateSnapshotNID); err != nil { + if _, err = tx.Exec(`UPDATE roomserver_rooms SET state_snapshot_nid=$1 WHERE state_snapshot_nid=$2 AND state_snapshot_nid<$3`, newNID, snapshotdata.StateSnapshotNID, maxsnapshotid); err != nil { return fmt.Errorf("tx.Exec (update rooms): %w", err) } } diff --git a/roomserver/storage/postgres/state_block_table.go b/roomserver/storage/postgres/state_block_table.go index 38fb44e89..7c785060f 100644 --- a/roomserver/storage/postgres/state_block_table.go +++ b/roomserver/storage/postgres/state_block_table.go @@ -39,8 +39,9 @@ const stateDataSchema = ` -- lookup a specific (type, state_key) pair for an event. It also makes it easy -- to read the state for a given state_block_nid ordered by (type, state_key) -- which in turn makes it easier to merge state data blocks. +CREATE SEQUENCE roomserver_state_block_nid_seq START AT $1; CREATE TABLE IF NOT EXISTS roomserver_state_block ( - state_block_nid bigserial PRIMARY KEY, + state_block_nid bigint PRIMARY KEY DEFAULT nextval('roomserver_state_block_nid_seq'), state_block_hash BYTEA UNIQUE, event_nids bigint[] NOT NULL ); diff --git a/roomserver/storage/postgres/state_snapshot_table.go b/roomserver/storage/postgres/state_snapshot_table.go index b5641b732..c6f72a829 100644 --- a/roomserver/storage/postgres/state_snapshot_table.go +++ b/roomserver/storage/postgres/state_snapshot_table.go @@ -39,8 +39,9 @@ const stateSnapshotSchema = ` -- because room state tends to accumulate small changes over time. Although if -- the list of deltas becomes too long it becomes more efficient to encode -- the full state under single state_block_nid. +CREATE SEQUENCE roomserver_state_snapshot_nid_seq START AT $1; CREATE TABLE IF NOT EXISTS roomserver_state_snapshots ( - state_snapshot_nid bigserial PRIMARY KEY, + state_snapshot_nid bigint PRIMARY KEY DEFAULT nextval('roomserver_state_snapshot_nid_seq'), state_snapshot_hash BYTEA UNIQUE, room_nid bigint NOT NULL, state_block_nids bigint[] NOT NULL