diff --git a/roomserver/storage/postgres/sql.go b/roomserver/storage/postgres/sql.go index 914f269c5..49538c4ab 100644 --- a/roomserver/storage/postgres/sql.go +++ b/roomserver/storage/postgres/sql.go @@ -38,8 +38,6 @@ func (s *statements) prepare(db *sql.DB) error { var err error for _, prepare := range []func(db *sql.DB) error{ - s.stateSnapshotStatements.prepare, - s.stateBlockStatements.prepare, s.previousEventStatements.prepare, s.roomAliasesStatements.prepare, s.inviteStatements.prepare, diff --git a/roomserver/storage/postgres/state_block_table.go b/roomserver/storage/postgres/state_block_table.go index 38334fa9e..d1aaaa003 100644 --- a/roomserver/storage/postgres/state_block_table.go +++ b/roomserver/storage/postgres/state_block_table.go @@ -24,6 +24,7 @@ import ( "github.com/matrix-org/dendrite/internal" "github.com/lib/pq" + "github.com/matrix-org/dendrite/roomserver/storage/tables" "github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/util" ) @@ -87,13 +88,14 @@ type stateBlockStatements struct { bulkSelectFilteredStateBlockEntriesStmt *sql.Stmt } -func (s *stateBlockStatements) prepare(db *sql.DB) (err error) { - _, err = db.Exec(stateDataSchema) +func NewPostgresStateBlockTable(db *sql.DB) (tables.StateBlock, error) { + s := &stateBlockStatements{} + _, err := db.Exec(stateDataSchema) if err != nil { - return + return nil, err } - return statementList{ + return s, statementList{ {&s.insertStateDataStmt, insertStateDataSQL}, {&s.selectNextStateBlockNIDStmt, selectNextStateBlockNIDSQL}, {&s.bulkSelectStateBlockEntriesStmt, bulkSelectStateBlockEntriesSQL}, @@ -101,11 +103,15 @@ func (s *stateBlockStatements) prepare(db *sql.DB) (err error) { }.prepare(db) } -func (s *stateBlockStatements) bulkInsertStateData( +func (s *stateBlockStatements) BulkInsertStateData( ctx context.Context, - stateBlockNID types.StateBlockNID, + txn *sql.Tx, entries []types.StateEntry, -) error { +) (types.StateBlockNID, error) { + stateBlockNID, err := s.selectNextStateBlockNID(ctx) + if err != nil { + return 0, err + } for _, entry := range entries { _, err := s.insertStateDataStmt.ExecContext( ctx, @@ -115,10 +121,10 @@ func (s *stateBlockStatements) bulkInsertStateData( int64(entry.EventNID), ) if err != nil { - return err + return 0, err } } - return nil + return stateBlockNID, nil } func (s *stateBlockStatements) selectNextStateBlockNID( @@ -129,7 +135,7 @@ func (s *stateBlockStatements) selectNextStateBlockNID( return types.StateBlockNID(stateBlockNID), err } -func (s *stateBlockStatements) bulkSelectStateBlockEntries( +func (s *stateBlockStatements) BulkSelectStateBlockEntries( ctx context.Context, stateBlockNIDs []types.StateBlockNID, ) ([]types.StateEntryList, error) { nids := make([]int64, len(stateBlockNIDs)) @@ -180,7 +186,7 @@ func (s *stateBlockStatements) bulkSelectStateBlockEntries( return results, err } -func (s *stateBlockStatements) bulkSelectFilteredStateBlockEntries( +func (s *stateBlockStatements) BulkSelectFilteredStateBlockEntries( ctx context.Context, stateBlockNIDs []types.StateBlockNID, stateKeyTuples []types.StateKeyTuple, diff --git a/roomserver/storage/postgres/state_snapshot_table.go b/roomserver/storage/postgres/state_snapshot_table.go index a1f26e228..8971292fc 100644 --- a/roomserver/storage/postgres/state_snapshot_table.go +++ b/roomserver/storage/postgres/state_snapshot_table.go @@ -21,6 +21,7 @@ import ( "fmt" "github.com/lib/pq" + "github.com/matrix-org/dendrite/roomserver/storage/tables" "github.com/matrix-org/dendrite/roomserver/types" ) @@ -64,30 +65,31 @@ type stateSnapshotStatements struct { bulkSelectStateBlockNIDsStmt *sql.Stmt } -func (s *stateSnapshotStatements) prepare(db *sql.DB) (err error) { - _, err = db.Exec(stateSnapshotSchema) +func NewPostgresStateSnapshotTable(db *sql.DB) (tables.StateSnapshot, error) { + s := &stateSnapshotStatements{} + _, err := db.Exec(stateSnapshotSchema) if err != nil { - return + return nil, err } - return statementList{ + return s, statementList{ {&s.insertStateStmt, insertStateSQL}, {&s.bulkSelectStateBlockNIDsStmt, bulkSelectStateBlockNIDsSQL}, }.prepare(db) } -func (s *stateSnapshotStatements) insertState( - ctx context.Context, roomNID types.RoomNID, stateBlockNIDs []types.StateBlockNID, +func (s *stateSnapshotStatements) InsertState( + ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, stateBlockNIDs []types.StateBlockNID, ) (stateNID types.StateSnapshotNID, err error) { nids := make([]int64, len(stateBlockNIDs)) for i := range stateBlockNIDs { nids[i] = int64(stateBlockNIDs[i]) } - err = s.insertStateStmt.QueryRowContext(ctx, int64(roomNID), pq.Int64Array(nids)).Scan(&stateNID) + err = txn.Stmt(s.insertStateStmt).QueryRowContext(ctx, int64(roomNID), pq.Int64Array(nids)).Scan(&stateNID) return } -func (s *stateSnapshotStatements) bulkSelectStateBlockNIDs( +func (s *stateSnapshotStatements) BulkSelectStateBlockNIDs( ctx context.Context, stateNIDs []types.StateSnapshotNID, ) ([]types.StateBlockNIDList, error) { nids := make([]int64, len(stateNIDs)) diff --git a/roomserver/storage/postgres/storage.go b/roomserver/storage/postgres/storage.go index d44da8585..037271613 100644 --- a/roomserver/storage/postgres/storage.go +++ b/roomserver/storage/postgres/storage.go @@ -77,6 +77,14 @@ func Open(dataSourceName string, dbProperties internal.DbProperties) (*Database, if err != nil { return nil, err } + stateBlock, err := NewPostgresStateBlockTable(d.db) + if err != nil { + return nil, err + } + stateSnapshot, err := NewPostgresStateSnapshotTable(d.db) + if err != nil { + return nil, err + } d.Database = shared.Database{ DB: d.db, EventTypesTable: d.eventTypes, @@ -85,6 +93,8 @@ func Open(dataSourceName string, dbProperties internal.DbProperties) (*Database, EventsTable: d.events, RoomsTable: d.rooms, TransactionsTable: d.transactions, + StateBlockTable: stateBlock, + StateSnapshotTable: stateSnapshot, } return &d, nil } @@ -122,41 +132,6 @@ func (d *Database) assignStateKeyNID( return eventStateKeyNID, err } -// AddState implements input.EventDatabase -func (d *Database) AddState( - ctx context.Context, - roomNID types.RoomNID, - stateBlockNIDs []types.StateBlockNID, - state []types.StateEntry, -) (types.StateSnapshotNID, error) { - if len(state) > 0 { - stateBlockNID, err := d.statements.selectNextStateBlockNID(ctx) - if err != nil { - return 0, err - } - if err = d.statements.bulkInsertStateData(ctx, stateBlockNID, state); err != nil { - return 0, err - } - stateBlockNIDs = append(stateBlockNIDs[:len(stateBlockNIDs):len(stateBlockNIDs)], stateBlockNID) - } - - return d.statements.insertState(ctx, roomNID, stateBlockNIDs) -} - -// StateBlockNIDs implements state.RoomStateDatabase -func (d *Database) StateBlockNIDs( - ctx context.Context, stateNIDs []types.StateSnapshotNID, -) ([]types.StateBlockNIDList, error) { - return d.statements.bulkSelectStateBlockNIDs(ctx, stateNIDs) -} - -// StateEntries implements state.RoomStateDatabase -func (d *Database) StateEntries( - ctx context.Context, stateBlockNIDs []types.StateBlockNID, -) ([]types.StateEntryList, error) { - return d.statements.bulkSelectStateBlockEntries(ctx, stateBlockNIDs) -} - // GetLatestEventsForUpdate implements input.EventDatabase func (d *Database) GetLatestEventsForUpdate( ctx context.Context, roomNID types.RoomNID, @@ -303,17 +278,6 @@ func (d *Database) RemoveRoomAlias(ctx context.Context, alias string) error { return d.statements.deleteRoomAlias(ctx, alias) } -// StateEntriesForTuples implements state.RoomStateDatabase -func (d *Database) StateEntriesForTuples( - ctx context.Context, - stateBlockNIDs []types.StateBlockNID, - stateKeyTuples []types.StateKeyTuple, -) ([]types.StateEntryList, error) { - return d.statements.bulkSelectFilteredStateBlockEntries( - ctx, stateBlockNIDs, stateKeyTuples, - ) -} - // MembershipUpdater implements input.RoomEventDatabase func (d *Database) MembershipUpdater( ctx context.Context, roomID, targetUserID string, diff --git a/roomserver/storage/shared/storage.go b/roomserver/storage/shared/storage.go index 814b6e812..a209990cd 100644 --- a/roomserver/storage/shared/storage.go +++ b/roomserver/storage/shared/storage.go @@ -20,6 +20,8 @@ type Database struct { EventStateKeysTable tables.EventStateKeys RoomsTable tables.Rooms TransactionsTable tables.Transactions + StateSnapshotTable tables.StateSnapshot + StateBlockTable tables.StateBlock } // EventTypeNIDs implements state.RoomStateDatabase @@ -50,6 +52,42 @@ func (d *Database) StateEntriesForEventIDs( return d.EventsTable.BulkSelectStateEventByID(ctx, eventIDs) } +// StateEntriesForTuples implements state.RoomStateDatabase +func (d *Database) StateEntriesForTuples( + ctx context.Context, + stateBlockNIDs []types.StateBlockNID, + stateKeyTuples []types.StateKeyTuple, +) ([]types.StateEntryList, error) { + return d.StateBlockTable.BulkSelectFilteredStateBlockEntries( + ctx, stateBlockNIDs, stateKeyTuples, + ) +} + +// AddState implements input.EventDatabase +func (d *Database) AddState( + ctx context.Context, + roomNID types.RoomNID, + stateBlockNIDs []types.StateBlockNID, + state []types.StateEntry, +) (stateNID types.StateSnapshotNID, err error) { + err = internal.WithTransaction(d.DB, func(txn *sql.Tx) error { + if len(state) > 0 { + var stateBlockNID types.StateBlockNID + stateBlockNID, err = d.StateBlockTable.BulkInsertStateData(ctx, txn, state) + if err != nil { + return err + } + stateBlockNIDs = append(stateBlockNIDs[:len(stateBlockNIDs):len(stateBlockNIDs)], stateBlockNID) + } + stateNID, err = d.StateSnapshotTable.InsertState(ctx, txn, roomNID, stateBlockNIDs) + return err + }) + if err != nil { + return 0, err + } + return +} + // EventNIDs implements query.RoomserverQueryAPIDatabase func (d *Database) EventNIDs( ctx context.Context, eventIDs []string, @@ -150,6 +188,20 @@ func (d *Database) LatestEventIDs( return } +// StateBlockNIDs implements state.RoomStateDatabase +func (d *Database) StateBlockNIDs( + ctx context.Context, stateNIDs []types.StateSnapshotNID, +) ([]types.StateBlockNIDList, error) { + return d.StateSnapshotTable.BulkSelectStateBlockNIDs(ctx, stateNIDs) +} + +// StateEntries implements state.RoomStateDatabase +func (d *Database) StateEntries( + ctx context.Context, stateBlockNIDs []types.StateBlockNID, +) ([]types.StateEntryList, error) { + return d.StateBlockTable.BulkSelectStateBlockEntries(ctx, stateBlockNIDs) +} + func (d *Database) GetRoomVersionForRoom( ctx context.Context, roomID string, ) (gomatrixserverlib.RoomVersion, error) { diff --git a/roomserver/storage/sqlite3/sql.go b/roomserver/storage/sqlite3/sql.go index fe899174a..1704961c4 100644 --- a/roomserver/storage/sqlite3/sql.go +++ b/roomserver/storage/sqlite3/sql.go @@ -38,8 +38,6 @@ func (s *statements) prepare(db *sql.DB) error { var err error for _, prepare := range []func(db *sql.DB) error{ - s.stateSnapshotStatements.prepare, - s.stateBlockStatements.prepare, s.previousEventStatements.prepare, s.roomAliasesStatements.prepare, s.inviteStatements.prepare, diff --git a/roomserver/storage/sqlite3/state_block_table.go b/roomserver/storage/sqlite3/state_block_table.go index 861d76cf9..e3ca0b3c8 100644 --- a/roomserver/storage/sqlite3/state_block_table.go +++ b/roomserver/storage/sqlite3/state_block_table.go @@ -23,6 +23,7 @@ import ( "strings" "github.com/matrix-org/dendrite/internal" + "github.com/matrix-org/dendrite/roomserver/storage/tables" "github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/util" ) @@ -77,14 +78,15 @@ type stateBlockStatements struct { bulkSelectFilteredStateBlockEntriesStmt *sql.Stmt } -func (s *stateBlockStatements) prepare(db *sql.DB) (err error) { +func NewSqliteStateBlockTable(db *sql.DB) (tables.StateBlock, error) { + s := &stateBlockStatements{} s.db = db - _, err = db.Exec(stateDataSchema) + _, err := db.Exec(stateDataSchema) if err != nil { - return + return nil, err } - return statementList{ + return s, statementList{ {&s.insertStateDataStmt, insertStateDataSQL}, {&s.selectNextStateBlockNIDStmt, selectNextStateBlockNIDSQL}, {&s.bulkSelectStateBlockEntriesStmt, bulkSelectStateBlockEntriesSQL}, @@ -92,7 +94,7 @@ func (s *stateBlockStatements) prepare(db *sql.DB) (err error) { }.prepare(db) } -func (s *stateBlockStatements) bulkInsertStateData( +func (s *stateBlockStatements) BulkInsertStateData( ctx context.Context, txn *sql.Tx, entries []types.StateEntry, ) (types.StateBlockNID, error) { @@ -120,19 +122,18 @@ func (s *stateBlockStatements) bulkInsertStateData( return stateBlockNID, nil } -func (s *stateBlockStatements) bulkSelectStateBlockEntries( - ctx context.Context, txn *sql.Tx, stateBlockNIDs []types.StateBlockNID, +func (s *stateBlockStatements) BulkSelectStateBlockEntries( + ctx context.Context, stateBlockNIDs []types.StateBlockNID, ) ([]types.StateEntryList, error) { nids := make([]interface{}, len(stateBlockNIDs)) for k, v := range stateBlockNIDs { nids[k] = v } selectOrig := strings.Replace(bulkSelectStateBlockEntriesSQL, "($1)", internal.QueryVariadic(len(nids)), 1) - selectPrep, err := s.db.Prepare(selectOrig) + selectStmt, err := s.db.Prepare(selectOrig) if err != nil { return nil, err } - selectStmt := internal.TxStmt(txn, selectPrep) rows, err := selectStmt.QueryContext(ctx, nids...) if err != nil { return nil, err @@ -174,8 +175,8 @@ func (s *stateBlockStatements) bulkSelectStateBlockEntries( return results, nil } -func (s *stateBlockStatements) bulkSelectFilteredStateBlockEntries( - ctx context.Context, txn *sql.Tx, // nolint: unparam +func (s *stateBlockStatements) BulkSelectFilteredStateBlockEntries( + ctx context.Context, stateBlockNIDs []types.StateBlockNID, stateKeyTuples []types.StateKeyTuple, ) ([]types.StateEntryList, error) { diff --git a/roomserver/storage/sqlite3/state_snapshot_table.go b/roomserver/storage/sqlite3/state_snapshot_table.go index 8682627a0..42eae1a7f 100644 --- a/roomserver/storage/sqlite3/state_snapshot_table.go +++ b/roomserver/storage/sqlite3/state_snapshot_table.go @@ -23,6 +23,7 @@ import ( "strings" "github.com/matrix-org/dendrite/internal" + "github.com/matrix-org/dendrite/roomserver/storage/tables" "github.com/matrix-org/dendrite/roomserver/types" ) @@ -51,20 +52,21 @@ type stateSnapshotStatements struct { bulkSelectStateBlockNIDsStmt *sql.Stmt } -func (s *stateSnapshotStatements) prepare(db *sql.DB) (err error) { +func NewSqliteStateSnapshotTable(db *sql.DB) (tables.StateSnapshot, error) { + s := &stateSnapshotStatements{} s.db = db - _, err = db.Exec(stateSnapshotSchema) + _, err := db.Exec(stateSnapshotSchema) if err != nil { - return + return nil, err } - return statementList{ + return s, statementList{ {&s.insertStateStmt, insertStateSQL}, {&s.bulkSelectStateBlockNIDsStmt, bulkSelectStateBlockNIDsSQL}, }.prepare(db) } -func (s *stateSnapshotStatements) insertState( +func (s *stateSnapshotStatements) InsertState( ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, stateBlockNIDs []types.StateBlockNID, ) (stateNID types.StateSnapshotNID, err error) { stateBlockNIDsJSON, err := json.Marshal(stateBlockNIDs) @@ -82,15 +84,15 @@ func (s *stateSnapshotStatements) insertState( return } -func (s *stateSnapshotStatements) bulkSelectStateBlockNIDs( - ctx context.Context, txn *sql.Tx, stateNIDs []types.StateSnapshotNID, +func (s *stateSnapshotStatements) BulkSelectStateBlockNIDs( + ctx context.Context, stateNIDs []types.StateSnapshotNID, ) ([]types.StateBlockNIDList, error) { nids := make([]interface{}, len(stateNIDs)) for k, v := range stateNIDs { nids[k] = v } selectOrig := strings.Replace(bulkSelectStateBlockNIDsSQL, "($1)", internal.QueryVariadic(len(nids)), 1) - selectStmt, err := txn.Prepare(selectOrig) + selectStmt, err := s.db.Prepare(selectOrig) if err != nil { return nil, err } diff --git a/roomserver/storage/sqlite3/storage.go b/roomserver/storage/sqlite3/storage.go index a0a1b5684..d5bcc2bf5 100644 --- a/roomserver/storage/sqlite3/storage.go +++ b/roomserver/storage/sqlite3/storage.go @@ -97,6 +97,14 @@ func Open(dataSourceName string) (*Database, error) { if err != nil { return nil, err } + stateBlock, err := NewSqliteStateBlockTable(d.db) + if err != nil { + return nil, err + } + stateSnapshot, err := NewSqliteStateSnapshotTable(d.db) + if err != nil { + return nil, err + } d.Database = shared.Database{ DB: d.db, EventsTable: d.events, @@ -105,6 +113,8 @@ func Open(dataSourceName string) (*Database, error) { EventJSONTable: d.eventJSON, RoomsTable: d.rooms, TransactionsTable: d.transactions, + StateBlockTable: stateBlock, + StateSnapshotTable: stateSnapshot, } return &d, nil } @@ -142,53 +152,6 @@ func (d *Database) assignStateKeyNID( return } -// AddState implements input.EventDatabase -func (d *Database) AddState( - ctx context.Context, - roomNID types.RoomNID, - stateBlockNIDs []types.StateBlockNID, - state []types.StateEntry, -) (stateNID types.StateSnapshotNID, err error) { - err = internal.WithTransaction(d.db, func(txn *sql.Tx) error { - if len(state) > 0 { - var stateBlockNID types.StateBlockNID - stateBlockNID, err = d.statements.bulkInsertStateData(ctx, txn, state) - if err != nil { - return err - } - stateBlockNIDs = append(stateBlockNIDs[:len(stateBlockNIDs):len(stateBlockNIDs)], stateBlockNID) - } - stateNID, err = d.statements.insertState(ctx, txn, roomNID, stateBlockNIDs) - return err - }) - if err != nil { - return 0, err - } - return -} - -// StateBlockNIDs implements state.RoomStateDatabase -func (d *Database) StateBlockNIDs( - ctx context.Context, stateNIDs []types.StateSnapshotNID, -) (sl []types.StateBlockNIDList, err error) { - err = internal.WithTransaction(d.db, func(txn *sql.Tx) error { - sl, err = d.statements.bulkSelectStateBlockNIDs(ctx, txn, stateNIDs) - return err - }) - return -} - -// StateEntries implements state.RoomStateDatabase -func (d *Database) StateEntries( - ctx context.Context, stateBlockNIDs []types.StateBlockNID, -) (sel []types.StateEntryList, err error) { - err = internal.WithTransaction(d.db, func(txn *sql.Tx) error { - sel, err = d.statements.bulkSelectStateBlockEntries(ctx, txn, stateBlockNIDs) - return err - }) - return -} - // GetLatestEventsForUpdate implements input.EventDatabase func (d *Database) GetLatestEventsForUpdate( ctx context.Context, roomNID types.RoomNID, @@ -366,17 +329,6 @@ func (d *Database) RemoveRoomAlias(ctx context.Context, alias string) error { return d.statements.deleteRoomAlias(ctx, nil, alias) } -// StateEntriesForTuples implements state.RoomStateDatabase -func (d *Database) StateEntriesForTuples( - ctx context.Context, - stateBlockNIDs []types.StateBlockNID, - stateKeyTuples []types.StateKeyTuple, -) ([]types.StateEntryList, error) { - return d.statements.bulkSelectFilteredStateBlockEntries( - ctx, nil, stateBlockNIDs, stateKeyTuples, - ) -} - // MembershipUpdater implements input.RoomEventDatabase func (d *Database) MembershipUpdater( ctx context.Context, roomID, targetUserID string, diff --git a/roomserver/storage/tables/interface.go b/roomserver/storage/tables/interface.go index e913de6b4..89bff28d2 100644 --- a/roomserver/storage/tables/interface.go +++ b/roomserver/storage/tables/interface.go @@ -70,3 +70,14 @@ type Transactions interface { InsertTransaction(ctx context.Context, txn *sql.Tx, transactionID string, sessionID int64, userID string, eventID string) error SelectTransactionEventID(ctx context.Context, transactionID string, sessionID int64, userID string) (eventID string, err error) } + +type StateSnapshot interface { + InsertState(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, stateBlockNIDs []types.StateBlockNID) (stateNID types.StateSnapshotNID, err error) + BulkSelectStateBlockNIDs(ctx context.Context, stateNIDs []types.StateSnapshotNID) ([]types.StateBlockNIDList, error) +} + +type StateBlock interface { + BulkInsertStateData(ctx context.Context, txn *sql.Tx, entries []types.StateEntry) (types.StateBlockNID, error) + BulkSelectStateBlockEntries(ctx context.Context, stateBlockNIDs []types.StateBlockNID) ([]types.StateEntryList, error) + BulkSelectFilteredStateBlockEntries(ctx context.Context, stateBlockNIDs []types.StateBlockNID, stateKeyTuples []types.StateKeyTuple) ([]types.StateEntryList, error) +}