diff --git a/syncapi/storage/sqlite3/account_data_table.go b/syncapi/storage/sqlite3/account_data_table.go index 248ec9267..85eb3b21a 100644 --- a/syncapi/storage/sqlite3/account_data_table.go +++ b/syncapi/storage/sqlite3/account_data_table.go @@ -58,10 +58,10 @@ type accountDataStatements struct { selectAccountDataInRangeStmt *sql.Stmt } -func NewSqliteAccountDataTable(db *sql.DB, streamID *streamIDStatements) (tables.AccountData, error) { +func NewSqliteAccountDataTable(db *sql.DB, writer sqlutil.TransactionWriter, streamID *streamIDStatements) (tables.AccountData, error) { s := &accountDataStatements{ db: db, - writer: sqlutil.NewTransactionWriter(), + writer: writer, streamIDStatements: streamID, } _, err := db.Exec(accountDataSchema) diff --git a/syncapi/storage/sqlite3/backwards_extremities_table.go b/syncapi/storage/sqlite3/backwards_extremities_table.go index d96f2fe57..f489b0148 100644 --- a/syncapi/storage/sqlite3/backwards_extremities_table.go +++ b/syncapi/storage/sqlite3/backwards_extremities_table.go @@ -55,10 +55,10 @@ type backwardExtremitiesStatements struct { deleteBackwardExtremityStmt *sql.Stmt } -func NewSqliteBackwardsExtremitiesTable(db *sql.DB) (tables.BackwardsExtremities, error) { +func NewSqliteBackwardsExtremitiesTable(db *sql.DB, writer sqlutil.TransactionWriter) (tables.BackwardsExtremities, error) { s := &backwardExtremitiesStatements{ db: db, - writer: sqlutil.NewTransactionWriter(), + writer: writer, } _, err := db.Exec(backwardExtremitiesSchema) if err != nil { diff --git a/syncapi/storage/sqlite3/current_room_state_table.go b/syncapi/storage/sqlite3/current_room_state_table.go index 77a21543f..f8c8151ee 100644 --- a/syncapi/storage/sqlite3/current_room_state_table.go +++ b/syncapi/storage/sqlite3/current_room_state_table.go @@ -95,10 +95,10 @@ type currentRoomStateStatements struct { selectStateEventStmt *sql.Stmt } -func NewSqliteCurrentRoomStateTable(db *sql.DB, streamID *streamIDStatements) (tables.CurrentRoomState, error) { +func NewSqliteCurrentRoomStateTable(db *sql.DB, writer sqlutil.TransactionWriter, streamID *streamIDStatements) (tables.CurrentRoomState, error) { s := ¤tRoomStateStatements{ db: db, - writer: sqlutil.NewTransactionWriter(), + writer: writer, streamIDStatements: streamID, } _, err := db.Exec(currentRoomStateSchema) diff --git a/syncapi/storage/sqlite3/filter_table.go b/syncapi/storage/sqlite3/filter_table.go index 338b0b500..67ba5e54b 100644 --- a/syncapi/storage/sqlite3/filter_table.go +++ b/syncapi/storage/sqlite3/filter_table.go @@ -58,14 +58,14 @@ type filterStatements struct { insertFilterStmt *sql.Stmt } -func NewSqliteFilterTable(db *sql.DB) (tables.Filter, error) { +func NewSqliteFilterTable(db *sql.DB, writer sqlutil.TransactionWriter) (tables.Filter, error) { _, err := db.Exec(filterSchema) if err != nil { return nil, err } s := &filterStatements{ db: db, - writer: sqlutil.NewTransactionWriter(), + writer: writer, } if s.selectFilterStmt, err = db.Prepare(selectFilterSQL); err != nil { return nil, err diff --git a/syncapi/storage/sqlite3/invites_table.go b/syncapi/storage/sqlite3/invites_table.go index 0bbd79f77..8951f01b2 100644 --- a/syncapi/storage/sqlite3/invites_table.go +++ b/syncapi/storage/sqlite3/invites_table.go @@ -67,10 +67,10 @@ type inviteEventsStatements struct { selectMaxInviteIDStmt *sql.Stmt } -func NewSqliteInvitesTable(db *sql.DB, streamID *streamIDStatements) (tables.Invites, error) { +func NewSqliteInvitesTable(db *sql.DB, writer sqlutil.TransactionWriter, streamID *streamIDStatements) (tables.Invites, error) { s := &inviteEventsStatements{ db: db, - writer: sqlutil.NewTransactionWriter(), + writer: writer, streamIDStatements: streamID, } _, err := db.Exec(inviteEventsSchema) diff --git a/syncapi/storage/sqlite3/output_room_events_table.go b/syncapi/storage/sqlite3/output_room_events_table.go index 0d1546507..84e567dcf 100644 --- a/syncapi/storage/sqlite3/output_room_events_table.go +++ b/syncapi/storage/sqlite3/output_room_events_table.go @@ -117,10 +117,10 @@ type outputRoomEventsStatements struct { updateEventJSONStmt *sql.Stmt } -func NewSqliteEventsTable(db *sql.DB, streamID *streamIDStatements) (tables.Events, error) { +func NewSqliteEventsTable(db *sql.DB, writer sqlutil.TransactionWriter, streamID *streamIDStatements) (tables.Events, error) { s := &outputRoomEventsStatements{ db: db, - writer: sqlutil.NewTransactionWriter(), + writer: writer, streamIDStatements: streamID, } _, err := db.Exec(outputRoomEventsSchema) diff --git a/syncapi/storage/sqlite3/output_room_events_topology_table.go b/syncapi/storage/sqlite3/output_room_events_topology_table.go index 5c4ab005f..16e2260b7 100644 --- a/syncapi/storage/sqlite3/output_room_events_topology_table.go +++ b/syncapi/storage/sqlite3/output_room_events_topology_table.go @@ -75,10 +75,10 @@ type outputRoomEventsTopologyStatements struct { selectMaxPositionInTopologyStmt *sql.Stmt } -func NewSqliteTopologyTable(db *sql.DB) (tables.Topology, error) { +func NewSqliteTopologyTable(db *sql.DB, writer sqlutil.TransactionWriter) (tables.Topology, error) { s := &outputRoomEventsTopologyStatements{ db: db, - writer: sqlutil.NewTransactionWriter(), + writer: writer, } _, err := db.Exec(outputRoomEventsTopologySchema) if err != nil { diff --git a/syncapi/storage/sqlite3/send_to_device_table.go b/syncapi/storage/sqlite3/send_to_device_table.go index 53786589c..e19a532e9 100644 --- a/syncapi/storage/sqlite3/send_to_device_table.go +++ b/syncapi/storage/sqlite3/send_to_device_table.go @@ -79,10 +79,10 @@ type sendToDeviceStatements struct { countSendToDeviceMessagesStmt *sql.Stmt } -func NewSqliteSendToDeviceTable(db *sql.DB) (tables.SendToDevice, error) { +func NewSqliteSendToDeviceTable(db *sql.DB, writer sqlutil.TransactionWriter) (tables.SendToDevice, error) { s := &sendToDeviceStatements{ db: db, - writer: sqlutil.NewTransactionWriter(), + writer: writer, } _, err := db.Exec(sendToDeviceSchema) if err != nil { diff --git a/syncapi/storage/sqlite3/stream_id_table.go b/syncapi/storage/sqlite3/stream_id_table.go index 1971e7f3b..e76f1dcc3 100644 --- a/syncapi/storage/sqlite3/stream_id_table.go +++ b/syncapi/storage/sqlite3/stream_id_table.go @@ -33,9 +33,9 @@ type streamIDStatements struct { selectStreamIDStmt *sql.Stmt } -func (s *streamIDStatements) prepare(db *sql.DB) (err error) { +func (s *streamIDStatements) prepare(db *sql.DB, writer sqlutil.TransactionWriter) (err error) { s.db = db - s.writer = sqlutil.NewTransactionWriter() + s.writer = writer _, err = db.Exec(streamIDTableSchema) if err != nil { return diff --git a/syncapi/storage/sqlite3/syncserver.go b/syncapi/storage/sqlite3/syncserver.go index 23e2186f4..3fa7602c6 100644 --- a/syncapi/storage/sqlite3/syncserver.go +++ b/syncapi/storage/sqlite3/syncserver.go @@ -56,38 +56,38 @@ func (d *SyncServerDatasource) prepare() (err error) { if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "syncapi"); err != nil { return err } - if err = d.streamID.prepare(d.db); err != nil { + if err = d.streamID.prepare(d.db, d.writer); err != nil { return err } - accountData, err := NewSqliteAccountDataTable(d.db, &d.streamID) + accountData, err := NewSqliteAccountDataTable(d.db, d.writer, &d.streamID) if err != nil { return err } - events, err := NewSqliteEventsTable(d.db, &d.streamID) + events, err := NewSqliteEventsTable(d.db, d.writer, &d.streamID) if err != nil { return err } - roomState, err := NewSqliteCurrentRoomStateTable(d.db, &d.streamID) + roomState, err := NewSqliteCurrentRoomStateTable(d.db, d.writer, &d.streamID) if err != nil { return err } - invites, err := NewSqliteInvitesTable(d.db, &d.streamID) + invites, err := NewSqliteInvitesTable(d.db, d.writer, &d.streamID) if err != nil { return err } - topology, err := NewSqliteTopologyTable(d.db) + topology, err := NewSqliteTopologyTable(d.db, d.writer) if err != nil { return err } - bwExtrem, err := NewSqliteBackwardsExtremitiesTable(d.db) + bwExtrem, err := NewSqliteBackwardsExtremitiesTable(d.db, d.writer) if err != nil { return err } - sendToDevice, err := NewSqliteSendToDeviceTable(d.db) + sendToDevice, err := NewSqliteSendToDeviceTable(d.db, d.writer) if err != nil { return err } - filter, err := NewSqliteFilterTable(d.db) + filter, err := NewSqliteFilterTable(d.db, d.writer) if err != nil { return err }