mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-21 13:53:09 -06:00
Refactor TransactionWriter in sync API
This commit is contained in:
parent
e551c035c6
commit
66d0134e2a
|
|
@ -58,10 +58,10 @@ type accountDataStatements struct {
|
||||||
selectAccountDataInRangeStmt *sql.Stmt
|
selectAccountDataInRangeStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSqliteAccountDataTable(db *sql.DB, streamID *streamIDStatements) (tables.AccountData, error) {
|
func NewSqliteAccountDataTable(db *sql.DB, writer sqlutil.TransactionWriter, streamID *streamIDStatements) (tables.AccountData, error) {
|
||||||
s := &accountDataStatements{
|
s := &accountDataStatements{
|
||||||
db: db,
|
db: db,
|
||||||
writer: sqlutil.NewTransactionWriter(),
|
writer: writer,
|
||||||
streamIDStatements: streamID,
|
streamIDStatements: streamID,
|
||||||
}
|
}
|
||||||
_, err := db.Exec(accountDataSchema)
|
_, err := db.Exec(accountDataSchema)
|
||||||
|
|
|
||||||
|
|
@ -55,10 +55,10 @@ type backwardExtremitiesStatements struct {
|
||||||
deleteBackwardExtremityStmt *sql.Stmt
|
deleteBackwardExtremityStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSqliteBackwardsExtremitiesTable(db *sql.DB) (tables.BackwardsExtremities, error) {
|
func NewSqliteBackwardsExtremitiesTable(db *sql.DB, writer sqlutil.TransactionWriter) (tables.BackwardsExtremities, error) {
|
||||||
s := &backwardExtremitiesStatements{
|
s := &backwardExtremitiesStatements{
|
||||||
db: db,
|
db: db,
|
||||||
writer: sqlutil.NewTransactionWriter(),
|
writer: writer,
|
||||||
}
|
}
|
||||||
_, err := db.Exec(backwardExtremitiesSchema)
|
_, err := db.Exec(backwardExtremitiesSchema)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
||||||
|
|
@ -95,10 +95,10 @@ type currentRoomStateStatements struct {
|
||||||
selectStateEventStmt *sql.Stmt
|
selectStateEventStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSqliteCurrentRoomStateTable(db *sql.DB, streamID *streamIDStatements) (tables.CurrentRoomState, error) {
|
func NewSqliteCurrentRoomStateTable(db *sql.DB, writer sqlutil.TransactionWriter, streamID *streamIDStatements) (tables.CurrentRoomState, error) {
|
||||||
s := ¤tRoomStateStatements{
|
s := ¤tRoomStateStatements{
|
||||||
db: db,
|
db: db,
|
||||||
writer: sqlutil.NewTransactionWriter(),
|
writer: writer,
|
||||||
streamIDStatements: streamID,
|
streamIDStatements: streamID,
|
||||||
}
|
}
|
||||||
_, err := db.Exec(currentRoomStateSchema)
|
_, err := db.Exec(currentRoomStateSchema)
|
||||||
|
|
|
||||||
|
|
@ -58,14 +58,14 @@ type filterStatements struct {
|
||||||
insertFilterStmt *sql.Stmt
|
insertFilterStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSqliteFilterTable(db *sql.DB) (tables.Filter, error) {
|
func NewSqliteFilterTable(db *sql.DB, writer sqlutil.TransactionWriter) (tables.Filter, error) {
|
||||||
_, err := db.Exec(filterSchema)
|
_, err := db.Exec(filterSchema)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
s := &filterStatements{
|
s := &filterStatements{
|
||||||
db: db,
|
db: db,
|
||||||
writer: sqlutil.NewTransactionWriter(),
|
writer: writer,
|
||||||
}
|
}
|
||||||
if s.selectFilterStmt, err = db.Prepare(selectFilterSQL); err != nil {
|
if s.selectFilterStmt, err = db.Prepare(selectFilterSQL); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
|
||||||
|
|
@ -67,10 +67,10 @@ type inviteEventsStatements struct {
|
||||||
selectMaxInviteIDStmt *sql.Stmt
|
selectMaxInviteIDStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSqliteInvitesTable(db *sql.DB, streamID *streamIDStatements) (tables.Invites, error) {
|
func NewSqliteInvitesTable(db *sql.DB, writer sqlutil.TransactionWriter, streamID *streamIDStatements) (tables.Invites, error) {
|
||||||
s := &inviteEventsStatements{
|
s := &inviteEventsStatements{
|
||||||
db: db,
|
db: db,
|
||||||
writer: sqlutil.NewTransactionWriter(),
|
writer: writer,
|
||||||
streamIDStatements: streamID,
|
streamIDStatements: streamID,
|
||||||
}
|
}
|
||||||
_, err := db.Exec(inviteEventsSchema)
|
_, err := db.Exec(inviteEventsSchema)
|
||||||
|
|
|
||||||
|
|
@ -117,10 +117,10 @@ type outputRoomEventsStatements struct {
|
||||||
updateEventJSONStmt *sql.Stmt
|
updateEventJSONStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSqliteEventsTable(db *sql.DB, streamID *streamIDStatements) (tables.Events, error) {
|
func NewSqliteEventsTable(db *sql.DB, writer sqlutil.TransactionWriter, streamID *streamIDStatements) (tables.Events, error) {
|
||||||
s := &outputRoomEventsStatements{
|
s := &outputRoomEventsStatements{
|
||||||
db: db,
|
db: db,
|
||||||
writer: sqlutil.NewTransactionWriter(),
|
writer: writer,
|
||||||
streamIDStatements: streamID,
|
streamIDStatements: streamID,
|
||||||
}
|
}
|
||||||
_, err := db.Exec(outputRoomEventsSchema)
|
_, err := db.Exec(outputRoomEventsSchema)
|
||||||
|
|
|
||||||
|
|
@ -75,10 +75,10 @@ type outputRoomEventsTopologyStatements struct {
|
||||||
selectMaxPositionInTopologyStmt *sql.Stmt
|
selectMaxPositionInTopologyStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSqliteTopologyTable(db *sql.DB) (tables.Topology, error) {
|
func NewSqliteTopologyTable(db *sql.DB, writer sqlutil.TransactionWriter) (tables.Topology, error) {
|
||||||
s := &outputRoomEventsTopologyStatements{
|
s := &outputRoomEventsTopologyStatements{
|
||||||
db: db,
|
db: db,
|
||||||
writer: sqlutil.NewTransactionWriter(),
|
writer: writer,
|
||||||
}
|
}
|
||||||
_, err := db.Exec(outputRoomEventsTopologySchema)
|
_, err := db.Exec(outputRoomEventsTopologySchema)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
||||||
|
|
@ -79,10 +79,10 @@ type sendToDeviceStatements struct {
|
||||||
countSendToDeviceMessagesStmt *sql.Stmt
|
countSendToDeviceMessagesStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSqliteSendToDeviceTable(db *sql.DB) (tables.SendToDevice, error) {
|
func NewSqliteSendToDeviceTable(db *sql.DB, writer sqlutil.TransactionWriter) (tables.SendToDevice, error) {
|
||||||
s := &sendToDeviceStatements{
|
s := &sendToDeviceStatements{
|
||||||
db: db,
|
db: db,
|
||||||
writer: sqlutil.NewTransactionWriter(),
|
writer: writer,
|
||||||
}
|
}
|
||||||
_, err := db.Exec(sendToDeviceSchema)
|
_, err := db.Exec(sendToDeviceSchema)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
||||||
|
|
@ -33,9 +33,9 @@ type streamIDStatements struct {
|
||||||
selectStreamIDStmt *sql.Stmt
|
selectStreamIDStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *streamIDStatements) prepare(db *sql.DB) (err error) {
|
func (s *streamIDStatements) prepare(db *sql.DB, writer sqlutil.TransactionWriter) (err error) {
|
||||||
s.db = db
|
s.db = db
|
||||||
s.writer = sqlutil.NewTransactionWriter()
|
s.writer = writer
|
||||||
_, err = db.Exec(streamIDTableSchema)
|
_, err = db.Exec(streamIDTableSchema)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
|
|
|
||||||
|
|
@ -56,38 +56,38 @@ func (d *SyncServerDatasource) prepare() (err error) {
|
||||||
if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "syncapi"); err != nil {
|
if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "syncapi"); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err = d.streamID.prepare(d.db); err != nil {
|
if err = d.streamID.prepare(d.db, d.writer); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
accountData, err := NewSqliteAccountDataTable(d.db, &d.streamID)
|
accountData, err := NewSqliteAccountDataTable(d.db, d.writer, &d.streamID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
events, err := NewSqliteEventsTable(d.db, &d.streamID)
|
events, err := NewSqliteEventsTable(d.db, d.writer, &d.streamID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
roomState, err := NewSqliteCurrentRoomStateTable(d.db, &d.streamID)
|
roomState, err := NewSqliteCurrentRoomStateTable(d.db, d.writer, &d.streamID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
invites, err := NewSqliteInvitesTable(d.db, &d.streamID)
|
invites, err := NewSqliteInvitesTable(d.db, d.writer, &d.streamID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
topology, err := NewSqliteTopologyTable(d.db)
|
topology, err := NewSqliteTopologyTable(d.db, d.writer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
bwExtrem, err := NewSqliteBackwardsExtremitiesTable(d.db)
|
bwExtrem, err := NewSqliteBackwardsExtremitiesTable(d.db, d.writer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
sendToDevice, err := NewSqliteSendToDeviceTable(d.db)
|
sendToDevice, err := NewSqliteSendToDeviceTable(d.db, d.writer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
filter, err := NewSqliteFilterTable(d.db)
|
filter, err := NewSqliteFilterTable(d.db, d.writer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue