Revert "Add postgres migration"

This reverts commit 722fe5a046.
This commit is contained in:
S7evinK 2020-11-09 17:31:05 +01:00
parent 8012204000
commit 7169d0d2e4
3 changed files with 17 additions and 92 deletions

View file

@ -1,46 +0,0 @@
package deltas
import (
"database/sql"
"fmt"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/pressly/goose"
)
func LoadFromGoose() {
goose.AddMigration(UpCreateReceiptTable, DownCreateReceiptTable)
}
func LoadCreateReceiptTable(m *sqlutil.Migrations) {
m.AddMigration(UpCreateReceiptTable, DownCreateReceiptTable)
}
func UpCreateReceiptTable(tx *sql.Tx) error {
_, err := tx.Exec(`
-- Stores data about receipts
CREATE TABLE IF NOT EXISTS syncapi_receipts (
-- The ID
id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_stream_id'),
room_id TEXT NOT NULL,
receipt_type TEXT NOT NULL,
user_id TEXT NOT NULL,
event_id TEXT NOT NULL,
receipt_ts BIGINT NOT NULL,
CONSTRAINT syncapi_receipts_unique UNIQUE (room_id, receipt_type, user_id)
);
CREATE INDEX IF NOT EXISTS syncapi_receipts_room_id ON syncapi_receipts(room_id);
`)
if err != nil {
return fmt.Errorf("failed to execute upgrade: %w", err)
}
return nil
}
func DownCreateReceiptTable(tx *sql.Tx) error {
_, err := tx.Exec("DROP TABLE IF EXISTS syncapi_receipts;")
if err != nil {
return fmt.Errorf("failed to execute downgrade: %w", err)
}
return nil
}

View file

@ -55,7 +55,7 @@ const upsertReceipt = "" +
const selectRoomReceipts = "" + const selectRoomReceipts = "" +
"SELECT room_id, receipt_type, user_id, event_id, receipt_ts" + "SELECT room_id, receipt_type, user_id, event_id, receipt_ts" +
" FROM syncapi_receipts" + " FROM syncapi_receipts" +
" WHERE room_id = ANY($1) AND id > $2" " WHERE room_id in $1 AND id > $2"
type receiptStatements struct { type receiptStatements struct {
db *sql.DB db *sql.DB
@ -64,7 +64,10 @@ type receiptStatements struct {
} }
func NewPostgresReceiptsTable(db *sql.DB) (tables.Receipts, error) { func NewPostgresReceiptsTable(db *sql.DB) (tables.Receipts, error) {
var err error _, err := db.Exec(receiptsSchema)
if err != nil {
return nil, err
}
r := &receiptStatements{ r := &receiptStatements{
db: db, db: db,
} }
@ -77,11 +80,6 @@ func NewPostgresReceiptsTable(db *sql.DB) (tables.Receipts, error) {
return r, nil return r, nil
} }
func (r *receiptStatements) execSchema(db *sql.DB) error {
_, err := db.Exec(receiptsSchema)
return err
}
func (r *receiptStatements) UpsertReceipt(ctx context.Context, txn *sql.Tx, roomId, receiptType, userId, eventId string, timestamp gomatrixserverlib.Timestamp) (pos types.StreamPosition, err error) { func (r *receiptStatements) UpsertReceipt(ctx context.Context, txn *sql.Tx, roomId, receiptType, userId, eventId string, timestamp gomatrixserverlib.Timestamp) (pos types.StreamPosition, err error) {
stmt := sqlutil.TxStmt(txn, r.upsertReceipt) stmt := sqlutil.TxStmt(txn, r.upsertReceipt)
err = stmt.QueryRowContext(ctx, roomId, receiptType, userId, eventId, timestamp).Scan(&pos) err = stmt.QueryRowContext(ctx, roomId, receiptType, userId, eventId, timestamp).Scan(&pos)

View file

@ -20,11 +20,9 @@ import (
// Import the postgres database driver. // Import the postgres database driver.
_ "github.com/lib/pq" _ "github.com/lib/pq"
"github.com/matrix-org/dendrite/eduserver/cache" "github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/syncapi/storage/postgres/deltas"
"github.com/matrix-org/dendrite/syncapi/storage/shared" "github.com/matrix-org/dendrite/syncapi/storage/shared"
) )
@ -47,73 +45,48 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e
return nil, err return nil, err
} }
d.writer = sqlutil.NewDummyWriter() d.writer = sqlutil.NewDummyWriter()
// Make sure the required sequence, for most tables, exists. Must be executed before anything else.
if _, err = d.db.Exec(createSequence); err != nil {
return nil, err
}
// Create tables before executing migrations so we don't fail if the table is missing,
// and THEN prepare statements so we don't fail due to referencing new columns
r := receiptStatements{}
if err = r.execSchema(d.db); err != nil {
return nil, err
}
m := sqlutil.NewMigrations()
deltas.LoadCreateReceiptTable(m)
if err = m.RunDeltas(d.db, dbProperties); err != nil {
return nil, err
}
if err = d.prepare(); err != nil {
return nil, err
}
return &d, nil
}
func (d *SyncServerDatasource) prepare() (err error) {
if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "syncapi"); err != nil { if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "syncapi"); err != nil {
return err return nil, err
} }
accountData, err := NewPostgresAccountDataTable(d.db) accountData, err := NewPostgresAccountDataTable(d.db)
if err != nil { if err != nil {
return err return nil, err
} }
events, err := NewPostgresEventsTable(d.db) events, err := NewPostgresEventsTable(d.db)
if err != nil { if err != nil {
return err return nil, err
} }
currState, err := NewPostgresCurrentRoomStateTable(d.db) currState, err := NewPostgresCurrentRoomStateTable(d.db)
if err != nil { if err != nil {
return err return nil, err
} }
invites, err := NewPostgresInvitesTable(d.db) invites, err := NewPostgresInvitesTable(d.db)
if err != nil { if err != nil {
return err return nil, err
} }
peeks, err := NewPostgresPeeksTable(d.db) peeks, err := NewPostgresPeeksTable(d.db)
if err != nil { if err != nil {
return err return nil, err
} }
topology, err := NewPostgresTopologyTable(d.db) topology, err := NewPostgresTopologyTable(d.db)
if err != nil { if err != nil {
return err return nil, err
} }
backwardExtremities, err := NewPostgresBackwardsExtremitiesTable(d.db) backwardExtremities, err := NewPostgresBackwardsExtremitiesTable(d.db)
if err != nil { if err != nil {
return err return nil, err
} }
sendToDevice, err := NewPostgresSendToDeviceTable(d.db) sendToDevice, err := NewPostgresSendToDeviceTable(d.db)
if err != nil { if err != nil {
return err return nil, err
} }
filter, err := NewPostgresFilterTable(d.db) filter, err := NewPostgresFilterTable(d.db)
if err != nil { if err != nil {
return err return nil, err
} }
receipts, err := NewPostgresReceiptsTable(d.db) receipts, err := NewPostgresReceiptsTable(d.db)
if err != nil { if err != nil {
return err return nil, err
} }
d.Database = shared.Database{ d.Database = shared.Database{
DB: d.db, DB: d.db,
@ -130,5 +103,5 @@ func (d *SyncServerDatasource) prepare() (err error) {
Receipts: receipts, Receipts: receipts,
EDUCache: cache.New(), EDUCache: cache.New(),
} }
return nil return &d, nil
} }