From c958068bdf177c61ae56399859e02527332bcd50 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Thu, 20 Aug 2020 16:41:48 +0100 Subject: [PATCH] Offset updates take place using TransactionWriter --- appservice/storage/postgres/storage.go | 4 ++- appservice/storage/sqlite3/storage.go | 4 ++- .../storage/postgres/storage.go | 6 ++-- currentstateserver/storage/sqlite3/storage.go | 6 ++-- federationsender/storage/postgres/storage.go | 6 ++-- federationsender/storage/sqlite3/storage.go | 6 ++-- internal/sqlutil/partition_offset_table.go | 13 ++++++-- syncapi/storage/postgres/syncserver.go | 6 ++-- syncapi/storage/sqlite3/syncserver.go | 6 ++-- userapi/storage/accounts/postgres/storage.go | 25 +++++++------- userapi/storage/accounts/sqlite3/storage.go | 33 ++++++++----------- 11 files changed, 67 insertions(+), 48 deletions(-) diff --git a/appservice/storage/postgres/storage.go b/appservice/storage/postgres/storage.go index 9fda87ae9..82b637e0b 100644 --- a/appservice/storage/postgres/storage.go +++ b/appservice/storage/postgres/storage.go @@ -32,6 +32,7 @@ type Database struct { events eventsStatements txnID txnStatements db *sql.DB + writer sqlutil.TransactionWriter } // NewDatabase opens a new database @@ -41,10 +42,11 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) { if result.db, err = sqlutil.Open(dbProperties); err != nil { return nil, err } + result.writer = sqlutil.NewDummyTransactionWriter() if err = result.prepare(); err != nil { return nil, err } - if err = result.PartitionOffsetStatements.Prepare(result.db, "appservice"); err != nil { + if err = result.PartitionOffsetStatements.Prepare(result.db, result.writer, "appservice"); err != nil { return nil, err } return &result, nil diff --git a/appservice/storage/sqlite3/storage.go b/appservice/storage/sqlite3/storage.go index 59af9016d..dee21f6f4 100644 --- a/appservice/storage/sqlite3/storage.go +++ b/appservice/storage/sqlite3/storage.go @@ -32,6 +32,7 @@ type Database struct { events eventsStatements txnID txnStatements db *sql.DB + writer sqlutil.TransactionWriter } // NewDatabase opens a new database @@ -41,10 +42,11 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) { if result.db, err = sqlutil.Open(dbProperties); err != nil { return nil, err } + result.writer = sqlutil.NewTransactionWriter() if err = result.prepare(); err != nil { return nil, err } - if err = result.PartitionOffsetStatements.Prepare(result.db, "appservice"); err != nil { + if err = result.PartitionOffsetStatements.Prepare(result.db, result.writer, "appservice"); err != nil { return nil, err } return &result, nil diff --git a/currentstateserver/storage/postgres/storage.go b/currentstateserver/storage/postgres/storage.go index 0cd7e5553..96ff049b1 100644 --- a/currentstateserver/storage/postgres/storage.go +++ b/currentstateserver/storage/postgres/storage.go @@ -10,7 +10,8 @@ import ( type Database struct { shared.Database - db *sql.DB + db *sql.DB + writer sqlutil.TransactionWriter sqlutil.PartitionOffsetStatements } @@ -21,7 +22,8 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) { if d.db, err = sqlutil.Open(dbProperties); err != nil { return nil, err } - if err = d.PartitionOffsetStatements.Prepare(d.db, "currentstate"); err != nil { + d.writer = sqlutil.NewDummyTransactionWriter() + if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "currentstate"); err != nil { return nil, err } currRoomState, err := NewPostgresCurrentRoomStateTable(d.db) diff --git a/currentstateserver/storage/sqlite3/storage.go b/currentstateserver/storage/sqlite3/storage.go index 4454c9ed7..9210bcc7d 100644 --- a/currentstateserver/storage/sqlite3/storage.go +++ b/currentstateserver/storage/sqlite3/storage.go @@ -10,7 +10,8 @@ import ( type Database struct { shared.Database - db *sql.DB + db *sql.DB + writer sqlutil.TransactionWriter sqlutil.PartitionOffsetStatements } @@ -22,7 +23,8 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) { if d.db, err = sqlutil.Open(dbProperties); err != nil { return nil, err } - if err = d.PartitionOffsetStatements.Prepare(d.db, "currentstate"); err != nil { + d.writer = sqlutil.NewTransactionWriter() + if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "currentstate"); err != nil { return nil, err } currRoomState, err := NewSqliteCurrentRoomStateTable(d.db) diff --git a/federationsender/storage/postgres/storage.go b/federationsender/storage/postgres/storage.go index b65ff0b6d..09ce6e557 100644 --- a/federationsender/storage/postgres/storage.go +++ b/federationsender/storage/postgres/storage.go @@ -27,7 +27,8 @@ import ( type Database struct { shared.Database sqlutil.PartitionOffsetStatements - db *sql.DB + db *sql.DB + writer sqlutil.TransactionWriter } // NewDatabase opens a new database @@ -37,6 +38,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) { if d.db, err = sqlutil.Open(dbProperties); err != nil { return nil, err } + d.writer = sqlutil.NewDummyTransactionWriter() joinedHosts, err := NewPostgresJoinedHostsTable(d.db) if err != nil { return nil, err @@ -70,7 +72,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) { FederationSenderRooms: rooms, FederationSenderBlacklist: blacklist, } - if err = d.PartitionOffsetStatements.Prepare(d.db, "federationsender"); err != nil { + if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "federationsender"); err != nil { return nil, err } return &d, nil diff --git a/federationsender/storage/sqlite3/storage.go b/federationsender/storage/sqlite3/storage.go index 41b91871e..3da9586d5 100644 --- a/federationsender/storage/sqlite3/storage.go +++ b/federationsender/storage/sqlite3/storage.go @@ -29,7 +29,8 @@ import ( type Database struct { shared.Database sqlutil.PartitionOffsetStatements - db *sql.DB + db *sql.DB + writer sqlutil.TransactionWriter } // NewDatabase opens a new database @@ -39,6 +40,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) { if d.db, err = sqlutil.Open(dbProperties); err != nil { return nil, err } + d.writer = sqlutil.NewTransactionWriter() joinedHosts, err := NewSQLiteJoinedHostsTable(d.db) if err != nil { return nil, err @@ -72,7 +74,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) { FederationSenderRooms: rooms, FederationSenderBlacklist: blacklist, } - if err = d.PartitionOffsetStatements.Prepare(d.db, "federationsender"); err != nil { + if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "federationsender"); err != nil { return nil, err } return &d, nil diff --git a/internal/sqlutil/partition_offset_table.go b/internal/sqlutil/partition_offset_table.go index 348829025..e63deac65 100644 --- a/internal/sqlutil/partition_offset_table.go +++ b/internal/sqlutil/partition_offset_table.go @@ -53,6 +53,8 @@ const upsertPartitionOffsetsSQL = "" + // PartitionOffsetStatements represents a set of statements that can be run on a partition_offsets table. type PartitionOffsetStatements struct { + db *sql.DB + writer TransactionWriter selectPartitionOffsetsStmt *sql.Stmt upsertPartitionOffsetStmt *sql.Stmt } @@ -60,7 +62,9 @@ type PartitionOffsetStatements struct { // Prepare converts the raw SQL statements into prepared statements. // Takes a prefix to prepend to the table name used to store the partition offsets. // This allows multiple components to share the same database schema. -func (s *PartitionOffsetStatements) Prepare(db *sql.DB, prefix string) (err error) { +func (s *PartitionOffsetStatements) Prepare(db *sql.DB, writer TransactionWriter, prefix string) (err error) { + s.db = db + s.writer = writer _, err = db.Exec(strings.Replace(partitionOffsetsSchema, "${prefix}", prefix, -1)) if err != nil { return @@ -121,6 +125,9 @@ func (s *PartitionOffsetStatements) selectPartitionOffsets( func (s *PartitionOffsetStatements) upsertPartitionOffset( ctx context.Context, topic string, partition int32, offset int64, ) error { - _, err := s.upsertPartitionOffsetStmt.ExecContext(ctx, topic, partition, offset) - return err + return s.writer.Do(s.db, nil, func(txn *sql.Tx) error { + stmt := TxStmt(txn, s.upsertPartitionOffsetStmt) + _, err := stmt.ExecContext(ctx, topic, partition, offset) + return err + }) } diff --git a/syncapi/storage/postgres/syncserver.go b/syncapi/storage/postgres/syncserver.go index 26ef082f5..ef6010d63 100644 --- a/syncapi/storage/postgres/syncserver.go +++ b/syncapi/storage/postgres/syncserver.go @@ -30,7 +30,8 @@ import ( // both the database for PDUs and caches for EDUs. type SyncServerDatasource struct { shared.Database - db *sql.DB + db *sql.DB + writer sqlutil.TransactionWriter sqlutil.PartitionOffsetStatements } @@ -41,7 +42,8 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e if d.db, err = sqlutil.Open(dbProperties); err != nil { return nil, err } - if err = d.PartitionOffsetStatements.Prepare(d.db, "syncapi"); err != nil { + d.writer = sqlutil.NewDummyTransactionWriter() + if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "syncapi"); err != nil { return nil, err } accountData, err := NewPostgresAccountDataTable(d.db) diff --git a/syncapi/storage/sqlite3/syncserver.go b/syncapi/storage/sqlite3/syncserver.go index 9564a23aa..23e2186f4 100644 --- a/syncapi/storage/sqlite3/syncserver.go +++ b/syncapi/storage/sqlite3/syncserver.go @@ -31,7 +31,8 @@ import ( // both the database for PDUs and caches for EDUs. type SyncServerDatasource struct { shared.Database - db *sql.DB + db *sql.DB + writer sqlutil.TransactionWriter sqlutil.PartitionOffsetStatements streamID streamIDStatements } @@ -44,6 +45,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e if d.db, err = sqlutil.Open(dbProperties); err != nil { return nil, err } + d.writer = sqlutil.NewTransactionWriter() if err = d.prepare(); err != nil { return nil, err } @@ -51,7 +53,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e } func (d *SyncServerDatasource) prepare() (err error) { - if err = d.PartitionOffsetStatements.Prepare(d.db, "syncapi"); err != nil { + if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "syncapi"); err != nil { return err } if err = d.streamID.prepare(d.db); err != nil { diff --git a/userapi/storage/accounts/postgres/storage.go b/userapi/storage/accounts/postgres/storage.go index 9653c019c..585d0eeaf 100644 --- a/userapi/storage/accounts/postgres/storage.go +++ b/userapi/storage/accounts/postgres/storage.go @@ -34,7 +34,8 @@ import ( // Database represents an account database type Database struct { - db *sql.DB + db *sql.DB + writer sqlutil.TransactionWriter sqlutil.PartitionOffsetStatements accounts accountsStatements profiles profilesStatements @@ -49,27 +50,27 @@ func NewDatabase(dbProperties *config.DatabaseOptions, serverName gomatrixserver if err != nil { return nil, err } - partitions := sqlutil.PartitionOffsetStatements{} - if err = partitions.Prepare(db, "account"); err != nil { + d := &Database{ + serverName: serverName, + db: db, + writer: sqlutil.NewDummyTransactionWriter(), + } + if err = d.PartitionOffsetStatements.Prepare(db, d.writer, "account"); err != nil { return nil, err } - a := accountsStatements{} - if err = a.prepare(db, serverName); err != nil { + if err = d.accounts.prepare(db, serverName); err != nil { return nil, err } - p := profilesStatements{} - if err = p.prepare(db); err != nil { + if err = d.profiles.prepare(db); err != nil { return nil, err } - ac := accountDataStatements{} - if err = ac.prepare(db); err != nil { + if err = d.accountDatas.prepare(db); err != nil { return nil, err } - t := threepidStatements{} - if err = t.prepare(db); err != nil { + if err = d.threepids.prepare(db); err != nil { return nil, err } - return &Database{db, partitions, a, p, ac, t, serverName}, nil + return d, nil } // GetAccountByPassword returns the account associated with the given localpart and password. diff --git a/userapi/storage/accounts/sqlite3/storage.go b/userapi/storage/accounts/sqlite3/storage.go index 4d2c5e51d..6e4dfaf3d 100644 --- a/userapi/storage/accounts/sqlite3/storage.go +++ b/userapi/storage/accounts/sqlite3/storage.go @@ -33,7 +33,9 @@ import ( // Database represents an account database type Database struct { - db *sql.DB + db *sql.DB + writer sqlutil.TransactionWriter + sqlutil.PartitionOffsetStatements accounts accountsStatements profiles profilesStatements @@ -53,35 +55,28 @@ func NewDatabase(dbProperties *config.DatabaseOptions, serverName gomatrixserver if err != nil { return nil, err } + d := &Database{ + serverName: serverName, + db: db, + writer: sqlutil.NewTransactionWriter(), + } partitions := sqlutil.PartitionOffsetStatements{} - if err = partitions.Prepare(db, "account"); err != nil { + if err = partitions.Prepare(db, d.writer, "account"); err != nil { return nil, err } - a := accountsStatements{} - if err = a.prepare(db, serverName); err != nil { + if err = d.accounts.prepare(db, serverName); err != nil { return nil, err } - p := profilesStatements{} - if err = p.prepare(db); err != nil { + if err = d.profiles.prepare(db); err != nil { return nil, err } - ac := accountDataStatements{} - if err = ac.prepare(db); err != nil { + if err = d.accountDatas.prepare(db); err != nil { return nil, err } - t := threepidStatements{} - if err = t.prepare(db); err != nil { + if err = d.threepids.prepare(db); err != nil { return nil, err } - return &Database{ - db: db, - PartitionOffsetStatements: partitions, - accounts: a, - profiles: p, - accountDatas: ac, - threepids: t, - serverName: serverName, - }, nil + return d, nil } // GetAccountByPassword returns the account associated with the given localpart and password.