From a4230089873475f4f8c16963ced4f21cb58dfc35 Mon Sep 17 00:00:00 2001 From: Kegsay Date: Wed, 29 Mar 2017 14:05:43 +0100 Subject: [PATCH 1/2] Read roomserver output log and remember position across restarts (#52) --- .../dendrite/clientapi/config/config.go | 2 + .../dendrite/clientapi/routing/routing.go | 4 +- .../dendrite/clientapi/storage/syncserver.go | 39 +++++++++++++++++ .../dendrite/clientapi/sync/syncserver.go | 43 +++++++++++++++++++ .../dendrite/cmd/dendrite-sync-server/main.go | 18 +++++++- .../dendrite/common/partition_offset_table.go | 1 + 6 files changed, 104 insertions(+), 3 deletions(-) create mode 100644 src/github.com/matrix-org/dendrite/clientapi/storage/syncserver.go create mode 100644 src/github.com/matrix-org/dendrite/clientapi/sync/syncserver.go diff --git a/src/github.com/matrix-org/dendrite/clientapi/config/config.go b/src/github.com/matrix-org/dendrite/clientapi/config/config.go index 4f431bee3..08e1f4d14 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/config/config.go +++ b/src/github.com/matrix-org/dendrite/clientapi/config/config.go @@ -25,4 +25,6 @@ type Sync struct { RoomserverOutputTopic string // A list of URIs to consume events from. These kafka logs should be produced by a Room Server. KafkaConsumerURIs []string + // The postgres connection config for connecting to the database e.g a postgres:// URI + DataSource string } diff --git a/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go b/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go index f1d2dd5fe..499881b41 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go +++ b/src/github.com/matrix-org/dendrite/clientapi/routing/routing.go @@ -48,8 +48,8 @@ func Setup(servMux *http.ServeMux, httpClient *http.Client, cfg config.ClientAPI servMux.Handle("/api/", http.StripPrefix("/api", apiMux)) } -// SetupSyncServer configures the given mux with sync-server listeners -func SetupSyncServer(servMux *http.ServeMux, httpClient *http.Client, cfg config.Sync) { +// SetupSyncServerListeners configures the given mux with sync-server listeners +func SetupSyncServerListeners(servMux *http.ServeMux, httpClient *http.Client, cfg config.Sync) { apiMux := mux.NewRouter() r0mux := apiMux.PathPrefix(pathPrefixR0).Subrouter() r0mux.Handle("/sync", make("sync", util.NewJSONRequestHandler(func(req *http.Request) util.JSONResponse { diff --git a/src/github.com/matrix-org/dendrite/clientapi/storage/syncserver.go b/src/github.com/matrix-org/dendrite/clientapi/storage/syncserver.go new file mode 100644 index 000000000..7d8f4f504 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/clientapi/storage/syncserver.go @@ -0,0 +1,39 @@ +package storage + +import ( + "database/sql" + + "github.com/matrix-org/dendrite/common" + // Import the postgres database driver. + _ "github.com/lib/pq" +) + +// SyncServerDatabase represents a sync server database +type SyncServerDatabase struct { + db *sql.DB + partitions common.PartitionOffsetStatements +} + +// NewSyncServerDatabase creates a new sync server database +func NewSyncServerDatabase(dataSourceName string) (*SyncServerDatabase, error) { + var db *sql.DB + var err error + if db, err = sql.Open("postgres", dataSourceName); err != nil { + return nil, err + } + partitions := common.PartitionOffsetStatements{} + if err = partitions.Prepare(db); err != nil { + return nil, err + } + return &SyncServerDatabase{db, partitions}, nil +} + +// PartitionOffsets implements common.PartitionStorer +func (d *SyncServerDatabase) PartitionOffsets(topic string) ([]common.PartitionOffset, error) { + return d.partitions.SelectPartitionOffsets(topic) +} + +// SetPartitionOffset implements common.PartitionStorer +func (d *SyncServerDatabase) SetPartitionOffset(topic string, partition int32, offset int64) error { + return d.partitions.UpsertPartitionOffset(topic, partition, offset) +} diff --git a/src/github.com/matrix-org/dendrite/clientapi/sync/syncserver.go b/src/github.com/matrix-org/dendrite/clientapi/sync/syncserver.go new file mode 100644 index 000000000..71c6ec887 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/clientapi/sync/syncserver.go @@ -0,0 +1,43 @@ +package sync + +import ( + log "github.com/Sirupsen/logrus" + "github.com/matrix-org/dendrite/clientapi/config" + "github.com/matrix-org/dendrite/common" + sarama "gopkg.in/Shopify/sarama.v1" +) + +// Server contains all the logic for running a sync server +type Server struct { + roomServerConsumer *common.ContinualConsumer +} + +// NewServer creates a new sync server. Call Start() to begin consuming from room servers. +func NewServer(cfg *config.Sync, store common.PartitionStorer) (*Server, error) { + kafkaConsumer, err := sarama.NewConsumer(cfg.KafkaConsumerURIs, nil) + if err != nil { + return nil, err + } + + consumer := common.ContinualConsumer{ + Topic: cfg.RoomserverOutputTopic, + Consumer: kafkaConsumer, + PartitionStore: store, + } + s := &Server{ + roomServerConsumer: &consumer, + } + consumer.ProcessMessage = s.onMessage + + return s, nil +} + +// Start consuming from room servers +func (s *Server) Start() error { + return s.roomServerConsumer.Start() +} + +func (s *Server) onMessage(msg *sarama.ConsumerMessage) error { + log.WithField("key", string(msg.Key)).WithField("val", string(msg.Value)).Info("Recv") + return nil +} diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-server/main.go index c92e70961..83ea51c3d 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-server/main.go @@ -7,6 +7,8 @@ import ( "github.com/matrix-org/dendrite/clientapi/config" "github.com/matrix-org/dendrite/clientapi/routing" + "github.com/matrix-org/dendrite/clientapi/storage" + "github.com/matrix-org/dendrite/clientapi/sync" log "github.com/Sirupsen/logrus" "github.com/matrix-org/dugong" @@ -40,10 +42,24 @@ func main() { cfg := config.Sync{ KafkaConsumerURIs: []string{"localhost:9092"}, RoomserverOutputTopic: "roomserverOutput", + DataSource: "postgres://dendrite:itsasecret@localhost/syncserver?sslmode=disable", } log.Info("Starting sync server") - routing.SetupSyncServer(http.DefaultServeMux, http.DefaultClient, cfg) + db, err := storage.NewSyncServerDatabase(cfg.DataSource) + if err != nil { + log.Panicf("startup: failed to create sync server database with data source %s : %s", cfg.DataSource, err) + } + + server, err := sync.NewServer(&cfg, db) + if err != nil { + log.Panicf("startup: failed to create sync server: %s", err) + } + if err = server.Start(); err != nil { + log.Panicf("startup: failed to start sync server") + } + + routing.SetupSyncServerListeners(http.DefaultServeMux, http.DefaultClient, cfg) log.Fatal(http.ListenAndServe(bindAddr, nil)) } diff --git a/src/github.com/matrix-org/dendrite/common/partition_offset_table.go b/src/github.com/matrix-org/dendrite/common/partition_offset_table.go index 904e0be81..5b7a3fd17 100644 --- a/src/github.com/matrix-org/dendrite/common/partition_offset_table.go +++ b/src/github.com/matrix-org/dendrite/common/partition_offset_table.go @@ -57,6 +57,7 @@ func (s *PartitionOffsetStatements) SelectPartitionOffsets(topic string) ([]Part if err := rows.Scan(&offset.Partition, &offset.Offset); err != nil { return nil, err } + results = append(results, offset) } return results, nil } From 2d2c7e7169f7d4e943b85f8e15bed475e5667a8c Mon Sep 17 00:00:00 2001 From: Kegsay Date: Wed, 29 Mar 2017 14:09:27 +0100 Subject: [PATCH 2/2] sync-server config: Read from a YAML file rather than hard-coded variables (#53) --- .../dendrite/clientapi/config/config.go | 6 +-- .../dendrite/cmd/dendrite-sync-server/main.go | 49 ++++++++++++++----- sync-server-config.yaml | 8 +++ 3 files changed, 47 insertions(+), 16 deletions(-) create mode 100644 sync-server-config.yaml diff --git a/src/github.com/matrix-org/dendrite/clientapi/config/config.go b/src/github.com/matrix-org/dendrite/clientapi/config/config.go index 08e1f4d14..c3f18b086 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/config/config.go +++ b/src/github.com/matrix-org/dendrite/clientapi/config/config.go @@ -22,9 +22,9 @@ type ClientAPI struct { // Sync contains the config information necessary to spin up a sync-server process. type Sync struct { // The topic for events which are written by the room server output log. - RoomserverOutputTopic string + RoomserverOutputTopic string `yaml:"roomserver_topic"` // A list of URIs to consume events from. These kafka logs should be produced by a Room Server. - KafkaConsumerURIs []string + KafkaConsumerURIs []string `yaml:"consumer_uris"` // The postgres connection config for connecting to the database e.g a postgres:// URI - DataSource string + DataSource string `yaml:"database"` } diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-server/main.go index 83ea51c3d..0c11448ad 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-server/main.go @@ -1,6 +1,8 @@ package main import ( + "flag" + "io/ioutil" "net/http" "os" "path/filepath" @@ -12,8 +14,12 @@ import ( log "github.com/Sirupsen/logrus" "github.com/matrix-org/dugong" + yaml "gopkg.in/yaml.v2" ) +var configPath = flag.String("config", "sync-server-config.yaml", "The path to the config file. For more information, see the config file in this repository.") +var bindAddr = flag.String("listen", ":4200", "The port to listen on.") + func setupLogging(logDir string) { _ = os.Mkdir(logDir, os.ModePerm) log.AddHook(dugong.NewFSHook( @@ -29,30 +35,46 @@ func setupLogging(logDir string) { )) } +func loadConfig(configPath string) (*config.Sync, error) { + contents, err := ioutil.ReadFile(configPath) + if err != nil { + return nil, err + } + var cfg config.Sync + if err = yaml.Unmarshal(contents, &cfg); err != nil { + return nil, err + } + // check required fields + return &cfg, nil +} + func main() { - bindAddr := os.Getenv("BIND_ADDRESS") - if bindAddr == "" { - log.Panic("No BIND_ADDRESS environment variable found.") + flag.Parse() + + if *configPath == "" { + log.Fatal("--config must be supplied") + } + cfg, err := loadConfig(*configPath) + if err != nil { + log.Fatalf("Invalid config file: %s", err) + } + + if *bindAddr == "" { + log.Fatal("--listen must be supplied") } logDir := os.Getenv("LOG_DIR") if logDir != "" { setupLogging(logDir) } - cfg := config.Sync{ - KafkaConsumerURIs: []string{"localhost:9092"}, - RoomserverOutputTopic: "roomserverOutput", - DataSource: "postgres://dendrite:itsasecret@localhost/syncserver?sslmode=disable", - } - - log.Info("Starting sync server") + log.Info("sync server config: ", cfg) db, err := storage.NewSyncServerDatabase(cfg.DataSource) if err != nil { log.Panicf("startup: failed to create sync server database with data source %s : %s", cfg.DataSource, err) } - server, err := sync.NewServer(&cfg, db) + server, err := sync.NewServer(cfg, db) if err != nil { log.Panicf("startup: failed to create sync server: %s", err) } @@ -60,6 +82,7 @@ func main() { log.Panicf("startup: failed to start sync server") } - routing.SetupSyncServerListeners(http.DefaultServeMux, http.DefaultClient, cfg) - log.Fatal(http.ListenAndServe(bindAddr, nil)) + log.Info("Starting sync server on ", *bindAddr) + routing.SetupSyncServerListeners(http.DefaultServeMux, http.DefaultClient, *cfg) + log.Fatal(http.ListenAndServe(*bindAddr, nil)) } diff --git a/sync-server-config.yaml b/sync-server-config.yaml new file mode 100644 index 000000000..cbe452b35 --- /dev/null +++ b/sync-server-config.yaml @@ -0,0 +1,8 @@ +# A list of URIs which host Kafka logs. +consumer_uris: ["localhost:9092"] + +# The name of the topic which the sync server will consume events from. +roomserver_topic: "roomserverOutput" + +# The database URI to store sync server information. +database: "postgres://dendrite:itsasecret@localhost/syncserver?sslmode=disable"