mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-26 00:03:09 -06:00
parent
7169d0d2e4
commit
749ebfbd93
|
|
@ -1,46 +0,0 @@
|
||||||
package deltas
|
|
||||||
|
|
||||||
import (
|
|
||||||
"database/sql"
|
|
||||||
"fmt"
|
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
|
||||||
"github.com/pressly/goose"
|
|
||||||
)
|
|
||||||
|
|
||||||
func LoadFromGoose() {
|
|
||||||
goose.AddMigration(UpCreateReceiptTable, DownCreateReceiptTable)
|
|
||||||
}
|
|
||||||
|
|
||||||
func LoadCreateReceiptTable(m *sqlutil.Migrations) {
|
|
||||||
m.AddMigration(UpCreateReceiptTable, DownCreateReceiptTable)
|
|
||||||
}
|
|
||||||
|
|
||||||
func UpCreateReceiptTable(tx *sql.Tx) error {
|
|
||||||
_, err := tx.Exec(`
|
|
||||||
-- Stores data about receipts
|
|
||||||
CREATE TABLE IF NOT EXISTS syncapi_receipts (
|
|
||||||
-- The ID
|
|
||||||
id BIGINT,
|
|
||||||
room_id TEXT NOT NULL,
|
|
||||||
receipt_type TEXT NOT NULL,
|
|
||||||
user_id TEXT NOT NULL,
|
|
||||||
event_id TEXT NOT NULL,
|
|
||||||
receipt_ts BIGINT NOT NULL,
|
|
||||||
CONSTRAINT syncapi_receipts_unique UNIQUE (room_id, receipt_type, user_id)
|
|
||||||
);
|
|
||||||
CREATE INDEX IF NOT EXISTS syncapi_receipts_room_id_idx ON syncapi_receipts(room_id);
|
|
||||||
`)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to execute upgrade: %w", err)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func DownCreateReceiptTable(tx *sql.Tx) error {
|
|
||||||
_, err := tx.Exec("DROP TABLE IF EXISTS syncapi_receipts;")
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to execute downgrade: %w", err)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
@ -63,7 +63,10 @@ type receiptStatements struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSqliteReceiptsTable(db *sql.DB, streamID *streamIDStatements) (tables.Receipts, error) {
|
func NewSqliteReceiptsTable(db *sql.DB, streamID *streamIDStatements) (tables.Receipts, error) {
|
||||||
var err error
|
_, err := db.Exec(receiptsSchema)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
r := &receiptStatements{
|
r := &receiptStatements{
|
||||||
db: db,
|
db: db,
|
||||||
streamIDStatements: streamID,
|
streamIDStatements: streamID,
|
||||||
|
|
@ -77,11 +80,6 @@ func NewSqliteReceiptsTable(db *sql.DB, streamID *streamIDStatements) (tables.Re
|
||||||
return r, nil
|
return r, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *receiptStatements) execSchema(db *sql.DB) error {
|
|
||||||
_, err := db.Exec(receiptsSchema)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// UpsertReceipt creates new user receipts
|
// UpsertReceipt creates new user receipts
|
||||||
func (r *receiptStatements) UpsertReceipt(ctx context.Context, txn *sql.Tx, roomId, receiptType, userId, eventId string, timestamp gomatrixserverlib.Timestamp) (pos types.StreamPosition, err error) {
|
func (r *receiptStatements) UpsertReceipt(ctx context.Context, txn *sql.Tx, roomId, receiptType, userId, eventId string, timestamp gomatrixserverlib.Timestamp) (pos types.StreamPosition, err error) {
|
||||||
pos, err = r.streamIDStatements.nextStreamID(ctx, txn)
|
pos, err = r.streamIDStatements.nextStreamID(ctx, txn)
|
||||||
|
|
|
||||||
|
|
@ -25,7 +25,6 @@ import (
|
||||||
"github.com/matrix-org/dendrite/internal/config"
|
"github.com/matrix-org/dendrite/internal/config"
|
||||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
"github.com/matrix-org/dendrite/syncapi/storage/shared"
|
"github.com/matrix-org/dendrite/syncapi/storage/shared"
|
||||||
"github.com/matrix-org/dendrite/syncapi/storage/sqlite3/deltas"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// SyncServerDatasource represents a sync server datasource which manages
|
// SyncServerDatasource represents a sync server datasource which manages
|
||||||
|
|
@ -47,18 +46,6 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
d.writer = sqlutil.NewExclusiveWriter()
|
d.writer = sqlutil.NewExclusiveWriter()
|
||||||
|
|
||||||
// Create tables before executing migrations so we don't fail if the table is missing,
|
|
||||||
// and THEN prepare statements so we don't fail due to referencing new columns
|
|
||||||
r := receiptStatements{}
|
|
||||||
if err = r.execSchema(d.db); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
m := sqlutil.NewMigrations()
|
|
||||||
deltas.LoadCreateReceiptTable(m)
|
|
||||||
if err = m.RunDeltas(d.db, dbProperties); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if err = d.prepare(); err != nil {
|
if err = d.prepare(); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue