mirror of
https://github.com/matrix-org/dendrite.git
synced 2024-11-30 02:01:56 -06:00
Fix lock errors in federation sender (#1347)
* Fix lock errors in federation sender * Additional fix to writers
This commit is contained in:
parent
421b6b2313
commit
7466e6b718
|
@ -21,7 +21,6 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -33,7 +32,7 @@ func (d *Database) AssociateEDUWithDestination(
|
||||||
serverName gomatrixserverlib.ServerName,
|
serverName gomatrixserverlib.ServerName,
|
||||||
receipt *Receipt,
|
receipt *Receipt,
|
||||||
) error {
|
) error {
|
||||||
return sqlutil.WithTransaction(d.DB, func(txn *sql.Tx) error {
|
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||||
for _, nid := range receipt.nids {
|
for _, nid := range receipt.nids {
|
||||||
if err := d.FederationSenderQueueEDUs.InsertQueueEDU(
|
if err := d.FederationSenderQueueEDUs.InsertQueueEDU(
|
||||||
ctx, // context
|
ctx, // context
|
||||||
|
@ -60,7 +59,7 @@ func (d *Database) GetNextTransactionEDUs(
|
||||||
receipt *Receipt,
|
receipt *Receipt,
|
||||||
err error,
|
err error,
|
||||||
) {
|
) {
|
||||||
err = sqlutil.WithTransaction(d.DB, func(txn *sql.Tx) error {
|
err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||||
nids, err := d.FederationSenderQueueEDUs.SelectQueueEDUs(ctx, txn, serverName, limit)
|
nids, err := d.FederationSenderQueueEDUs.SelectQueueEDUs(ctx, txn, serverName, limit)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("SelectQueueEDUs: %w", err)
|
return fmt.Errorf("SelectQueueEDUs: %w", err)
|
||||||
|
@ -99,7 +98,7 @@ func (d *Database) CleanEDUs(
|
||||||
return errors.New("expected receipt")
|
return errors.New("expected receipt")
|
||||||
}
|
}
|
||||||
|
|
||||||
return sqlutil.WithTransaction(d.DB, func(txn *sql.Tx) error {
|
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||||
if err := d.FederationSenderQueueEDUs.DeleteQueueEDUs(ctx, txn, serverName, receipt.nids); err != nil {
|
if err := d.FederationSenderQueueEDUs.DeleteQueueEDUs(ctx, txn, serverName, receipt.nids); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,7 +34,7 @@ func (d *Database) AssociatePDUWithDestination(
|
||||||
serverName gomatrixserverlib.ServerName,
|
serverName gomatrixserverlib.ServerName,
|
||||||
receipt *Receipt,
|
receipt *Receipt,
|
||||||
) error {
|
) error {
|
||||||
return sqlutil.WithTransaction(d.DB, func(txn *sql.Tx) error {
|
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||||
for _, nid := range receipt.nids {
|
for _, nid := range receipt.nids {
|
||||||
if err := d.FederationSenderQueuePDUs.InsertQueuePDU(
|
if err := d.FederationSenderQueuePDUs.InsertQueuePDU(
|
||||||
ctx, // context
|
ctx, // context
|
||||||
|
@ -111,7 +111,7 @@ func (d *Database) CleanPDUs(
|
||||||
return errors.New("expected receipt")
|
return errors.New("expected receipt")
|
||||||
}
|
}
|
||||||
|
|
||||||
return sqlutil.WithTransaction(d.DB, func(txn *sql.Tx) error {
|
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||||
if err := d.FederationSenderQueuePDUs.DeleteQueuePDUs(ctx, txn, serverName, receipt.nids); err != nil {
|
if err := d.FederationSenderQueuePDUs.DeleteQueuePDUs(ctx, txn, serverName, receipt.nids); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue