From 66db5a9f55c299ec823ad27a903eecce9075905a Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Fri, 24 Mar 2017 17:38:04 +0000 Subject: [PATCH] Add SyncServer and remove needless abstractions --- .../clientapi/consumers/roomserver.go | 29 ---------------- .../dendrite/clientapi/routing/routing.go | 5 ++- .../storage/{sync/sync.go => syncserver.go} | 18 +++++----- .../dendrite/clientapi/sync/syncserver.go | 34 +++++++++++++++++++ .../dendrite/cmd/dendrite-sync-server/main.go | 19 ++++++----- 5 files changed, 57 insertions(+), 48 deletions(-) delete mode 100644 src/github.com/matrix-org/dendrite/clientapi/consumers/roomserver.go rename src/github.com/matrix-org/dendrite/clientapi/storage/{sync/sync.go => syncserver.go} (52%) create mode 100644 src/github.com/matrix-org/dendrite/clientapi/sync/syncserver.go diff --git a/src/github.com/matrix-org/dendrite/clientapi/consumers/roomserver.go b/src/github.com/matrix-org/dendrite/clientapi/consumers/roomserver.go deleted file mode 100644 index af764c016..000000000 --- a/src/github.com/matrix-org/dendrite/clientapi/consumers/roomserver.go +++ /dev/null @@ -1,29 +0,0 @@ -package consumers - -import ( - "github.com/matrix-org/dendrite/clientapi/config" - "github.com/matrix-org/dendrite/common" - sarama "gopkg.in/Shopify/sarama.v1" -) - -// RoomserverConsumer consumes events from the room server -type RoomserverConsumer struct { - Consumer common.ContinualConsumer -} - -// NewRoomserverConsumer creates a new roomserver consumer -func NewRoomserverConsumer(cfg *config.Sync, store common.PartitionStorer) (*RoomserverConsumer, error) { - kafkaConsumer, err := sarama.NewConsumer(cfg.KafkaConsumerURIs, nil) - if err != nil { - return nil, err - } - - return &RoomserverConsumer{ - Consumer: common.ContinualConsumer{ - Topic: cfg.RoomserverOutputTopic, - Consumer: kafkaConsumer, - PartitionStore: store, - }, - }, nil - -} diff --git a/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go b/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go index a4ff04b31..499881b41 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go +++ b/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go @@ -5,7 +5,6 @@ import ( "github.com/gorilla/mux" "github.com/matrix-org/dendrite/clientapi/config" - "github.com/matrix-org/dendrite/clientapi/consumers" "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/clientapi/readers" "github.com/matrix-org/dendrite/clientapi/writers" @@ -49,8 +48,8 @@ func Setup(servMux *http.ServeMux, httpClient *http.Client, cfg config.ClientAPI servMux.Handle("/api/", http.StripPrefix("/api", apiMux)) } -// SetupSyncServer configures the given mux with sync-server listeners -func SetupSyncServer(servMux *http.ServeMux, httpClient *http.Client, cfg config.Sync, consumer *consumers.RoomserverConsumer) { +// SetupSyncServerListeners configures the given mux with sync-server listeners +func SetupSyncServerListeners(servMux *http.ServeMux, httpClient *http.Client, cfg config.Sync) { apiMux := mux.NewRouter() r0mux := apiMux.PathPrefix(pathPrefixR0).Subrouter() r0mux.Handle("/sync", make("sync", util.NewJSONRequestHandler(func(req *http.Request) util.JSONResponse { diff --git a/src/github.com/matrix-org/dendrite/clientapi/storage/sync/sync.go b/src/github.com/matrix-org/dendrite/clientapi/storage/syncserver.go similarity index 52% rename from src/github.com/matrix-org/dendrite/clientapi/storage/sync/sync.go rename to src/github.com/matrix-org/dendrite/clientapi/storage/syncserver.go index 23cf8c066..21962378e 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/storage/sync/sync.go +++ b/src/github.com/matrix-org/dendrite/clientapi/storage/syncserver.go @@ -1,19 +1,21 @@ -package sync +package storage import ( "database/sql" "github.com/matrix-org/dendrite/common" + // Import the postgres database driver. + _ "github.com/lib/pq" ) -// Database represents a sync server database -type Database struct { +// SyncServerDatabase represents a sync server database +type SyncServerDatabase struct { db *sql.DB partitions common.PartitionOffsetStatements } -// NewDatabase creates a new sync server database -func NewDatabase(dataSourceName string) (*Database, error) { +// NewSyncServerDatabase creates a new sync server database +func NewSyncServerDatabase(dataSourceName string) (*SyncServerDatabase, error) { var db *sql.DB var err error if db, err = sql.Open("postgres", dataSourceName); err != nil { @@ -23,15 +25,15 @@ func NewDatabase(dataSourceName string) (*Database, error) { if err := partitions.Prepare(db); err != nil { return nil, err } - return &Database{db, partitions}, nil + return &SyncServerDatabase{db, partitions}, nil } // PartitionOffsets implements common.PartitionStorer -func (d *Database) PartitionOffsets(topic string) ([]common.PartitionOffset, error) { +func (d *SyncServerDatabase) PartitionOffsets(topic string) ([]common.PartitionOffset, error) { return d.partitions.SelectPartitionOffsets(topic) } // SetPartitionOffset implements common.PartitionStorer -func (d *Database) SetPartitionOffset(topic string, partition int32, offset int64) error { +func (d *SyncServerDatabase) SetPartitionOffset(topic string, partition int32, offset int64) error { return d.partitions.UpsertPartitionOffset(topic, partition, offset) } diff --git a/src/github.com/matrix-org/dendrite/clientapi/sync/syncserver.go b/src/github.com/matrix-org/dendrite/clientapi/sync/syncserver.go new file mode 100644 index 000000000..385db186c --- /dev/null +++ b/src/github.com/matrix-org/dendrite/clientapi/sync/syncserver.go @@ -0,0 +1,34 @@ +package sync + +import ( + "github.com/matrix-org/dendrite/clientapi/config" + "github.com/matrix-org/dendrite/common" + sarama "gopkg.in/Shopify/sarama.v1" +) + +// Server contains all the logic for running a sync server +type Server struct { + roomServerConsumer *common.ContinualConsumer +} + +// NewServer creates a new sync server. Call Start() to begin consuming from room servers. +func NewServer(cfg *config.Sync, store common.PartitionStorer) (*Server, error) { + kafkaConsumer, err := sarama.NewConsumer(cfg.KafkaConsumerURIs, nil) + if err != nil { + return nil, err + } + + return &Server{ + roomServerConsumer: &common.ContinualConsumer{ + Topic: cfg.RoomserverOutputTopic, + Consumer: kafkaConsumer, + PartitionStore: store, + }, + }, nil + +} + +// Start consuming from room servers +func (s *Server) Start() error { + return s.roomServerConsumer.Start() +} diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-server/main.go index ebc180f5a..01415eaaf 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-server/main.go @@ -6,9 +6,9 @@ import ( "path/filepath" "github.com/matrix-org/dendrite/clientapi/config" - "github.com/matrix-org/dendrite/clientapi/consumers" "github.com/matrix-org/dendrite/clientapi/routing" - "github.com/matrix-org/dendrite/clientapi/storage/sync" + "github.com/matrix-org/dendrite/clientapi/storage" + "github.com/matrix-org/dendrite/clientapi/sync" log "github.com/Sirupsen/logrus" "github.com/matrix-org/dugong" @@ -42,21 +42,24 @@ func main() { cfg := config.Sync{ KafkaConsumerURIs: []string{"localhost:9092"}, RoomserverOutputTopic: "roomserverOutput", - DataSource: "", + DataSource: "postgres://user:pass@localhost/dendrite-sync-server?sslmode=disable", } log.Info("Starting sync server") - db, err := sync.NewDatabase(cfg.DataSource) + db, err := storage.NewSyncServerDatabase(cfg.DataSource) if err != nil { - log.Panicf("startup: failed to create database with data source %s : %s", cfg.DataSource, err) + log.Panicf("startup: failed to create sync server database with data source %s : %s", cfg.DataSource, err) } - consumer, err := consumers.NewRoomserverConsumer(&cfg, db) + server, err := sync.NewServer(&cfg, db) if err != nil { - log.Panicf("startup: failed to create roomserver consumer: %s", err) + log.Panicf("startup: failed to create sync server: %s", err) + } + if err = server.Start(); err != nil { + log.Panicf("startup: failed to start sync server") } - routing.SetupSyncServer(http.DefaultServeMux, http.DefaultClient, cfg, consumer) + routing.SetupSyncServerListeners(http.DefaultServeMux, http.DefaultClient, cfg) log.Fatal(http.ListenAndServe(bindAddr, nil)) }