Refactor TransactionWriter in federation sender

This commit is contained in:
Neil Alexander 2020-08-20 17:08:56 +01:00
parent 910522ffd0
commit bd03445a50
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
9 changed files with 76 additions and 101 deletions

View file

@ -65,6 +65,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) {
} }
d.Database = shared.Database{ d.Database = shared.Database{
DB: d.db, DB: d.db,
Writer: d.writer,
FederationSenderJoinedHosts: joinedHosts, FederationSenderJoinedHosts: joinedHosts,
FederationSenderQueuePDUs: queuePDUs, FederationSenderQueuePDUs: queuePDUs,
FederationSenderQueueEDUs: queueEDUs, FederationSenderQueueEDUs: queueEDUs,

View file

@ -28,6 +28,7 @@ import (
type Database struct { type Database struct {
DB *sql.DB DB *sql.DB
Writer sqlutil.TransactionWriter
FederationSenderQueuePDUs tables.FederationSenderQueuePDUs FederationSenderQueuePDUs tables.FederationSenderQueuePDUs
FederationSenderQueueEDUs tables.FederationSenderQueueEDUs FederationSenderQueueEDUs tables.FederationSenderQueueEDUs
FederationSenderQueueJSON tables.FederationSenderQueueJSON FederationSenderQueueJSON tables.FederationSenderQueueJSON
@ -64,7 +65,7 @@ func (d *Database) UpdateRoom(
addHosts []types.JoinedHost, addHosts []types.JoinedHost,
removeHosts []string, removeHosts []string,
) (joinedHosts []types.JoinedHost, err error) { ) (joinedHosts []types.JoinedHost, err error) {
err = sqlutil.WithTransaction(d.DB, func(txn *sql.Tx) error { err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
err = d.FederationSenderRooms.InsertRoom(ctx, txn, roomID) err = d.FederationSenderRooms.InsertRoom(ctx, txn, roomID)
if err != nil { if err != nil {
return err return err
@ -133,7 +134,12 @@ func (d *Database) GetJoinedHostsForRooms(ctx context.Context, roomIDs []string)
func (d *Database) StoreJSON( func (d *Database) StoreJSON(
ctx context.Context, js string, ctx context.Context, js string,
) (*Receipt, error) { ) (*Receipt, error) {
nid, err := d.FederationSenderQueueJSON.InsertQueueJSON(ctx, nil, js) var nid int64
var err error
_ = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
nid, err = d.FederationSenderQueueJSON.InsertQueueJSON(ctx, txn, js)
return nil
})
if err != nil { if err != nil {
return nil, fmt.Errorf("d.insertQueueJSON: %w", err) return nil, fmt.Errorf("d.insertQueueJSON: %w", err)
} }
@ -143,11 +149,15 @@ func (d *Database) StoreJSON(
} }
func (d *Database) AddServerToBlacklist(serverName gomatrixserverlib.ServerName) error { func (d *Database) AddServerToBlacklist(serverName gomatrixserverlib.ServerName) error {
return d.FederationSenderBlacklist.InsertBlacklist(context.TODO(), nil, serverName) return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
return d.FederationSenderBlacklist.InsertBlacklist(context.TODO(), txn, serverName)
})
} }
func (d *Database) RemoveServerFromBlacklist(serverName gomatrixserverlib.ServerName) error { func (d *Database) RemoveServerFromBlacklist(serverName gomatrixserverlib.ServerName) error {
return d.FederationSenderBlacklist.DeleteBlacklist(context.TODO(), nil, serverName) return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
return d.FederationSenderBlacklist.DeleteBlacklist(context.TODO(), txn, serverName)
})
} }
func (d *Database) IsServerBlacklisted(serverName gomatrixserverlib.ServerName) (bool, error) { func (d *Database) IsServerBlacklisted(serverName gomatrixserverlib.ServerName) (bool, error) {

View file

@ -42,7 +42,6 @@ const deleteBlacklistSQL = "" +
type blacklistStatements struct { type blacklistStatements struct {
db *sql.DB db *sql.DB
writer sqlutil.TransactionWriter
insertBlacklistStmt *sql.Stmt insertBlacklistStmt *sql.Stmt
selectBlacklistStmt *sql.Stmt selectBlacklistStmt *sql.Stmt
deleteBlacklistStmt *sql.Stmt deleteBlacklistStmt *sql.Stmt
@ -50,8 +49,7 @@ type blacklistStatements struct {
func NewSQLiteBlacklistTable(db *sql.DB) (s *blacklistStatements, err error) { func NewSQLiteBlacklistTable(db *sql.DB) (s *blacklistStatements, err error) {
s = &blacklistStatements{ s = &blacklistStatements{
db: db, db: db,
writer: sqlutil.NewTransactionWriter(),
} }
_, err = db.Exec(blacklistSchema) _, err = db.Exec(blacklistSchema)
if err != nil { if err != nil {
@ -75,11 +73,9 @@ func NewSQLiteBlacklistTable(db *sql.DB) (s *blacklistStatements, err error) {
func (s *blacklistStatements) InsertBlacklist( func (s *blacklistStatements) InsertBlacklist(
ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName,
) error { ) error {
return s.writer.Do(s.db, txn, func(txn *sql.Tx) error { stmt := sqlutil.TxStmt(txn, s.insertBlacklistStmt)
stmt := sqlutil.TxStmt(txn, s.insertBlacklistStmt) _, err := stmt.ExecContext(ctx, serverName)
_, err := stmt.ExecContext(ctx, serverName) return err
return err
})
} }
// selectRoomForUpdate locks the row for the room and returns the last_event_id. // selectRoomForUpdate locks the row for the room and returns the last_event_id.
@ -105,9 +101,7 @@ func (s *blacklistStatements) SelectBlacklist(
func (s *blacklistStatements) DeleteBlacklist( func (s *blacklistStatements) DeleteBlacklist(
ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName,
) error { ) error {
return s.writer.Do(s.db, txn, func(txn *sql.Tx) error { stmt := sqlutil.TxStmt(txn, s.deleteBlacklistStmt)
stmt := sqlutil.TxStmt(txn, s.deleteBlacklistStmt) _, err := stmt.ExecContext(ctx, serverName)
_, err := stmt.ExecContext(ctx, serverName) return err
return err
})
} }

View file

@ -65,7 +65,6 @@ const selectJoinedHostsForRoomsSQL = "" +
type joinedHostsStatements struct { type joinedHostsStatements struct {
db *sql.DB db *sql.DB
writer sqlutil.TransactionWriter
insertJoinedHostsStmt *sql.Stmt insertJoinedHostsStmt *sql.Stmt
deleteJoinedHostsStmt *sql.Stmt deleteJoinedHostsStmt *sql.Stmt
selectJoinedHostsStmt *sql.Stmt selectJoinedHostsStmt *sql.Stmt
@ -75,8 +74,7 @@ type joinedHostsStatements struct {
func NewSQLiteJoinedHostsTable(db *sql.DB) (s *joinedHostsStatements, err error) { func NewSQLiteJoinedHostsTable(db *sql.DB) (s *joinedHostsStatements, err error) {
s = &joinedHostsStatements{ s = &joinedHostsStatements{
db: db, db: db,
writer: sqlutil.NewTransactionWriter(),
} }
_, err = db.Exec(joinedHostsSchema) _, err = db.Exec(joinedHostsSchema)
if err != nil { if err != nil {
@ -103,25 +101,21 @@ func (s *joinedHostsStatements) InsertJoinedHosts(
roomID, eventID string, roomID, eventID string,
serverName gomatrixserverlib.ServerName, serverName gomatrixserverlib.ServerName,
) error { ) error {
return s.writer.Do(s.db, txn, func(txn *sql.Tx) error { stmt := sqlutil.TxStmt(txn, s.insertJoinedHostsStmt)
stmt := sqlutil.TxStmt(txn, s.insertJoinedHostsStmt) _, err := stmt.ExecContext(ctx, roomID, eventID, serverName)
_, err := stmt.ExecContext(ctx, roomID, eventID, serverName) return err
return err
})
} }
func (s *joinedHostsStatements) DeleteJoinedHosts( func (s *joinedHostsStatements) DeleteJoinedHosts(
ctx context.Context, txn *sql.Tx, eventIDs []string, ctx context.Context, txn *sql.Tx, eventIDs []string,
) error { ) error {
return s.writer.Do(s.db, txn, func(txn *sql.Tx) error { for _, eventID := range eventIDs {
for _, eventID := range eventIDs { stmt := sqlutil.TxStmt(txn, s.deleteJoinedHostsStmt)
stmt := sqlutil.TxStmt(txn, s.deleteJoinedHostsStmt) if _, err := stmt.ExecContext(ctx, eventID); err != nil {
if _, err := stmt.ExecContext(ctx, eventID); err != nil { return err
return err
}
} }
return nil }
}) return nil
} }
func (s *joinedHostsStatements) SelectJoinedHostsWithTx( func (s *joinedHostsStatements) SelectJoinedHostsWithTx(

View file

@ -64,7 +64,6 @@ const selectQueueServerNamesSQL = "" +
type queueEDUsStatements struct { type queueEDUsStatements struct {
db *sql.DB db *sql.DB
writer sqlutil.TransactionWriter
insertQueueEDUStmt *sql.Stmt insertQueueEDUStmt *sql.Stmt
selectQueueEDUStmt *sql.Stmt selectQueueEDUStmt *sql.Stmt
selectQueueEDUReferenceJSONCountStmt *sql.Stmt selectQueueEDUReferenceJSONCountStmt *sql.Stmt
@ -74,8 +73,7 @@ type queueEDUsStatements struct {
func NewSQLiteQueueEDUsTable(db *sql.DB) (s *queueEDUsStatements, err error) { func NewSQLiteQueueEDUsTable(db *sql.DB) (s *queueEDUsStatements, err error) {
s = &queueEDUsStatements{ s = &queueEDUsStatements{
db: db, db: db,
writer: sqlutil.NewTransactionWriter(),
} }
_, err = db.Exec(queueEDUsSchema) _, err = db.Exec(queueEDUsSchema)
if err != nil { if err != nil {
@ -106,16 +104,14 @@ func (s *queueEDUsStatements) InsertQueueEDU(
serverName gomatrixserverlib.ServerName, serverName gomatrixserverlib.ServerName,
nid int64, nid int64,
) error { ) error {
return s.writer.Do(s.db, txn, func(txn *sql.Tx) error { stmt := sqlutil.TxStmt(txn, s.insertQueueEDUStmt)
stmt := sqlutil.TxStmt(txn, s.insertQueueEDUStmt) _, err := stmt.ExecContext(
_, err := stmt.ExecContext( ctx,
ctx, eduType, // the EDU type
eduType, // the EDU type serverName, // destination server name
serverName, // destination server name nid, // JSON blob NID
nid, // JSON blob NID )
) return err
return err
})
} }
func (s *queueEDUsStatements) DeleteQueueEDUs( func (s *queueEDUsStatements) DeleteQueueEDUs(
@ -135,11 +131,9 @@ func (s *queueEDUsStatements) DeleteQueueEDUs(
params[k+1] = v params[k+1] = v
} }
return s.writer.Do(s.db, txn, func(txn *sql.Tx) error { stmt := sqlutil.TxStmt(txn, deleteStmt)
stmt := sqlutil.TxStmt(txn, deleteStmt) _, err = stmt.ExecContext(ctx, params...)
_, err := stmt.ExecContext(ctx, params...) return err
return err
})
} }
func (s *queueEDUsStatements) SelectQueueEDUs( func (s *queueEDUsStatements) SelectQueueEDUs(

View file

@ -50,7 +50,6 @@ const selectJSONSQL = "" +
type queueJSONStatements struct { type queueJSONStatements struct {
db *sql.DB db *sql.DB
writer sqlutil.TransactionWriter
insertJSONStmt *sql.Stmt insertJSONStmt *sql.Stmt
//deleteJSONStmt *sql.Stmt - prepared at runtime due to variadic //deleteJSONStmt *sql.Stmt - prepared at runtime due to variadic
//selectJSONStmt *sql.Stmt - prepared at runtime due to variadic //selectJSONStmt *sql.Stmt - prepared at runtime due to variadic
@ -58,8 +57,7 @@ type queueJSONStatements struct {
func NewSQLiteQueueJSONTable(db *sql.DB) (s *queueJSONStatements, err error) { func NewSQLiteQueueJSONTable(db *sql.DB) (s *queueJSONStatements, err error) {
s = &queueJSONStatements{ s = &queueJSONStatements{
db: db, db: db,
writer: sqlutil.NewTransactionWriter(),
} }
_, err = db.Exec(queueJSONSchema) _, err = db.Exec(queueJSONSchema)
if err != nil { if err != nil {
@ -74,18 +72,15 @@ func NewSQLiteQueueJSONTable(db *sql.DB) (s *queueJSONStatements, err error) {
func (s *queueJSONStatements) InsertQueueJSON( func (s *queueJSONStatements) InsertQueueJSON(
ctx context.Context, txn *sql.Tx, json string, ctx context.Context, txn *sql.Tx, json string,
) (lastid int64, err error) { ) (lastid int64, err error) {
err = s.writer.Do(s.db, txn, func(txn *sql.Tx) error { stmt := sqlutil.TxStmt(txn, s.insertJSONStmt)
stmt := sqlutil.TxStmt(txn, s.insertJSONStmt) res, err := stmt.ExecContext(ctx, json)
res, err := stmt.ExecContext(ctx, json) if err != nil {
if err != nil { return 0, fmt.Errorf("stmt.QueryContext: %w", err)
return fmt.Errorf("stmt.QueryContext: %w", err) }
} lastid, err = res.LastInsertId()
lastid, err = res.LastInsertId() if err != nil {
if err != nil { return 0, fmt.Errorf("res.LastInsertId: %w", err)
return fmt.Errorf("res.LastInsertId: %w", err) }
}
return nil
})
return return
} }
@ -103,11 +98,9 @@ func (s *queueJSONStatements) DeleteQueueJSON(
iNIDs[k] = v iNIDs[k] = v
} }
return s.writer.Do(s.db, txn, func(txn *sql.Tx) error { stmt := sqlutil.TxStmt(txn, deleteStmt)
stmt := sqlutil.TxStmt(txn, deleteStmt) _, err = stmt.ExecContext(ctx, iNIDs...)
_, err = stmt.ExecContext(ctx, iNIDs...) return err
return err
})
} }
func (s *queueJSONStatements) SelectQueueJSON( func (s *queueJSONStatements) SelectQueueJSON(

View file

@ -71,7 +71,6 @@ const selectQueuePDUsServerNamesSQL = "" +
type queuePDUsStatements struct { type queuePDUsStatements struct {
db *sql.DB db *sql.DB
writer sqlutil.TransactionWriter
insertQueuePDUStmt *sql.Stmt insertQueuePDUStmt *sql.Stmt
selectQueueNextTransactionIDStmt *sql.Stmt selectQueueNextTransactionIDStmt *sql.Stmt
selectQueuePDUsByTransactionStmt *sql.Stmt selectQueuePDUsByTransactionStmt *sql.Stmt
@ -83,8 +82,7 @@ type queuePDUsStatements struct {
func NewSQLiteQueuePDUsTable(db *sql.DB) (s *queuePDUsStatements, err error) { func NewSQLiteQueuePDUsTable(db *sql.DB) (s *queuePDUsStatements, err error) {
s = &queuePDUsStatements{ s = &queuePDUsStatements{
db: db, db: db,
writer: sqlutil.NewTransactionWriter(),
} }
_, err = db.Exec(queuePDUsSchema) _, err = db.Exec(queuePDUsSchema)
if err != nil { if err != nil {
@ -121,16 +119,14 @@ func (s *queuePDUsStatements) InsertQueuePDU(
serverName gomatrixserverlib.ServerName, serverName gomatrixserverlib.ServerName,
nid int64, nid int64,
) error { ) error {
return s.writer.Do(s.db, txn, func(txn *sql.Tx) error { stmt := sqlutil.TxStmt(txn, s.insertQueuePDUStmt)
stmt := sqlutil.TxStmt(txn, s.insertQueuePDUStmt) _, err := stmt.ExecContext(
_, err := stmt.ExecContext( ctx,
ctx, transactionID, // the transaction ID that we initially attempted
transactionID, // the transaction ID that we initially attempted serverName, // destination server name
serverName, // destination server name nid, // JSON blob NID
nid, // JSON blob NID )
) return err
return err
})
} }
func (s *queuePDUsStatements) DeleteQueuePDUs( func (s *queuePDUsStatements) DeleteQueuePDUs(
@ -150,11 +146,9 @@ func (s *queuePDUsStatements) DeleteQueuePDUs(
params[k+1] = v params[k+1] = v
} }
return s.writer.Do(s.db, txn, func(txn *sql.Tx) error { stmt := sqlutil.TxStmt(txn, deleteStmt)
stmt := sqlutil.TxStmt(txn, deleteStmt) _, err = stmt.ExecContext(ctx, params...)
_, err := stmt.ExecContext(ctx, params...) return err
return err
})
} }
func (s *queuePDUsStatements) SelectQueuePDUNextTransactionID( func (s *queuePDUsStatements) SelectQueuePDUNextTransactionID(

View file

@ -44,7 +44,6 @@ const updateRoomSQL = "" +
type roomStatements struct { type roomStatements struct {
db *sql.DB db *sql.DB
writer sqlutil.TransactionWriter
insertRoomStmt *sql.Stmt insertRoomStmt *sql.Stmt
selectRoomForUpdateStmt *sql.Stmt selectRoomForUpdateStmt *sql.Stmt
updateRoomStmt *sql.Stmt updateRoomStmt *sql.Stmt
@ -52,8 +51,7 @@ type roomStatements struct {
func NewSQLiteRoomsTable(db *sql.DB) (s *roomStatements, err error) { func NewSQLiteRoomsTable(db *sql.DB) (s *roomStatements, err error) {
s = &roomStatements{ s = &roomStatements{
db: db, db: db,
writer: sqlutil.NewTransactionWriter(),
} }
_, err = db.Exec(roomSchema) _, err = db.Exec(roomSchema)
if err != nil { if err != nil {
@ -77,10 +75,8 @@ func NewSQLiteRoomsTable(db *sql.DB) (s *roomStatements, err error) {
func (s *roomStatements) InsertRoom( func (s *roomStatements) InsertRoom(
ctx context.Context, txn *sql.Tx, roomID string, ctx context.Context, txn *sql.Tx, roomID string,
) error { ) error {
return s.writer.Do(s.db, txn, func(txn *sql.Tx) error { _, err := sqlutil.TxStmt(txn, s.insertRoomStmt).ExecContext(ctx, roomID)
_, err := sqlutil.TxStmt(txn, s.insertRoomStmt).ExecContext(ctx, roomID) return err
return err
})
} }
// selectRoomForUpdate locks the row for the room and returns the last_event_id. // selectRoomForUpdate locks the row for the room and returns the last_event_id.
@ -103,9 +99,7 @@ func (s *roomStatements) SelectRoomForUpdate(
func (s *roomStatements) UpdateRoom( func (s *roomStatements) UpdateRoom(
ctx context.Context, txn *sql.Tx, roomID, lastEventID string, ctx context.Context, txn *sql.Tx, roomID, lastEventID string,
) error { ) error {
return s.writer.Do(s.db, txn, func(txn *sql.Tx) error { stmt := sqlutil.TxStmt(txn, s.updateRoomStmt)
stmt := sqlutil.TxStmt(txn, s.updateRoomStmt) _, err := stmt.ExecContext(ctx, roomID, lastEventID)
_, err := stmt.ExecContext(ctx, roomID, lastEventID) return err
return err
})
} }

View file

@ -67,6 +67,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) {
} }
d.Database = shared.Database{ d.Database = shared.Database{
DB: d.db, DB: d.db,
Writer: d.writer,
FederationSenderJoinedHosts: joinedHosts, FederationSenderJoinedHosts: joinedHosts,
FederationSenderQueuePDUs: queuePDUs, FederationSenderQueuePDUs: queuePDUs,
FederationSenderQueueEDUs: queueEDUs, FederationSenderQueueEDUs: queueEDUs,