Add postgres migration
This commit is contained in:
parent
d113b03f64
commit
722fe5a046
|
@ -0,0 +1,46 @@
|
||||||
|
package deltas
|
||||||
|
|
||||||
|
import (
|
||||||
|
"database/sql"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
|
"github.com/pressly/goose"
|
||||||
|
)
|
||||||
|
|
||||||
|
func LoadFromGoose() {
|
||||||
|
goose.AddMigration(UpCreateReceiptTable, DownCreateReceiptTable)
|
||||||
|
}
|
||||||
|
|
||||||
|
func LoadCreateReceiptTable(m *sqlutil.Migrations) {
|
||||||
|
m.AddMigration(UpCreateReceiptTable, DownCreateReceiptTable)
|
||||||
|
}
|
||||||
|
|
||||||
|
func UpCreateReceiptTable(tx *sql.Tx) error {
|
||||||
|
_, err := tx.Exec(`
|
||||||
|
-- Stores data about receipts
|
||||||
|
CREATE TABLE IF NOT EXISTS syncapi_receipts (
|
||||||
|
-- The ID
|
||||||
|
id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_stream_id'),
|
||||||
|
room_id TEXT NOT NULL,
|
||||||
|
receipt_type TEXT NOT NULL,
|
||||||
|
user_id TEXT NOT NULL,
|
||||||
|
event_id TEXT NOT NULL,
|
||||||
|
receipt_ts BIGINT NOT NULL,
|
||||||
|
CONSTRAINT syncapi_receipts_unique UNIQUE (room_id, receipt_type, user_id)
|
||||||
|
);
|
||||||
|
CREATE INDEX IF NOT EXISTS syncapi_receipts_room_id ON syncapi_receipts(room_id);
|
||||||
|
`)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to execute upgrade: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func DownCreateReceiptTable(tx *sql.Tx) error {
|
||||||
|
_, err := tx.Exec("DROP TABLE IF EXISTS syncapi_receipts;")
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to execute downgrade: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -54,7 +54,7 @@ const upsertReceipt = "" +
|
||||||
const selectRoomReceipts = "" +
|
const selectRoomReceipts = "" +
|
||||||
"SELECT room_id, receipt_type, user_id, event_id, receipt_ts" +
|
"SELECT room_id, receipt_type, user_id, event_id, receipt_ts" +
|
||||||
" FROM syncapi_receipts" +
|
" FROM syncapi_receipts" +
|
||||||
" WHERE room_id in $1 AND id > $2"
|
" WHERE room_id = $1 AND id > $2"
|
||||||
|
|
||||||
type receiptStatements struct {
|
type receiptStatements struct {
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
|
@ -63,10 +63,7 @@ type receiptStatements struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewPostgresReceiptsTable(db *sql.DB) (tables.Receipts, error) {
|
func NewPostgresReceiptsTable(db *sql.DB) (tables.Receipts, error) {
|
||||||
_, err := db.Exec(receiptsSchema)
|
var err error
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
r := &receiptStatements{
|
r := &receiptStatements{
|
||||||
db: db,
|
db: db,
|
||||||
}
|
}
|
||||||
|
@ -79,6 +76,11 @@ func NewPostgresReceiptsTable(db *sql.DB) (tables.Receipts, error) {
|
||||||
return r, nil
|
return r, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *receiptStatements) execSchema(db *sql.DB) error {
|
||||||
|
_, err := db.Exec(receiptsSchema)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
func (r *receiptStatements) UpsertReceipt(ctx context.Context, txn *sql.Tx, roomId, receiptType, userId, eventId string, timestamp gomatrixserverlib.Timestamp) (pos types.StreamPosition, err error) {
|
func (r *receiptStatements) UpsertReceipt(ctx context.Context, txn *sql.Tx, roomId, receiptType, userId, eventId string, timestamp gomatrixserverlib.Timestamp) (pos types.StreamPosition, err error) {
|
||||||
stmt := sqlutil.TxStmt(txn, r.upsertReceipt)
|
stmt := sqlutil.TxStmt(txn, r.upsertReceipt)
|
||||||
err = stmt.QueryRowContext(ctx, roomId, receiptType, userId, eventId, timestamp).Scan(&pos)
|
err = stmt.QueryRowContext(ctx, roomId, receiptType, userId, eventId, timestamp).Scan(&pos)
|
||||||
|
|
|
@ -20,9 +20,11 @@ import (
|
||||||
|
|
||||||
// Import the postgres database driver.
|
// Import the postgres database driver.
|
||||||
_ "github.com/lib/pq"
|
_ "github.com/lib/pq"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/eduserver/cache"
|
"github.com/matrix-org/dendrite/eduserver/cache"
|
||||||
"github.com/matrix-org/dendrite/internal/config"
|
"github.com/matrix-org/dendrite/internal/config"
|
||||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
|
"github.com/matrix-org/dendrite/syncapi/storage/postgres/deltas"
|
||||||
"github.com/matrix-org/dendrite/syncapi/storage/shared"
|
"github.com/matrix-org/dendrite/syncapi/storage/shared"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -43,48 +45,68 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
d.writer = sqlutil.NewDummyWriter()
|
d.writer = sqlutil.NewDummyWriter()
|
||||||
if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "syncapi"); err != nil {
|
|
||||||
|
// Create tables before executing migrations so we don't fail if the table is missing,
|
||||||
|
// and THEN prepare statements so we don't fail due to referencing new columns
|
||||||
|
r := receiptStatements{}
|
||||||
|
if err = r.execSchema(d.db); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
m := sqlutil.NewMigrations()
|
||||||
|
deltas.LoadCreateReceiptTable(m)
|
||||||
|
if err = m.RunDeltas(d.db, dbProperties); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if err = d.prepare(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &d, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *SyncServerDatasource) prepare() (err error) {
|
||||||
|
if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "syncapi"); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
accountData, err := NewPostgresAccountDataTable(d.db)
|
accountData, err := NewPostgresAccountDataTable(d.db)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return err
|
||||||
}
|
}
|
||||||
events, err := NewPostgresEventsTable(d.db)
|
events, err := NewPostgresEventsTable(d.db)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return err
|
||||||
}
|
}
|
||||||
currState, err := NewPostgresCurrentRoomStateTable(d.db)
|
currState, err := NewPostgresCurrentRoomStateTable(d.db)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return err
|
||||||
}
|
}
|
||||||
invites, err := NewPostgresInvitesTable(d.db)
|
invites, err := NewPostgresInvitesTable(d.db)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return err
|
||||||
}
|
}
|
||||||
peeks, err := NewPostgresPeeksTable(d.db)
|
peeks, err := NewPostgresPeeksTable(d.db)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return err
|
||||||
}
|
}
|
||||||
topology, err := NewPostgresTopologyTable(d.db)
|
topology, err := NewPostgresTopologyTable(d.db)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return err
|
||||||
}
|
}
|
||||||
backwardExtremities, err := NewPostgresBackwardsExtremitiesTable(d.db)
|
backwardExtremities, err := NewPostgresBackwardsExtremitiesTable(d.db)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return err
|
||||||
}
|
}
|
||||||
sendToDevice, err := NewPostgresSendToDeviceTable(d.db)
|
sendToDevice, err := NewPostgresSendToDeviceTable(d.db)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return err
|
||||||
}
|
}
|
||||||
filter, err := NewPostgresFilterTable(d.db)
|
filter, err := NewPostgresFilterTable(d.db)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return err
|
||||||
}
|
}
|
||||||
receipts, err := NewPostgresReceiptsTable(d.db)
|
receipts, err := NewPostgresReceiptsTable(d.db)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return err
|
||||||
}
|
}
|
||||||
d.Database = shared.Database{
|
d.Database = shared.Database{
|
||||||
DB: d.db,
|
DB: d.db,
|
||||||
|
@ -101,5 +123,5 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e
|
||||||
Receipts: receipts,
|
Receipts: receipts,
|
||||||
EDUCache: cache.New(),
|
EDUCache: cache.New(),
|
||||||
}
|
}
|
||||||
return &d, nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue