Return errors from Do, fix dendritejs

This commit is contained in:
Neil Alexander 2020-06-01 14:46:32 +01:00
parent e323bba3fd
commit 6158a8dcc5
3 changed files with 15 additions and 16 deletions

View file

@ -209,7 +209,7 @@ func main() {
p2pPublicRoomProvider := NewLibP2PPublicRoomsProvider(node)
rsAPI := roomserver.SetupRoomServerComponent(base, keyRing, federation)
eduInputAPI := eduserver.SetupEDUServerComponent(base, cache.New())
eduInputAPI := eduserver.SetupEDUServerComponent(base, cache.New(), deviceDB)
asQuery := appservice.SetupAppServiceAPIComponent(
base, accountDB, deviceDB, federation, rsAPI, transactions.New(),
)

View file

@ -123,14 +123,14 @@ type TransactionWriter struct {
// transactionWriterTask represents a specific task.
type transactionWriterTask struct {
db *sql.DB
f func(txn *sql.Tx)
wait chan struct{}
f func(txn *sql.Tx) error
wait chan error
}
// Do queues a task to be run by a TransactionWriter. The function
// provided will be ran within a transaction as supplied by the
// database parameter. This will block until the task is finished.
func (w *TransactionWriter) Do(db *sql.DB, f func(txn *sql.Tx)) {
func (w *TransactionWriter) Do(db *sql.DB, f func(txn *sql.Tx) error) error {
if w.todo == nil {
w.todo = make(chan transactionWriterTask)
}
@ -140,10 +140,10 @@ func (w *TransactionWriter) Do(db *sql.DB, f func(txn *sql.Tx)) {
task := transactionWriterTask{
db: db,
f: f,
wait: make(chan struct{}),
wait: make(chan error, 1),
}
w.todo <- task
<-task.wait
return <-task.wait
}
// run processes the tasks for a given transaction writer. Only one
@ -156,9 +156,8 @@ func (w *TransactionWriter) run() {
}
defer w.running.Store(false)
for task := range w.todo {
_ = WithTransaction(task.db, func(txn *sql.Tx) error {
task.f(txn)
return nil
task.wait <- WithTransaction(task.db, func(txn *sql.Tx) error {
return task.f(txn)
})
close(task.wait)
}

View file

@ -1078,8 +1078,8 @@ func (d *Database) StoreNewSendForDeviceMessage(
}
// Delegate the database write task to the SendToDeviceWriter. It'll guarantee
// that we don't lock the table for writes in more than one place.
d.SendToDeviceWriter.Do(d.DB, func(txn *sql.Tx) {
err = d.AddSendToDeviceEvent(
err = d.SendToDeviceWriter.Do(d.DB, func(txn *sql.Tx) error {
return d.AddSendToDeviceEvent(
ctx, txn, userID, deviceID, string(j),
)
})
@ -1143,18 +1143,18 @@ func (d *Database) CleanSendToDeviceUpdates(
// If we need to write to the database then we'll ask the SendToDeviceWriter to
// do that for us. It'll guarantee that we don't lock the table for writes in
// more than one place.
d.SendToDeviceWriter.Do(d.DB, func(txn *sql.Tx) {
err = d.SendToDeviceWriter.Do(d.DB, func(txn *sql.Tx) error {
// Delete any send-to-device messages marked for deletion.
if e := d.SendToDevice.DeleteSendToDeviceMessages(ctx, txn, toDelete); e != nil {
err = fmt.Errorf("d.SendToDevice.DeleteSendToDeviceMessages: %w", e)
return
return fmt.Errorf("d.SendToDevice.DeleteSendToDeviceMessages: %w", e)
}
// Now update any outstanding send-to-device messages with the new sync token.
if e := d.SendToDevice.UpdateSentSendToDeviceMessages(ctx, txn, token.String(), toUpdate); e != nil {
err = fmt.Errorf("d.SendToDevice.UpdateSentSendToDeviceMessages: %w", err)
return
return fmt.Errorf("d.SendToDevice.UpdateSentSendToDeviceMessages: %w", err)
}
return nil
})
return
}