Fix migrations

This commit is contained in:
Till Faelligen 2022-08-03 09:03:49 +02:00
parent 3f6008cf0c
commit d0f15d3aef
No known key found for this signature in database
GPG key ID: 3DF82D8AB9211D4E
4 changed files with 48 additions and 24 deletions

View file

@ -15,32 +15,28 @@
package deltas package deltas
import ( import (
"context"
"database/sql" "database/sql"
"fmt" "fmt"
"time" "time"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
) )
func LoadAddExpiresAt(m *sqlutil.Migrations) { func UpAddexpiresat(ctx context.Context, tx *sql.Tx) error {
m.AddMigration(upAddexpiresat, downAddexpiresat) _, err := tx.ExecContext(ctx, "ALTER TABLE federationsender_queue_edus ADD COLUMN IF NOT EXISTS expires_at BIGINT NOT NULL DEFAULT 0;")
}
func upAddexpiresat(tx *sql.Tx) error {
_, err := tx.Exec("ALTER TABLE federationsender_queue_edus ADD COLUMN IF NOT EXISTS expires_at BIGINT NOT NULL DEFAULT 0;")
if err != nil { if err != nil {
return fmt.Errorf("failed to execute upgrade: %w", err) return fmt.Errorf("failed to execute upgrade: %w", err)
} }
_, err = tx.Exec("UPDATE federationsender_queue_edus SET expires_at = $1", gomatrixserverlib.AsTimestamp(time.Now().Add(time.Hour*24))) _, err = tx.ExecContext(ctx, "UPDATE federationsender_queue_edus SET expires_at = $1", gomatrixserverlib.AsTimestamp(time.Now().Add(time.Hour*24)))
if err != nil { if err != nil {
return fmt.Errorf("failed to update queue_edus: %w", err) return fmt.Errorf("failed to update queue_edus: %w", err)
} }
return nil return nil
} }
func downAddexpiresat(tx *sql.Tx) error { func DownAddexpiresat(ctx context.Context, tx *sql.Tx) error {
_, err := tx.Exec("ALTER TABLE federationsender_queue_edus DROP COLUMN expires_at;") _, err := tx.ExecContext(ctx, "ALTER TABLE federationsender_queue_edus DROP COLUMN expires_at;")
if err != nil { if err != nil {
return fmt.Errorf("failed to execute downgrade: %w", err) return fmt.Errorf("failed to execute downgrade: %w", err)
} }

View file

@ -19,6 +19,7 @@ import (
"database/sql" "database/sql"
"github.com/lib/pq" "github.com/lib/pq"
"github.com/matrix-org/dendrite/federationapi/storage/postgres/deltas"
"github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
@ -90,7 +91,22 @@ func NewPostgresQueueEDUsTable(db *sql.DB) (s *queueEDUsStatements, err error) {
db: db, db: db,
} }
_, err = s.db.Exec(queueEDUsSchema) _, err = s.db.Exec(queueEDUsSchema)
return s, err if err != nil {
return s, err
}
m := sqlutil.NewMigrator(db)
m.AddMigrations(
sqlutil.Migration{
Version: "federationapi: add expiresat column",
Up: deltas.UpAddexpiresat,
},
)
if err := m.Up(context.Background()); err != nil {
return s, err
}
return s, nil
} }
func (s *queueEDUsStatements) Prepare() error { func (s *queueEDUsStatements) Prepare() error {

View file

@ -15,25 +15,21 @@
package deltas package deltas
import ( import (
"context"
"database/sql" "database/sql"
"fmt" "fmt"
"time" "time"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
) )
func LoadAddExpiresAt(m *sqlutil.Migrations) { func UpAddexpiresat(ctx context.Context, tx *sql.Tx) error {
m.AddMigration(upAddexpiresat, downAddexpiresat) _, err := tx.ExecContext(ctx, "ALTER TABLE federationsender_queue_edus RENAME TO federationsender_queue_edus_old;")
}
func upAddexpiresat(tx *sql.Tx) error {
_, err := tx.Exec("ALTER TABLE federationsender_queue_edus RENAME TO federationsender_queue_edus_old;")
if err != nil { if err != nil {
return fmt.Errorf("failed to rename table: %w", err) return fmt.Errorf("failed to rename table: %w", err)
} }
_, err = tx.Exec(` _, err = tx.ExecContext(ctx, `
CREATE TABLE IF NOT EXISTS federationsender_queue_edus ( CREATE TABLE IF NOT EXISTS federationsender_queue_edus (
edu_type TEXT NOT NULL, edu_type TEXT NOT NULL,
server_name TEXT NOT NULL, server_name TEXT NOT NULL,
@ -47,7 +43,7 @@ CREATE UNIQUE INDEX IF NOT EXISTS federationsender_queue_edus_json_nid_idx
if err != nil { if err != nil {
return fmt.Errorf("failed to create new table: %w", err) return fmt.Errorf("failed to create new table: %w", err)
} }
_, err = tx.Exec(` _, err = tx.ExecContext(ctx, `
INSERT INSERT
INTO federationsender_queue_edus ( INTO federationsender_queue_edus (
edu_type, server_name, json_nid, expires_at edu_type, server_name, json_nid, expires_at
@ -56,15 +52,15 @@ INSERT
if err != nil { if err != nil {
return fmt.Errorf("failed to update queue_edus: %w", err) return fmt.Errorf("failed to update queue_edus: %w", err)
} }
_, err = tx.Exec("UPDATE federationsender_queue_edus SET expires_at = $1", gomatrixserverlib.AsTimestamp(time.Now().Add(time.Hour*24))) _, err = tx.ExecContext(ctx, "UPDATE federationsender_queue_edus SET expires_at = $1", gomatrixserverlib.AsTimestamp(time.Now().Add(time.Hour*24)))
if err != nil { if err != nil {
return fmt.Errorf("failed to update queue_edus: %w", err) return fmt.Errorf("failed to update queue_edus: %w", err)
} }
return nil return nil
} }
func downAddexpiresat(tx *sql.Tx) error { func DownAddexpiresat(ctx context.Context, tx *sql.Tx) error {
_, err := tx.Exec("ALTER TABLE federationsender_queue_edus DROP COLUMN expires_at;") _, err := tx.ExecContext(ctx, "ALTER TABLE federationsender_queue_edus DROP COLUMN expires_at;")
if err != nil { if err != nil {
return fmt.Errorf("failed to rename table: %w", err) return fmt.Errorf("failed to rename table: %w", err)
} }

View file

@ -20,6 +20,7 @@ import (
"fmt" "fmt"
"strings" "strings"
"github.com/matrix-org/dendrite/federationapi/storage/sqlite3/deltas"
"github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
@ -91,7 +92,22 @@ func NewSQLiteQueueEDUsTable(db *sql.DB) (s *queueEDUsStatements, err error) {
db: db, db: db,
} }
_, err = db.Exec(queueEDUsSchema) _, err = db.Exec(queueEDUsSchema)
return s, err if err != nil {
return s, err
}
m := sqlutil.NewMigrator(db)
m.AddMigrations(
sqlutil.Migration{
Version: "federationapi: add expiresat column",
Up: deltas.UpAddexpiresat,
},
)
if err := m.Up(context.Background()); err != nil {
return s, err
}
return s, nil
} }
func (s *queueEDUsStatements) Prepare() error { func (s *queueEDUsStatements) Prepare() error {