From 81943b63f3d26141bc3e34094a71158190c8ee7d Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Fri, 24 Mar 2017 17:15:16 +0000 Subject: [PATCH] Glue up RoomserverConsumer to a database backend --- .../dendrite/clientapi/config/config.go | 2 + .../dendrite/clientapi/storage/sync/sync.go | 37 +++++++++++++++++++ .../dendrite/cmd/dendrite-sync-server/main.go | 11 +++++- 3 files changed, 48 insertions(+), 2 deletions(-) create mode 100644 src/github.com/matrix-org/dendrite/clientapi/storage/sync/sync.go diff --git a/src/github.com/matrix-org/dendrite/clientapi/config/config.go b/src/github.com/matrix-org/dendrite/clientapi/config/config.go index 4f431bee3..08e1f4d14 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/config/config.go +++ b/src/github.com/matrix-org/dendrite/clientapi/config/config.go @@ -25,4 +25,6 @@ type Sync struct { RoomserverOutputTopic string // A list of URIs to consume events from. These kafka logs should be produced by a Room Server. KafkaConsumerURIs []string + // The postgres connection config for connecting to the database e.g a postgres:// URI + DataSource string } diff --git a/src/github.com/matrix-org/dendrite/clientapi/storage/sync/sync.go b/src/github.com/matrix-org/dendrite/clientapi/storage/sync/sync.go new file mode 100644 index 000000000..23cf8c066 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/clientapi/storage/sync/sync.go @@ -0,0 +1,37 @@ +package sync + +import ( + "database/sql" + + "github.com/matrix-org/dendrite/common" +) + +// Database represents a sync server database +type Database struct { + db *sql.DB + partitions common.PartitionOffsetStatements +} + +// NewDatabase creates a new sync server database +func NewDatabase(dataSourceName string) (*Database, error) { + var db *sql.DB + var err error + if db, err = sql.Open("postgres", dataSourceName); err != nil { + return nil, err + } + partitions := common.PartitionOffsetStatements{} + if err := partitions.Prepare(db); err != nil { + return nil, err + } + return &Database{db, partitions}, nil +} + +// PartitionOffsets implements common.PartitionStorer +func (d *Database) PartitionOffsets(topic string) ([]common.PartitionOffset, error) { + return d.partitions.SelectPartitionOffsets(topic) +} + +// SetPartitionOffset implements common.PartitionStorer +func (d *Database) SetPartitionOffset(topic string, partition int32, offset int64) error { + return d.partitions.UpsertPartitionOffset(topic, partition, offset) +} diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-server/main.go index 0ab56c9e4..ebc180f5a 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-server/main.go @@ -8,6 +8,7 @@ import ( "github.com/matrix-org/dendrite/clientapi/config" "github.com/matrix-org/dendrite/clientapi/consumers" "github.com/matrix-org/dendrite/clientapi/routing" + "github.com/matrix-org/dendrite/clientapi/storage/sync" log "github.com/Sirupsen/logrus" "github.com/matrix-org/dugong" @@ -41,13 +42,19 @@ func main() { cfg := config.Sync{ KafkaConsumerURIs: []string{"localhost:9092"}, RoomserverOutputTopic: "roomserverOutput", + DataSource: "", } log.Info("Starting sync server") - consumer, err := consumers.NewRoomserverConsumer(&cfg, nil) // TODO: partition storer + db, err := sync.NewDatabase(cfg.DataSource) if err != nil { - panic(err) + log.Panicf("startup: failed to create database with data source %s : %s", cfg.DataSource, err) + } + + consumer, err := consumers.NewRoomserverConsumer(&cfg, db) + if err != nil { + log.Panicf("startup: failed to create roomserver consumer: %s", err) } routing.SetupSyncServer(http.DefaultServeMux, http.DefaultClient, cfg, consumer)