Add sqlite3 migration

This commit is contained in:
Till Faelligen 2020-10-17 15:15:21 +02:00
parent c51dcc25b6
commit d113b03f64
3 changed files with 65 additions and 4 deletions

View file

@ -0,0 +1,46 @@
package deltas
import (
"database/sql"
"fmt"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/pressly/goose"
)
func LoadFromGoose() {
goose.AddMigration(UpCreateReceiptTable, DownCreateReceiptTable)
}
func LoadCreateReceiptTable(m *sqlutil.Migrations) {
m.AddMigration(UpCreateReceiptTable, DownCreateReceiptTable)
}
func UpCreateReceiptTable(tx *sql.Tx) error {
_, err := tx.Exec(`
-- Stores data about receipts
CREATE TABLE IF NOT EXISTS syncapi_receipts (
-- The ID
id BIGINT,
room_id TEXT NOT NULL,
receipt_type TEXT NOT NULL,
user_id TEXT NOT NULL,
event_id TEXT NOT NULL,
receipt_ts BIGINT NOT NULL,
CONSTRAINT syncapi_receipts_unique UNIQUE (room_id, receipt_type, user_id)
);
CREATE INDEX IF NOT EXISTS syncapi_receipts_room_id_idx ON syncapi_receipts(room_id);
`)
if err != nil {
return fmt.Errorf("failed to execute upgrade: %w", err)
}
return nil
}
func DownCreateReceiptTable(tx *sql.Tx) error {
_, err := tx.Exec("DROP TABLE IF EXISTS syncapi_receipts;")
if err != nil {
return fmt.Errorf("failed to execute downgrade: %w", err)
}
return nil
}

View file

@ -63,10 +63,7 @@ type receiptStatements struct {
}
func NewSqliteReceiptsTable(db *sql.DB, streamID *streamIDStatements) (tables.Receipts, error) {
_, err := db.Exec(receiptsSchema)
if err != nil {
return nil, err
}
var err error
r := &receiptStatements{
db: db,
streamIDStatements: streamID,
@ -80,6 +77,11 @@ func NewSqliteReceiptsTable(db *sql.DB, streamID *streamIDStatements) (tables.Re
return r, nil
}
func (r *receiptStatements) execSchema(db *sql.DB) error {
_, err := db.Exec(receiptsSchema)
return err
}
// UpsertReceipt creates new user receipts
func (r *receiptStatements) UpsertReceipt(ctx context.Context, txn *sql.Tx, roomId, receiptType, userId, eventId string, timestamp gomatrixserverlib.Timestamp) (pos types.StreamPosition, err error) {
pos, err = r.streamIDStatements.nextStreamID(ctx, txn)

View file

@ -25,6 +25,7 @@ import (
"github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/syncapi/storage/shared"
"github.com/matrix-org/dendrite/syncapi/storage/sqlite3/deltas"
)
// SyncServerDatasource represents a sync server datasource which manages
@ -46,6 +47,18 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e
return nil, err
}
d.writer = sqlutil.NewExclusiveWriter()
// Create tables before executing migrations so we don't fail if the table is missing,
// and THEN prepare statements so we don't fail due to referencing new columns
r := receiptStatements{}
if err = r.execSchema(d.db); err != nil {
return nil, err
}
m := sqlutil.NewMigrations()
deltas.LoadCreateReceiptTable(m)
if err = m.RunDeltas(d.db, dbProperties); err != nil {
return nil, err
}
if err = d.prepare(); err != nil {
return nil, err
}