Convert state_snapshot and state_block tables

This commit is contained in:
Kegan Dougal 2020-05-26 18:43:33 +01:00
parent 19aa44ecae
commit bf1b44e6ba
10 changed files with 132 additions and 146 deletions

View file

@ -38,8 +38,6 @@ func (s *statements) prepare(db *sql.DB) error {
var err error var err error
for _, prepare := range []func(db *sql.DB) error{ for _, prepare := range []func(db *sql.DB) error{
s.stateSnapshotStatements.prepare,
s.stateBlockStatements.prepare,
s.previousEventStatements.prepare, s.previousEventStatements.prepare,
s.roomAliasesStatements.prepare, s.roomAliasesStatements.prepare,
s.inviteStatements.prepare, s.inviteStatements.prepare,

View file

@ -24,6 +24,7 @@ import (
"github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal"
"github.com/lib/pq" "github.com/lib/pq"
"github.com/matrix-org/dendrite/roomserver/storage/tables"
"github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/util" "github.com/matrix-org/util"
) )
@ -87,13 +88,14 @@ type stateBlockStatements struct {
bulkSelectFilteredStateBlockEntriesStmt *sql.Stmt bulkSelectFilteredStateBlockEntriesStmt *sql.Stmt
} }
func (s *stateBlockStatements) prepare(db *sql.DB) (err error) { func NewPostgresStateBlockTable(db *sql.DB) (tables.StateBlock, error) {
_, err = db.Exec(stateDataSchema) s := &stateBlockStatements{}
_, err := db.Exec(stateDataSchema)
if err != nil { if err != nil {
return return nil, err
} }
return statementList{ return s, statementList{
{&s.insertStateDataStmt, insertStateDataSQL}, {&s.insertStateDataStmt, insertStateDataSQL},
{&s.selectNextStateBlockNIDStmt, selectNextStateBlockNIDSQL}, {&s.selectNextStateBlockNIDStmt, selectNextStateBlockNIDSQL},
{&s.bulkSelectStateBlockEntriesStmt, bulkSelectStateBlockEntriesSQL}, {&s.bulkSelectStateBlockEntriesStmt, bulkSelectStateBlockEntriesSQL},
@ -101,11 +103,15 @@ func (s *stateBlockStatements) prepare(db *sql.DB) (err error) {
}.prepare(db) }.prepare(db)
} }
func (s *stateBlockStatements) bulkInsertStateData( func (s *stateBlockStatements) BulkInsertStateData(
ctx context.Context, ctx context.Context,
stateBlockNID types.StateBlockNID, txn *sql.Tx,
entries []types.StateEntry, entries []types.StateEntry,
) error { ) (types.StateBlockNID, error) {
stateBlockNID, err := s.selectNextStateBlockNID(ctx)
if err != nil {
return 0, err
}
for _, entry := range entries { for _, entry := range entries {
_, err := s.insertStateDataStmt.ExecContext( _, err := s.insertStateDataStmt.ExecContext(
ctx, ctx,
@ -115,10 +121,10 @@ func (s *stateBlockStatements) bulkInsertStateData(
int64(entry.EventNID), int64(entry.EventNID),
) )
if err != nil { if err != nil {
return err return 0, err
} }
} }
return nil return stateBlockNID, nil
} }
func (s *stateBlockStatements) selectNextStateBlockNID( func (s *stateBlockStatements) selectNextStateBlockNID(
@ -129,7 +135,7 @@ func (s *stateBlockStatements) selectNextStateBlockNID(
return types.StateBlockNID(stateBlockNID), err return types.StateBlockNID(stateBlockNID), err
} }
func (s *stateBlockStatements) bulkSelectStateBlockEntries( func (s *stateBlockStatements) BulkSelectStateBlockEntries(
ctx context.Context, stateBlockNIDs []types.StateBlockNID, ctx context.Context, stateBlockNIDs []types.StateBlockNID,
) ([]types.StateEntryList, error) { ) ([]types.StateEntryList, error) {
nids := make([]int64, len(stateBlockNIDs)) nids := make([]int64, len(stateBlockNIDs))
@ -180,7 +186,7 @@ func (s *stateBlockStatements) bulkSelectStateBlockEntries(
return results, err return results, err
} }
func (s *stateBlockStatements) bulkSelectFilteredStateBlockEntries( func (s *stateBlockStatements) BulkSelectFilteredStateBlockEntries(
ctx context.Context, ctx context.Context,
stateBlockNIDs []types.StateBlockNID, stateBlockNIDs []types.StateBlockNID,
stateKeyTuples []types.StateKeyTuple, stateKeyTuples []types.StateKeyTuple,

View file

@ -21,6 +21,7 @@ import (
"fmt" "fmt"
"github.com/lib/pq" "github.com/lib/pq"
"github.com/matrix-org/dendrite/roomserver/storage/tables"
"github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/dendrite/roomserver/types"
) )
@ -64,30 +65,31 @@ type stateSnapshotStatements struct {
bulkSelectStateBlockNIDsStmt *sql.Stmt bulkSelectStateBlockNIDsStmt *sql.Stmt
} }
func (s *stateSnapshotStatements) prepare(db *sql.DB) (err error) { func NewPostgresStateSnapshotTable(db *sql.DB) (tables.StateSnapshot, error) {
_, err = db.Exec(stateSnapshotSchema) s := &stateSnapshotStatements{}
_, err := db.Exec(stateSnapshotSchema)
if err != nil { if err != nil {
return return nil, err
} }
return statementList{ return s, statementList{
{&s.insertStateStmt, insertStateSQL}, {&s.insertStateStmt, insertStateSQL},
{&s.bulkSelectStateBlockNIDsStmt, bulkSelectStateBlockNIDsSQL}, {&s.bulkSelectStateBlockNIDsStmt, bulkSelectStateBlockNIDsSQL},
}.prepare(db) }.prepare(db)
} }
func (s *stateSnapshotStatements) insertState( func (s *stateSnapshotStatements) InsertState(
ctx context.Context, roomNID types.RoomNID, stateBlockNIDs []types.StateBlockNID, ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, stateBlockNIDs []types.StateBlockNID,
) (stateNID types.StateSnapshotNID, err error) { ) (stateNID types.StateSnapshotNID, err error) {
nids := make([]int64, len(stateBlockNIDs)) nids := make([]int64, len(stateBlockNIDs))
for i := range stateBlockNIDs { for i := range stateBlockNIDs {
nids[i] = int64(stateBlockNIDs[i]) nids[i] = int64(stateBlockNIDs[i])
} }
err = s.insertStateStmt.QueryRowContext(ctx, int64(roomNID), pq.Int64Array(nids)).Scan(&stateNID) err = txn.Stmt(s.insertStateStmt).QueryRowContext(ctx, int64(roomNID), pq.Int64Array(nids)).Scan(&stateNID)
return return
} }
func (s *stateSnapshotStatements) bulkSelectStateBlockNIDs( func (s *stateSnapshotStatements) BulkSelectStateBlockNIDs(
ctx context.Context, stateNIDs []types.StateSnapshotNID, ctx context.Context, stateNIDs []types.StateSnapshotNID,
) ([]types.StateBlockNIDList, error) { ) ([]types.StateBlockNIDList, error) {
nids := make([]int64, len(stateNIDs)) nids := make([]int64, len(stateNIDs))

View file

@ -77,6 +77,14 @@ func Open(dataSourceName string, dbProperties internal.DbProperties) (*Database,
if err != nil { if err != nil {
return nil, err return nil, err
} }
stateBlock, err := NewPostgresStateBlockTable(d.db)
if err != nil {
return nil, err
}
stateSnapshot, err := NewPostgresStateSnapshotTable(d.db)
if err != nil {
return nil, err
}
d.Database = shared.Database{ d.Database = shared.Database{
DB: d.db, DB: d.db,
EventTypesTable: d.eventTypes, EventTypesTable: d.eventTypes,
@ -85,6 +93,8 @@ func Open(dataSourceName string, dbProperties internal.DbProperties) (*Database,
EventsTable: d.events, EventsTable: d.events,
RoomsTable: d.rooms, RoomsTable: d.rooms,
TransactionsTable: d.transactions, TransactionsTable: d.transactions,
StateBlockTable: stateBlock,
StateSnapshotTable: stateSnapshot,
} }
return &d, nil return &d, nil
} }
@ -122,41 +132,6 @@ func (d *Database) assignStateKeyNID(
return eventStateKeyNID, err return eventStateKeyNID, err
} }
// AddState implements input.EventDatabase
func (d *Database) AddState(
ctx context.Context,
roomNID types.RoomNID,
stateBlockNIDs []types.StateBlockNID,
state []types.StateEntry,
) (types.StateSnapshotNID, error) {
if len(state) > 0 {
stateBlockNID, err := d.statements.selectNextStateBlockNID(ctx)
if err != nil {
return 0, err
}
if err = d.statements.bulkInsertStateData(ctx, stateBlockNID, state); err != nil {
return 0, err
}
stateBlockNIDs = append(stateBlockNIDs[:len(stateBlockNIDs):len(stateBlockNIDs)], stateBlockNID)
}
return d.statements.insertState(ctx, roomNID, stateBlockNIDs)
}
// StateBlockNIDs implements state.RoomStateDatabase
func (d *Database) StateBlockNIDs(
ctx context.Context, stateNIDs []types.StateSnapshotNID,
) ([]types.StateBlockNIDList, error) {
return d.statements.bulkSelectStateBlockNIDs(ctx, stateNIDs)
}
// StateEntries implements state.RoomStateDatabase
func (d *Database) StateEntries(
ctx context.Context, stateBlockNIDs []types.StateBlockNID,
) ([]types.StateEntryList, error) {
return d.statements.bulkSelectStateBlockEntries(ctx, stateBlockNIDs)
}
// GetLatestEventsForUpdate implements input.EventDatabase // GetLatestEventsForUpdate implements input.EventDatabase
func (d *Database) GetLatestEventsForUpdate( func (d *Database) GetLatestEventsForUpdate(
ctx context.Context, roomNID types.RoomNID, ctx context.Context, roomNID types.RoomNID,
@ -303,17 +278,6 @@ func (d *Database) RemoveRoomAlias(ctx context.Context, alias string) error {
return d.statements.deleteRoomAlias(ctx, alias) return d.statements.deleteRoomAlias(ctx, alias)
} }
// StateEntriesForTuples implements state.RoomStateDatabase
func (d *Database) StateEntriesForTuples(
ctx context.Context,
stateBlockNIDs []types.StateBlockNID,
stateKeyTuples []types.StateKeyTuple,
) ([]types.StateEntryList, error) {
return d.statements.bulkSelectFilteredStateBlockEntries(
ctx, stateBlockNIDs, stateKeyTuples,
)
}
// MembershipUpdater implements input.RoomEventDatabase // MembershipUpdater implements input.RoomEventDatabase
func (d *Database) MembershipUpdater( func (d *Database) MembershipUpdater(
ctx context.Context, roomID, targetUserID string, ctx context.Context, roomID, targetUserID string,

View file

@ -20,6 +20,8 @@ type Database struct {
EventStateKeysTable tables.EventStateKeys EventStateKeysTable tables.EventStateKeys
RoomsTable tables.Rooms RoomsTable tables.Rooms
TransactionsTable tables.Transactions TransactionsTable tables.Transactions
StateSnapshotTable tables.StateSnapshot
StateBlockTable tables.StateBlock
} }
// EventTypeNIDs implements state.RoomStateDatabase // EventTypeNIDs implements state.RoomStateDatabase
@ -50,6 +52,42 @@ func (d *Database) StateEntriesForEventIDs(
return d.EventsTable.BulkSelectStateEventByID(ctx, eventIDs) return d.EventsTable.BulkSelectStateEventByID(ctx, eventIDs)
} }
// StateEntriesForTuples implements state.RoomStateDatabase
func (d *Database) StateEntriesForTuples(
ctx context.Context,
stateBlockNIDs []types.StateBlockNID,
stateKeyTuples []types.StateKeyTuple,
) ([]types.StateEntryList, error) {
return d.StateBlockTable.BulkSelectFilteredStateBlockEntries(
ctx, stateBlockNIDs, stateKeyTuples,
)
}
// AddState implements input.EventDatabase
func (d *Database) AddState(
ctx context.Context,
roomNID types.RoomNID,
stateBlockNIDs []types.StateBlockNID,
state []types.StateEntry,
) (stateNID types.StateSnapshotNID, err error) {
err = internal.WithTransaction(d.DB, func(txn *sql.Tx) error {
if len(state) > 0 {
var stateBlockNID types.StateBlockNID
stateBlockNID, err = d.StateBlockTable.BulkInsertStateData(ctx, txn, state)
if err != nil {
return err
}
stateBlockNIDs = append(stateBlockNIDs[:len(stateBlockNIDs):len(stateBlockNIDs)], stateBlockNID)
}
stateNID, err = d.StateSnapshotTable.InsertState(ctx, txn, roomNID, stateBlockNIDs)
return err
})
if err != nil {
return 0, err
}
return
}
// EventNIDs implements query.RoomserverQueryAPIDatabase // EventNIDs implements query.RoomserverQueryAPIDatabase
func (d *Database) EventNIDs( func (d *Database) EventNIDs(
ctx context.Context, eventIDs []string, ctx context.Context, eventIDs []string,
@ -150,6 +188,20 @@ func (d *Database) LatestEventIDs(
return return
} }
// StateBlockNIDs implements state.RoomStateDatabase
func (d *Database) StateBlockNIDs(
ctx context.Context, stateNIDs []types.StateSnapshotNID,
) ([]types.StateBlockNIDList, error) {
return d.StateSnapshotTable.BulkSelectStateBlockNIDs(ctx, stateNIDs)
}
// StateEntries implements state.RoomStateDatabase
func (d *Database) StateEntries(
ctx context.Context, stateBlockNIDs []types.StateBlockNID,
) ([]types.StateEntryList, error) {
return d.StateBlockTable.BulkSelectStateBlockEntries(ctx, stateBlockNIDs)
}
func (d *Database) GetRoomVersionForRoom( func (d *Database) GetRoomVersionForRoom(
ctx context.Context, roomID string, ctx context.Context, roomID string,
) (gomatrixserverlib.RoomVersion, error) { ) (gomatrixserverlib.RoomVersion, error) {

View file

@ -38,8 +38,6 @@ func (s *statements) prepare(db *sql.DB) error {
var err error var err error
for _, prepare := range []func(db *sql.DB) error{ for _, prepare := range []func(db *sql.DB) error{
s.stateSnapshotStatements.prepare,
s.stateBlockStatements.prepare,
s.previousEventStatements.prepare, s.previousEventStatements.prepare,
s.roomAliasesStatements.prepare, s.roomAliasesStatements.prepare,
s.inviteStatements.prepare, s.inviteStatements.prepare,

View file

@ -23,6 +23,7 @@ import (
"strings" "strings"
"github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/roomserver/storage/tables"
"github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/util" "github.com/matrix-org/util"
) )
@ -77,14 +78,15 @@ type stateBlockStatements struct {
bulkSelectFilteredStateBlockEntriesStmt *sql.Stmt bulkSelectFilteredStateBlockEntriesStmt *sql.Stmt
} }
func (s *stateBlockStatements) prepare(db *sql.DB) (err error) { func NewSqliteStateBlockTable(db *sql.DB) (tables.StateBlock, error) {
s := &stateBlockStatements{}
s.db = db s.db = db
_, err = db.Exec(stateDataSchema) _, err := db.Exec(stateDataSchema)
if err != nil { if err != nil {
return return nil, err
} }
return statementList{ return s, statementList{
{&s.insertStateDataStmt, insertStateDataSQL}, {&s.insertStateDataStmt, insertStateDataSQL},
{&s.selectNextStateBlockNIDStmt, selectNextStateBlockNIDSQL}, {&s.selectNextStateBlockNIDStmt, selectNextStateBlockNIDSQL},
{&s.bulkSelectStateBlockEntriesStmt, bulkSelectStateBlockEntriesSQL}, {&s.bulkSelectStateBlockEntriesStmt, bulkSelectStateBlockEntriesSQL},
@ -92,7 +94,7 @@ func (s *stateBlockStatements) prepare(db *sql.DB) (err error) {
}.prepare(db) }.prepare(db)
} }
func (s *stateBlockStatements) bulkInsertStateData( func (s *stateBlockStatements) BulkInsertStateData(
ctx context.Context, txn *sql.Tx, ctx context.Context, txn *sql.Tx,
entries []types.StateEntry, entries []types.StateEntry,
) (types.StateBlockNID, error) { ) (types.StateBlockNID, error) {
@ -120,19 +122,18 @@ func (s *stateBlockStatements) bulkInsertStateData(
return stateBlockNID, nil return stateBlockNID, nil
} }
func (s *stateBlockStatements) bulkSelectStateBlockEntries( func (s *stateBlockStatements) BulkSelectStateBlockEntries(
ctx context.Context, txn *sql.Tx, stateBlockNIDs []types.StateBlockNID, ctx context.Context, stateBlockNIDs []types.StateBlockNID,
) ([]types.StateEntryList, error) { ) ([]types.StateEntryList, error) {
nids := make([]interface{}, len(stateBlockNIDs)) nids := make([]interface{}, len(stateBlockNIDs))
for k, v := range stateBlockNIDs { for k, v := range stateBlockNIDs {
nids[k] = v nids[k] = v
} }
selectOrig := strings.Replace(bulkSelectStateBlockEntriesSQL, "($1)", internal.QueryVariadic(len(nids)), 1) selectOrig := strings.Replace(bulkSelectStateBlockEntriesSQL, "($1)", internal.QueryVariadic(len(nids)), 1)
selectPrep, err := s.db.Prepare(selectOrig) selectStmt, err := s.db.Prepare(selectOrig)
if err != nil { if err != nil {
return nil, err return nil, err
} }
selectStmt := internal.TxStmt(txn, selectPrep)
rows, err := selectStmt.QueryContext(ctx, nids...) rows, err := selectStmt.QueryContext(ctx, nids...)
if err != nil { if err != nil {
return nil, err return nil, err
@ -174,8 +175,8 @@ func (s *stateBlockStatements) bulkSelectStateBlockEntries(
return results, nil return results, nil
} }
func (s *stateBlockStatements) bulkSelectFilteredStateBlockEntries( func (s *stateBlockStatements) BulkSelectFilteredStateBlockEntries(
ctx context.Context, txn *sql.Tx, // nolint: unparam ctx context.Context,
stateBlockNIDs []types.StateBlockNID, stateBlockNIDs []types.StateBlockNID,
stateKeyTuples []types.StateKeyTuple, stateKeyTuples []types.StateKeyTuple,
) ([]types.StateEntryList, error) { ) ([]types.StateEntryList, error) {

View file

@ -23,6 +23,7 @@ import (
"strings" "strings"
"github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/roomserver/storage/tables"
"github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/dendrite/roomserver/types"
) )
@ -51,20 +52,21 @@ type stateSnapshotStatements struct {
bulkSelectStateBlockNIDsStmt *sql.Stmt bulkSelectStateBlockNIDsStmt *sql.Stmt
} }
func (s *stateSnapshotStatements) prepare(db *sql.DB) (err error) { func NewSqliteStateSnapshotTable(db *sql.DB) (tables.StateSnapshot, error) {
s := &stateSnapshotStatements{}
s.db = db s.db = db
_, err = db.Exec(stateSnapshotSchema) _, err := db.Exec(stateSnapshotSchema)
if err != nil { if err != nil {
return return nil, err
} }
return statementList{ return s, statementList{
{&s.insertStateStmt, insertStateSQL}, {&s.insertStateStmt, insertStateSQL},
{&s.bulkSelectStateBlockNIDsStmt, bulkSelectStateBlockNIDsSQL}, {&s.bulkSelectStateBlockNIDsStmt, bulkSelectStateBlockNIDsSQL},
}.prepare(db) }.prepare(db)
} }
func (s *stateSnapshotStatements) insertState( func (s *stateSnapshotStatements) InsertState(
ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, stateBlockNIDs []types.StateBlockNID, ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, stateBlockNIDs []types.StateBlockNID,
) (stateNID types.StateSnapshotNID, err error) { ) (stateNID types.StateSnapshotNID, err error) {
stateBlockNIDsJSON, err := json.Marshal(stateBlockNIDs) stateBlockNIDsJSON, err := json.Marshal(stateBlockNIDs)
@ -82,15 +84,15 @@ func (s *stateSnapshotStatements) insertState(
return return
} }
func (s *stateSnapshotStatements) bulkSelectStateBlockNIDs( func (s *stateSnapshotStatements) BulkSelectStateBlockNIDs(
ctx context.Context, txn *sql.Tx, stateNIDs []types.StateSnapshotNID, ctx context.Context, stateNIDs []types.StateSnapshotNID,
) ([]types.StateBlockNIDList, error) { ) ([]types.StateBlockNIDList, error) {
nids := make([]interface{}, len(stateNIDs)) nids := make([]interface{}, len(stateNIDs))
for k, v := range stateNIDs { for k, v := range stateNIDs {
nids[k] = v nids[k] = v
} }
selectOrig := strings.Replace(bulkSelectStateBlockNIDsSQL, "($1)", internal.QueryVariadic(len(nids)), 1) selectOrig := strings.Replace(bulkSelectStateBlockNIDsSQL, "($1)", internal.QueryVariadic(len(nids)), 1)
selectStmt, err := txn.Prepare(selectOrig) selectStmt, err := s.db.Prepare(selectOrig)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -97,6 +97,14 @@ func Open(dataSourceName string) (*Database, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
stateBlock, err := NewSqliteStateBlockTable(d.db)
if err != nil {
return nil, err
}
stateSnapshot, err := NewSqliteStateSnapshotTable(d.db)
if err != nil {
return nil, err
}
d.Database = shared.Database{ d.Database = shared.Database{
DB: d.db, DB: d.db,
EventsTable: d.events, EventsTable: d.events,
@ -105,6 +113,8 @@ func Open(dataSourceName string) (*Database, error) {
EventJSONTable: d.eventJSON, EventJSONTable: d.eventJSON,
RoomsTable: d.rooms, RoomsTable: d.rooms,
TransactionsTable: d.transactions, TransactionsTable: d.transactions,
StateBlockTable: stateBlock,
StateSnapshotTable: stateSnapshot,
} }
return &d, nil return &d, nil
} }
@ -142,53 +152,6 @@ func (d *Database) assignStateKeyNID(
return return
} }
// AddState implements input.EventDatabase
func (d *Database) AddState(
ctx context.Context,
roomNID types.RoomNID,
stateBlockNIDs []types.StateBlockNID,
state []types.StateEntry,
) (stateNID types.StateSnapshotNID, err error) {
err = internal.WithTransaction(d.db, func(txn *sql.Tx) error {
if len(state) > 0 {
var stateBlockNID types.StateBlockNID
stateBlockNID, err = d.statements.bulkInsertStateData(ctx, txn, state)
if err != nil {
return err
}
stateBlockNIDs = append(stateBlockNIDs[:len(stateBlockNIDs):len(stateBlockNIDs)], stateBlockNID)
}
stateNID, err = d.statements.insertState(ctx, txn, roomNID, stateBlockNIDs)
return err
})
if err != nil {
return 0, err
}
return
}
// StateBlockNIDs implements state.RoomStateDatabase
func (d *Database) StateBlockNIDs(
ctx context.Context, stateNIDs []types.StateSnapshotNID,
) (sl []types.StateBlockNIDList, err error) {
err = internal.WithTransaction(d.db, func(txn *sql.Tx) error {
sl, err = d.statements.bulkSelectStateBlockNIDs(ctx, txn, stateNIDs)
return err
})
return
}
// StateEntries implements state.RoomStateDatabase
func (d *Database) StateEntries(
ctx context.Context, stateBlockNIDs []types.StateBlockNID,
) (sel []types.StateEntryList, err error) {
err = internal.WithTransaction(d.db, func(txn *sql.Tx) error {
sel, err = d.statements.bulkSelectStateBlockEntries(ctx, txn, stateBlockNIDs)
return err
})
return
}
// GetLatestEventsForUpdate implements input.EventDatabase // GetLatestEventsForUpdate implements input.EventDatabase
func (d *Database) GetLatestEventsForUpdate( func (d *Database) GetLatestEventsForUpdate(
ctx context.Context, roomNID types.RoomNID, ctx context.Context, roomNID types.RoomNID,
@ -366,17 +329,6 @@ func (d *Database) RemoveRoomAlias(ctx context.Context, alias string) error {
return d.statements.deleteRoomAlias(ctx, nil, alias) return d.statements.deleteRoomAlias(ctx, nil, alias)
} }
// StateEntriesForTuples implements state.RoomStateDatabase
func (d *Database) StateEntriesForTuples(
ctx context.Context,
stateBlockNIDs []types.StateBlockNID,
stateKeyTuples []types.StateKeyTuple,
) ([]types.StateEntryList, error) {
return d.statements.bulkSelectFilteredStateBlockEntries(
ctx, nil, stateBlockNIDs, stateKeyTuples,
)
}
// MembershipUpdater implements input.RoomEventDatabase // MembershipUpdater implements input.RoomEventDatabase
func (d *Database) MembershipUpdater( func (d *Database) MembershipUpdater(
ctx context.Context, roomID, targetUserID string, ctx context.Context, roomID, targetUserID string,

View file

@ -70,3 +70,14 @@ type Transactions interface {
InsertTransaction(ctx context.Context, txn *sql.Tx, transactionID string, sessionID int64, userID string, eventID string) error InsertTransaction(ctx context.Context, txn *sql.Tx, transactionID string, sessionID int64, userID string, eventID string) error
SelectTransactionEventID(ctx context.Context, transactionID string, sessionID int64, userID string) (eventID string, err error) SelectTransactionEventID(ctx context.Context, transactionID string, sessionID int64, userID string) (eventID string, err error)
} }
type StateSnapshot interface {
InsertState(ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, stateBlockNIDs []types.StateBlockNID) (stateNID types.StateSnapshotNID, err error)
BulkSelectStateBlockNIDs(ctx context.Context, stateNIDs []types.StateSnapshotNID) ([]types.StateBlockNIDList, error)
}
type StateBlock interface {
BulkInsertStateData(ctx context.Context, txn *sql.Tx, entries []types.StateEntry) (types.StateBlockNID, error)
BulkSelectStateBlockEntries(ctx context.Context, stateBlockNIDs []types.StateBlockNID) ([]types.StateEntryList, error)
BulkSelectFilteredStateBlockEntries(ctx context.Context, stateBlockNIDs []types.StateBlockNID, stateKeyTuples []types.StateKeyTuple) ([]types.StateEntryList, error)
}