diff --git a/cmd/dendritejs/main.go b/cmd/dendritejs/main.go index b8517bf9d..add130aa8 100644 --- a/cmd/dendritejs/main.go +++ b/cmd/dendritejs/main.go @@ -209,7 +209,7 @@ func main() { p2pPublicRoomProvider := NewLibP2PPublicRoomsProvider(node) rsAPI := roomserver.SetupRoomServerComponent(base, keyRing, federation) - eduInputAPI := eduserver.SetupEDUServerComponent(base, cache.New()) + eduInputAPI := eduserver.SetupEDUServerComponent(base, cache.New(), deviceDB) asQuery := appservice.SetupAppServiceAPIComponent( base, accountDB, deviceDB, federation, rsAPI, transactions.New(), ) diff --git a/internal/sql.go b/internal/sql.go index c5da03572..4abab4d35 100644 --- a/internal/sql.go +++ b/internal/sql.go @@ -123,14 +123,14 @@ type TransactionWriter struct { // transactionWriterTask represents a specific task. type transactionWriterTask struct { db *sql.DB - f func(txn *sql.Tx) - wait chan struct{} + f func(txn *sql.Tx) error + wait chan error } // Do queues a task to be run by a TransactionWriter. The function // provided will be ran within a transaction as supplied by the // database parameter. This will block until the task is finished. -func (w *TransactionWriter) Do(db *sql.DB, f func(txn *sql.Tx)) { +func (w *TransactionWriter) Do(db *sql.DB, f func(txn *sql.Tx) error) error { if w.todo == nil { w.todo = make(chan transactionWriterTask) } @@ -140,10 +140,10 @@ func (w *TransactionWriter) Do(db *sql.DB, f func(txn *sql.Tx)) { task := transactionWriterTask{ db: db, f: f, - wait: make(chan struct{}), + wait: make(chan error, 1), } w.todo <- task - <-task.wait + return <-task.wait } // run processes the tasks for a given transaction writer. Only one @@ -156,9 +156,8 @@ func (w *TransactionWriter) run() { } defer w.running.Store(false) for task := range w.todo { - _ = WithTransaction(task.db, func(txn *sql.Tx) error { - task.f(txn) - return nil + task.wait <- WithTransaction(task.db, func(txn *sql.Tx) error { + return task.f(txn) }) close(task.wait) } diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index 841f2d536..86747f874 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -1078,8 +1078,8 @@ func (d *Database) StoreNewSendForDeviceMessage( } // Delegate the database write task to the SendToDeviceWriter. It'll guarantee // that we don't lock the table for writes in more than one place. - d.SendToDeviceWriter.Do(d.DB, func(txn *sql.Tx) { - err = d.AddSendToDeviceEvent( + err = d.SendToDeviceWriter.Do(d.DB, func(txn *sql.Tx) error { + return d.AddSendToDeviceEvent( ctx, txn, userID, deviceID, string(j), ) }) @@ -1143,18 +1143,18 @@ func (d *Database) CleanSendToDeviceUpdates( // If we need to write to the database then we'll ask the SendToDeviceWriter to // do that for us. It'll guarantee that we don't lock the table for writes in // more than one place. - d.SendToDeviceWriter.Do(d.DB, func(txn *sql.Tx) { + err = d.SendToDeviceWriter.Do(d.DB, func(txn *sql.Tx) error { // Delete any send-to-device messages marked for deletion. if e := d.SendToDevice.DeleteSendToDeviceMessages(ctx, txn, toDelete); e != nil { - err = fmt.Errorf("d.SendToDevice.DeleteSendToDeviceMessages: %w", e) - return + return fmt.Errorf("d.SendToDevice.DeleteSendToDeviceMessages: %w", e) } // Now update any outstanding send-to-device messages with the new sync token. if e := d.SendToDevice.UpdateSentSendToDeviceMessages(ctx, txn, token.String(), toUpdate); e != nil { - err = fmt.Errorf("d.SendToDevice.UpdateSentSendToDeviceMessages: %w", err) - return + return fmt.Errorf("d.SendToDevice.UpdateSentSendToDeviceMessages: %w", err) } + + return nil }) return }