From 722fe5a04628882b787d096942459961db159b06 Mon Sep 17 00:00:00 2001 From: Till Faelligen Date: Sat, 17 Oct 2020 15:15:35 +0200 Subject: [PATCH] Add postgres migration --- .../20201017140347_create_receipt_table.go | 46 +++++++++++++++++++ syncapi/storage/postgres/receipt_table.go | 12 +++-- syncapi/storage/postgres/syncserver.go | 46 ++++++++++++++----- 3 files changed, 87 insertions(+), 17 deletions(-) create mode 100644 syncapi/storage/postgres/deltas/20201017140347_create_receipt_table.go diff --git a/syncapi/storage/postgres/deltas/20201017140347_create_receipt_table.go b/syncapi/storage/postgres/deltas/20201017140347_create_receipt_table.go new file mode 100644 index 000000000..12925b299 --- /dev/null +++ b/syncapi/storage/postgres/deltas/20201017140347_create_receipt_table.go @@ -0,0 +1,46 @@ +package deltas + +import ( + "database/sql" + "fmt" + + "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/pressly/goose" +) + +func LoadFromGoose() { + goose.AddMigration(UpCreateReceiptTable, DownCreateReceiptTable) +} + +func LoadCreateReceiptTable(m *sqlutil.Migrations) { + m.AddMigration(UpCreateReceiptTable, DownCreateReceiptTable) +} + +func UpCreateReceiptTable(tx *sql.Tx) error { + _, err := tx.Exec(` +-- Stores data about receipts +CREATE TABLE IF NOT EXISTS syncapi_receipts ( + -- The ID + id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_stream_id'), + room_id TEXT NOT NULL, + receipt_type TEXT NOT NULL, + user_id TEXT NOT NULL, + event_id TEXT NOT NULL, + receipt_ts BIGINT NOT NULL, + CONSTRAINT syncapi_receipts_unique UNIQUE (room_id, receipt_type, user_id) +); +CREATE INDEX IF NOT EXISTS syncapi_receipts_room_id ON syncapi_receipts(room_id); +`) + if err != nil { + return fmt.Errorf("failed to execute upgrade: %w", err) + } + return nil +} + +func DownCreateReceiptTable(tx *sql.Tx) error { + _, err := tx.Exec("DROP TABLE IF EXISTS syncapi_receipts;") + if err != nil { + return fmt.Errorf("failed to execute downgrade: %w", err) + } + return nil +} diff --git a/syncapi/storage/postgres/receipt_table.go b/syncapi/storage/postgres/receipt_table.go index 5b98c7251..74ba39221 100644 --- a/syncapi/storage/postgres/receipt_table.go +++ b/syncapi/storage/postgres/receipt_table.go @@ -54,7 +54,7 @@ const upsertReceipt = "" + const selectRoomReceipts = "" + "SELECT room_id, receipt_type, user_id, event_id, receipt_ts" + " FROM syncapi_receipts" + - " WHERE room_id in $1 AND id > $2" + " WHERE room_id = $1 AND id > $2" type receiptStatements struct { db *sql.DB @@ -63,10 +63,7 @@ type receiptStatements struct { } func NewPostgresReceiptsTable(db *sql.DB) (tables.Receipts, error) { - _, err := db.Exec(receiptsSchema) - if err != nil { - return nil, err - } + var err error r := &receiptStatements{ db: db, } @@ -79,6 +76,11 @@ func NewPostgresReceiptsTable(db *sql.DB) (tables.Receipts, error) { return r, nil } +func (r *receiptStatements) execSchema(db *sql.DB) error { + _, err := db.Exec(receiptsSchema) + return err +} + func (r *receiptStatements) UpsertReceipt(ctx context.Context, txn *sql.Tx, roomId, receiptType, userId, eventId string, timestamp gomatrixserverlib.Timestamp) (pos types.StreamPosition, err error) { stmt := sqlutil.TxStmt(txn, r.upsertReceipt) err = stmt.QueryRowContext(ctx, roomId, receiptType, userId, eventId, timestamp).Scan(&pos) diff --git a/syncapi/storage/postgres/syncserver.go b/syncapi/storage/postgres/syncserver.go index 979e19a0b..fa2df26e9 100644 --- a/syncapi/storage/postgres/syncserver.go +++ b/syncapi/storage/postgres/syncserver.go @@ -20,9 +20,11 @@ import ( // Import the postgres database driver. _ "github.com/lib/pq" + "github.com/matrix-org/dendrite/eduserver/cache" "github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/dendrite/syncapi/storage/postgres/deltas" "github.com/matrix-org/dendrite/syncapi/storage/shared" ) @@ -43,48 +45,68 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e return nil, err } d.writer = sqlutil.NewDummyWriter() - if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "syncapi"); err != nil { + + // Create tables before executing migrations so we don't fail if the table is missing, + // and THEN prepare statements so we don't fail due to referencing new columns + r := receiptStatements{} + if err = r.execSchema(d.db); err != nil { return nil, err } + m := sqlutil.NewMigrations() + deltas.LoadCreateReceiptTable(m) + if err = m.RunDeltas(d.db, dbProperties); err != nil { + return nil, err + } + if err = d.prepare(); err != nil { + return nil, err + } + + return &d, nil +} + +func (d *SyncServerDatasource) prepare() (err error) { + if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "syncapi"); err != nil { + return err + } accountData, err := NewPostgresAccountDataTable(d.db) if err != nil { - return nil, err + return err } events, err := NewPostgresEventsTable(d.db) if err != nil { - return nil, err + return err } currState, err := NewPostgresCurrentRoomStateTable(d.db) if err != nil { - return nil, err + return err } invites, err := NewPostgresInvitesTable(d.db) if err != nil { - return nil, err + return err } peeks, err := NewPostgresPeeksTable(d.db) if err != nil { - return nil, err + return err } topology, err := NewPostgresTopologyTable(d.db) if err != nil { - return nil, err + return err } backwardExtremities, err := NewPostgresBackwardsExtremitiesTable(d.db) if err != nil { - return nil, err + return err } sendToDevice, err := NewPostgresSendToDeviceTable(d.db) if err != nil { - return nil, err + return err } filter, err := NewPostgresFilterTable(d.db) if err != nil { - return nil, err + return err } receipts, err := NewPostgresReceiptsTable(d.db) if err != nil { - return nil, err + return err } d.Database = shared.Database{ DB: d.db, @@ -101,5 +123,5 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e Receipts: receipts, EDUCache: cache.New(), } - return &d, nil + return nil }