mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-22 06:13:10 -06:00
Fix redaction deadlocks
This commit is contained in:
parent
352a4a896c
commit
2e65e0d160
|
|
@ -18,7 +18,7 @@ type roomRecentEventsUpdater struct {
|
||||||
currentStateSnapshotNID types.StateSnapshotNID
|
currentStateSnapshotNID types.StateSnapshotNID
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewRoomRecentEventsUpdater(d *Database, ctx context.Context, roomNID types.RoomNID) (types.RoomRecentEventsUpdater, func() error, error) {
|
func NewRoomRecentEventsUpdater(d *Database, ctx context.Context, roomNID types.RoomNID, useTxns bool) (types.RoomRecentEventsUpdater, func() error, error) {
|
||||||
eventNIDs, lastEventNIDSent, currentStateSnapshotNID, err :=
|
eventNIDs, lastEventNIDSent, currentStateSnapshotNID, err :=
|
||||||
d.RoomsTable.SelectLatestEventsNIDsForUpdate(ctx, nil, roomNID)
|
d.RoomsTable.SelectLatestEventsNIDsForUpdate(ctx, nil, roomNID)
|
||||||
if err != nil && err != sql.ErrNoRows {
|
if err != nil && err != sql.ErrNoRows {
|
||||||
|
|
@ -40,13 +40,13 @@ func NewRoomRecentEventsUpdater(d *Database, ctx context.Context, roomNID types.
|
||||||
}
|
}
|
||||||
var txn *sql.Tx
|
var txn *sql.Tx
|
||||||
cancel := func() error { return nil }
|
cancel := func() error { return nil }
|
||||||
/*
|
if useTxns {
|
||||||
txn, err := d.DB.Begin()
|
txn, err := d.DB.Begin()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, fmt.Errorf("d.DB.Begin: %w", err)
|
return nil, nil, fmt.Errorf("d.DB.Begin: %w", err)
|
||||||
}
|
}
|
||||||
cancel = func() error { return txn.Commit() }
|
cancel = func() error { return txn.Commit() }
|
||||||
*/
|
}
|
||||||
return &roomRecentEventsUpdater{
|
return &roomRecentEventsUpdater{
|
||||||
transaction{ctx, txn}, d, roomNID, stateAndRefs, lastEventIDSent, currentStateSnapshotNID,
|
transaction{ctx, txn}, d, roomNID, stateAndRefs, lastEventIDSent, currentStateSnapshotNID,
|
||||||
}, cancel, nil
|
}, cancel, nil
|
||||||
|
|
|
||||||
|
|
@ -353,7 +353,7 @@ func (d *Database) MembershipUpdater(
|
||||||
func (d *Database) GetLatestEventsForUpdate(
|
func (d *Database) GetLatestEventsForUpdate(
|
||||||
ctx context.Context, roomNID types.RoomNID,
|
ctx context.Context, roomNID types.RoomNID,
|
||||||
) (types.RoomRecentEventsUpdater, func() error, error) {
|
) (types.RoomRecentEventsUpdater, func() error, error) {
|
||||||
return NewRoomRecentEventsUpdater(d, ctx, roomNID)
|
return NewRoomRecentEventsUpdater(d, ctx, roomNID, true)
|
||||||
}
|
}
|
||||||
|
|
||||||
// nolint:gocyclo
|
// nolint:gocyclo
|
||||||
|
|
@ -570,12 +570,10 @@ func (d *Database) handleRedactions(
|
||||||
return nil, "", nil
|
return nil, "", nil
|
||||||
}
|
}
|
||||||
|
|
||||||
err = d.Writer.Do(d.DB, txn, func(txn *sql.Tx) error {
|
err = d.RedactionsTable.InsertRedaction(ctx, txn, tables.RedactionInfo{
|
||||||
return d.RedactionsTable.InsertRedaction(ctx, txn, tables.RedactionInfo{
|
Validated: false,
|
||||||
Validated: false,
|
RedactionEventID: event.EventID(),
|
||||||
RedactionEventID: event.EventID(),
|
RedactsEventID: event.Redacts(),
|
||||||
RedactsEventID: event.Redacts(),
|
|
||||||
})
|
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, "", err
|
return nil, "", err
|
||||||
|
|
@ -604,9 +602,7 @@ func (d *Database) handleRedactions(
|
||||||
redactedEvent.Event = redactedEvent.Redact()
|
redactedEvent.Event = redactedEvent.Redact()
|
||||||
}
|
}
|
||||||
// overwrite the eventJSON table
|
// overwrite the eventJSON table
|
||||||
err = d.Writer.Do(d.DB, txn, func(txn *sql.Tx) error {
|
err = d.EventJSONTable.InsertEventJSON(ctx, txn, redactedEvent.EventNID, redactedEvent.JSON())
|
||||||
return d.EventJSONTable.InsertEventJSON(ctx, txn, redactedEvent.EventNID, redactedEvent.JSON())
|
|
||||||
})
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, "", err
|
return nil, "", err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -16,12 +16,14 @@
|
||||||
package sqlite3
|
package sqlite3
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/internal/config"
|
"github.com/matrix-org/dendrite/internal/config"
|
||||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
"github.com/matrix-org/dendrite/roomserver/storage/shared"
|
"github.com/matrix-org/dendrite/roomserver/storage/shared"
|
||||||
"github.com/matrix-org/dendrite/roomserver/storage/tables"
|
"github.com/matrix-org/dendrite/roomserver/storage/tables"
|
||||||
|
"github.com/matrix-org/dendrite/roomserver/types"
|
||||||
_ "github.com/mattn/go-sqlite3"
|
_ "github.com/mattn/go-sqlite3"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -133,3 +135,9 @@ func Open(dbProperties *config.DatabaseOptions) (*Database, error) {
|
||||||
}
|
}
|
||||||
return &d, nil
|
return &d, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (d *Database) GetLatestEventsForUpdate(
|
||||||
|
ctx context.Context, roomNID types.RoomNID,
|
||||||
|
) (types.RoomRecentEventsUpdater, func() error, error) {
|
||||||
|
return shared.NewRoomRecentEventsUpdater(d, ctx, roomNID, false)
|
||||||
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue