mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-01-18 09:54:27 -06:00
Add tests for the UpDropEventReferenceSHAPrevEvents
migration (#3087)
... as they could fail if there are duplicate events in `roomserver_previous_events`. This fixes the migration by trying to combine the `event_nids` if possible (same room) as mentioned by @kegsay in https://github.com/matrix-org/dendrite/pull/3083#discussion_r1195508963
This commit is contained in:
parent
3dcca4017c
commit
61341aca50
|
@ -18,19 +18,14 @@ import (
|
|||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
|
||||
"github.com/lib/pq"
|
||||
"github.com/matrix-org/dendrite/internal"
|
||||
"github.com/matrix-org/util"
|
||||
)
|
||||
|
||||
func UpDropEventReferenceSHAEvents(ctx context.Context, tx *sql.Tx) error {
|
||||
var count int
|
||||
err := tx.QueryRowContext(ctx, `SELECT count(*) FROM roomserver_events GROUP BY event_id HAVING count(event_id) > 1`).
|
||||
Scan(&count)
|
||||
if err != nil && err != sql.ErrNoRows {
|
||||
return fmt.Errorf("failed to query duplicate event ids")
|
||||
}
|
||||
if count > 0 {
|
||||
return fmt.Errorf("unable to drop column, as there are duplicate event ids")
|
||||
}
|
||||
_, err = tx.ExecContext(ctx, `ALTER TABLE roomserver_events DROP COLUMN IF EXISTS reference_sha256;`)
|
||||
_, err := tx.ExecContext(ctx, `ALTER TABLE roomserver_events DROP COLUMN IF EXISTS reference_sha256;`)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to execute upgrade: %w", err)
|
||||
}
|
||||
|
@ -46,9 +41,80 @@ func UpDropEventReferenceSHAPrevEvents(ctx context.Context, tx *sql.Tx) error {
|
|||
if err != nil {
|
||||
return fmt.Errorf("failed to execute upgrade: %w", err)
|
||||
}
|
||||
|
||||
// figure out if there are duplicates
|
||||
dupeRows, err := tx.QueryContext(ctx, `SELECT previous_event_id FROM roomserver_previous_events GROUP BY previous_event_id HAVING count(previous_event_id) > 1`)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to query duplicate event ids")
|
||||
}
|
||||
defer internal.CloseAndLogIfError(ctx, dupeRows, "failed to close rows")
|
||||
|
||||
var prevEvents []string
|
||||
var prevEventID string
|
||||
for dupeRows.Next() {
|
||||
if err = dupeRows.Scan(&prevEventID); err != nil {
|
||||
return err
|
||||
}
|
||||
prevEvents = append(prevEvents, prevEventID)
|
||||
}
|
||||
if dupeRows.Err() != nil {
|
||||
return dupeRows.Err()
|
||||
}
|
||||
|
||||
// if we found duplicates, check if we can combine them, e.g. they are in the same room
|
||||
for _, dupeID := range prevEvents {
|
||||
var dupeNIDsRows *sql.Rows
|
||||
dupeNIDsRows, err = tx.QueryContext(ctx, `SELECT event_nids FROM roomserver_previous_events WHERE previous_event_id = $1`, dupeID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to query duplicate event ids")
|
||||
}
|
||||
defer internal.CloseAndLogIfError(ctx, dupeNIDsRows, "failed to close rows")
|
||||
var dupeNIDs []int64
|
||||
for dupeNIDsRows.Next() {
|
||||
var nids pq.Int64Array
|
||||
if err = dupeNIDsRows.Scan(&nids); err != nil {
|
||||
return err
|
||||
}
|
||||
dupeNIDs = append(dupeNIDs, nids...)
|
||||
}
|
||||
|
||||
if dupeNIDsRows.Err() != nil {
|
||||
return dupeNIDsRows.Err()
|
||||
}
|
||||
// dedupe NIDs
|
||||
dupeNIDs = dupeNIDs[:util.SortAndUnique(nids(dupeNIDs))]
|
||||
// now that we have all NIDs, check which room they belong to
|
||||
var roomCount int
|
||||
err = tx.QueryRowContext(ctx, `SELECT count(distinct room_nid) FROM roomserver_events WHERE event_nid = ANY($1)`, pq.Array(dupeNIDs)).Scan(&roomCount)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// if the events are from different rooms, that's bad and we can't continue
|
||||
if roomCount > 1 {
|
||||
return fmt.Errorf("detected events (%v) referenced for different rooms (%v)", dupeNIDs, roomCount)
|
||||
}
|
||||
// otherwise delete the dupes
|
||||
_, err = tx.ExecContext(ctx, "DELETE FROM roomserver_previous_events WHERE previous_event_id = $1", dupeID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to delete duplicates: %w", err)
|
||||
}
|
||||
|
||||
// insert combined values
|
||||
_, err = tx.ExecContext(ctx, "INSERT INTO roomserver_previous_events (previous_event_id, event_nids) VALUES ($1, $2)", dupeID, pq.Array(dupeNIDs))
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to insert new event NIDs: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
_, err = tx.ExecContext(ctx, `ALTER TABLE roomserver_previous_events ADD CONSTRAINT roomserver_previous_event_id_unique UNIQUE (previous_event_id);`)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to execute upgrade: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type nids []int64
|
||||
|
||||
func (s nids) Len() int { return len(s) }
|
||||
func (s nids) Less(i, j int) bool { return s[i] < s[j] }
|
||||
func (s nids) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
|
||||
|
|
|
@ -0,0 +1,60 @@
|
|||
package deltas
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/lib/pq"
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
"github.com/matrix-org/dendrite/test"
|
||||
"github.com/matrix-org/dendrite/test/testrig"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestUpDropEventReferenceSHAPrevEvents(t *testing.T) {
|
||||
|
||||
cfg, ctx, close := testrig.CreateConfig(t, test.DBTypePostgres)
|
||||
defer close()
|
||||
|
||||
db, err := sqlutil.Open(&cfg.Global.DatabaseOptions, sqlutil.NewDummyWriter())
|
||||
assert.Nil(t, err)
|
||||
assert.NotNil(t, db)
|
||||
defer db.Close()
|
||||
|
||||
// create the table in the old layout
|
||||
_, err = db.ExecContext(ctx.Context(), `
|
||||
CREATE TABLE IF NOT EXISTS roomserver_previous_events (
|
||||
previous_event_id TEXT NOT NULL,
|
||||
event_nids BIGINT[] NOT NULL,
|
||||
previous_reference_sha256 BYTEA NOT NULL,
|
||||
CONSTRAINT roomserver_previous_event_id_unique UNIQUE (previous_event_id, previous_reference_sha256)
|
||||
);`)
|
||||
assert.Nil(t, err)
|
||||
|
||||
// create the events table as well, slimmed down with one eventNID
|
||||
_, err = db.ExecContext(ctx.Context(), `
|
||||
CREATE SEQUENCE IF NOT EXISTS roomserver_event_nid_seq;
|
||||
CREATE TABLE IF NOT EXISTS roomserver_events (
|
||||
event_nid BIGINT PRIMARY KEY DEFAULT nextval('roomserver_event_nid_seq'),
|
||||
room_nid BIGINT NOT NULL
|
||||
);
|
||||
|
||||
INSERT INTO roomserver_events (event_nid, room_nid) VALUES (1, 1)
|
||||
`)
|
||||
assert.Nil(t, err)
|
||||
|
||||
// insert duplicate prev events with different event_nids
|
||||
stmt, err := db.PrepareContext(ctx.Context(), `INSERT INTO roomserver_previous_events (previous_event_id, event_nids, previous_reference_sha256) VALUES ($1, $2, $3)`)
|
||||
assert.Nil(t, err)
|
||||
assert.NotNil(t, stmt)
|
||||
_, err = stmt.ExecContext(ctx.Context(), "1", pq.Array([]int64{1, 2}), "a")
|
||||
assert.Nil(t, err)
|
||||
_, err = stmt.ExecContext(ctx.Context(), "1", pq.Array([]int64{1, 2, 3}), "b")
|
||||
assert.Nil(t, err)
|
||||
// execute the migration
|
||||
txn, err := db.Begin()
|
||||
assert.Nil(t, err)
|
||||
assert.NotNil(t, txn)
|
||||
defer txn.Rollback()
|
||||
err = UpDropEventReferenceSHAPrevEvents(ctx.Context(), txn)
|
||||
assert.NoError(t, err)
|
||||
}
|
|
@ -18,6 +18,10 @@ import (
|
|||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
|
||||
"github.com/lib/pq"
|
||||
"github.com/matrix-org/dendrite/internal"
|
||||
"github.com/matrix-org/util"
|
||||
)
|
||||
|
||||
func UpDropEventReferenceSHA(ctx context.Context, tx *sql.Tx) error {
|
||||
|
@ -52,8 +56,72 @@ func UpDropEventReferenceSHAPrevEvents(ctx context.Context, tx *sql.Tx) error {
|
|||
return fmt.Errorf("tx.ExecContext: %w", err)
|
||||
}
|
||||
|
||||
// figure out if there are duplicates
|
||||
dupeRows, err := tx.QueryContext(ctx, `SELECT previous_event_id FROM _roomserver_previous_events GROUP BY previous_event_id HAVING count(previous_event_id) > 1`)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to query duplicate event ids")
|
||||
}
|
||||
defer internal.CloseAndLogIfError(ctx, dupeRows, "failed to close rows")
|
||||
|
||||
var prevEvents []string
|
||||
var prevEventID string
|
||||
for dupeRows.Next() {
|
||||
if err = dupeRows.Scan(&prevEventID); err != nil {
|
||||
return err
|
||||
}
|
||||
prevEvents = append(prevEvents, prevEventID)
|
||||
}
|
||||
if dupeRows.Err() != nil {
|
||||
return dupeRows.Err()
|
||||
}
|
||||
|
||||
// if we found duplicates, check if we can combine them, e.g. they are in the same room
|
||||
for _, dupeID := range prevEvents {
|
||||
var dupeNIDsRows *sql.Rows
|
||||
dupeNIDsRows, err = tx.QueryContext(ctx, `SELECT event_nids FROM _roomserver_previous_events WHERE previous_event_id = $1`, dupeID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to query duplicate event ids")
|
||||
}
|
||||
defer internal.CloseAndLogIfError(ctx, dupeNIDsRows, "failed to close rows")
|
||||
var dupeNIDs []int64
|
||||
for dupeNIDsRows.Next() {
|
||||
var nids pq.Int64Array
|
||||
if err = dupeNIDsRows.Scan(&nids); err != nil {
|
||||
return err
|
||||
}
|
||||
dupeNIDs = append(dupeNIDs, nids...)
|
||||
}
|
||||
|
||||
if dupeNIDsRows.Err() != nil {
|
||||
return dupeNIDsRows.Err()
|
||||
}
|
||||
// dedupe NIDs
|
||||
dupeNIDs = dupeNIDs[:util.SortAndUnique(nids(dupeNIDs))]
|
||||
// now that we have all NIDs, check which room they belong to
|
||||
var roomCount int
|
||||
err = tx.QueryRowContext(ctx, `SELECT count(distinct room_nid) FROM roomserver_events WHERE event_nid IN ($1)`, pq.Array(dupeNIDs)).Scan(&roomCount)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// if the events are from different rooms, that's bad and we can't continue
|
||||
if roomCount > 1 {
|
||||
return fmt.Errorf("detected events (%v) referenced for different rooms (%v)", dupeNIDs, roomCount)
|
||||
}
|
||||
// otherwise delete the dupes
|
||||
_, err = tx.ExecContext(ctx, "DELETE FROM _roomserver_previous_events WHERE previous_event_id = $1", dupeID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to delete duplicates: %w", err)
|
||||
}
|
||||
|
||||
// insert combined values
|
||||
_, err = tx.ExecContext(ctx, "INSERT INTO _roomserver_previous_events (previous_event_id, event_nids) VALUES ($1, $2)", dupeID, pq.Array(dupeNIDs))
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to insert new event NIDs: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// move data
|
||||
if _, err := tx.ExecContext(ctx, `
|
||||
if _, err = tx.ExecContext(ctx, `
|
||||
INSERT
|
||||
INTO roomserver_previous_events (
|
||||
previous_event_id, event_nids
|
||||
|
@ -64,9 +132,15 @@ INSERT
|
|||
return fmt.Errorf("tx.ExecContext: %w", err)
|
||||
}
|
||||
// drop old table
|
||||
_, err := tx.ExecContext(ctx, `DROP TABLE _roomserver_previous_events;`)
|
||||
_, err = tx.ExecContext(ctx, `DROP TABLE _roomserver_previous_events;`)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to execute upgrade: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type nids []int64
|
||||
|
||||
func (s nids) Len() int { return len(s) }
|
||||
func (s nids) Less(i, j int) bool { return s[i] < s[j] }
|
||||
func (s nids) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
|
||||
|
|
|
@ -0,0 +1,59 @@
|
|||
package deltas
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
"github.com/matrix-org/dendrite/test"
|
||||
"github.com/matrix-org/dendrite/test/testrig"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestUpDropEventReferenceSHAPrevEvents(t *testing.T) {
|
||||
|
||||
cfg, ctx, close := testrig.CreateConfig(t, test.DBTypeSQLite)
|
||||
defer close()
|
||||
|
||||
db, err := sqlutil.Open(&cfg.RoomServer.Database, sqlutil.NewExclusiveWriter())
|
||||
assert.Nil(t, err)
|
||||
assert.NotNil(t, db)
|
||||
defer db.Close()
|
||||
|
||||
// create the table in the old layout
|
||||
_, err = db.ExecContext(ctx.Context(), `
|
||||
CREATE TABLE IF NOT EXISTS roomserver_previous_events (
|
||||
previous_event_id TEXT NOT NULL,
|
||||
previous_reference_sha256 BLOB,
|
||||
event_nids TEXT NOT NULL,
|
||||
UNIQUE (previous_event_id, previous_reference_sha256)
|
||||
);`)
|
||||
assert.Nil(t, err)
|
||||
|
||||
// create the events table as well, slimmed down with one eventNID
|
||||
_, err = db.ExecContext(ctx.Context(), `
|
||||
CREATE TABLE IF NOT EXISTS roomserver_events (
|
||||
event_nid INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
room_nid INTEGER NOT NULL
|
||||
);
|
||||
|
||||
INSERT INTO roomserver_events (event_nid, room_nid) VALUES (1, 1)
|
||||
`)
|
||||
assert.Nil(t, err)
|
||||
|
||||
// insert duplicate prev events with different event_nids
|
||||
stmt, err := db.PrepareContext(ctx.Context(), `INSERT INTO roomserver_previous_events (previous_event_id, event_nids, previous_reference_sha256) VALUES ($1, $2, $3)`)
|
||||
assert.Nil(t, err)
|
||||
assert.NotNil(t, stmt)
|
||||
_, err = stmt.ExecContext(ctx.Context(), "1", "{1,2}", "a")
|
||||
assert.Nil(t, err)
|
||||
_, err = stmt.ExecContext(ctx.Context(), "1", "{1,2,3}", "b")
|
||||
assert.Nil(t, err)
|
||||
|
||||
// execute the migration
|
||||
txn, err := db.Begin()
|
||||
assert.Nil(t, err)
|
||||
assert.NotNil(t, txn)
|
||||
err = UpDropEventReferenceSHAPrevEvents(ctx.Context(), txn)
|
||||
defer txn.Rollback()
|
||||
assert.NoError(t, err)
|
||||
}
|
Loading…
Reference in a new issue