mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-20 05:13:11 -06:00
Constructor for TransactionWriter
This commit is contained in:
parent
44b25a7f9e
commit
9f834726cd
|
|
@ -18,6 +18,7 @@ package internal
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"database/sql"
|
"database/sql"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"runtime"
|
"runtime"
|
||||||
"time"
|
"time"
|
||||||
|
|
@ -120,6 +121,12 @@ type TransactionWriter struct {
|
||||||
todo chan transactionWriterTask
|
todo chan transactionWriterTask
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func NewTransactionWriter() *TransactionWriter {
|
||||||
|
return &TransactionWriter{
|
||||||
|
todo: make(chan transactionWriterTask),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// transactionWriterTask represents a specific task.
|
// transactionWriterTask represents a specific task.
|
||||||
type transactionWriterTask struct {
|
type transactionWriterTask struct {
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
|
|
@ -132,7 +139,7 @@ type transactionWriterTask struct {
|
||||||
// database parameter. This will block until the task is finished.
|
// database parameter. This will block until the task is finished.
|
||||||
func (w *TransactionWriter) Do(db *sql.DB, f func(txn *sql.Tx) error) error {
|
func (w *TransactionWriter) Do(db *sql.DB, f func(txn *sql.Tx) error) error {
|
||||||
if w.todo == nil {
|
if w.todo == nil {
|
||||||
w.todo = make(chan transactionWriterTask)
|
return errors.New("not initialised")
|
||||||
}
|
}
|
||||||
if !w.running.Load() {
|
if !w.running.Load() {
|
||||||
go w.run()
|
go w.run()
|
||||||
|
|
|
||||||
|
|
@ -82,6 +82,7 @@ func NewDatabase(dbDataSourceName string, dbProperties internal.DbProperties) (*
|
||||||
CurrentRoomState: currState,
|
CurrentRoomState: currState,
|
||||||
BackwardExtremities: backwardExtremities,
|
BackwardExtremities: backwardExtremities,
|
||||||
SendToDevice: sendToDevice,
|
SendToDevice: sendToDevice,
|
||||||
|
SendToDeviceWriter: internal.NewTransactionWriter(),
|
||||||
EDUCache: cache.New(),
|
EDUCache: cache.New(),
|
||||||
}
|
}
|
||||||
return &d, nil
|
return &d, nil
|
||||||
|
|
|
||||||
|
|
@ -42,7 +42,7 @@ type Database struct {
|
||||||
CurrentRoomState tables.CurrentRoomState
|
CurrentRoomState tables.CurrentRoomState
|
||||||
BackwardExtremities tables.BackwardsExtremities
|
BackwardExtremities tables.BackwardsExtremities
|
||||||
SendToDevice tables.SendToDevice
|
SendToDevice tables.SendToDevice
|
||||||
SendToDeviceWriter internal.TransactionWriter
|
SendToDeviceWriter *internal.TransactionWriter
|
||||||
EDUCache *cache.EDUCache
|
EDUCache *cache.EDUCache
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -108,6 +108,7 @@ func (d *SyncServerDatasource) prepare() (err error) {
|
||||||
CurrentRoomState: roomState,
|
CurrentRoomState: roomState,
|
||||||
Topology: topology,
|
Topology: topology,
|
||||||
SendToDevice: sendToDevice,
|
SendToDevice: sendToDevice,
|
||||||
|
SendToDeviceWriter: internal.NewTransactionWriter(),
|
||||||
EDUCache: cache.New(),
|
EDUCache: cache.New(),
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue