Somehow this got lost in the rebase..

This commit is contained in:
Kegan Dougal 2017-03-31 10:48:58 +01:00
parent b651ecb592
commit 0b4e2817c3
2 changed files with 26 additions and 1 deletions

View file

@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"github.com/lib/pq" "github.com/lib/pq"
"github.com/matrix-org/gomatrixserverlib"
) )
const outputRoomEventsSchema = ` const outputRoomEventsSchema = `

View file

@ -13,6 +13,7 @@ type SyncServerDatabase struct {
db *sql.DB db *sql.DB
partitions common.PartitionOffsetStatements partitions common.PartitionOffsetStatements
events outputRoomEventsStatements events outputRoomEventsStatements
roomstate currentRoomStateStatements
} }
// NewSyncServerDatabase creates a new sync server database // NewSyncServerDatabase creates a new sync server database
@ -30,7 +31,11 @@ func NewSyncServerDatabase(dataSourceName string) (*SyncServerDatabase, error) {
if err = events.prepare(db); err != nil { if err = events.prepare(db); err != nil {
return nil, err return nil, err
} }
return &SyncServerDatabase{db, partitions, events}, nil state := currentRoomStateStatements{}
if err := state.prepare(db); err != nil {
return nil, err
}
return &SyncServerDatabase{db, partitions, events, state}, nil
} }
// WriteEvent into the database. It is not safe to call this function from multiple goroutines, as it would create races // WriteEvent into the database. It is not safe to call this function from multiple goroutines, as it would create races
@ -69,3 +74,22 @@ func (d *SyncServerDatabase) PartitionOffsets(topic string) ([]common.PartitionO
func (d *SyncServerDatabase) SetPartitionOffset(topic string, partition int32, offset int64) error { func (d *SyncServerDatabase) SetPartitionOffset(topic string, partition int32, offset int64) error {
return d.partitions.UpsertPartitionOffset(topic, partition, offset) return d.partitions.UpsertPartitionOffset(topic, partition, offset)
} }
func runTransaction(db *sql.DB, fn func(txn *sql.Tx) error) (err error) {
txn, err := db.Begin()
if err != nil {
return
}
defer func() {
if r := recover(); r != nil {
txn.Rollback()
panic(r)
} else if err != nil {
txn.Rollback()
} else {
err = txn.Commit()
}
}()
err = fn(txn)
return
}