Use TransactionWriter to reduce database lock issues on SQLite (#1192)
This commit is contained in:
parent
99b50f30a0
commit
9cc52f47f3
|
@ -36,6 +36,8 @@ type Database struct {
|
||||||
queueJSONStatements
|
queueJSONStatements
|
||||||
sqlutil.PartitionOffsetStatements
|
sqlutil.PartitionOffsetStatements
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
|
queuePDUsWriter *sqlutil.TransactionWriter
|
||||||
|
queueJSONWriter *sqlutil.TransactionWriter
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewDatabase opens a new database
|
// NewDatabase opens a new database
|
||||||
|
@ -74,6 +76,9 @@ func (d *Database) prepare() error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
d.queuePDUsWriter = sqlutil.NewTransactionWriter()
|
||||||
|
d.queueJSONWriter = sqlutil.NewTransactionWriter()
|
||||||
|
|
||||||
return d.PartitionOffsetStatements.Prepare(d.db, "federationsender")
|
return d.PartitionOffsetStatements.Prepare(d.db, "federationsender")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -145,12 +150,16 @@ func (d *Database) GetJoinedHosts(
|
||||||
// metadata entries.
|
// metadata entries.
|
||||||
func (d *Database) StoreJSON(
|
func (d *Database) StoreJSON(
|
||||||
ctx context.Context, js string,
|
ctx context.Context, js string,
|
||||||
) (int64, error) {
|
) (nid int64, err error) {
|
||||||
nid, err := d.insertQueueJSON(ctx, nil, js)
|
err = d.queueJSONWriter.Do(d.db, func(txn *sql.Tx) error {
|
||||||
if err != nil {
|
n, e := d.insertQueueJSON(ctx, nil, js)
|
||||||
return 0, fmt.Errorf("d.insertQueueJSON: %w", err)
|
if e != nil {
|
||||||
|
return fmt.Errorf("d.insertQueueJSON: %w", e)
|
||||||
}
|
}
|
||||||
return nid, nil
|
nid = n
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// AssociatePDUWithDestination creates an association that the
|
// AssociatePDUWithDestination creates an association that the
|
||||||
|
@ -162,7 +171,7 @@ func (d *Database) AssociatePDUWithDestination(
|
||||||
serverName gomatrixserverlib.ServerName,
|
serverName gomatrixserverlib.ServerName,
|
||||||
nids []int64,
|
nids []int64,
|
||||||
) error {
|
) error {
|
||||||
return sqlutil.WithTransaction(d.db, func(txn *sql.Tx) error {
|
return d.queuePDUsWriter.Do(d.db, func(txn *sql.Tx) error {
|
||||||
for _, nid := range nids {
|
for _, nid := range nids {
|
||||||
if err := d.insertQueuePDU(
|
if err := d.insertQueuePDU(
|
||||||
ctx, // context
|
ctx, // context
|
||||||
|
@ -230,18 +239,18 @@ func (d *Database) CleanTransactionPDUs(
|
||||||
serverName gomatrixserverlib.ServerName,
|
serverName gomatrixserverlib.ServerName,
|
||||||
transactionID gomatrixserverlib.TransactionID,
|
transactionID gomatrixserverlib.TransactionID,
|
||||||
) error {
|
) error {
|
||||||
return sqlutil.WithTransaction(d.db, func(txn *sql.Tx) error {
|
var err error
|
||||||
nids, err := d.selectQueuePDUs(ctx, txn, serverName, transactionID, 50)
|
var nids []int64
|
||||||
|
var deleteNIDs []int64
|
||||||
|
if err = d.queuePDUsWriter.Do(d.db, func(txn *sql.Tx) error {
|
||||||
|
nids, err = d.selectQueuePDUs(ctx, txn, serverName, transactionID, 50)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("d.selectQueuePDUs: %w", err)
|
return fmt.Errorf("d.selectQueuePDUs: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = d.deleteQueueTransaction(ctx, txn, serverName, transactionID); err != nil {
|
if err = d.deleteQueueTransaction(ctx, txn, serverName, transactionID); err != nil {
|
||||||
return fmt.Errorf("d.deleteQueueTransaction: %w", err)
|
return fmt.Errorf("d.deleteQueueTransaction: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var count int64
|
var count int64
|
||||||
var deleteNIDs []int64
|
|
||||||
for _, nid := range nids {
|
for _, nid := range nids {
|
||||||
count, err = d.selectQueueReferenceJSONCount(ctx, txn, nid)
|
count, err = d.selectQueueReferenceJSONCount(ctx, txn, nid)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -251,15 +260,19 @@ func (d *Database) CleanTransactionPDUs(
|
||||||
deleteNIDs = append(deleteNIDs, nid)
|
deleteNIDs = append(deleteNIDs, nid)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return nil
|
||||||
|
}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
err = d.queueJSONWriter.Do(d.db, func(txn *sql.Tx) error {
|
||||||
if len(deleteNIDs) > 0 {
|
if len(deleteNIDs) > 0 {
|
||||||
if err = d.deleteQueueJSON(ctx, txn, deleteNIDs); err != nil {
|
if err = d.deleteQueueJSON(ctx, txn, deleteNIDs); err != nil {
|
||||||
return fmt.Errorf("d.deleteQueueJSON: %w", err)
|
return fmt.Errorf("d.deleteQueueJSON: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetPendingPDUCount returns the number of PDUs waiting to be
|
// GetPendingPDUCount returns the number of PDUs waiting to be
|
||||||
|
|
Loading…
Reference in a new issue