Take input transaction when setting up updaters

This commit is contained in:
Neil Alexander 2020-08-19 11:59:55 +01:00
parent 3d58417555
commit e9e7a76a5a
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
4 changed files with 15 additions and 37 deletions

View file

@ -18,21 +18,9 @@ type membershipUpdater struct {
} }
func NewMembershipUpdater( func NewMembershipUpdater(
ctx context.Context, d *Database, roomID, targetUserID string, ctx context.Context, d *Database, txn *sql.Tx, roomID, targetUserID string,
targetLocal bool, roomVersion gomatrixserverlib.RoomVersion, targetLocal bool, roomVersion gomatrixserverlib.RoomVersion,
useTxns bool,
) (types.MembershipUpdater, error) { ) (types.MembershipUpdater, error) {
txn, err := d.DB.Begin()
if err != nil {
return nil, err
}
succeeded := false
defer func() {
if !succeeded {
txn.Rollback() // nolint: errcheck
}
}()
roomNID, err := d.assignRoomNID(ctx, txn, roomID, roomVersion) roomNID, err := d.assignRoomNID(ctx, txn, roomID, roomVersion)
if err != nil { if err != nil {
return nil, err return nil, err
@ -43,17 +31,7 @@ func NewMembershipUpdater(
return nil, err return nil, err
} }
updater, err := d.membershipUpdaterTxn(ctx, txn, roomNID, targetUserNID, targetLocal) return d.membershipUpdaterTxn(ctx, txn, roomNID, targetUserNID, targetLocal)
if err != nil {
return nil, err
}
succeeded = true
if !useTxns {
txn.Commit() // nolint: errcheck
updater.transaction.txn = nil
}
return updater, nil
} }
func (d *Database) membershipUpdaterTxn( func (d *Database) membershipUpdaterTxn(

View file

@ -17,11 +17,7 @@ type roomRecentEventsUpdater struct {
currentStateSnapshotNID types.StateSnapshotNID currentStateSnapshotNID types.StateSnapshotNID
} }
func NewRoomRecentEventsUpdater(d *Database, ctx context.Context, roomNID types.RoomNID, useTxns bool) (types.RoomRecentEventsUpdater, error) { func NewRoomRecentEventsUpdater(ctx context.Context, d *Database, txn *sql.Tx, roomNID types.RoomNID) (types.RoomRecentEventsUpdater, error) {
txn, err := d.DB.Begin()
if err != nil {
return nil, err
}
eventNIDs, lastEventNIDSent, currentStateSnapshotNID, err := eventNIDs, lastEventNIDSent, currentStateSnapshotNID, err :=
d.RoomsTable.SelectLatestEventsNIDsForUpdate(ctx, txn, roomNID) d.RoomsTable.SelectLatestEventsNIDsForUpdate(ctx, txn, roomNID)
if err != nil { if err != nil {
@ -41,10 +37,6 @@ func NewRoomRecentEventsUpdater(d *Database, ctx context.Context, roomNID types.
return nil, err return nil, err
} }
} }
if !useTxns {
txn.Commit() // nolint: errcheck
txn = nil
}
return &roomRecentEventsUpdater{ return &roomRecentEventsUpdater{
transaction{ctx, txn}, d, roomNID, stateAndRefs, lastEventIDSent, currentStateSnapshotNID, transaction{ctx, txn}, d, roomNID, stateAndRefs, lastEventIDSent, currentStateSnapshotNID,
}, nil }, nil

View file

@ -333,13 +333,21 @@ func (d *Database) MembershipUpdater(
ctx context.Context, roomID, targetUserID string, ctx context.Context, roomID, targetUserID string,
targetLocal bool, roomVersion gomatrixserverlib.RoomVersion, targetLocal bool, roomVersion gomatrixserverlib.RoomVersion,
) (types.MembershipUpdater, error) { ) (types.MembershipUpdater, error) {
return NewMembershipUpdater(ctx, d, roomID, targetUserID, targetLocal, roomVersion, true) txn, err := d.DB.Begin()
if err != nil {
return nil, err
}
return NewMembershipUpdater(ctx, d, txn, roomID, targetUserID, targetLocal, roomVersion)
} }
func (d *Database) GetLatestEventsForUpdate( func (d *Database) GetLatestEventsForUpdate(
ctx context.Context, roomNID types.RoomNID, ctx context.Context, roomNID types.RoomNID,
) (types.RoomRecentEventsUpdater, error) { ) (types.RoomRecentEventsUpdater, error) {
return NewRoomRecentEventsUpdater(d, ctx, roomNID, true) txn, err := d.DB.Begin()
if err != nil {
return nil, err
}
return NewRoomRecentEventsUpdater(ctx, d, txn, roomNID)
} }
func (d *Database) StoreEvent( func (d *Database) StoreEvent(

View file

@ -146,7 +146,7 @@ func (d *Database) GetLatestEventsForUpdate(
// 'database is locked' errors. As sqlite doesn't support multi-process on the // 'database is locked' errors. As sqlite doesn't support multi-process on the
// same DB anyway, and we only execute updates sequentially, the only worries // same DB anyway, and we only execute updates sequentially, the only worries
// are for rolling back when things go wrong. (atomicity) // are for rolling back when things go wrong. (atomicity)
return shared.NewRoomRecentEventsUpdater(&d.Database, ctx, roomNID, false) return shared.NewRoomRecentEventsUpdater(ctx, &d.Database, nil, roomNID)
} }
func (d *Database) MembershipUpdater( func (d *Database) MembershipUpdater(
@ -159,5 +159,5 @@ func (d *Database) MembershipUpdater(
// 'database is locked' errors. As sqlite doesn't support multi-process on the // 'database is locked' errors. As sqlite doesn't support multi-process on the
// same DB anyway, and we only execute updates sequentially, the only worries // same DB anyway, and we only execute updates sequentially, the only worries
// are for rolling back when things go wrong. (atomicity) // are for rolling back when things go wrong. (atomicity)
return shared.NewMembershipUpdater(ctx, &d.Database, roomID, targetUserID, targetLocal, roomVersion, false) return shared.NewMembershipUpdater(ctx, &d.Database, nil, roomID, targetUserID, targetLocal, roomVersion)
} }