mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-21 05:43:09 -06:00
Rename TransactionWriters to Writers
This commit is contained in:
parent
c97f3d3f83
commit
bc1023ea19
|
|
@ -32,7 +32,7 @@ type Database struct {
|
||||||
events eventsStatements
|
events eventsStatements
|
||||||
txnID txnStatements
|
txnID txnStatements
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
writer sqlutil.TransactionWriter
|
writer sqlutil.Writer
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewDatabase opens a new database
|
// NewDatabase opens a new database
|
||||||
|
|
@ -42,7 +42,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) {
|
||||||
if result.db, err = sqlutil.Open(dbProperties); err != nil {
|
if result.db, err = sqlutil.Open(dbProperties); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
result.writer = sqlutil.NewDummyTransactionWriter()
|
result.writer = sqlutil.NewDummyWriter()
|
||||||
if err = result.prepare(); err != nil {
|
if err = result.prepare(); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -67,7 +67,7 @@ const (
|
||||||
|
|
||||||
type eventsStatements struct {
|
type eventsStatements struct {
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
writer sqlutil.TransactionWriter
|
writer sqlutil.Writer
|
||||||
selectEventsByApplicationServiceIDStmt *sql.Stmt
|
selectEventsByApplicationServiceIDStmt *sql.Stmt
|
||||||
countEventsByApplicationServiceIDStmt *sql.Stmt
|
countEventsByApplicationServiceIDStmt *sql.Stmt
|
||||||
insertEventStmt *sql.Stmt
|
insertEventStmt *sql.Stmt
|
||||||
|
|
@ -75,7 +75,7 @@ type eventsStatements struct {
|
||||||
deleteEventsBeforeAndIncludingIDStmt *sql.Stmt
|
deleteEventsBeforeAndIncludingIDStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *eventsStatements) prepare(db *sql.DB, writer sqlutil.TransactionWriter) (err error) {
|
func (s *eventsStatements) prepare(db *sql.DB, writer sqlutil.Writer) (err error) {
|
||||||
s.db = db
|
s.db = db
|
||||||
s.writer = writer
|
s.writer = writer
|
||||||
_, err = db.Exec(appserviceEventsSchema)
|
_, err = db.Exec(appserviceEventsSchema)
|
||||||
|
|
|
||||||
|
|
@ -32,7 +32,7 @@ type Database struct {
|
||||||
events eventsStatements
|
events eventsStatements
|
||||||
txnID txnStatements
|
txnID txnStatements
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
writer sqlutil.TransactionWriter
|
writer sqlutil.Writer
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewDatabase opens a new database
|
// NewDatabase opens a new database
|
||||||
|
|
@ -42,7 +42,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) {
|
||||||
if result.db, err = sqlutil.Open(dbProperties); err != nil {
|
if result.db, err = sqlutil.Open(dbProperties); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
result.writer = sqlutil.NewTransactionWriter()
|
result.writer = sqlutil.NewExclusiveWriter()
|
||||||
if err = result.prepare(); err != nil {
|
if err = result.prepare(); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -38,11 +38,11 @@ const selectTxnIDSQL = `
|
||||||
|
|
||||||
type txnStatements struct {
|
type txnStatements struct {
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
writer sqlutil.TransactionWriter
|
writer sqlutil.Writer
|
||||||
selectTxnIDStmt *sql.Stmt
|
selectTxnIDStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *txnStatements) prepare(db *sql.DB, writer sqlutil.TransactionWriter) (err error) {
|
func (s *txnStatements) prepare(db *sql.DB, writer sqlutil.Writer) (err error) {
|
||||||
s.db = db
|
s.db = db
|
||||||
s.writer = writer
|
s.writer = writer
|
||||||
_, err = db.Exec(txnIDSchema)
|
_, err = db.Exec(txnIDSchema)
|
||||||
|
|
|
||||||
|
|
@ -11,7 +11,7 @@ import (
|
||||||
type Database struct {
|
type Database struct {
|
||||||
shared.Database
|
shared.Database
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
writer sqlutil.TransactionWriter
|
writer sqlutil.Writer
|
||||||
sqlutil.PartitionOffsetStatements
|
sqlutil.PartitionOffsetStatements
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -22,7 +22,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) {
|
||||||
if d.db, err = sqlutil.Open(dbProperties); err != nil {
|
if d.db, err = sqlutil.Open(dbProperties); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
d.writer = sqlutil.NewDummyTransactionWriter()
|
d.writer = sqlutil.NewDummyWriter()
|
||||||
if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "currentstate"); err != nil {
|
if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "currentstate"); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -27,7 +27,7 @@ import (
|
||||||
|
|
||||||
type Database struct {
|
type Database struct {
|
||||||
DB *sql.DB
|
DB *sql.DB
|
||||||
Writer sqlutil.TransactionWriter
|
Writer sqlutil.Writer
|
||||||
CurrentRoomState tables.CurrentRoomState
|
CurrentRoomState tables.CurrentRoomState
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -83,7 +83,7 @@ const selectKnownUsersSQL = "" +
|
||||||
|
|
||||||
type currentRoomStateStatements struct {
|
type currentRoomStateStatements struct {
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
writer sqlutil.TransactionWriter
|
writer sqlutil.Writer
|
||||||
upsertRoomStateStmt *sql.Stmt
|
upsertRoomStateStmt *sql.Stmt
|
||||||
deleteRoomStateByEventIDStmt *sql.Stmt
|
deleteRoomStateByEventIDStmt *sql.Stmt
|
||||||
selectRoomIDsWithMembershipStmt *sql.Stmt
|
selectRoomIDsWithMembershipStmt *sql.Stmt
|
||||||
|
|
@ -96,7 +96,7 @@ type currentRoomStateStatements struct {
|
||||||
func NewSqliteCurrentRoomStateTable(db *sql.DB) (tables.CurrentRoomState, error) {
|
func NewSqliteCurrentRoomStateTable(db *sql.DB) (tables.CurrentRoomState, error) {
|
||||||
s := ¤tRoomStateStatements{
|
s := ¤tRoomStateStatements{
|
||||||
db: db,
|
db: db,
|
||||||
writer: sqlutil.NewTransactionWriter(),
|
writer: sqlutil.NewExclusiveWriter(),
|
||||||
}
|
}
|
||||||
_, err := db.Exec(currentRoomStateSchema)
|
_, err := db.Exec(currentRoomStateSchema)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
||||||
|
|
@ -11,7 +11,7 @@ import (
|
||||||
type Database struct {
|
type Database struct {
|
||||||
shared.Database
|
shared.Database
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
writer sqlutil.TransactionWriter
|
writer sqlutil.Writer
|
||||||
sqlutil.PartitionOffsetStatements
|
sqlutil.PartitionOffsetStatements
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -23,7 +23,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) {
|
||||||
if d.db, err = sqlutil.Open(dbProperties); err != nil {
|
if d.db, err = sqlutil.Open(dbProperties); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
d.writer = sqlutil.NewTransactionWriter()
|
d.writer = sqlutil.NewExclusiveWriter()
|
||||||
if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "currentstate"); err != nil {
|
if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "currentstate"); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -28,7 +28,7 @@ type Database struct {
|
||||||
shared.Database
|
shared.Database
|
||||||
sqlutil.PartitionOffsetStatements
|
sqlutil.PartitionOffsetStatements
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
writer sqlutil.TransactionWriter
|
writer sqlutil.Writer
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewDatabase opens a new database
|
// NewDatabase opens a new database
|
||||||
|
|
@ -38,7 +38,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) {
|
||||||
if d.db, err = sqlutil.Open(dbProperties); err != nil {
|
if d.db, err = sqlutil.Open(dbProperties); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
d.writer = sqlutil.NewDummyTransactionWriter()
|
d.writer = sqlutil.NewDummyWriter()
|
||||||
joinedHosts, err := NewPostgresJoinedHostsTable(d.db)
|
joinedHosts, err := NewPostgresJoinedHostsTable(d.db)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
|
||||||
|
|
@ -28,7 +28,7 @@ import (
|
||||||
|
|
||||||
type Database struct {
|
type Database struct {
|
||||||
DB *sql.DB
|
DB *sql.DB
|
||||||
Writer sqlutil.TransactionWriter
|
Writer sqlutil.Writer
|
||||||
FederationSenderQueuePDUs tables.FederationSenderQueuePDUs
|
FederationSenderQueuePDUs tables.FederationSenderQueuePDUs
|
||||||
FederationSenderQueueEDUs tables.FederationSenderQueueEDUs
|
FederationSenderQueueEDUs tables.FederationSenderQueueEDUs
|
||||||
FederationSenderQueueJSON tables.FederationSenderQueueJSON
|
FederationSenderQueueJSON tables.FederationSenderQueueJSON
|
||||||
|
|
|
||||||
|
|
@ -30,7 +30,7 @@ type Database struct {
|
||||||
shared.Database
|
shared.Database
|
||||||
sqlutil.PartitionOffsetStatements
|
sqlutil.PartitionOffsetStatements
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
writer sqlutil.TransactionWriter
|
writer sqlutil.Writer
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewDatabase opens a new database
|
// NewDatabase opens a new database
|
||||||
|
|
@ -40,7 +40,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) {
|
||||||
if d.db, err = sqlutil.Open(dbProperties); err != nil {
|
if d.db, err = sqlutil.Open(dbProperties); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
d.writer = sqlutil.NewTransactionWriter()
|
d.writer = sqlutil.NewExclusiveWriter()
|
||||||
joinedHosts, err := NewSQLiteJoinedHostsTable(d.db)
|
joinedHosts, err := NewSQLiteJoinedHostsTable(d.db)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
|
||||||
|
|
@ -54,7 +54,7 @@ const upsertPartitionOffsetsSQL = "" +
|
||||||
// PartitionOffsetStatements represents a set of statements that can be run on a partition_offsets table.
|
// PartitionOffsetStatements represents a set of statements that can be run on a partition_offsets table.
|
||||||
type PartitionOffsetStatements struct {
|
type PartitionOffsetStatements struct {
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
writer TransactionWriter
|
writer Writer
|
||||||
selectPartitionOffsetsStmt *sql.Stmt
|
selectPartitionOffsetsStmt *sql.Stmt
|
||||||
upsertPartitionOffsetStmt *sql.Stmt
|
upsertPartitionOffsetStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
@ -62,7 +62,7 @@ type PartitionOffsetStatements struct {
|
||||||
// Prepare converts the raw SQL statements into prepared statements.
|
// Prepare converts the raw SQL statements into prepared statements.
|
||||||
// Takes a prefix to prepend to the table name used to store the partition offsets.
|
// Takes a prefix to prepend to the table name used to store the partition offsets.
|
||||||
// This allows multiple components to share the same database schema.
|
// This allows multiple components to share the same database schema.
|
||||||
func (s *PartitionOffsetStatements) Prepare(db *sql.DB, writer TransactionWriter, prefix string) (err error) {
|
func (s *PartitionOffsetStatements) Prepare(db *sql.DB, writer Writer, prefix string) (err error) {
|
||||||
s.db = db
|
s.db = db
|
||||||
s.writer = writer
|
s.writer = writer
|
||||||
_, err = db.Exec(strings.Replace(partitionOffsetsSchema, "${prefix}", prefix, -1))
|
_, err = db.Exec(strings.Replace(partitionOffsetsSchema, "${prefix}", prefix, -1))
|
||||||
|
|
|
||||||
|
|
@ -104,6 +104,6 @@ func SQLiteDriverName() string {
|
||||||
return "sqlite3"
|
return "sqlite3"
|
||||||
}
|
}
|
||||||
|
|
||||||
type TransactionWriter interface {
|
type Writer interface {
|
||||||
Do(db *sql.DB, txn *sql.Tx, f func(txn *sql.Tx) error) error
|
Do(db *sql.DB, txn *sql.Tx, f func(txn *sql.Tx) error) error
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -4,15 +4,15 @@ import (
|
||||||
"database/sql"
|
"database/sql"
|
||||||
)
|
)
|
||||||
|
|
||||||
type DummyTransactionWriter struct {
|
type DummyWriter struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewDummyTransactionWriter() TransactionWriter {
|
func NewDummyWriter() Writer {
|
||||||
return &DummyTransactionWriter{}
|
return &DummyWriter{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *DummyTransactionWriter) Do(db *sql.DB, txn *sql.Tx, f func(txn *sql.Tx) error) error {
|
func (w *DummyWriter) Do(db *sql.DB, txn *sql.Tx, f func(txn *sql.Tx) error) error {
|
||||||
if txn == nil {
|
if db != nil && txn == nil {
|
||||||
return WithTransaction(db, func(txn *sql.Tx) error {
|
return WithTransaction(db, func(txn *sql.Tx) error {
|
||||||
return f(txn)
|
return f(txn)
|
||||||
})
|
})
|
||||||
|
|
|
||||||
|
|
@ -7,16 +7,16 @@ import (
|
||||||
"go.uber.org/atomic"
|
"go.uber.org/atomic"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ExclusiveTransactionWriter allows queuing database writes so that you don't
|
// ExclusiveWriter allows queuing database writes so that you don't
|
||||||
// contend on database locks in, e.g. SQLite. Only one task will run
|
// contend on database locks in, e.g. SQLite. Only one task will run
|
||||||
// at a time on a given ExclusiveTransactionWriter.
|
// at a time on a given ExclusiveWriter.
|
||||||
type ExclusiveTransactionWriter struct {
|
type ExclusiveWriter struct {
|
||||||
running atomic.Bool
|
running atomic.Bool
|
||||||
todo chan transactionWriterTask
|
todo chan transactionWriterTask
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewTransactionWriter() TransactionWriter {
|
func NewExclusiveWriter() Writer {
|
||||||
return &ExclusiveTransactionWriter{
|
return &ExclusiveWriter{
|
||||||
todo: make(chan transactionWriterTask),
|
todo: make(chan transactionWriterTask),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -34,7 +34,7 @@ type transactionWriterTask struct {
|
||||||
// txn parameter if one is supplied, and if not, will take out a
|
// txn parameter if one is supplied, and if not, will take out a
|
||||||
// new transaction from the database supplied in the database
|
// new transaction from the database supplied in the database
|
||||||
// parameter. Either way, this will block until the task is done.
|
// parameter. Either way, this will block until the task is done.
|
||||||
func (w *ExclusiveTransactionWriter) Do(db *sql.DB, txn *sql.Tx, f func(txn *sql.Tx) error) error {
|
func (w *ExclusiveWriter) Do(db *sql.DB, txn *sql.Tx, f func(txn *sql.Tx) error) error {
|
||||||
if w.todo == nil {
|
if w.todo == nil {
|
||||||
return errors.New("not initialised")
|
return errors.New("not initialised")
|
||||||
}
|
}
|
||||||
|
|
@ -55,15 +55,15 @@ func (w *ExclusiveTransactionWriter) Do(db *sql.DB, txn *sql.Tx, f func(txn *sql
|
||||||
// of these goroutines will run at a time. A transaction will be
|
// of these goroutines will run at a time. A transaction will be
|
||||||
// opened using the database object from the task and then this will
|
// opened using the database object from the task and then this will
|
||||||
// be passed as a parameter to the task function.
|
// be passed as a parameter to the task function.
|
||||||
func (w *ExclusiveTransactionWriter) run() {
|
func (w *ExclusiveWriter) run() {
|
||||||
if !w.running.CAS(false, true) {
|
if !w.running.CAS(false, true) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer w.running.Store(false)
|
defer w.running.Store(false)
|
||||||
for task := range w.todo {
|
for task := range w.todo {
|
||||||
if task.txn != nil {
|
if task.db != nil && task.txn != nil {
|
||||||
task.wait <- task.f(task.txn)
|
task.wait <- task.f(task.txn)
|
||||||
} else if task.db != nil {
|
} else if task.db != nil && task.txn == nil {
|
||||||
task.wait <- WithTransaction(task.db, func(txn *sql.Tx) error {
|
task.wait <- WithTransaction(task.db, func(txn *sql.Tx) error {
|
||||||
return task.f(txn)
|
return task.f(txn)
|
||||||
})
|
})
|
||||||
|
|
|
||||||
|
|
@ -63,7 +63,7 @@ const deleteAllDeviceKeysSQL = "" +
|
||||||
|
|
||||||
type deviceKeysStatements struct {
|
type deviceKeysStatements struct {
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
writer sqlutil.TransactionWriter
|
writer sqlutil.Writer
|
||||||
upsertDeviceKeysStmt *sql.Stmt
|
upsertDeviceKeysStmt *sql.Stmt
|
||||||
selectDeviceKeysStmt *sql.Stmt
|
selectDeviceKeysStmt *sql.Stmt
|
||||||
selectBatchDeviceKeysStmt *sql.Stmt
|
selectBatchDeviceKeysStmt *sql.Stmt
|
||||||
|
|
@ -71,7 +71,7 @@ type deviceKeysStatements struct {
|
||||||
deleteAllDeviceKeysStmt *sql.Stmt
|
deleteAllDeviceKeysStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSqliteDeviceKeysTable(db *sql.DB, writer sqlutil.TransactionWriter) (tables.DeviceKeys, error) {
|
func NewSqliteDeviceKeysTable(db *sql.DB, writer sqlutil.Writer) (tables.DeviceKeys, error) {
|
||||||
s := &deviceKeysStatements{
|
s := &deviceKeysStatements{
|
||||||
db: db,
|
db: db,
|
||||||
writer: writer,
|
writer: writer,
|
||||||
|
|
|
||||||
|
|
@ -52,12 +52,12 @@ const selectKeyChangesSQL = "" +
|
||||||
|
|
||||||
type keyChangesStatements struct {
|
type keyChangesStatements struct {
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
writer sqlutil.TransactionWriter
|
writer sqlutil.Writer
|
||||||
upsertKeyChangeStmt *sql.Stmt
|
upsertKeyChangeStmt *sql.Stmt
|
||||||
selectKeyChangesStmt *sql.Stmt
|
selectKeyChangesStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSqliteKeyChangesTable(db *sql.DB, writer sqlutil.TransactionWriter) (tables.KeyChanges, error) {
|
func NewSqliteKeyChangesTable(db *sql.DB, writer sqlutil.Writer) (tables.KeyChanges, error) {
|
||||||
s := &keyChangesStatements{
|
s := &keyChangesStatements{
|
||||||
db: db,
|
db: db,
|
||||||
writer: writer,
|
writer: writer,
|
||||||
|
|
|
||||||
|
|
@ -60,7 +60,7 @@ const selectKeyByAlgorithmSQL = "" +
|
||||||
|
|
||||||
type oneTimeKeysStatements struct {
|
type oneTimeKeysStatements struct {
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
writer sqlutil.TransactionWriter
|
writer sqlutil.Writer
|
||||||
upsertKeysStmt *sql.Stmt
|
upsertKeysStmt *sql.Stmt
|
||||||
selectKeysStmt *sql.Stmt
|
selectKeysStmt *sql.Stmt
|
||||||
selectKeysCountStmt *sql.Stmt
|
selectKeysCountStmt *sql.Stmt
|
||||||
|
|
@ -68,7 +68,7 @@ type oneTimeKeysStatements struct {
|
||||||
deleteOneTimeKeyStmt *sql.Stmt
|
deleteOneTimeKeyStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSqliteOneTimeKeysTable(db *sql.DB, writer sqlutil.TransactionWriter) (tables.OneTimeKeys, error) {
|
func NewSqliteOneTimeKeysTable(db *sql.DB, writer sqlutil.Writer) (tables.OneTimeKeys, error) {
|
||||||
s := &oneTimeKeysStatements{
|
s := &oneTimeKeysStatements{
|
||||||
db: db,
|
db: db,
|
||||||
writer: writer,
|
writer: writer,
|
||||||
|
|
|
||||||
|
|
@ -51,13 +51,13 @@ const selectStaleDeviceListsSQL = "" +
|
||||||
|
|
||||||
type staleDeviceListsStatements struct {
|
type staleDeviceListsStatements struct {
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
writer sqlutil.TransactionWriter
|
writer sqlutil.Writer
|
||||||
upsertStaleDeviceListStmt *sql.Stmt
|
upsertStaleDeviceListStmt *sql.Stmt
|
||||||
selectStaleDeviceListsWithDomainsStmt *sql.Stmt
|
selectStaleDeviceListsWithDomainsStmt *sql.Stmt
|
||||||
selectStaleDeviceListsStmt *sql.Stmt
|
selectStaleDeviceListsStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSqliteStaleDeviceListsTable(db *sql.DB, writer sqlutil.TransactionWriter) (tables.StaleDeviceLists, error) {
|
func NewSqliteStaleDeviceListsTable(db *sql.DB, writer sqlutil.Writer) (tables.StaleDeviceLists, error) {
|
||||||
s := &staleDeviceListsStatements{
|
s := &staleDeviceListsStatements{
|
||||||
db: db,
|
db: db,
|
||||||
writer: writer,
|
writer: writer,
|
||||||
|
|
|
||||||
|
|
@ -25,7 +25,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*shared.Database, error)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
writer := sqlutil.NewTransactionWriter()
|
writer := sqlutil.NewExclusiveWriter()
|
||||||
otk, err := NewSqliteOneTimeKeysTable(db, writer)
|
otk, err := NewSqliteOneTimeKeysTable(db, writer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
|
||||||
|
|
@ -62,12 +62,12 @@ SELECT content_type, file_size_bytes, creation_ts, upload_name, base64hash, user
|
||||||
|
|
||||||
type mediaStatements struct {
|
type mediaStatements struct {
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
writer sqlutil.TransactionWriter
|
writer sqlutil.Writer
|
||||||
insertMediaStmt *sql.Stmt
|
insertMediaStmt *sql.Stmt
|
||||||
selectMediaStmt *sql.Stmt
|
selectMediaStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *mediaStatements) prepare(db *sql.DB, writer sqlutil.TransactionWriter) (err error) {
|
func (s *mediaStatements) prepare(db *sql.DB, writer sqlutil.Writer) (err error) {
|
||||||
s.db = db
|
s.db = db
|
||||||
s.writer = writer
|
s.writer = writer
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -26,7 +26,7 @@ type statements struct {
|
||||||
thumbnail thumbnailStatements
|
thumbnail thumbnailStatements
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *statements) prepare(db *sql.DB, writer sqlutil.TransactionWriter) (err error) {
|
func (s *statements) prepare(db *sql.DB, writer sqlutil.Writer) (err error) {
|
||||||
if err = s.media.prepare(db, writer); err != nil {
|
if err = s.media.prepare(db, writer); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -31,13 +31,13 @@ import (
|
||||||
type Database struct {
|
type Database struct {
|
||||||
statements statements
|
statements statements
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
writer sqlutil.TransactionWriter
|
writer sqlutil.Writer
|
||||||
}
|
}
|
||||||
|
|
||||||
// Open opens a postgres database.
|
// Open opens a postgres database.
|
||||||
func Open(dbProperties *config.DatabaseOptions) (*Database, error) {
|
func Open(dbProperties *config.DatabaseOptions) (*Database, error) {
|
||||||
d := Database{
|
d := Database{
|
||||||
writer: sqlutil.NewTransactionWriter(),
|
writer: sqlutil.NewExclusiveWriter(),
|
||||||
}
|
}
|
||||||
var err error
|
var err error
|
||||||
if d.db, err = sqlutil.Open(dbProperties); err != nil {
|
if d.db, err = sqlutil.Open(dbProperties); err != nil {
|
||||||
|
|
|
||||||
|
|
@ -59,13 +59,13 @@ SELECT content_type, file_size_bytes, creation_ts, width, height, resize_method
|
||||||
|
|
||||||
type thumbnailStatements struct {
|
type thumbnailStatements struct {
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
writer sqlutil.TransactionWriter
|
writer sqlutil.Writer
|
||||||
insertThumbnailStmt *sql.Stmt
|
insertThumbnailStmt *sql.Stmt
|
||||||
selectThumbnailStmt *sql.Stmt
|
selectThumbnailStmt *sql.Stmt
|
||||||
selectThumbnailsStmt *sql.Stmt
|
selectThumbnailsStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *thumbnailStatements) prepare(db *sql.DB, writer sqlutil.TransactionWriter) (err error) {
|
func (s *thumbnailStatements) prepare(db *sql.DB, writer sqlutil.Writer) (err error) {
|
||||||
_, err = db.Exec(thumbnailSchema)
|
_, err = db.Exec(thumbnailSchema)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
|
|
|
||||||
|
|
@ -98,7 +98,7 @@ func Open(dbProperties *config.DatabaseOptions) (*Database, error) {
|
||||||
}
|
}
|
||||||
d.Database = shared.Database{
|
d.Database = shared.Database{
|
||||||
DB: db,
|
DB: db,
|
||||||
Writer: sqlutil.NewDummyTransactionWriter(),
|
Writer: sqlutil.NewDummyWriter(),
|
||||||
EventTypesTable: eventTypes,
|
EventTypesTable: eventTypes,
|
||||||
EventStateKeysTable: eventStateKeys,
|
EventStateKeysTable: eventStateKeys,
|
||||||
EventJSONTable: eventJSON,
|
EventJSONTable: eventJSON,
|
||||||
|
|
|
||||||
|
|
@ -27,7 +27,7 @@ const redactionsArePermanent = false
|
||||||
|
|
||||||
type Database struct {
|
type Database struct {
|
||||||
DB *sql.DB
|
DB *sql.DB
|
||||||
Writer sqlutil.TransactionWriter
|
Writer sqlutil.Writer
|
||||||
EventsTable tables.Events
|
EventsTable tables.Events
|
||||||
EventJSONTable tables.EventJSON
|
EventJSONTable tables.EventJSON
|
||||||
EventTypesTable tables.EventTypes
|
EventTypesTable tables.EventTypes
|
||||||
|
|
|
||||||
|
|
@ -41,7 +41,7 @@ type Database struct {
|
||||||
invites tables.Invites
|
invites tables.Invites
|
||||||
membership tables.Membership
|
membership tables.Membership
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
writer sqlutil.TransactionWriter
|
writer sqlutil.Writer
|
||||||
}
|
}
|
||||||
|
|
||||||
// Open a sqlite database.
|
// Open a sqlite database.
|
||||||
|
|
@ -52,7 +52,7 @@ func Open(dbProperties *config.DatabaseOptions) (*Database, error) {
|
||||||
if d.db, err = sqlutil.Open(dbProperties); err != nil {
|
if d.db, err = sqlutil.Open(dbProperties); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
d.writer = sqlutil.NewTransactionWriter()
|
d.writer = sqlutil.NewExclusiveWriter()
|
||||||
//d.db.Exec("PRAGMA journal_mode=WAL;")
|
//d.db.Exec("PRAGMA journal_mode=WAL;")
|
||||||
//d.db.Exec("PRAGMA read_uncommitted = true;")
|
//d.db.Exec("PRAGMA read_uncommitted = true;")
|
||||||
|
|
||||||
|
|
@ -120,7 +120,7 @@ func Open(dbProperties *config.DatabaseOptions) (*Database, error) {
|
||||||
}
|
}
|
||||||
d.Database = shared.Database{
|
d.Database = shared.Database{
|
||||||
DB: d.db,
|
DB: d.db,
|
||||||
Writer: sqlutil.NewTransactionWriter(),
|
Writer: sqlutil.NewExclusiveWriter(),
|
||||||
EventsTable: d.events,
|
EventsTable: d.events,
|
||||||
EventTypesTable: d.eventTypes,
|
EventTypesTable: d.eventTypes,
|
||||||
EventStateKeysTable: d.eventStateKeys,
|
EventStateKeysTable: d.eventStateKeys,
|
||||||
|
|
|
||||||
|
|
@ -30,7 +30,7 @@ import (
|
||||||
// A Database implements gomatrixserverlib.KeyDatabase and is used to store
|
// A Database implements gomatrixserverlib.KeyDatabase and is used to store
|
||||||
// the public keys for other matrix servers.
|
// the public keys for other matrix servers.
|
||||||
type Database struct {
|
type Database struct {
|
||||||
writer sqlutil.TransactionWriter
|
writer sqlutil.Writer
|
||||||
statements serverKeyStatements
|
statements serverKeyStatements
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -49,7 +49,7 @@ func NewDatabase(
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
d := &Database{
|
d := &Database{
|
||||||
writer: sqlutil.NewTransactionWriter(),
|
writer: sqlutil.NewExclusiveWriter(),
|
||||||
}
|
}
|
||||||
err = d.statements.prepare(db, d.writer)
|
err = d.statements.prepare(db, d.writer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
||||||
|
|
@ -63,12 +63,12 @@ const upsertServerKeysSQL = "" +
|
||||||
|
|
||||||
type serverKeyStatements struct {
|
type serverKeyStatements struct {
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
writer sqlutil.TransactionWriter
|
writer sqlutil.Writer
|
||||||
bulkSelectServerKeysStmt *sql.Stmt
|
bulkSelectServerKeysStmt *sql.Stmt
|
||||||
upsertServerKeysStmt *sql.Stmt
|
upsertServerKeysStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *serverKeyStatements) prepare(db *sql.DB, writer sqlutil.TransactionWriter) (err error) {
|
func (s *serverKeyStatements) prepare(db *sql.DB, writer sqlutil.Writer) (err error) {
|
||||||
s.db = db
|
s.db = db
|
||||||
s.writer = writer
|
s.writer = writer
|
||||||
_, err = db.Exec(serverKeysSchema)
|
_, err = db.Exec(serverKeysSchema)
|
||||||
|
|
|
||||||
|
|
@ -31,7 +31,7 @@ import (
|
||||||
type SyncServerDatasource struct {
|
type SyncServerDatasource struct {
|
||||||
shared.Database
|
shared.Database
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
writer sqlutil.TransactionWriter
|
writer sqlutil.Writer
|
||||||
sqlutil.PartitionOffsetStatements
|
sqlutil.PartitionOffsetStatements
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -42,7 +42,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e
|
||||||
if d.db, err = sqlutil.Open(dbProperties); err != nil {
|
if d.db, err = sqlutil.Open(dbProperties); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
d.writer = sqlutil.NewDummyTransactionWriter()
|
d.writer = sqlutil.NewDummyWriter()
|
||||||
if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "syncapi"); err != nil {
|
if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "syncapi"); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
@ -88,7 +88,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e
|
||||||
BackwardExtremities: backwardExtremities,
|
BackwardExtremities: backwardExtremities,
|
||||||
Filter: filter,
|
Filter: filter,
|
||||||
SendToDevice: sendToDevice,
|
SendToDevice: sendToDevice,
|
||||||
SendToDeviceWriter: sqlutil.NewTransactionWriter(),
|
SendToDeviceWriter: sqlutil.NewExclusiveWriter(),
|
||||||
EDUCache: cache.New(),
|
EDUCache: cache.New(),
|
||||||
}
|
}
|
||||||
return &d, nil
|
return &d, nil
|
||||||
|
|
|
||||||
|
|
@ -45,7 +45,7 @@ type Database struct {
|
||||||
BackwardExtremities tables.BackwardsExtremities
|
BackwardExtremities tables.BackwardsExtremities
|
||||||
SendToDevice tables.SendToDevice
|
SendToDevice tables.SendToDevice
|
||||||
Filter tables.Filter
|
Filter tables.Filter
|
||||||
SendToDeviceWriter sqlutil.TransactionWriter
|
SendToDeviceWriter sqlutil.Writer
|
||||||
EDUCache *cache.EDUCache
|
EDUCache *cache.EDUCache
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -51,14 +51,14 @@ const selectMaxAccountDataIDSQL = "" +
|
||||||
|
|
||||||
type accountDataStatements struct {
|
type accountDataStatements struct {
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
writer sqlutil.TransactionWriter
|
writer sqlutil.Writer
|
||||||
streamIDStatements *streamIDStatements
|
streamIDStatements *streamIDStatements
|
||||||
insertAccountDataStmt *sql.Stmt
|
insertAccountDataStmt *sql.Stmt
|
||||||
selectMaxAccountDataIDStmt *sql.Stmt
|
selectMaxAccountDataIDStmt *sql.Stmt
|
||||||
selectAccountDataInRangeStmt *sql.Stmt
|
selectAccountDataInRangeStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSqliteAccountDataTable(db *sql.DB, writer sqlutil.TransactionWriter, streamID *streamIDStatements) (tables.AccountData, error) {
|
func NewSqliteAccountDataTable(db *sql.DB, writer sqlutil.Writer, streamID *streamIDStatements) (tables.AccountData, error) {
|
||||||
s := &accountDataStatements{
|
s := &accountDataStatements{
|
||||||
db: db,
|
db: db,
|
||||||
writer: writer,
|
writer: writer,
|
||||||
|
|
|
||||||
|
|
@ -49,13 +49,13 @@ const deleteBackwardExtremitySQL = "" +
|
||||||
|
|
||||||
type backwardExtremitiesStatements struct {
|
type backwardExtremitiesStatements struct {
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
writer sqlutil.TransactionWriter
|
writer sqlutil.Writer
|
||||||
insertBackwardExtremityStmt *sql.Stmt
|
insertBackwardExtremityStmt *sql.Stmt
|
||||||
selectBackwardExtremitiesForRoomStmt *sql.Stmt
|
selectBackwardExtremitiesForRoomStmt *sql.Stmt
|
||||||
deleteBackwardExtremityStmt *sql.Stmt
|
deleteBackwardExtremityStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSqliteBackwardsExtremitiesTable(db *sql.DB, writer sqlutil.TransactionWriter) (tables.BackwardsExtremities, error) {
|
func NewSqliteBackwardsExtremitiesTable(db *sql.DB, writer sqlutil.Writer) (tables.BackwardsExtremities, error) {
|
||||||
s := &backwardExtremitiesStatements{
|
s := &backwardExtremitiesStatements{
|
||||||
db: db,
|
db: db,
|
||||||
writer: writer,
|
writer: writer,
|
||||||
|
|
|
||||||
|
|
@ -85,7 +85,7 @@ const selectEventsWithEventIDsSQL = "" +
|
||||||
|
|
||||||
type currentRoomStateStatements struct {
|
type currentRoomStateStatements struct {
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
writer sqlutil.TransactionWriter
|
writer sqlutil.Writer
|
||||||
streamIDStatements *streamIDStatements
|
streamIDStatements *streamIDStatements
|
||||||
upsertRoomStateStmt *sql.Stmt
|
upsertRoomStateStmt *sql.Stmt
|
||||||
deleteRoomStateByEventIDStmt *sql.Stmt
|
deleteRoomStateByEventIDStmt *sql.Stmt
|
||||||
|
|
@ -95,7 +95,7 @@ type currentRoomStateStatements struct {
|
||||||
selectStateEventStmt *sql.Stmt
|
selectStateEventStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSqliteCurrentRoomStateTable(db *sql.DB, writer sqlutil.TransactionWriter, streamID *streamIDStatements) (tables.CurrentRoomState, error) {
|
func NewSqliteCurrentRoomStateTable(db *sql.DB, writer sqlutil.Writer, streamID *streamIDStatements) (tables.CurrentRoomState, error) {
|
||||||
s := ¤tRoomStateStatements{
|
s := ¤tRoomStateStatements{
|
||||||
db: db,
|
db: db,
|
||||||
writer: writer,
|
writer: writer,
|
||||||
|
|
|
||||||
|
|
@ -52,13 +52,13 @@ const insertFilterSQL = "" +
|
||||||
|
|
||||||
type filterStatements struct {
|
type filterStatements struct {
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
writer sqlutil.TransactionWriter
|
writer sqlutil.Writer
|
||||||
selectFilterStmt *sql.Stmt
|
selectFilterStmt *sql.Stmt
|
||||||
selectFilterIDByContentStmt *sql.Stmt
|
selectFilterIDByContentStmt *sql.Stmt
|
||||||
insertFilterStmt *sql.Stmt
|
insertFilterStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSqliteFilterTable(db *sql.DB, writer sqlutil.TransactionWriter) (tables.Filter, error) {
|
func NewSqliteFilterTable(db *sql.DB, writer sqlutil.Writer) (tables.Filter, error) {
|
||||||
_, err := db.Exec(filterSchema)
|
_, err := db.Exec(filterSchema)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
|
||||||
|
|
@ -59,7 +59,7 @@ const selectMaxInviteIDSQL = "" +
|
||||||
|
|
||||||
type inviteEventsStatements struct {
|
type inviteEventsStatements struct {
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
writer sqlutil.TransactionWriter
|
writer sqlutil.Writer
|
||||||
streamIDStatements *streamIDStatements
|
streamIDStatements *streamIDStatements
|
||||||
insertInviteEventStmt *sql.Stmt
|
insertInviteEventStmt *sql.Stmt
|
||||||
selectInviteEventsInRangeStmt *sql.Stmt
|
selectInviteEventsInRangeStmt *sql.Stmt
|
||||||
|
|
@ -67,7 +67,7 @@ type inviteEventsStatements struct {
|
||||||
selectMaxInviteIDStmt *sql.Stmt
|
selectMaxInviteIDStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSqliteInvitesTable(db *sql.DB, writer sqlutil.TransactionWriter, streamID *streamIDStatements) (tables.Invites, error) {
|
func NewSqliteInvitesTable(db *sql.DB, writer sqlutil.Writer, streamID *streamIDStatements) (tables.Invites, error) {
|
||||||
s := &inviteEventsStatements{
|
s := &inviteEventsStatements{
|
||||||
db: db,
|
db: db,
|
||||||
writer: writer,
|
writer: writer,
|
||||||
|
|
|
||||||
|
|
@ -105,7 +105,7 @@ const selectStateInRangeSQL = "" +
|
||||||
|
|
||||||
type outputRoomEventsStatements struct {
|
type outputRoomEventsStatements struct {
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
writer sqlutil.TransactionWriter
|
writer sqlutil.Writer
|
||||||
streamIDStatements *streamIDStatements
|
streamIDStatements *streamIDStatements
|
||||||
insertEventStmt *sql.Stmt
|
insertEventStmt *sql.Stmt
|
||||||
selectEventsStmt *sql.Stmt
|
selectEventsStmt *sql.Stmt
|
||||||
|
|
@ -117,7 +117,7 @@ type outputRoomEventsStatements struct {
|
||||||
updateEventJSONStmt *sql.Stmt
|
updateEventJSONStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSqliteEventsTable(db *sql.DB, writer sqlutil.TransactionWriter, streamID *streamIDStatements) (tables.Events, error) {
|
func NewSqliteEventsTable(db *sql.DB, writer sqlutil.Writer, streamID *streamIDStatements) (tables.Events, error) {
|
||||||
s := &outputRoomEventsStatements{
|
s := &outputRoomEventsStatements{
|
||||||
db: db,
|
db: db,
|
||||||
writer: writer,
|
writer: writer,
|
||||||
|
|
|
||||||
|
|
@ -67,7 +67,7 @@ const selectMaxPositionInTopologySQL = "" +
|
||||||
|
|
||||||
type outputRoomEventsTopologyStatements struct {
|
type outputRoomEventsTopologyStatements struct {
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
writer sqlutil.TransactionWriter
|
writer sqlutil.Writer
|
||||||
insertEventInTopologyStmt *sql.Stmt
|
insertEventInTopologyStmt *sql.Stmt
|
||||||
selectEventIDsInRangeASCStmt *sql.Stmt
|
selectEventIDsInRangeASCStmt *sql.Stmt
|
||||||
selectEventIDsInRangeDESCStmt *sql.Stmt
|
selectEventIDsInRangeDESCStmt *sql.Stmt
|
||||||
|
|
@ -75,7 +75,7 @@ type outputRoomEventsTopologyStatements struct {
|
||||||
selectMaxPositionInTopologyStmt *sql.Stmt
|
selectMaxPositionInTopologyStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSqliteTopologyTable(db *sql.DB, writer sqlutil.TransactionWriter) (tables.Topology, error) {
|
func NewSqliteTopologyTable(db *sql.DB, writer sqlutil.Writer) (tables.Topology, error) {
|
||||||
s := &outputRoomEventsTopologyStatements{
|
s := &outputRoomEventsTopologyStatements{
|
||||||
db: db,
|
db: db,
|
||||||
writer: writer,
|
writer: writer,
|
||||||
|
|
|
||||||
|
|
@ -73,13 +73,13 @@ const deleteSendToDeviceMessagesSQL = `
|
||||||
|
|
||||||
type sendToDeviceStatements struct {
|
type sendToDeviceStatements struct {
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
writer sqlutil.TransactionWriter
|
writer sqlutil.Writer
|
||||||
insertSendToDeviceMessageStmt *sql.Stmt
|
insertSendToDeviceMessageStmt *sql.Stmt
|
||||||
selectSendToDeviceMessagesStmt *sql.Stmt
|
selectSendToDeviceMessagesStmt *sql.Stmt
|
||||||
countSendToDeviceMessagesStmt *sql.Stmt
|
countSendToDeviceMessagesStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSqliteSendToDeviceTable(db *sql.DB, writer sqlutil.TransactionWriter) (tables.SendToDevice, error) {
|
func NewSqliteSendToDeviceTable(db *sql.DB, writer sqlutil.Writer) (tables.SendToDevice, error) {
|
||||||
s := &sendToDeviceStatements{
|
s := &sendToDeviceStatements{
|
||||||
db: db,
|
db: db,
|
||||||
writer: writer,
|
writer: writer,
|
||||||
|
|
|
||||||
|
|
@ -28,12 +28,12 @@ const selectStreamIDStmt = "" +
|
||||||
|
|
||||||
type streamIDStatements struct {
|
type streamIDStatements struct {
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
writer sqlutil.TransactionWriter
|
writer sqlutil.Writer
|
||||||
increaseStreamIDStmt *sql.Stmt
|
increaseStreamIDStmt *sql.Stmt
|
||||||
selectStreamIDStmt *sql.Stmt
|
selectStreamIDStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *streamIDStatements) prepare(db *sql.DB, writer sqlutil.TransactionWriter) (err error) {
|
func (s *streamIDStatements) prepare(db *sql.DB, writer sqlutil.Writer) (err error) {
|
||||||
s.db = db
|
s.db = db
|
||||||
s.writer = writer
|
s.writer = writer
|
||||||
_, err = db.Exec(streamIDTableSchema)
|
_, err = db.Exec(streamIDTableSchema)
|
||||||
|
|
|
||||||
|
|
@ -32,7 +32,7 @@ import (
|
||||||
type SyncServerDatasource struct {
|
type SyncServerDatasource struct {
|
||||||
shared.Database
|
shared.Database
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
writer sqlutil.TransactionWriter
|
writer sqlutil.Writer
|
||||||
sqlutil.PartitionOffsetStatements
|
sqlutil.PartitionOffsetStatements
|
||||||
streamID streamIDStatements
|
streamID streamIDStatements
|
||||||
}
|
}
|
||||||
|
|
@ -45,7 +45,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e
|
||||||
if d.db, err = sqlutil.Open(dbProperties); err != nil {
|
if d.db, err = sqlutil.Open(dbProperties); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
d.writer = sqlutil.NewTransactionWriter()
|
d.writer = sqlutil.NewExclusiveWriter()
|
||||||
if err = d.prepare(); err != nil {
|
if err = d.prepare(); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
@ -101,7 +101,7 @@ func (d *SyncServerDatasource) prepare() (err error) {
|
||||||
Topology: topology,
|
Topology: topology,
|
||||||
Filter: filter,
|
Filter: filter,
|
||||||
SendToDevice: sendToDevice,
|
SendToDevice: sendToDevice,
|
||||||
SendToDeviceWriter: sqlutil.NewTransactionWriter(),
|
SendToDeviceWriter: sqlutil.NewExclusiveWriter(),
|
||||||
EDUCache: cache.New(),
|
EDUCache: cache.New(),
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|
|
||||||
|
|
@ -35,7 +35,7 @@ import (
|
||||||
// Database represents an account database
|
// Database represents an account database
|
||||||
type Database struct {
|
type Database struct {
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
writer sqlutil.TransactionWriter
|
writer sqlutil.Writer
|
||||||
sqlutil.PartitionOffsetStatements
|
sqlutil.PartitionOffsetStatements
|
||||||
accounts accountsStatements
|
accounts accountsStatements
|
||||||
profiles profilesStatements
|
profiles profilesStatements
|
||||||
|
|
@ -53,7 +53,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions, serverName gomatrixserver
|
||||||
d := &Database{
|
d := &Database{
|
||||||
serverName: serverName,
|
serverName: serverName,
|
||||||
db: db,
|
db: db,
|
||||||
writer: sqlutil.NewDummyTransactionWriter(),
|
writer: sqlutil.NewDummyWriter(),
|
||||||
}
|
}
|
||||||
if err = d.PartitionOffsetStatements.Prepare(db, d.writer, "account"); err != nil {
|
if err = d.PartitionOffsetStatements.Prepare(db, d.writer, "account"); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
|
||||||
|
|
@ -51,13 +51,13 @@ const selectAccountDataByTypeSQL = "" +
|
||||||
|
|
||||||
type accountDataStatements struct {
|
type accountDataStatements struct {
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
writer sqlutil.TransactionWriter
|
writer sqlutil.Writer
|
||||||
insertAccountDataStmt *sql.Stmt
|
insertAccountDataStmt *sql.Stmt
|
||||||
selectAccountDataStmt *sql.Stmt
|
selectAccountDataStmt *sql.Stmt
|
||||||
selectAccountDataByTypeStmt *sql.Stmt
|
selectAccountDataByTypeStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *accountDataStatements) prepare(db *sql.DB, writer sqlutil.TransactionWriter) (err error) {
|
func (s *accountDataStatements) prepare(db *sql.DB, writer sqlutil.Writer) (err error) {
|
||||||
s.db = db
|
s.db = db
|
||||||
s.writer = writer
|
s.writer = writer
|
||||||
_, err = db.Exec(accountDataSchema)
|
_, err = db.Exec(accountDataSchema)
|
||||||
|
|
|
||||||
|
|
@ -59,7 +59,7 @@ const selectNewNumericLocalpartSQL = "" +
|
||||||
|
|
||||||
type accountsStatements struct {
|
type accountsStatements struct {
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
writer sqlutil.TransactionWriter
|
writer sqlutil.Writer
|
||||||
insertAccountStmt *sql.Stmt
|
insertAccountStmt *sql.Stmt
|
||||||
selectAccountByLocalpartStmt *sql.Stmt
|
selectAccountByLocalpartStmt *sql.Stmt
|
||||||
selectPasswordHashStmt *sql.Stmt
|
selectPasswordHashStmt *sql.Stmt
|
||||||
|
|
@ -67,7 +67,7 @@ type accountsStatements struct {
|
||||||
serverName gomatrixserverlib.ServerName
|
serverName gomatrixserverlib.ServerName
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *accountsStatements) prepare(db *sql.DB, writer sqlutil.TransactionWriter, server gomatrixserverlib.ServerName) (err error) {
|
func (s *accountsStatements) prepare(db *sql.DB, writer sqlutil.Writer, server gomatrixserverlib.ServerName) (err error) {
|
||||||
s.db = db
|
s.db = db
|
||||||
s.writer = writer
|
s.writer = writer
|
||||||
_, err = db.Exec(accountsSchema)
|
_, err = db.Exec(accountsSchema)
|
||||||
|
|
|
||||||
|
|
@ -53,7 +53,7 @@ const selectProfilesBySearchSQL = "" +
|
||||||
|
|
||||||
type profilesStatements struct {
|
type profilesStatements struct {
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
writer sqlutil.TransactionWriter
|
writer sqlutil.Writer
|
||||||
insertProfileStmt *sql.Stmt
|
insertProfileStmt *sql.Stmt
|
||||||
selectProfileByLocalpartStmt *sql.Stmt
|
selectProfileByLocalpartStmt *sql.Stmt
|
||||||
setAvatarURLStmt *sql.Stmt
|
setAvatarURLStmt *sql.Stmt
|
||||||
|
|
@ -61,7 +61,7 @@ type profilesStatements struct {
|
||||||
selectProfilesBySearchStmt *sql.Stmt
|
selectProfilesBySearchStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *profilesStatements) prepare(db *sql.DB, writer sqlutil.TransactionWriter) (err error) {
|
func (s *profilesStatements) prepare(db *sql.DB, writer sqlutil.Writer) (err error) {
|
||||||
s.db = db
|
s.db = db
|
||||||
s.writer = writer
|
s.writer = writer
|
||||||
_, err = db.Exec(profilesSchema)
|
_, err = db.Exec(profilesSchema)
|
||||||
|
|
|
||||||
|
|
@ -34,7 +34,7 @@ import (
|
||||||
// Database represents an account database
|
// Database represents an account database
|
||||||
type Database struct {
|
type Database struct {
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
writer sqlutil.TransactionWriter
|
writer sqlutil.Writer
|
||||||
|
|
||||||
sqlutil.PartitionOffsetStatements
|
sqlutil.PartitionOffsetStatements
|
||||||
accounts accountsStatements
|
accounts accountsStatements
|
||||||
|
|
@ -58,7 +58,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions, serverName gomatrixserver
|
||||||
d := &Database{
|
d := &Database{
|
||||||
serverName: serverName,
|
serverName: serverName,
|
||||||
db: db,
|
db: db,
|
||||||
writer: sqlutil.NewTransactionWriter(),
|
writer: sqlutil.NewExclusiveWriter(),
|
||||||
}
|
}
|
||||||
partitions := sqlutil.PartitionOffsetStatements{}
|
partitions := sqlutil.PartitionOffsetStatements{}
|
||||||
if err = partitions.Prepare(db, d.writer, "account"); err != nil {
|
if err = partitions.Prepare(db, d.writer, "account"); err != nil {
|
||||||
|
|
|
||||||
|
|
@ -54,14 +54,14 @@ const deleteThreePIDSQL = "" +
|
||||||
|
|
||||||
type threepidStatements struct {
|
type threepidStatements struct {
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
writer sqlutil.TransactionWriter
|
writer sqlutil.Writer
|
||||||
selectLocalpartForThreePIDStmt *sql.Stmt
|
selectLocalpartForThreePIDStmt *sql.Stmt
|
||||||
selectThreePIDsForLocalpartStmt *sql.Stmt
|
selectThreePIDsForLocalpartStmt *sql.Stmt
|
||||||
insertThreePIDStmt *sql.Stmt
|
insertThreePIDStmt *sql.Stmt
|
||||||
deleteThreePIDStmt *sql.Stmt
|
deleteThreePIDStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *threepidStatements) prepare(db *sql.DB, writer sqlutil.TransactionWriter) (err error) {
|
func (s *threepidStatements) prepare(db *sql.DB, writer sqlutil.Writer) (err error) {
|
||||||
s.db = db
|
s.db = db
|
||||||
s.writer = writer
|
s.writer = writer
|
||||||
_, err = db.Exec(threepidSchema)
|
_, err = db.Exec(threepidSchema)
|
||||||
|
|
|
||||||
|
|
@ -78,7 +78,7 @@ const selectDevicesByIDSQL = "" +
|
||||||
|
|
||||||
type devicesStatements struct {
|
type devicesStatements struct {
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
writer sqlutil.TransactionWriter
|
writer sqlutil.Writer
|
||||||
insertDeviceStmt *sql.Stmt
|
insertDeviceStmt *sql.Stmt
|
||||||
selectDevicesCountStmt *sql.Stmt
|
selectDevicesCountStmt *sql.Stmt
|
||||||
selectDeviceByTokenStmt *sql.Stmt
|
selectDeviceByTokenStmt *sql.Stmt
|
||||||
|
|
@ -91,7 +91,7 @@ type devicesStatements struct {
|
||||||
serverName gomatrixserverlib.ServerName
|
serverName gomatrixserverlib.ServerName
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *devicesStatements) prepare(db *sql.DB, writer sqlutil.TransactionWriter, server gomatrixserverlib.ServerName) (err error) {
|
func (s *devicesStatements) prepare(db *sql.DB, writer sqlutil.Writer, server gomatrixserverlib.ServerName) (err error) {
|
||||||
s.db = db
|
s.db = db
|
||||||
s.writer = writer
|
s.writer = writer
|
||||||
_, err = db.Exec(devicesSchema)
|
_, err = db.Exec(devicesSchema)
|
||||||
|
|
|
||||||
|
|
@ -34,7 +34,7 @@ var deviceIDByteLength = 6
|
||||||
// Database represents a device database.
|
// Database represents a device database.
|
||||||
type Database struct {
|
type Database struct {
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
writer sqlutil.TransactionWriter
|
writer sqlutil.Writer
|
||||||
devices devicesStatements
|
devices devicesStatements
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -44,7 +44,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions, serverName gomatrixserver
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
writer := sqlutil.NewTransactionWriter()
|
writer := sqlutil.NewExclusiveWriter()
|
||||||
d := devicesStatements{}
|
d := devicesStatements{}
|
||||||
if err = d.prepare(db, writer, serverName); err != nil {
|
if err = d.prepare(db, writer, serverName); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue