Offset updates take place using TransactionWriter

This commit is contained in:
Neil Alexander 2020-08-20 16:41:48 +01:00
parent 068a3d3c9f
commit c958068bdf
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
11 changed files with 67 additions and 48 deletions

View file

@ -32,6 +32,7 @@ type Database struct {
events eventsStatements events eventsStatements
txnID txnStatements txnID txnStatements
db *sql.DB db *sql.DB
writer sqlutil.TransactionWriter
} }
// NewDatabase opens a new database // NewDatabase opens a new database
@ -41,10 +42,11 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) {
if result.db, err = sqlutil.Open(dbProperties); err != nil { if result.db, err = sqlutil.Open(dbProperties); err != nil {
return nil, err return nil, err
} }
result.writer = sqlutil.NewDummyTransactionWriter()
if err = result.prepare(); err != nil { if err = result.prepare(); err != nil {
return nil, err return nil, err
} }
if err = result.PartitionOffsetStatements.Prepare(result.db, "appservice"); err != nil { if err = result.PartitionOffsetStatements.Prepare(result.db, result.writer, "appservice"); err != nil {
return nil, err return nil, err
} }
return &result, nil return &result, nil

View file

@ -32,6 +32,7 @@ type Database struct {
events eventsStatements events eventsStatements
txnID txnStatements txnID txnStatements
db *sql.DB db *sql.DB
writer sqlutil.TransactionWriter
} }
// NewDatabase opens a new database // NewDatabase opens a new database
@ -41,10 +42,11 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) {
if result.db, err = sqlutil.Open(dbProperties); err != nil { if result.db, err = sqlutil.Open(dbProperties); err != nil {
return nil, err return nil, err
} }
result.writer = sqlutil.NewTransactionWriter()
if err = result.prepare(); err != nil { if err = result.prepare(); err != nil {
return nil, err return nil, err
} }
if err = result.PartitionOffsetStatements.Prepare(result.db, "appservice"); err != nil { if err = result.PartitionOffsetStatements.Prepare(result.db, result.writer, "appservice"); err != nil {
return nil, err return nil, err
} }
return &result, nil return &result, nil

View file

@ -11,6 +11,7 @@ import (
type Database struct { type Database struct {
shared.Database shared.Database
db *sql.DB db *sql.DB
writer sqlutil.TransactionWriter
sqlutil.PartitionOffsetStatements sqlutil.PartitionOffsetStatements
} }
@ -21,7 +22,8 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) {
if d.db, err = sqlutil.Open(dbProperties); err != nil { if d.db, err = sqlutil.Open(dbProperties); err != nil {
return nil, err return nil, err
} }
if err = d.PartitionOffsetStatements.Prepare(d.db, "currentstate"); err != nil { d.writer = sqlutil.NewDummyTransactionWriter()
if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "currentstate"); err != nil {
return nil, err return nil, err
} }
currRoomState, err := NewPostgresCurrentRoomStateTable(d.db) currRoomState, err := NewPostgresCurrentRoomStateTable(d.db)

View file

@ -11,6 +11,7 @@ import (
type Database struct { type Database struct {
shared.Database shared.Database
db *sql.DB db *sql.DB
writer sqlutil.TransactionWriter
sqlutil.PartitionOffsetStatements sqlutil.PartitionOffsetStatements
} }
@ -22,7 +23,8 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) {
if d.db, err = sqlutil.Open(dbProperties); err != nil { if d.db, err = sqlutil.Open(dbProperties); err != nil {
return nil, err return nil, err
} }
if err = d.PartitionOffsetStatements.Prepare(d.db, "currentstate"); err != nil { d.writer = sqlutil.NewTransactionWriter()
if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "currentstate"); err != nil {
return nil, err return nil, err
} }
currRoomState, err := NewSqliteCurrentRoomStateTable(d.db) currRoomState, err := NewSqliteCurrentRoomStateTable(d.db)

View file

@ -28,6 +28,7 @@ type Database struct {
shared.Database shared.Database
sqlutil.PartitionOffsetStatements sqlutil.PartitionOffsetStatements
db *sql.DB db *sql.DB
writer sqlutil.TransactionWriter
} }
// NewDatabase opens a new database // NewDatabase opens a new database
@ -37,6 +38,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) {
if d.db, err = sqlutil.Open(dbProperties); err != nil { if d.db, err = sqlutil.Open(dbProperties); err != nil {
return nil, err return nil, err
} }
d.writer = sqlutil.NewDummyTransactionWriter()
joinedHosts, err := NewPostgresJoinedHostsTable(d.db) joinedHosts, err := NewPostgresJoinedHostsTable(d.db)
if err != nil { if err != nil {
return nil, err return nil, err
@ -70,7 +72,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) {
FederationSenderRooms: rooms, FederationSenderRooms: rooms,
FederationSenderBlacklist: blacklist, FederationSenderBlacklist: blacklist,
} }
if err = d.PartitionOffsetStatements.Prepare(d.db, "federationsender"); err != nil { if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "federationsender"); err != nil {
return nil, err return nil, err
} }
return &d, nil return &d, nil

View file

@ -30,6 +30,7 @@ type Database struct {
shared.Database shared.Database
sqlutil.PartitionOffsetStatements sqlutil.PartitionOffsetStatements
db *sql.DB db *sql.DB
writer sqlutil.TransactionWriter
} }
// NewDatabase opens a new database // NewDatabase opens a new database
@ -39,6 +40,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) {
if d.db, err = sqlutil.Open(dbProperties); err != nil { if d.db, err = sqlutil.Open(dbProperties); err != nil {
return nil, err return nil, err
} }
d.writer = sqlutil.NewTransactionWriter()
joinedHosts, err := NewSQLiteJoinedHostsTable(d.db) joinedHosts, err := NewSQLiteJoinedHostsTable(d.db)
if err != nil { if err != nil {
return nil, err return nil, err
@ -72,7 +74,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) {
FederationSenderRooms: rooms, FederationSenderRooms: rooms,
FederationSenderBlacklist: blacklist, FederationSenderBlacklist: blacklist,
} }
if err = d.PartitionOffsetStatements.Prepare(d.db, "federationsender"); err != nil { if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "federationsender"); err != nil {
return nil, err return nil, err
} }
return &d, nil return &d, nil

View file

@ -53,6 +53,8 @@ const upsertPartitionOffsetsSQL = "" +
// PartitionOffsetStatements represents a set of statements that can be run on a partition_offsets table. // PartitionOffsetStatements represents a set of statements that can be run on a partition_offsets table.
type PartitionOffsetStatements struct { type PartitionOffsetStatements struct {
db *sql.DB
writer TransactionWriter
selectPartitionOffsetsStmt *sql.Stmt selectPartitionOffsetsStmt *sql.Stmt
upsertPartitionOffsetStmt *sql.Stmt upsertPartitionOffsetStmt *sql.Stmt
} }
@ -60,7 +62,9 @@ type PartitionOffsetStatements struct {
// Prepare converts the raw SQL statements into prepared statements. // Prepare converts the raw SQL statements into prepared statements.
// Takes a prefix to prepend to the table name used to store the partition offsets. // Takes a prefix to prepend to the table name used to store the partition offsets.
// This allows multiple components to share the same database schema. // This allows multiple components to share the same database schema.
func (s *PartitionOffsetStatements) Prepare(db *sql.DB, prefix string) (err error) { func (s *PartitionOffsetStatements) Prepare(db *sql.DB, writer TransactionWriter, prefix string) (err error) {
s.db = db
s.writer = writer
_, err = db.Exec(strings.Replace(partitionOffsetsSchema, "${prefix}", prefix, -1)) _, err = db.Exec(strings.Replace(partitionOffsetsSchema, "${prefix}", prefix, -1))
if err != nil { if err != nil {
return return
@ -121,6 +125,9 @@ func (s *PartitionOffsetStatements) selectPartitionOffsets(
func (s *PartitionOffsetStatements) upsertPartitionOffset( func (s *PartitionOffsetStatements) upsertPartitionOffset(
ctx context.Context, topic string, partition int32, offset int64, ctx context.Context, topic string, partition int32, offset int64,
) error { ) error {
_, err := s.upsertPartitionOffsetStmt.ExecContext(ctx, topic, partition, offset) return s.writer.Do(s.db, nil, func(txn *sql.Tx) error {
stmt := TxStmt(txn, s.upsertPartitionOffsetStmt)
_, err := stmt.ExecContext(ctx, topic, partition, offset)
return err return err
})
} }

View file

@ -31,6 +31,7 @@ import (
type SyncServerDatasource struct { type SyncServerDatasource struct {
shared.Database shared.Database
db *sql.DB db *sql.DB
writer sqlutil.TransactionWriter
sqlutil.PartitionOffsetStatements sqlutil.PartitionOffsetStatements
} }
@ -41,7 +42,8 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e
if d.db, err = sqlutil.Open(dbProperties); err != nil { if d.db, err = sqlutil.Open(dbProperties); err != nil {
return nil, err return nil, err
} }
if err = d.PartitionOffsetStatements.Prepare(d.db, "syncapi"); err != nil { d.writer = sqlutil.NewDummyTransactionWriter()
if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "syncapi"); err != nil {
return nil, err return nil, err
} }
accountData, err := NewPostgresAccountDataTable(d.db) accountData, err := NewPostgresAccountDataTable(d.db)

View file

@ -32,6 +32,7 @@ import (
type SyncServerDatasource struct { type SyncServerDatasource struct {
shared.Database shared.Database
db *sql.DB db *sql.DB
writer sqlutil.TransactionWriter
sqlutil.PartitionOffsetStatements sqlutil.PartitionOffsetStatements
streamID streamIDStatements streamID streamIDStatements
} }
@ -44,6 +45,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e
if d.db, err = sqlutil.Open(dbProperties); err != nil { if d.db, err = sqlutil.Open(dbProperties); err != nil {
return nil, err return nil, err
} }
d.writer = sqlutil.NewTransactionWriter()
if err = d.prepare(); err != nil { if err = d.prepare(); err != nil {
return nil, err return nil, err
} }
@ -51,7 +53,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e
} }
func (d *SyncServerDatasource) prepare() (err error) { func (d *SyncServerDatasource) prepare() (err error) {
if err = d.PartitionOffsetStatements.Prepare(d.db, "syncapi"); err != nil { if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "syncapi"); err != nil {
return err return err
} }
if err = d.streamID.prepare(d.db); err != nil { if err = d.streamID.prepare(d.db); err != nil {

View file

@ -35,6 +35,7 @@ import (
// Database represents an account database // Database represents an account database
type Database struct { type Database struct {
db *sql.DB db *sql.DB
writer sqlutil.TransactionWriter
sqlutil.PartitionOffsetStatements sqlutil.PartitionOffsetStatements
accounts accountsStatements accounts accountsStatements
profiles profilesStatements profiles profilesStatements
@ -49,27 +50,27 @@ func NewDatabase(dbProperties *config.DatabaseOptions, serverName gomatrixserver
if err != nil { if err != nil {
return nil, err return nil, err
} }
partitions := sqlutil.PartitionOffsetStatements{} d := &Database{
if err = partitions.Prepare(db, "account"); err != nil { serverName: serverName,
db: db,
writer: sqlutil.NewDummyTransactionWriter(),
}
if err = d.PartitionOffsetStatements.Prepare(db, d.writer, "account"); err != nil {
return nil, err return nil, err
} }
a := accountsStatements{} if err = d.accounts.prepare(db, serverName); err != nil {
if err = a.prepare(db, serverName); err != nil {
return nil, err return nil, err
} }
p := profilesStatements{} if err = d.profiles.prepare(db); err != nil {
if err = p.prepare(db); err != nil {
return nil, err return nil, err
} }
ac := accountDataStatements{} if err = d.accountDatas.prepare(db); err != nil {
if err = ac.prepare(db); err != nil {
return nil, err return nil, err
} }
t := threepidStatements{} if err = d.threepids.prepare(db); err != nil {
if err = t.prepare(db); err != nil {
return nil, err return nil, err
} }
return &Database{db, partitions, a, p, ac, t, serverName}, nil return d, nil
} }
// GetAccountByPassword returns the account associated with the given localpart and password. // GetAccountByPassword returns the account associated with the given localpart and password.

View file

@ -34,6 +34,8 @@ import (
// Database represents an account database // Database represents an account database
type Database struct { type Database struct {
db *sql.DB db *sql.DB
writer sqlutil.TransactionWriter
sqlutil.PartitionOffsetStatements sqlutil.PartitionOffsetStatements
accounts accountsStatements accounts accountsStatements
profiles profilesStatements profiles profilesStatements
@ -53,35 +55,28 @@ func NewDatabase(dbProperties *config.DatabaseOptions, serverName gomatrixserver
if err != nil { if err != nil {
return nil, err return nil, err
} }
partitions := sqlutil.PartitionOffsetStatements{} d := &Database{
if err = partitions.Prepare(db, "account"); err != nil {
return nil, err
}
a := accountsStatements{}
if err = a.prepare(db, serverName); err != nil {
return nil, err
}
p := profilesStatements{}
if err = p.prepare(db); err != nil {
return nil, err
}
ac := accountDataStatements{}
if err = ac.prepare(db); err != nil {
return nil, err
}
t := threepidStatements{}
if err = t.prepare(db); err != nil {
return nil, err
}
return &Database{
db: db,
PartitionOffsetStatements: partitions,
accounts: a,
profiles: p,
accountDatas: ac,
threepids: t,
serverName: serverName, serverName: serverName,
}, nil db: db,
writer: sqlutil.NewTransactionWriter(),
}
partitions := sqlutil.PartitionOffsetStatements{}
if err = partitions.Prepare(db, d.writer, "account"); err != nil {
return nil, err
}
if err = d.accounts.prepare(db, serverName); err != nil {
return nil, err
}
if err = d.profiles.prepare(db); err != nil {
return nil, err
}
if err = d.accountDatas.prepare(db); err != nil {
return nil, err
}
if err = d.threepids.prepare(db); err != nil {
return nil, err
}
return d, nil
} }
// GetAccountByPassword returns the account associated with the given localpart and password. // GetAccountByPassword returns the account associated with the given localpart and password.