diff --git a/syncapi/storage/postgres/deltas/20201211_sequences.go b/syncapi/storage/postgres/deltas/20201211_sequences.go index e43d2ff21..013ddca7c 100644 --- a/syncapi/storage/postgres/deltas/20201211_sequences.go +++ b/syncapi/storage/postgres/deltas/20201211_sequences.go @@ -17,8 +17,19 @@ package deltas import ( "database/sql" "fmt" + + "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/pressly/goose" ) +func LoadFromGoose() { + goose.AddMigration(UpFixSequences, DownFixSequences) +} + +func LoadFixSequences(m *sqlutil.Migrations) { + m.AddMigration(UpFixSequences, DownFixSequences) +} + func UpFixSequences(tx *sql.Tx) error { _, err := tx.Exec(` -- We need to delete all of the existing receipts because the indexes diff --git a/syncapi/storage/postgres/deltas/deltas.go b/syncapi/storage/sqlite3/deltas/20201211_sequences.go similarity index 52% rename from syncapi/storage/postgres/deltas/deltas.go rename to syncapi/storage/sqlite3/deltas/20201211_sequences.go index 8c19e9dcc..7211d96f4 100644 --- a/syncapi/storage/postgres/deltas/deltas.go +++ b/syncapi/storage/sqlite3/deltas/20201211_sequences.go @@ -15,6 +15,9 @@ package deltas import ( + "database/sql" + "fmt" + "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/pressly/goose" ) @@ -26,3 +29,29 @@ func LoadFromGoose() { func LoadFixSequences(m *sqlutil.Migrations) { m.AddMigration(UpFixSequences, DownFixSequences) } + +func UpFixSequences(tx *sql.Tx) error { + _, err := tx.Exec(` + -- We need to delete all of the existing receipts because the indexes + -- will be wrong, and we'll get primary key violations if we try to + -- reuse existing stream IDs from a different sequence. + DELETE FROM syncapi_receipts; + `) + if err != nil { + return fmt.Errorf("failed to execute upgrade: %w", err) + } + return nil +} + +func DownFixSequences(tx *sql.Tx) error { + _, err := tx.Exec(` + -- We need to delete all of the existing receipts because the indexes + -- will be wrong, and we'll get primary key violations if we try to + -- reuse existing stream IDs from a different sequence. + DELETE FROM syncapi_receipts; + `) + if err != nil { + return fmt.Errorf("failed to execute downgrade: %w", err) + } + return nil +} diff --git a/syncapi/storage/sqlite3/syncserver.go b/syncapi/storage/sqlite3/syncserver.go index 7df71b384..f07359e7a 100644 --- a/syncapi/storage/sqlite3/syncserver.go +++ b/syncapi/storage/sqlite3/syncserver.go @@ -25,6 +25,7 @@ import ( "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/syncapi/storage/shared" + "github.com/matrix-org/dendrite/syncapi/storage/sqlite3/deltas" ) // SyncServerDatasource represents a sync server datasource which manages @@ -46,13 +47,13 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e return nil, err } d.writer = sqlutil.NewExclusiveWriter() - if err = d.prepare(); err != nil { + if err = d.prepare(dbProperties); err != nil { return nil, err } return &d, nil } -func (d *SyncServerDatasource) prepare() (err error) { +func (d *SyncServerDatasource) prepare(dbProperties *config.DatabaseOptions) (err error) { if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "syncapi"); err != nil { return err } @@ -99,6 +100,11 @@ func (d *SyncServerDatasource) prepare() (err error) { if err != nil { return err } + m := sqlutil.NewMigrations() + deltas.LoadFixSequences(m) + if err = m.RunDeltas(d.db, dbProperties); err != nil { + return err + } d.Database = shared.Database{ DB: d.db, Writer: d.writer,