mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-01-19 02:14:28 -06:00
46902e5766
Given that #2714 wasn't merged but we are now at a minimum supported Go version of 1.20 (soon to be 1.21), I wanted to carry over some of the changes. Namely: - Fix the log typo - Simplify build constraints for unix - Use stdlib atomic package ### Pull Request Checklist <!-- Please read https://matrix-org.github.io/dendrite/development/contributing before submitting your pull request --> * [x] I have added Go unit tests or [Complement integration tests](https://github.com/matrix-org/complement) for this PR _or_ I have justified why this PR doesn't need tests * [x] Pull request includes a [sign off below using a legally identifiable name](https://matrix-org.github.io/dendrite/development/contributing#sign-off) _or_ I have already signed off privately Signed-off-by: `0x1a8510f2 <admin@0x1a8510f2.space>` --------- Co-authored-by: devonh <devon.dmytro@gmail.com>
77 lines
2 KiB
Go
77 lines
2 KiB
Go
package sqlutil
|
|
|
|
import (
|
|
"database/sql"
|
|
"errors"
|
|
"sync/atomic"
|
|
)
|
|
|
|
// ExclusiveWriter implements sqlutil.Writer.
|
|
// ExclusiveWriter allows queuing database writes so that you don't
|
|
// contend on database locks in, e.g. SQLite. Only one task will run
|
|
// at a time on a given ExclusiveWriter.
|
|
type ExclusiveWriter struct {
|
|
running atomic.Bool
|
|
todo chan transactionWriterTask
|
|
}
|
|
|
|
func NewExclusiveWriter() Writer {
|
|
return &ExclusiveWriter{
|
|
todo: make(chan transactionWriterTask),
|
|
}
|
|
}
|
|
|
|
// transactionWriterTask represents a specific task.
|
|
type transactionWriterTask struct {
|
|
db *sql.DB
|
|
txn *sql.Tx
|
|
f func(txn *sql.Tx) error
|
|
wait chan error
|
|
}
|
|
|
|
// Do queues a task to be run by a TransactionWriter. The function
|
|
// provided will be ran within a transaction as supplied by the
|
|
// txn parameter if one is supplied, and if not, will take out a
|
|
// new transaction from the database supplied in the database
|
|
// parameter. Either way, this will block until the task is done.
|
|
func (w *ExclusiveWriter) Do(db *sql.DB, txn *sql.Tx, f func(txn *sql.Tx) error) error {
|
|
if w.todo == nil {
|
|
return errors.New("not initialised")
|
|
}
|
|
if !w.running.Load() {
|
|
go w.run()
|
|
}
|
|
task := transactionWriterTask{
|
|
db: db,
|
|
txn: txn,
|
|
f: f,
|
|
wait: make(chan error, 1),
|
|
}
|
|
w.todo <- task
|
|
return <-task.wait
|
|
}
|
|
|
|
// run processes the tasks for a given transaction writer. Only one
|
|
// of these goroutines will run at a time. A transaction will be
|
|
// opened using the database object from the task and then this will
|
|
// be passed as a parameter to the task function.
|
|
func (w *ExclusiveWriter) run() {
|
|
if !w.running.CompareAndSwap(false, true) {
|
|
return
|
|
}
|
|
|
|
defer w.running.Store(false)
|
|
for task := range w.todo {
|
|
if task.db != nil && task.txn != nil {
|
|
task.wait <- task.f(task.txn)
|
|
} else if task.db != nil && task.txn == nil {
|
|
task.wait <- WithTransaction(task.db, func(txn *sql.Tx) error {
|
|
return task.f(txn)
|
|
})
|
|
} else {
|
|
task.wait <- task.f(nil)
|
|
}
|
|
close(task.wait)
|
|
}
|
|
}
|