From f91f309f4bf45a1b55e47fc564c9615cc3f03763 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 17 Aug 2020 18:03:53 +0100 Subject: [PATCH] Database-wide TransactionWriter --- .../sqlite3/appservice_events_table.go | 4 +-- appservice/storage/sqlite3/storage.go | 9 ++--- .../storage/sqlite3/txn_id_counter_table.go | 4 +-- .../sqlite3/current_room_state_table.go | 4 +-- currentstateserver/storage/sqlite3/storage.go | 3 +- .../storage/sqlite3/blacklist_table.go | 4 +-- .../storage/sqlite3/joined_hosts_table.go | 4 +-- .../storage/sqlite3/queue_edus_table.go | 4 +-- .../storage/sqlite3/queue_json_table.go | 4 +-- .../storage/sqlite3/queue_pdus_table.go | 4 +-- .../storage/sqlite3/room_table.go | 4 +-- federationsender/storage/sqlite3/storage.go | 13 +++---- .../storage/sqlite3/device_keys_table.go | 4 +-- .../storage/sqlite3/key_changes_table.go | 4 +-- .../storage/sqlite3/one_time_keys_table.go | 4 +-- .../storage/sqlite3/stale_device_lists.go | 16 ++++++--- keyserver/storage/sqlite3/storage.go | 9 ++--- .../storage/sqlite3/media_repository_table.go | 4 +-- mediaapi/storage/sqlite3/sql.go | 8 +++-- mediaapi/storage/sqlite3/storage.go | 4 ++- mediaapi/storage/sqlite3/thumbnail_table.go | 34 ++++++++++++------- .../storage/sqlite3/event_json_table.go | 4 +-- .../storage/sqlite3/event_state_keys_table.go | 4 +-- .../storage/sqlite3/event_types_table.go | 4 +-- roomserver/storage/sqlite3/events_table.go | 4 +-- roomserver/storage/sqlite3/invite_table.go | 6 ++-- .../storage/sqlite3/membership_table.go | 4 +-- .../storage/sqlite3/previous_events_table.go | 4 +-- roomserver/storage/sqlite3/published_table.go | 4 +-- .../storage/sqlite3/redactions_table.go | 4 +-- .../storage/sqlite3/room_aliases_table.go | 4 +-- roomserver/storage/sqlite3/rooms_table.go | 4 +-- .../storage/sqlite3/state_block_table.go | 4 +-- .../storage/sqlite3/state_snapshot_table.go | 4 +-- roomserver/storage/sqlite3/storage.go | 29 ++++++++-------- .../storage/sqlite3/transactions_table.go | 4 +-- serverkeyapi/storage/sqlite3/keydb.go | 10 ++++-- .../storage/sqlite3/server_key_table.go | 4 +-- syncapi/storage/sqlite3/account_data_table.go | 4 +-- .../sqlite3/backwards_extremities_table.go | 4 +-- .../sqlite3/current_room_state_table.go | 4 +-- syncapi/storage/sqlite3/filter_table.go | 4 +-- syncapi/storage/sqlite3/invites_table.go | 4 +-- .../sqlite3/output_room_events_table.go | 4 +-- .../output_room_events_topology_table.go | 4 +-- .../storage/sqlite3/send_to_device_table.go | 4 +-- syncapi/storage/sqlite3/stream_id_table.go | 4 +-- syncapi/storage/sqlite3/syncserver.go | 22 ++++++------ .../accounts/sqlite3/account_data_table.go | 4 +-- .../accounts/sqlite3/accounts_table.go | 4 +-- .../storage/accounts/sqlite3/profile_table.go | 4 +-- userapi/storage/accounts/sqlite3/storage.go | 14 +++++--- .../accounts/sqlite3/threepid_table.go | 4 +-- .../storage/devices/sqlite3/devices_table.go | 4 +-- userapi/storage/devices/sqlite3/storage.go | 6 ++-- 55 files changed, 193 insertions(+), 154 deletions(-) diff --git a/appservice/storage/sqlite3/appservice_events_table.go b/appservice/storage/sqlite3/appservice_events_table.go index da31f2359..684cfafc6 100644 --- a/appservice/storage/sqlite3/appservice_events_table.go +++ b/appservice/storage/sqlite3/appservice_events_table.go @@ -75,9 +75,9 @@ type eventsStatements struct { deleteEventsBeforeAndIncludingIDStmt *sql.Stmt } -func (s *eventsStatements) prepare(db *sql.DB) (err error) { +func (s *eventsStatements) prepare(db *sql.DB, writer *sqlutil.TransactionWriter) (err error) { s.db = db - s.writer = sqlutil.NewTransactionWriter() + s.writer = writer _, err = db.Exec(appserviceEventsSchema) if err != nil { return diff --git a/appservice/storage/sqlite3/storage.go b/appservice/storage/sqlite3/storage.go index 59af9016d..b2b3df0c7 100644 --- a/appservice/storage/sqlite3/storage.go +++ b/appservice/storage/sqlite3/storage.go @@ -41,7 +41,8 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) { if result.db, err = sqlutil.Open(dbProperties); err != nil { return nil, err } - if err = result.prepare(); err != nil { + writer := sqlutil.NewTransactionWriter() + if err = result.prepare(writer); err != nil { return nil, err } if err = result.PartitionOffsetStatements.Prepare(result.db, "appservice"); err != nil { @@ -50,12 +51,12 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) { return &result, nil } -func (d *Database) prepare() error { - if err := d.events.prepare(d.db); err != nil { +func (d *Database) prepare(writer *sqlutil.TransactionWriter) error { + if err := d.events.prepare(d.db, writer); err != nil { return err } - return d.txnID.prepare(d.db) + return d.txnID.prepare(d.db, writer) } // StoreEvent takes in a gomatrixserverlib.HeaderedEvent and stores it in the database diff --git a/appservice/storage/sqlite3/txn_id_counter_table.go b/appservice/storage/sqlite3/txn_id_counter_table.go index 501ab5aa7..9ed5403c5 100644 --- a/appservice/storage/sqlite3/txn_id_counter_table.go +++ b/appservice/storage/sqlite3/txn_id_counter_table.go @@ -42,9 +42,9 @@ type txnStatements struct { selectTxnIDStmt *sql.Stmt } -func (s *txnStatements) prepare(db *sql.DB) (err error) { +func (s *txnStatements) prepare(db *sql.DB, writer *sqlutil.TransactionWriter) (err error) { s.db = db - s.writer = sqlutil.NewTransactionWriter() + s.writer = writer _, err = db.Exec(txnIDSchema) if err != nil { return diff --git a/currentstateserver/storage/sqlite3/current_room_state_table.go b/currentstateserver/storage/sqlite3/current_room_state_table.go index 5c7e8b0a7..734392550 100644 --- a/currentstateserver/storage/sqlite3/current_room_state_table.go +++ b/currentstateserver/storage/sqlite3/current_room_state_table.go @@ -93,10 +93,10 @@ type currentRoomStateStatements struct { selectKnownUsersStmt *sql.Stmt } -func NewSqliteCurrentRoomStateTable(db *sql.DB) (tables.CurrentRoomState, error) { +func NewSqliteCurrentRoomStateTable(db *sql.DB, writer *sqlutil.TransactionWriter) (tables.CurrentRoomState, error) { s := ¤tRoomStateStatements{ db: db, - writer: sqlutil.NewTransactionWriter(), + writer: writer, } _, err := db.Exec(currentRoomStateSchema) if err != nil { diff --git a/currentstateserver/storage/sqlite3/storage.go b/currentstateserver/storage/sqlite3/storage.go index 4454c9ed7..658397a30 100644 --- a/currentstateserver/storage/sqlite3/storage.go +++ b/currentstateserver/storage/sqlite3/storage.go @@ -22,10 +22,11 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) { if d.db, err = sqlutil.Open(dbProperties); err != nil { return nil, err } + writer := sqlutil.NewTransactionWriter() if err = d.PartitionOffsetStatements.Prepare(d.db, "currentstate"); err != nil { return nil, err } - currRoomState, err := NewSqliteCurrentRoomStateTable(d.db) + currRoomState, err := NewSqliteCurrentRoomStateTable(d.db, writer) if err != nil { return nil, err } diff --git a/federationsender/storage/sqlite3/blacklist_table.go b/federationsender/storage/sqlite3/blacklist_table.go index a14fe0c40..165899c45 100644 --- a/federationsender/storage/sqlite3/blacklist_table.go +++ b/federationsender/storage/sqlite3/blacklist_table.go @@ -48,10 +48,10 @@ type blacklistStatements struct { deleteBlacklistStmt *sql.Stmt } -func NewSQLiteBlacklistTable(db *sql.DB) (s *blacklistStatements, err error) { +func NewSQLiteBlacklistTable(db *sql.DB, writer *sqlutil.TransactionWriter) (s *blacklistStatements, err error) { s = &blacklistStatements{ db: db, - writer: sqlutil.NewTransactionWriter(), + writer: writer, } _, err = db.Exec(blacklistSchema) if err != nil { diff --git a/federationsender/storage/sqlite3/joined_hosts_table.go b/federationsender/storage/sqlite3/joined_hosts_table.go index 53736fa16..bfe81163a 100644 --- a/federationsender/storage/sqlite3/joined_hosts_table.go +++ b/federationsender/storage/sqlite3/joined_hosts_table.go @@ -73,10 +73,10 @@ type joinedHostsStatements struct { // selectJoinedHostsForRoomsStmt *sql.Stmt - prepared at runtime due to variadic } -func NewSQLiteJoinedHostsTable(db *sql.DB) (s *joinedHostsStatements, err error) { +func NewSQLiteJoinedHostsTable(db *sql.DB, writer *sqlutil.TransactionWriter) (s *joinedHostsStatements, err error) { s = &joinedHostsStatements{ db: db, - writer: sqlutil.NewTransactionWriter(), + writer: writer, } _, err = db.Exec(joinedHostsSchema) if err != nil { diff --git a/federationsender/storage/sqlite3/queue_edus_table.go b/federationsender/storage/sqlite3/queue_edus_table.go index cd11a0ea8..632b9f387 100644 --- a/federationsender/storage/sqlite3/queue_edus_table.go +++ b/federationsender/storage/sqlite3/queue_edus_table.go @@ -72,10 +72,10 @@ type queueEDUsStatements struct { selectQueueEDUServerNamesStmt *sql.Stmt } -func NewSQLiteQueueEDUsTable(db *sql.DB) (s *queueEDUsStatements, err error) { +func NewSQLiteQueueEDUsTable(db *sql.DB, writer *sqlutil.TransactionWriter) (s *queueEDUsStatements, err error) { s = &queueEDUsStatements{ db: db, - writer: sqlutil.NewTransactionWriter(), + writer: writer, } _, err = db.Exec(queueEDUsSchema) if err != nil { diff --git a/federationsender/storage/sqlite3/queue_json_table.go b/federationsender/storage/sqlite3/queue_json_table.go index 46dfd9ab1..3130218ff 100644 --- a/federationsender/storage/sqlite3/queue_json_table.go +++ b/federationsender/storage/sqlite3/queue_json_table.go @@ -56,10 +56,10 @@ type queueJSONStatements struct { //selectJSONStmt *sql.Stmt - prepared at runtime due to variadic } -func NewSQLiteQueueJSONTable(db *sql.DB) (s *queueJSONStatements, err error) { +func NewSQLiteQueueJSONTable(db *sql.DB, writer *sqlutil.TransactionWriter) (s *queueJSONStatements, err error) { s = &queueJSONStatements{ db: db, - writer: sqlutil.NewTransactionWriter(), + writer: writer, } _, err = db.Exec(queueJSONSchema) if err != nil { diff --git a/federationsender/storage/sqlite3/queue_pdus_table.go b/federationsender/storage/sqlite3/queue_pdus_table.go index 1474bfc02..54e93bcb4 100644 --- a/federationsender/storage/sqlite3/queue_pdus_table.go +++ b/federationsender/storage/sqlite3/queue_pdus_table.go @@ -81,10 +81,10 @@ type queuePDUsStatements struct { // deleteQueuePDUsStmt *sql.Stmt - prepared at runtime due to variadic } -func NewSQLiteQueuePDUsTable(db *sql.DB) (s *queuePDUsStatements, err error) { +func NewSQLiteQueuePDUsTable(db *sql.DB, writer *sqlutil.TransactionWriter) (s *queuePDUsStatements, err error) { s = &queuePDUsStatements{ db: db, - writer: sqlutil.NewTransactionWriter(), + writer: writer, } _, err = db.Exec(queuePDUsSchema) if err != nil { diff --git a/federationsender/storage/sqlite3/room_table.go b/federationsender/storage/sqlite3/room_table.go index 517938745..21a528365 100644 --- a/federationsender/storage/sqlite3/room_table.go +++ b/federationsender/storage/sqlite3/room_table.go @@ -50,10 +50,10 @@ type roomStatements struct { updateRoomStmt *sql.Stmt } -func NewSQLiteRoomsTable(db *sql.DB) (s *roomStatements, err error) { +func NewSQLiteRoomsTable(db *sql.DB, writer *sqlutil.TransactionWriter) (s *roomStatements, err error) { s = &roomStatements{ db: db, - writer: sqlutil.NewTransactionWriter(), + writer: writer, } _, err = db.Exec(roomSchema) if err != nil { diff --git a/federationsender/storage/sqlite3/storage.go b/federationsender/storage/sqlite3/storage.go index 41b91871e..d0ad8c94a 100644 --- a/federationsender/storage/sqlite3/storage.go +++ b/federationsender/storage/sqlite3/storage.go @@ -39,27 +39,28 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) { if d.db, err = sqlutil.Open(dbProperties); err != nil { return nil, err } - joinedHosts, err := NewSQLiteJoinedHostsTable(d.db) + writer := sqlutil.NewTransactionWriter() + joinedHosts, err := NewSQLiteJoinedHostsTable(d.db, writer) if err != nil { return nil, err } - rooms, err := NewSQLiteRoomsTable(d.db) + rooms, err := NewSQLiteRoomsTable(d.db, writer) if err != nil { return nil, err } - queuePDUs, err := NewSQLiteQueuePDUsTable(d.db) + queuePDUs, err := NewSQLiteQueuePDUsTable(d.db, writer) if err != nil { return nil, err } - queueEDUs, err := NewSQLiteQueueEDUsTable(d.db) + queueEDUs, err := NewSQLiteQueueEDUsTable(d.db, writer) if err != nil { return nil, err } - queueJSON, err := NewSQLiteQueueJSONTable(d.db) + queueJSON, err := NewSQLiteQueueJSONTable(d.db, writer) if err != nil { return nil, err } - blacklist, err := NewSQLiteBlacklistTable(d.db) + blacklist, err := NewSQLiteBlacklistTable(d.db, writer) if err != nil { return nil, err } diff --git a/keyserver/storage/sqlite3/device_keys_table.go b/keyserver/storage/sqlite3/device_keys_table.go index a4d71fe13..505883b4a 100644 --- a/keyserver/storage/sqlite3/device_keys_table.go +++ b/keyserver/storage/sqlite3/device_keys_table.go @@ -71,10 +71,10 @@ type deviceKeysStatements struct { deleteAllDeviceKeysStmt *sql.Stmt } -func NewSqliteDeviceKeysTable(db *sql.DB) (tables.DeviceKeys, error) { +func NewSqliteDeviceKeysTable(db *sql.DB, writer *sqlutil.TransactionWriter) (tables.DeviceKeys, error) { s := &deviceKeysStatements{ db: db, - writer: sqlutil.NewTransactionWriter(), + writer: writer, } _, err := db.Exec(deviceKeysSchema) if err != nil { diff --git a/keyserver/storage/sqlite3/key_changes_table.go b/keyserver/storage/sqlite3/key_changes_table.go index 02b9d193e..ac2913c3b 100644 --- a/keyserver/storage/sqlite3/key_changes_table.go +++ b/keyserver/storage/sqlite3/key_changes_table.go @@ -57,10 +57,10 @@ type keyChangesStatements struct { selectKeyChangesStmt *sql.Stmt } -func NewSqliteKeyChangesTable(db *sql.DB) (tables.KeyChanges, error) { +func NewSqliteKeyChangesTable(db *sql.DB, writer *sqlutil.TransactionWriter) (tables.KeyChanges, error) { s := &keyChangesStatements{ db: db, - writer: sqlutil.NewTransactionWriter(), + writer: writer, } _, err := db.Exec(keyChangesSchema) if err != nil { diff --git a/keyserver/storage/sqlite3/one_time_keys_table.go b/keyserver/storage/sqlite3/one_time_keys_table.go index 907966a7a..f21b3a8ae 100644 --- a/keyserver/storage/sqlite3/one_time_keys_table.go +++ b/keyserver/storage/sqlite3/one_time_keys_table.go @@ -68,10 +68,10 @@ type oneTimeKeysStatements struct { deleteOneTimeKeyStmt *sql.Stmt } -func NewSqliteOneTimeKeysTable(db *sql.DB) (tables.OneTimeKeys, error) { +func NewSqliteOneTimeKeysTable(db *sql.DB, writer *sqlutil.TransactionWriter) (tables.OneTimeKeys, error) { s := &oneTimeKeysStatements{ db: db, - writer: sqlutil.NewTransactionWriter(), + writer: writer, } _, err := db.Exec(oneTimeKeysSchema) if err != nil { diff --git a/keyserver/storage/sqlite3/stale_device_lists.go b/keyserver/storage/sqlite3/stale_device_lists.go index a989476d1..19dcfff04 100644 --- a/keyserver/storage/sqlite3/stale_device_lists.go +++ b/keyserver/storage/sqlite3/stale_device_lists.go @@ -20,6 +20,7 @@ import ( "time" "github.com/matrix-org/dendrite/internal" + "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/keyserver/storage/tables" "github.com/matrix-org/gomatrixserverlib" ) @@ -49,13 +50,18 @@ const selectStaleDeviceListsSQL = "" + "SELECT user_id FROM keyserver_stale_device_lists WHERE is_stale = $1" type staleDeviceListsStatements struct { + db *sql.DB + writer *sqlutil.TransactionWriter upsertStaleDeviceListStmt *sql.Stmt selectStaleDeviceListsWithDomainsStmt *sql.Stmt selectStaleDeviceListsStmt *sql.Stmt } -func NewSqliteStaleDeviceListsTable(db *sql.DB) (tables.StaleDeviceLists, error) { - s := &staleDeviceListsStatements{} +func NewSqliteStaleDeviceListsTable(db *sql.DB, writer *sqlutil.TransactionWriter) (tables.StaleDeviceLists, error) { + s := &staleDeviceListsStatements{ + db: db, + writer: writer, + } _, err := db.Exec(staleDeviceListsSchema) if err != nil { return nil, err @@ -77,8 +83,10 @@ func (s *staleDeviceListsStatements) InsertStaleDeviceList(ctx context.Context, if err != nil { return err } - _, err = s.upsertStaleDeviceListStmt.ExecContext(ctx, userID, string(domain), isStale, time.Now().Unix()) - return err + return s.writer.Do(s.db, nil, func(txn *sql.Tx) error { + _, err = s.upsertStaleDeviceListStmt.ExecContext(ctx, userID, string(domain), isStale, time.Now().Unix()) + return err + }) } func (s *staleDeviceListsStatements) SelectUserIDsWithStaleDeviceLists(ctx context.Context, domains []gomatrixserverlib.ServerName) ([]string, error) { diff --git a/keyserver/storage/sqlite3/storage.go b/keyserver/storage/sqlite3/storage.go index bb2935582..950bd0cfc 100644 --- a/keyserver/storage/sqlite3/storage.go +++ b/keyserver/storage/sqlite3/storage.go @@ -25,19 +25,20 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*shared.Database, error) if err != nil { return nil, err } - otk, err := NewSqliteOneTimeKeysTable(db) + writer := sqlutil.NewTransactionWriter() + otk, err := NewSqliteOneTimeKeysTable(db, writer) if err != nil { return nil, err } - dk, err := NewSqliteDeviceKeysTable(db) + dk, err := NewSqliteDeviceKeysTable(db, writer) if err != nil { return nil, err } - kc, err := NewSqliteKeyChangesTable(db) + kc, err := NewSqliteKeyChangesTable(db, writer) if err != nil { return nil, err } - sdl, err := NewSqliteStaleDeviceListsTable(db) + sdl, err := NewSqliteStaleDeviceListsTable(db, writer) if err != nil { return nil, err } diff --git a/mediaapi/storage/sqlite3/media_repository_table.go b/mediaapi/storage/sqlite3/media_repository_table.go index f53f164d4..ea6839fdc 100644 --- a/mediaapi/storage/sqlite3/media_repository_table.go +++ b/mediaapi/storage/sqlite3/media_repository_table.go @@ -67,9 +67,9 @@ type mediaStatements struct { selectMediaStmt *sql.Stmt } -func (s *mediaStatements) prepare(db *sql.DB) (err error) { +func (s *mediaStatements) prepare(db *sql.DB, writer *sqlutil.TransactionWriter) (err error) { s.db = db - s.writer = sqlutil.NewTransactionWriter() + s.writer = writer _, err = db.Exec(mediaSchema) if err != nil { diff --git a/mediaapi/storage/sqlite3/sql.go b/mediaapi/storage/sqlite3/sql.go index 9cd78b8ee..883495b65 100644 --- a/mediaapi/storage/sqlite3/sql.go +++ b/mediaapi/storage/sqlite3/sql.go @@ -17,6 +17,8 @@ package sqlite3 import ( "database/sql" + + "github.com/matrix-org/dendrite/internal/sqlutil" ) type statements struct { @@ -24,11 +26,11 @@ type statements struct { thumbnail thumbnailStatements } -func (s *statements) prepare(db *sql.DB) (err error) { - if err = s.media.prepare(db); err != nil { +func (s *statements) prepare(db *sql.DB, writer *sqlutil.TransactionWriter) (err error) { + if err = s.media.prepare(db, writer); err != nil { return } - if err = s.thumbnail.prepare(db); err != nil { + if err = s.thumbnail.prepare(db, writer); err != nil { return } diff --git a/mediaapi/storage/sqlite3/storage.go b/mediaapi/storage/sqlite3/storage.go index a1e7fec7d..f275b1da3 100644 --- a/mediaapi/storage/sqlite3/storage.go +++ b/mediaapi/storage/sqlite3/storage.go @@ -31,6 +31,7 @@ import ( type Database struct { statements statements db *sql.DB + writer *sqlutil.TransactionWriter } // Open opens a postgres database. @@ -40,7 +41,8 @@ func Open(dbProperties *config.DatabaseOptions) (*Database, error) { if d.db, err = sqlutil.Open(dbProperties); err != nil { return nil, err } - if err = d.statements.prepare(d.db); err != nil { + d.writer = sqlutil.NewTransactionWriter() + if err = d.statements.prepare(d.db, d.writer); err != nil { return nil, err } return &d, nil diff --git a/mediaapi/storage/sqlite3/thumbnail_table.go b/mediaapi/storage/sqlite3/thumbnail_table.go index 432a1590c..dd924a6cf 100644 --- a/mediaapi/storage/sqlite3/thumbnail_table.go +++ b/mediaapi/storage/sqlite3/thumbnail_table.go @@ -21,6 +21,7 @@ import ( "time" "github.com/matrix-org/dendrite/internal" + "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/mediaapi/types" "github.com/matrix-org/gomatrixserverlib" ) @@ -57,12 +58,16 @@ SELECT content_type, file_size_bytes, creation_ts, width, height, resize_method ` type thumbnailStatements struct { + db *sql.DB + writer *sqlutil.TransactionWriter insertThumbnailStmt *sql.Stmt selectThumbnailStmt *sql.Stmt selectThumbnailsStmt *sql.Stmt } -func (s *thumbnailStatements) prepare(db *sql.DB) (err error) { +func (s *thumbnailStatements) prepare(db *sql.DB, writer *sqlutil.TransactionWriter) (err error) { + s.db = db + s.writer = writer _, err = db.Exec(thumbnailSchema) if err != nil { return @@ -79,18 +84,21 @@ func (s *thumbnailStatements) insertThumbnail( ctx context.Context, thumbnailMetadata *types.ThumbnailMetadata, ) error { thumbnailMetadata.MediaMetadata.CreationTimestamp = types.UnixMs(time.Now().UnixNano() / 1000000) - _, err := s.insertThumbnailStmt.ExecContext( - ctx, - thumbnailMetadata.MediaMetadata.MediaID, - thumbnailMetadata.MediaMetadata.Origin, - thumbnailMetadata.MediaMetadata.ContentType, - thumbnailMetadata.MediaMetadata.FileSizeBytes, - thumbnailMetadata.MediaMetadata.CreationTimestamp, - thumbnailMetadata.ThumbnailSize.Width, - thumbnailMetadata.ThumbnailSize.Height, - thumbnailMetadata.ThumbnailSize.ResizeMethod, - ) - return err + return s.writer.Do(s.db, nil, func(txn *sql.Tx) error { + stmt := sqlutil.TxStmt(txn, s.insertThumbnailStmt) + _, err := stmt.ExecContext( + ctx, + thumbnailMetadata.MediaMetadata.MediaID, + thumbnailMetadata.MediaMetadata.Origin, + thumbnailMetadata.MediaMetadata.ContentType, + thumbnailMetadata.MediaMetadata.FileSizeBytes, + thumbnailMetadata.MediaMetadata.CreationTimestamp, + thumbnailMetadata.ThumbnailSize.Width, + thumbnailMetadata.ThumbnailSize.Height, + thumbnailMetadata.ThumbnailSize.ResizeMethod, + ) + return err + }) } func (s *thumbnailStatements) selectThumbnail( diff --git a/roomserver/storage/sqlite3/event_json_table.go b/roomserver/storage/sqlite3/event_json_table.go index 64795d024..e8118ad76 100644 --- a/roomserver/storage/sqlite3/event_json_table.go +++ b/roomserver/storage/sqlite3/event_json_table.go @@ -54,10 +54,10 @@ type eventJSONStatements struct { bulkSelectEventJSONStmt *sql.Stmt } -func NewSqliteEventJSONTable(db *sql.DB) (tables.EventJSON, error) { +func NewSqliteEventJSONTable(db *sql.DB, writer *sqlutil.TransactionWriter) (tables.EventJSON, error) { s := &eventJSONStatements{ db: db, - writer: sqlutil.NewTransactionWriter(), + writer: writer, } _, err := db.Exec(eventJSONSchema) if err != nil { diff --git a/roomserver/storage/sqlite3/event_state_keys_table.go b/roomserver/storage/sqlite3/event_state_keys_table.go index 3e9f2e613..c8ad052bf 100644 --- a/roomserver/storage/sqlite3/event_state_keys_table.go +++ b/roomserver/storage/sqlite3/event_state_keys_table.go @@ -71,10 +71,10 @@ type eventStateKeyStatements struct { bulkSelectEventStateKeyStmt *sql.Stmt } -func NewSqliteEventStateKeysTable(db *sql.DB) (tables.EventStateKeys, error) { +func NewSqliteEventStateKeysTable(db *sql.DB, writer *sqlutil.TransactionWriter) (tables.EventStateKeys, error) { s := &eventStateKeyStatements{ db: db, - writer: sqlutil.NewTransactionWriter(), + writer: writer, } _, err := db.Exec(eventStateKeysSchema) if err != nil { diff --git a/roomserver/storage/sqlite3/event_types_table.go b/roomserver/storage/sqlite3/event_types_table.go index fd4a2e42f..4a645789d 100644 --- a/roomserver/storage/sqlite3/event_types_table.go +++ b/roomserver/storage/sqlite3/event_types_table.go @@ -85,10 +85,10 @@ type eventTypeStatements struct { bulkSelectEventTypeNIDStmt *sql.Stmt } -func NewSqliteEventTypesTable(db *sql.DB) (tables.EventTypes, error) { +func NewSqliteEventTypesTable(db *sql.DB, writer *sqlutil.TransactionWriter) (tables.EventTypes, error) { s := &eventTypeStatements{ db: db, - writer: sqlutil.NewTransactionWriter(), + writer: writer, } _, err := db.Exec(eventTypesSchema) if err != nil { diff --git a/roomserver/storage/sqlite3/events_table.go b/roomserver/storage/sqlite3/events_table.go index b3cfee07e..3ac30ca3d 100644 --- a/roomserver/storage/sqlite3/events_table.go +++ b/roomserver/storage/sqlite3/events_table.go @@ -115,10 +115,10 @@ type eventStatements struct { selectRoomNIDForEventNIDStmt *sql.Stmt } -func NewSqliteEventsTable(db *sql.DB) (tables.Events, error) { +func NewSqliteEventsTable(db *sql.DB, writer *sqlutil.TransactionWriter) (tables.Events, error) { s := &eventStatements{ db: db, - writer: sqlutil.NewTransactionWriter(), + writer: writer, } _, err := db.Exec(eventsSchema) if err != nil { diff --git a/roomserver/storage/sqlite3/invite_table.go b/roomserver/storage/sqlite3/invite_table.go index e806eab6d..1305f4a8a 100644 --- a/roomserver/storage/sqlite3/invite_table.go +++ b/roomserver/storage/sqlite3/invite_table.go @@ -71,10 +71,10 @@ type inviteStatements struct { selectInvitesAboutToRetireStmt *sql.Stmt } -func NewSqliteInvitesTable(db *sql.DB) (tables.Invites, error) { +func NewSqliteInvitesTable(db *sql.DB, writer *sqlutil.TransactionWriter) (tables.Invites, error) { s := &inviteStatements{ db: db, - writer: sqlutil.NewTransactionWriter(), + writer: writer, } _, err := db.Exec(inviteSchema) if err != nil { @@ -124,7 +124,7 @@ func (s *inviteStatements) UpdateInviteRetired( if err != nil { return err } - defer (func() { err = rows.Close() })() + defer internal.CloseAndLogIfError(ctx, rows, "UpdateInviteRetired: rows.close() failed") for rows.Next() { var inviteEventID string if err = rows.Scan(&inviteEventID); err != nil { diff --git a/roomserver/storage/sqlite3/membership_table.go b/roomserver/storage/sqlite3/membership_table.go index 6dd8bd83f..7b69cee32 100644 --- a/roomserver/storage/sqlite3/membership_table.go +++ b/roomserver/storage/sqlite3/membership_table.go @@ -88,10 +88,10 @@ type membershipStatements struct { updateMembershipStmt *sql.Stmt } -func NewSqliteMembershipTable(db *sql.DB) (tables.Membership, error) { +func NewSqliteMembershipTable(db *sql.DB, writer *sqlutil.TransactionWriter) (tables.Membership, error) { s := &membershipStatements{ db: db, - writer: sqlutil.NewTransactionWriter(), + writer: writer, } _, err := db.Exec(membershipSchema) if err != nil { diff --git a/roomserver/storage/sqlite3/previous_events_table.go b/roomserver/storage/sqlite3/previous_events_table.go index 28b5d18f0..ff804861c 100644 --- a/roomserver/storage/sqlite3/previous_events_table.go +++ b/roomserver/storage/sqlite3/previous_events_table.go @@ -59,10 +59,10 @@ type previousEventStatements struct { selectPreviousEventExistsStmt *sql.Stmt } -func NewSqlitePrevEventsTable(db *sql.DB) (tables.PreviousEvents, error) { +func NewSqlitePrevEventsTable(db *sql.DB, writer *sqlutil.TransactionWriter) (tables.PreviousEvents, error) { s := &previousEventStatements{ db: db, - writer: sqlutil.NewTransactionWriter(), + writer: writer, } _, err := db.Exec(previousEventSchema) if err != nil { diff --git a/roomserver/storage/sqlite3/published_table.go b/roomserver/storage/sqlite3/published_table.go index 85f1e0a49..a4a47aec9 100644 --- a/roomserver/storage/sqlite3/published_table.go +++ b/roomserver/storage/sqlite3/published_table.go @@ -51,10 +51,10 @@ type publishedStatements struct { selectPublishedStmt *sql.Stmt } -func NewSqlitePublishedTable(db *sql.DB) (tables.Published, error) { +func NewSqlitePublishedTable(db *sql.DB, writer *sqlutil.TransactionWriter) (tables.Published, error) { s := &publishedStatements{ db: db, - writer: sqlutil.NewTransactionWriter(), + writer: writer, } _, err := db.Exec(publishedSchema) if err != nil { diff --git a/roomserver/storage/sqlite3/redactions_table.go b/roomserver/storage/sqlite3/redactions_table.go index d2bd2a204..ad900a4ec 100644 --- a/roomserver/storage/sqlite3/redactions_table.go +++ b/roomserver/storage/sqlite3/redactions_table.go @@ -60,10 +60,10 @@ type redactionStatements struct { markRedactionValidatedStmt *sql.Stmt } -func NewSqliteRedactionsTable(db *sql.DB) (tables.Redactions, error) { +func NewSqliteRedactionsTable(db *sql.DB, writer *sqlutil.TransactionWriter) (tables.Redactions, error) { s := &redactionStatements{ db: db, - writer: sqlutil.NewTransactionWriter(), + writer: writer, } _, err := db.Exec(redactionsSchema) if err != nil { diff --git a/roomserver/storage/sqlite3/room_aliases_table.go b/roomserver/storage/sqlite3/room_aliases_table.go index 4a5357776..deba3ff55 100644 --- a/roomserver/storage/sqlite3/room_aliases_table.go +++ b/roomserver/storage/sqlite3/room_aliases_table.go @@ -65,10 +65,10 @@ type roomAliasesStatements struct { deleteRoomAliasStmt *sql.Stmt } -func NewSqliteRoomAliasesTable(db *sql.DB) (tables.RoomAliases, error) { +func NewSqliteRoomAliasesTable(db *sql.DB, writer *sqlutil.TransactionWriter) (tables.RoomAliases, error) { s := &roomAliasesStatements{ db: db, - writer: sqlutil.NewTransactionWriter(), + writer: writer, } _, err := db.Exec(roomAliasesSchema) if err != nil { diff --git a/roomserver/storage/sqlite3/rooms_table.go b/roomserver/storage/sqlite3/rooms_table.go index bb30a63b3..8bbec5080 100644 --- a/roomserver/storage/sqlite3/rooms_table.go +++ b/roomserver/storage/sqlite3/rooms_table.go @@ -76,10 +76,10 @@ type roomStatements struct { selectRoomVersionForRoomNIDStmt *sql.Stmt } -func NewSqliteRoomsTable(db *sql.DB) (tables.Rooms, error) { +func NewSqliteRoomsTable(db *sql.DB, writer *sqlutil.TransactionWriter) (tables.Rooms, error) { s := &roomStatements{ db: db, - writer: sqlutil.NewTransactionWriter(), + writer: writer, } _, err := db.Exec(roomsSchema) if err != nil { diff --git a/roomserver/storage/sqlite3/state_block_table.go b/roomserver/storage/sqlite3/state_block_table.go index 3d716b642..3e28e450b 100644 --- a/roomserver/storage/sqlite3/state_block_table.go +++ b/roomserver/storage/sqlite3/state_block_table.go @@ -81,10 +81,10 @@ type stateBlockStatements struct { bulkSelectFilteredStateBlockEntriesStmt *sql.Stmt } -func NewSqliteStateBlockTable(db *sql.DB) (tables.StateBlock, error) { +func NewSqliteStateBlockTable(db *sql.DB, writer *sqlutil.TransactionWriter) (tables.StateBlock, error) { s := &stateBlockStatements{ db: db, - writer: sqlutil.NewTransactionWriter(), + writer: writer, } _, err := db.Exec(stateDataSchema) if err != nil { diff --git a/roomserver/storage/sqlite3/state_snapshot_table.go b/roomserver/storage/sqlite3/state_snapshot_table.go index 48f1210be..799904ff6 100644 --- a/roomserver/storage/sqlite3/state_snapshot_table.go +++ b/roomserver/storage/sqlite3/state_snapshot_table.go @@ -55,10 +55,10 @@ type stateSnapshotStatements struct { bulkSelectStateBlockNIDsStmt *sql.Stmt } -func NewSqliteStateSnapshotTable(db *sql.DB) (tables.StateSnapshot, error) { +func NewSqliteStateSnapshotTable(db *sql.DB, writer *sqlutil.TransactionWriter) (tables.StateSnapshot, error) { s := &stateSnapshotStatements{ db: db, - writer: sqlutil.NewTransactionWriter(), + writer: writer, } _, err := db.Exec(stateSnapshotSchema) if err != nil { diff --git a/roomserver/storage/sqlite3/storage.go b/roomserver/storage/sqlite3/storage.go index 048de1928..ae3140d7d 100644 --- a/roomserver/storage/sqlite3/storage.go +++ b/roomserver/storage/sqlite3/storage.go @@ -51,6 +51,7 @@ func Open(dbProperties *config.DatabaseOptions) (*Database, error) { if d.db, err = sqlutil.Open(dbProperties); err != nil { return nil, err } + writer := sqlutil.NewTransactionWriter() //d.db.Exec("PRAGMA journal_mode=WAL;") //d.db.Exec("PRAGMA read_uncommitted = true;") @@ -60,59 +61,59 @@ func Open(dbProperties *config.DatabaseOptions) (*Database, error) { // which it will never obtain. d.db.SetMaxOpenConns(20) - d.eventStateKeys, err = NewSqliteEventStateKeysTable(d.db) + d.eventStateKeys, err = NewSqliteEventStateKeysTable(d.db, writer) if err != nil { return nil, err } - d.eventTypes, err = NewSqliteEventTypesTable(d.db) + d.eventTypes, err = NewSqliteEventTypesTable(d.db, writer) if err != nil { return nil, err } - d.eventJSON, err = NewSqliteEventJSONTable(d.db) + d.eventJSON, err = NewSqliteEventJSONTable(d.db, writer) if err != nil { return nil, err } - d.events, err = NewSqliteEventsTable(d.db) + d.events, err = NewSqliteEventsTable(d.db, writer) if err != nil { return nil, err } - d.rooms, err = NewSqliteRoomsTable(d.db) + d.rooms, err = NewSqliteRoomsTable(d.db, writer) if err != nil { return nil, err } - d.transactions, err = NewSqliteTransactionsTable(d.db) + d.transactions, err = NewSqliteTransactionsTable(d.db, writer) if err != nil { return nil, err } - stateBlock, err := NewSqliteStateBlockTable(d.db) + stateBlock, err := NewSqliteStateBlockTable(d.db, writer) if err != nil { return nil, err } - stateSnapshot, err := NewSqliteStateSnapshotTable(d.db) + stateSnapshot, err := NewSqliteStateSnapshotTable(d.db, writer) if err != nil { return nil, err } - d.prevEvents, err = NewSqlitePrevEventsTable(d.db) + d.prevEvents, err = NewSqlitePrevEventsTable(d.db, writer) if err != nil { return nil, err } - roomAliases, err := NewSqliteRoomAliasesTable(d.db) + roomAliases, err := NewSqliteRoomAliasesTable(d.db, writer) if err != nil { return nil, err } - d.invites, err = NewSqliteInvitesTable(d.db) + d.invites, err = NewSqliteInvitesTable(d.db, writer) if err != nil { return nil, err } - d.membership, err = NewSqliteMembershipTable(d.db) + d.membership, err = NewSqliteMembershipTable(d.db, writer) if err != nil { return nil, err } - published, err := NewSqlitePublishedTable(d.db) + published, err := NewSqlitePublishedTable(d.db, writer) if err != nil { return nil, err } - redactions, err := NewSqliteRedactionsTable(d.db) + redactions, err := NewSqliteRedactionsTable(d.db, writer) if err != nil { return nil, err } diff --git a/roomserver/storage/sqlite3/transactions_table.go b/roomserver/storage/sqlite3/transactions_table.go index 2f6cff95a..65c18a8a9 100644 --- a/roomserver/storage/sqlite3/transactions_table.go +++ b/roomserver/storage/sqlite3/transactions_table.go @@ -50,10 +50,10 @@ type transactionStatements struct { selectTransactionEventIDStmt *sql.Stmt } -func NewSqliteTransactionsTable(db *sql.DB) (tables.Transactions, error) { +func NewSqliteTransactionsTable(db *sql.DB, writer *sqlutil.TransactionWriter) (tables.Transactions, error) { s := &transactionStatements{ db: db, - writer: sqlutil.NewTransactionWriter(), + writer: writer, } _, err := db.Exec(transactionsSchema) if err != nil { diff --git a/serverkeyapi/storage/sqlite3/keydb.go b/serverkeyapi/storage/sqlite3/keydb.go index 5174ece15..fbd109e9b 100644 --- a/serverkeyapi/storage/sqlite3/keydb.go +++ b/serverkeyapi/storage/sqlite3/keydb.go @@ -17,6 +17,7 @@ package sqlite3 import ( "context" + "database/sql" "golang.org/x/crypto/ed25519" @@ -30,6 +31,8 @@ import ( // A Database implements gomatrixserverlib.KeyDatabase and is used to store // the public keys for other matrix servers. type Database struct { + db *sql.DB + writer *sqlutil.TransactionWriter statements serverKeyStatements } @@ -47,8 +50,11 @@ func NewDatabase( if err != nil { return nil, err } - d := &Database{} - err = d.statements.prepare(db) + d := &Database{ + db: db, + writer: sqlutil.NewTransactionWriter(), + } + err = d.statements.prepare(d.db, d.writer) if err != nil { return nil, err } diff --git a/serverkeyapi/storage/sqlite3/server_key_table.go b/serverkeyapi/storage/sqlite3/server_key_table.go index 423292a54..022ce407e 100644 --- a/serverkeyapi/storage/sqlite3/server_key_table.go +++ b/serverkeyapi/storage/sqlite3/server_key_table.go @@ -68,9 +68,9 @@ type serverKeyStatements struct { upsertServerKeysStmt *sql.Stmt } -func (s *serverKeyStatements) prepare(db *sql.DB) (err error) { +func (s *serverKeyStatements) prepare(db *sql.DB, writer *sqlutil.TransactionWriter) (err error) { s.db = db - s.writer = sqlutil.NewTransactionWriter() + s.writer = writer _, err = db.Exec(serverKeysSchema) if err != nil { return diff --git a/syncapi/storage/sqlite3/account_data_table.go b/syncapi/storage/sqlite3/account_data_table.go index 609cef141..289bd7765 100644 --- a/syncapi/storage/sqlite3/account_data_table.go +++ b/syncapi/storage/sqlite3/account_data_table.go @@ -58,10 +58,10 @@ type accountDataStatements struct { selectAccountDataInRangeStmt *sql.Stmt } -func NewSqliteAccountDataTable(db *sql.DB, streamID *streamIDStatements) (tables.AccountData, error) { +func NewSqliteAccountDataTable(db *sql.DB, writer *sqlutil.TransactionWriter, streamID *streamIDStatements) (tables.AccountData, error) { s := &accountDataStatements{ db: db, - writer: sqlutil.NewTransactionWriter(), + writer: writer, streamIDStatements: streamID, } _, err := db.Exec(accountDataSchema) diff --git a/syncapi/storage/sqlite3/backwards_extremities_table.go b/syncapi/storage/sqlite3/backwards_extremities_table.go index 1aeb041f4..3260b10c1 100644 --- a/syncapi/storage/sqlite3/backwards_extremities_table.go +++ b/syncapi/storage/sqlite3/backwards_extremities_table.go @@ -55,10 +55,10 @@ type backwardExtremitiesStatements struct { deleteBackwardExtremityStmt *sql.Stmt } -func NewSqliteBackwardsExtremitiesTable(db *sql.DB) (tables.BackwardsExtremities, error) { +func NewSqliteBackwardsExtremitiesTable(db *sql.DB, writer *sqlutil.TransactionWriter) (tables.BackwardsExtremities, error) { s := &backwardExtremitiesStatements{ db: db, - writer: sqlutil.NewTransactionWriter(), + writer: writer, } _, err := db.Exec(backwardExtremitiesSchema) if err != nil { diff --git a/syncapi/storage/sqlite3/current_room_state_table.go b/syncapi/storage/sqlite3/current_room_state_table.go index 6edc99aa0..4d70d9d02 100644 --- a/syncapi/storage/sqlite3/current_room_state_table.go +++ b/syncapi/storage/sqlite3/current_room_state_table.go @@ -95,10 +95,10 @@ type currentRoomStateStatements struct { selectStateEventStmt *sql.Stmt } -func NewSqliteCurrentRoomStateTable(db *sql.DB, streamID *streamIDStatements) (tables.CurrentRoomState, error) { +func NewSqliteCurrentRoomStateTable(db *sql.DB, writer *sqlutil.TransactionWriter, streamID *streamIDStatements) (tables.CurrentRoomState, error) { s := ¤tRoomStateStatements{ db: db, - writer: sqlutil.NewTransactionWriter(), + writer: writer, streamIDStatements: streamID, } _, err := db.Exec(currentRoomStateSchema) diff --git a/syncapi/storage/sqlite3/filter_table.go b/syncapi/storage/sqlite3/filter_table.go index 3e8a46551..7ed8b9118 100644 --- a/syncapi/storage/sqlite3/filter_table.go +++ b/syncapi/storage/sqlite3/filter_table.go @@ -58,14 +58,14 @@ type filterStatements struct { insertFilterStmt *sql.Stmt } -func NewSqliteFilterTable(db *sql.DB) (tables.Filter, error) { +func NewSqliteFilterTable(db *sql.DB, writer *sqlutil.TransactionWriter) (tables.Filter, error) { _, err := db.Exec(filterSchema) if err != nil { return nil, err } s := &filterStatements{ db: db, - writer: sqlutil.NewTransactionWriter(), + writer: writer, } if s.selectFilterStmt, err = db.Prepare(selectFilterSQL); err != nil { return nil, err diff --git a/syncapi/storage/sqlite3/invites_table.go b/syncapi/storage/sqlite3/invites_table.go index 19e7a7c68..31cf50af9 100644 --- a/syncapi/storage/sqlite3/invites_table.go +++ b/syncapi/storage/sqlite3/invites_table.go @@ -67,10 +67,10 @@ type inviteEventsStatements struct { selectMaxInviteIDStmt *sql.Stmt } -func NewSqliteInvitesTable(db *sql.DB, streamID *streamIDStatements) (tables.Invites, error) { +func NewSqliteInvitesTable(db *sql.DB, writer *sqlutil.TransactionWriter, streamID *streamIDStatements) (tables.Invites, error) { s := &inviteEventsStatements{ db: db, - writer: sqlutil.NewTransactionWriter(), + writer: writer, streamIDStatements: streamID, } _, err := db.Exec(inviteEventsSchema) diff --git a/syncapi/storage/sqlite3/output_room_events_table.go b/syncapi/storage/sqlite3/output_room_events_table.go index 12b4dbabe..33b62671e 100644 --- a/syncapi/storage/sqlite3/output_room_events_table.go +++ b/syncapi/storage/sqlite3/output_room_events_table.go @@ -117,10 +117,10 @@ type outputRoomEventsStatements struct { updateEventJSONStmt *sql.Stmt } -func NewSqliteEventsTable(db *sql.DB, streamID *streamIDStatements) (tables.Events, error) { +func NewSqliteEventsTable(db *sql.DB, writer *sqlutil.TransactionWriter, streamID *streamIDStatements) (tables.Events, error) { s := &outputRoomEventsStatements{ db: db, - writer: sqlutil.NewTransactionWriter(), + writer: writer, streamIDStatements: streamID, } _, err := db.Exec(outputRoomEventsSchema) diff --git a/syncapi/storage/sqlite3/output_room_events_topology_table.go b/syncapi/storage/sqlite3/output_room_events_topology_table.go index 2e71e8f33..ee0141e57 100644 --- a/syncapi/storage/sqlite3/output_room_events_topology_table.go +++ b/syncapi/storage/sqlite3/output_room_events_topology_table.go @@ -75,10 +75,10 @@ type outputRoomEventsTopologyStatements struct { selectMaxPositionInTopologyStmt *sql.Stmt } -func NewSqliteTopologyTable(db *sql.DB) (tables.Topology, error) { +func NewSqliteTopologyTable(db *sql.DB, writer *sqlutil.TransactionWriter) (tables.Topology, error) { s := &outputRoomEventsTopologyStatements{ db: db, - writer: sqlutil.NewTransactionWriter(), + writer: writer, } _, err := db.Exec(outputRoomEventsTopologySchema) if err != nil { diff --git a/syncapi/storage/sqlite3/send_to_device_table.go b/syncapi/storage/sqlite3/send_to_device_table.go index 88b319fb3..74c8893c5 100644 --- a/syncapi/storage/sqlite3/send_to_device_table.go +++ b/syncapi/storage/sqlite3/send_to_device_table.go @@ -79,10 +79,10 @@ type sendToDeviceStatements struct { countSendToDeviceMessagesStmt *sql.Stmt } -func NewSqliteSendToDeviceTable(db *sql.DB) (tables.SendToDevice, error) { +func NewSqliteSendToDeviceTable(db *sql.DB, writer *sqlutil.TransactionWriter) (tables.SendToDevice, error) { s := &sendToDeviceStatements{ db: db, - writer: sqlutil.NewTransactionWriter(), + writer: writer, } _, err := db.Exec(sendToDeviceSchema) if err != nil { diff --git a/syncapi/storage/sqlite3/stream_id_table.go b/syncapi/storage/sqlite3/stream_id_table.go index cf3eed5ba..51d29d0ef 100644 --- a/syncapi/storage/sqlite3/stream_id_table.go +++ b/syncapi/storage/sqlite3/stream_id_table.go @@ -33,9 +33,9 @@ type streamIDStatements struct { selectStreamIDStmt *sql.Stmt } -func (s *streamIDStatements) prepare(db *sql.DB) (err error) { +func (s *streamIDStatements) prepare(db *sql.DB, writer *sqlutil.TransactionWriter) (err error) { s.db = db - s.writer = sqlutil.NewTransactionWriter() + s.writer = writer _, err = db.Exec(streamIDTableSchema) if err != nil { return diff --git a/syncapi/storage/sqlite3/syncserver.go b/syncapi/storage/sqlite3/syncserver.go index 9564a23aa..31c9e6589 100644 --- a/syncapi/storage/sqlite3/syncserver.go +++ b/syncapi/storage/sqlite3/syncserver.go @@ -31,7 +31,8 @@ import ( // both the database for PDUs and caches for EDUs. type SyncServerDatasource struct { shared.Database - db *sql.DB + db *sql.DB + writer *sqlutil.TransactionWriter sqlutil.PartitionOffsetStatements streamID streamIDStatements } @@ -44,6 +45,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e if d.db, err = sqlutil.Open(dbProperties); err != nil { return nil, err } + d.writer = sqlutil.NewTransactionWriter() if err = d.prepare(); err != nil { return nil, err } @@ -54,38 +56,38 @@ func (d *SyncServerDatasource) prepare() (err error) { if err = d.PartitionOffsetStatements.Prepare(d.db, "syncapi"); err != nil { return err } - if err = d.streamID.prepare(d.db); err != nil { + if err = d.streamID.prepare(d.db, d.writer); err != nil { return err } - accountData, err := NewSqliteAccountDataTable(d.db, &d.streamID) + accountData, err := NewSqliteAccountDataTable(d.db, d.writer, &d.streamID) if err != nil { return err } - events, err := NewSqliteEventsTable(d.db, &d.streamID) + events, err := NewSqliteEventsTable(d.db, d.writer, &d.streamID) if err != nil { return err } - roomState, err := NewSqliteCurrentRoomStateTable(d.db, &d.streamID) + roomState, err := NewSqliteCurrentRoomStateTable(d.db, d.writer, &d.streamID) if err != nil { return err } - invites, err := NewSqliteInvitesTable(d.db, &d.streamID) + invites, err := NewSqliteInvitesTable(d.db, d.writer, &d.streamID) if err != nil { return err } - topology, err := NewSqliteTopologyTable(d.db) + topology, err := NewSqliteTopologyTable(d.db, d.writer) if err != nil { return err } - bwExtrem, err := NewSqliteBackwardsExtremitiesTable(d.db) + bwExtrem, err := NewSqliteBackwardsExtremitiesTable(d.db, d.writer) if err != nil { return err } - sendToDevice, err := NewSqliteSendToDeviceTable(d.db) + sendToDevice, err := NewSqliteSendToDeviceTable(d.db, d.writer) if err != nil { return err } - filter, err := NewSqliteFilterTable(d.db) + filter, err := NewSqliteFilterTable(d.db, d.writer) if err != nil { return err } diff --git a/userapi/storage/accounts/sqlite3/account_data_table.go b/userapi/storage/accounts/sqlite3/account_data_table.go index cb54412ab..1a3eb1369 100644 --- a/userapi/storage/accounts/sqlite3/account_data_table.go +++ b/userapi/storage/accounts/sqlite3/account_data_table.go @@ -57,9 +57,9 @@ type accountDataStatements struct { selectAccountDataByTypeStmt *sql.Stmt } -func (s *accountDataStatements) prepare(db *sql.DB) (err error) { +func (s *accountDataStatements) prepare(db *sql.DB, writer *sqlutil.TransactionWriter) (err error) { s.db = db - s.writer = sqlutil.NewTransactionWriter() + s.writer = writer _, err = db.Exec(accountDataSchema) if err != nil { return diff --git a/userapi/storage/accounts/sqlite3/accounts_table.go b/userapi/storage/accounts/sqlite3/accounts_table.go index 27c3d845a..3cb15b32b 100644 --- a/userapi/storage/accounts/sqlite3/accounts_table.go +++ b/userapi/storage/accounts/sqlite3/accounts_table.go @@ -67,9 +67,9 @@ type accountsStatements struct { serverName gomatrixserverlib.ServerName } -func (s *accountsStatements) prepare(db *sql.DB, server gomatrixserverlib.ServerName) (err error) { +func (s *accountsStatements) prepare(db *sql.DB, writer *sqlutil.TransactionWriter, server gomatrixserverlib.ServerName) (err error) { s.db = db - s.writer = sqlutil.NewTransactionWriter() + s.writer = writer _, err = db.Exec(accountsSchema) if err != nil { return diff --git a/userapi/storage/accounts/sqlite3/profile_table.go b/userapi/storage/accounts/sqlite3/profile_table.go index d4c404ca3..c8cca2bab 100644 --- a/userapi/storage/accounts/sqlite3/profile_table.go +++ b/userapi/storage/accounts/sqlite3/profile_table.go @@ -61,9 +61,9 @@ type profilesStatements struct { selectProfilesBySearchStmt *sql.Stmt } -func (s *profilesStatements) prepare(db *sql.DB) (err error) { +func (s *profilesStatements) prepare(db *sql.DB, writer *sqlutil.TransactionWriter) (err error) { s.db = db - s.writer = sqlutil.NewTransactionWriter() + s.writer = writer _, err = db.Exec(profilesSchema) if err != nil { return diff --git a/userapi/storage/accounts/sqlite3/storage.go b/userapi/storage/accounts/sqlite3/storage.go index 4d2c5e51d..16b8f5e6e 100644 --- a/userapi/storage/accounts/sqlite3/storage.go +++ b/userapi/storage/accounts/sqlite3/storage.go @@ -57,20 +57,24 @@ func NewDatabase(dbProperties *config.DatabaseOptions, serverName gomatrixserver if err = partitions.Prepare(db, "account"); err != nil { return nil, err } - a := accountsStatements{} - if err = a.prepare(db, serverName); err != nil { + writer := sqlutil.NewTransactionWriter() + a := accountsStatements{ + db: db, + writer: writer, + } + if err = a.prepare(db, writer, serverName); err != nil { return nil, err } p := profilesStatements{} - if err = p.prepare(db); err != nil { + if err = p.prepare(db, writer); err != nil { return nil, err } ac := accountDataStatements{} - if err = ac.prepare(db); err != nil { + if err = ac.prepare(db, writer); err != nil { return nil, err } t := threepidStatements{} - if err = t.prepare(db); err != nil { + if err = t.prepare(db, writer); err != nil { return nil, err } return &Database{ diff --git a/userapi/storage/accounts/sqlite3/threepid_table.go b/userapi/storage/accounts/sqlite3/threepid_table.go index 0104e8346..ae4de0da1 100644 --- a/userapi/storage/accounts/sqlite3/threepid_table.go +++ b/userapi/storage/accounts/sqlite3/threepid_table.go @@ -61,9 +61,9 @@ type threepidStatements struct { deleteThreePIDStmt *sql.Stmt } -func (s *threepidStatements) prepare(db *sql.DB) (err error) { +func (s *threepidStatements) prepare(db *sql.DB, writer *sqlutil.TransactionWriter) (err error) { s.db = db - s.writer = sqlutil.NewTransactionWriter() + s.writer = writer _, err = db.Exec(threepidSchema) if err != nil { return diff --git a/userapi/storage/devices/sqlite3/devices_table.go b/userapi/storage/devices/sqlite3/devices_table.go index 9b535aab9..903d5219d 100644 --- a/userapi/storage/devices/sqlite3/devices_table.go +++ b/userapi/storage/devices/sqlite3/devices_table.go @@ -91,9 +91,9 @@ type devicesStatements struct { serverName gomatrixserverlib.ServerName } -func (s *devicesStatements) prepare(db *sql.DB, server gomatrixserverlib.ServerName) (err error) { +func (s *devicesStatements) prepare(db *sql.DB, writer *sqlutil.TransactionWriter, server gomatrixserverlib.ServerName) (err error) { s.db = db - s.writer = sqlutil.NewTransactionWriter() + s.writer = writer _, err = db.Exec(devicesSchema) if err != nil { return diff --git a/userapi/storage/devices/sqlite3/storage.go b/userapi/storage/devices/sqlite3/storage.go index 1f2b59f30..42e530a54 100644 --- a/userapi/storage/devices/sqlite3/storage.go +++ b/userapi/storage/devices/sqlite3/storage.go @@ -34,6 +34,7 @@ var deviceIDByteLength = 6 // Database represents a device database. type Database struct { db *sql.DB + writer *sqlutil.TransactionWriter devices devicesStatements } @@ -43,11 +44,12 @@ func NewDatabase(dbProperties *config.DatabaseOptions, serverName gomatrixserver if err != nil { return nil, err } + writer := sqlutil.NewTransactionWriter() d := devicesStatements{} - if err = d.prepare(db, serverName); err != nil { + if err = d.prepare(db, writer, serverName); err != nil { return nil, err } - return &Database{db, d}, nil + return &Database{db, writer, d}, nil } // GetDeviceByAccessToken returns the device matching the given access token.