Glue up RoomserverConsumer to a database backend

This commit is contained in:
Kegan Dougal 2017-03-24 17:15:16 +00:00
parent d2ef1a6a4b
commit 81943b63f3
3 changed files with 48 additions and 2 deletions

View file

@ -25,4 +25,6 @@ type Sync struct {
RoomserverOutputTopic string
// A list of URIs to consume events from. These kafka logs should be produced by a Room Server.
KafkaConsumerURIs []string
// The postgres connection config for connecting to the database e.g a postgres:// URI
DataSource string
}

View file

@ -0,0 +1,37 @@
package sync
import (
"database/sql"
"github.com/matrix-org/dendrite/common"
)
// Database represents a sync server database
type Database struct {
db *sql.DB
partitions common.PartitionOffsetStatements
}
// NewDatabase creates a new sync server database
func NewDatabase(dataSourceName string) (*Database, error) {
var db *sql.DB
var err error
if db, err = sql.Open("postgres", dataSourceName); err != nil {
return nil, err
}
partitions := common.PartitionOffsetStatements{}
if err := partitions.Prepare(db); err != nil {
return nil, err
}
return &Database{db, partitions}, nil
}
// PartitionOffsets implements common.PartitionStorer
func (d *Database) PartitionOffsets(topic string) ([]common.PartitionOffset, error) {
return d.partitions.SelectPartitionOffsets(topic)
}
// SetPartitionOffset implements common.PartitionStorer
func (d *Database) SetPartitionOffset(topic string, partition int32, offset int64) error {
return d.partitions.UpsertPartitionOffset(topic, partition, offset)
}

View file

@ -8,6 +8,7 @@ import (
"github.com/matrix-org/dendrite/clientapi/config"
"github.com/matrix-org/dendrite/clientapi/consumers"
"github.com/matrix-org/dendrite/clientapi/routing"
"github.com/matrix-org/dendrite/clientapi/storage/sync"
log "github.com/Sirupsen/logrus"
"github.com/matrix-org/dugong"
@ -41,13 +42,19 @@ func main() {
cfg := config.Sync{
KafkaConsumerURIs: []string{"localhost:9092"},
RoomserverOutputTopic: "roomserverOutput",
DataSource: "",
}
log.Info("Starting sync server")
consumer, err := consumers.NewRoomserverConsumer(&cfg, nil) // TODO: partition storer
db, err := sync.NewDatabase(cfg.DataSource)
if err != nil {
panic(err)
log.Panicf("startup: failed to create database with data source %s : %s", cfg.DataSource, err)
}
consumer, err := consumers.NewRoomserverConsumer(&cfg, db)
if err != nil {
log.Panicf("startup: failed to create roomserver consumer: %s", err)
}
routing.SetupSyncServer(http.DefaultServeMux, http.DefaultClient, cfg, consumer)