diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-api-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-api-server/main.go index 638eba3cd..ae8f66c4c 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-api-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-api-server/main.go @@ -16,13 +16,12 @@ package main import ( "flag" - "io/ioutil" "net/http" "os" "github.com/matrix-org/dendrite/clientapi/auth/storage/devices" "github.com/matrix-org/dendrite/common" - "github.com/matrix-org/dendrite/syncapi/config" + "github.com/matrix-org/dendrite/common/config" "github.com/matrix-org/dendrite/syncapi/consumers" "github.com/matrix-org/dendrite/syncapi/routing" "github.com/matrix-org/dendrite/syncapi/storage" @@ -30,28 +29,11 @@ import ( "github.com/matrix-org/dendrite/syncapi/types" log "github.com/Sirupsen/logrus" - yaml "gopkg.in/yaml.v2" ) -var configPath = flag.String("config", "sync-server-config.yaml", "The path to the config file. For more information, see the config file in this repository.") +var configPath = flag.String("config", "dendrite.yaml", "The path to the config file. For more information, see the config file in this repository.") var bindAddr = flag.String("listen", ":4200", "The port to listen on.") -func loadConfig(configPath string) (*config.Sync, error) { - contents, err := ioutil.ReadFile(configPath) - if err != nil { - return nil, err - } - var cfg config.Sync - if err = yaml.Unmarshal(contents, &cfg); err != nil { - return nil, err - } - // check required fields - if cfg.ServerName == "" { - log.Fatalf("'server_name' must be supplied in %s", configPath) - } - return &cfg, nil -} - func main() { common.SetupLogging(os.Getenv("LOG_DIR")) @@ -60,7 +42,7 @@ func main() { if *configPath == "" { log.Fatal("--config must be supplied") } - cfg, err := loadConfig(*configPath) + cfg, err := config.Load(*configPath) if err != nil { log.Fatalf("Invalid config file: %s", err) } @@ -71,15 +53,14 @@ func main() { log.Info("sync server config: ", cfg) - db, err := storage.NewSyncServerDatabase(cfg.DataSource) + db, err := storage.NewSyncServerDatabase(string(cfg.Database.SyncServer)) if err != nil { - log.Panicf("startup: failed to create sync server database with data source %s : %s", cfg.DataSource, err) + log.Panicf("startup: failed to create sync server database with data source %s : %s", cfg.Database.SyncServer, err) } - // TODO: DO NOT USE THIS DATA SOURCE (it's the sync one, not devices!) - deviceDB, err := devices.NewDatabase(cfg.DataSource, cfg.ServerName) + deviceDB, err := devices.NewDatabase(string(cfg.Database.Device), cfg.Matrix.ServerName) if err != nil { - log.Panicf("startup: failed to create device database with data source %s : %s", cfg.DataSource, err) + log.Panicf("startup: failed to create device database with data source %s : %s", cfg.Database.Device, err) } pos, err := db.SyncStreamPosition() @@ -88,7 +69,7 @@ func main() { } n := sync.NewNotifier(types.StreamPosition(pos)) - if err := n.Load(db); err != nil { + if err = n.Load(db); err != nil { log.Panicf("startup: failed to set up notifier: %s", err) } consumer, err := consumers.NewOutputRoomEvent(cfg, n, db) @@ -100,6 +81,6 @@ func main() { } log.Info("Starting sync server on ", *bindAddr) - routing.SetupSyncServerListeners(http.DefaultServeMux, http.DefaultClient, *cfg, sync.NewRequestPool(db, n), deviceDB) + routing.SetupSyncServerListeners(http.DefaultServeMux, http.DefaultClient, sync.NewRequestPool(db, n), deviceDB) log.Fatal(http.ListenAndServe(*bindAddr, nil)) } diff --git a/src/github.com/matrix-org/dendrite/common/config/config.go b/src/github.com/matrix-org/dendrite/common/config/config.go index 767cd9780..29d5c539c 100644 --- a/src/github.com/matrix-org/dendrite/common/config/config.go +++ b/src/github.com/matrix-org/dendrite/common/config/config.go @@ -90,13 +90,13 @@ type Dendrite struct { // The configuration for talking to kafka. Kafka struct { // A list of kafka addresses to connect to. - Addresses []Address `yaml:"addresses"` + Addresses []string `yaml:"addresses"` // The names of the topics to use when reading and writing from kafka. Topics struct { // Topic for roomserver/api.InputRoomEvent events. InputRoomEvent Topic `yaml:"input_room_event"` // Topic for roomserver/api.OutputRoomEvent events. - OuputRoomEvent Topic `yaml:"output_room_event"` + OutputRoomEvent Topic `yaml:"output_room_event"` } } `yaml:"kafka"` @@ -104,7 +104,8 @@ type Dendrite struct { Database struct { MediaServer DataSource `yaml:"media_server"` Account DataSource `yaml:"account"` - ServerKeys DataSource `yaml:"server_keys"` + Device DataSource `yaml:"device"` + ServerKey DataSource `yaml:"server_key"` SyncServer DataSource `yaml:"sync_server"` RoomServer DataSource `yaml:"room_server"` } `yaml:"database"` @@ -258,11 +259,13 @@ func (config *Dendrite) check() error { checkNotEmpty("matrix.private_key", string(config.Matrix.PrivateKeyPath)) checkNotZero("matrix.federation_certificates", len(config.Matrix.FederationCertificatePaths)) checkNotEmpty("media.base_path", string(config.Media.BasePath)) + checkNotZero("kafka.addresses", len(config.Kafka.Addresses)) checkNotEmpty("kafka.topics.input_room_event", string(config.Kafka.Topics.InputRoomEvent)) - checkNotEmpty("kafka.topics.output_room_event", string(config.Kafka.Topics.InputRoomEvent)) + checkNotEmpty("kafka.topics.output_room_event", string(config.Kafka.Topics.OutputRoomEvent)) checkNotEmpty("database.media_server", string(config.Database.MediaServer)) checkNotEmpty("database.account", string(config.Database.Account)) - checkNotEmpty("database.server_keys", string(config.Database.ServerKeys)) + checkNotEmpty("database.device", string(config.Database.Device)) + checkNotEmpty("database.server_key", string(config.Database.ServerKey)) checkNotEmpty("database.sync_server", string(config.Database.SyncServer)) checkNotEmpty("database.room_server", string(config.Database.RoomServer)) checkNotEmpty("listen.media_api", string(config.Listen.MediaAPI)) diff --git a/src/github.com/matrix-org/dendrite/common/config/config_test.go b/src/github.com/matrix-org/dendrite/common/config/config_test.go index 18a16ea01..3b1220ba1 100644 --- a/src/github.com/matrix-org/dendrite/common/config/config_test.go +++ b/src/github.com/matrix-org/dendrite/common/config/config_test.go @@ -43,11 +43,12 @@ kafka: addresses: ["localhost:9092"] topics: input_room_event: input.room - output_room_event: output.room + output_room_event: output.room database: media_server: "postgresql:///media_server" account: "postgresql:///account" - server_keys: "postgresql:///server_keys" + device: "postgresql:///device" + server_key: "postgresql:///server_keys" sync_server: "postgresql:///sync_server" room_server: "postgresql:///room_server" listen: diff --git a/src/github.com/matrix-org/dendrite/syncapi/config/config.go b/src/github.com/matrix-org/dendrite/syncapi/config/config.go deleted file mode 100644 index 32f047229..000000000 --- a/src/github.com/matrix-org/dendrite/syncapi/config/config.go +++ /dev/null @@ -1,33 +0,0 @@ -// Copyright 2017 Vector Creations Ltd -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package config - -import ( - "github.com/matrix-org/gomatrixserverlib" -) - -// Sync contains the config information necessary to spin up a sync-server process. -type Sync struct { - // Where the room server is listening for queries. - RoomserverURL string `yaml:"roomserver_url"` - // The topic for events which are written by the room server output log. - RoomserverOutputTopic string `yaml:"roomserver_topic"` - // A list of URIs to consume events from. These kafka logs should be produced by a Room Server. - KafkaConsumerURIs []string `yaml:"consumer_uris"` - // The postgres connection config for connecting to the database e.g a postgres:// URI - DataSource string `yaml:"database"` - // The server_name of the running process e.g "localhost" - ServerName gomatrixserverlib.ServerName `yaml:"server_name"` -} diff --git a/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go b/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go index 31c48ac57..9497cb512 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go @@ -20,8 +20,8 @@ import ( log "github.com/Sirupsen/logrus" "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/common/config" "github.com/matrix-org/dendrite/roomserver/api" - "github.com/matrix-org/dendrite/syncapi/config" "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/sync" "github.com/matrix-org/dendrite/syncapi/types" @@ -38,14 +38,15 @@ type OutputRoomEvent struct { } // NewOutputRoomEvent creates a new OutputRoomEvent consumer. Call Start() to begin consuming from room servers. -func NewOutputRoomEvent(cfg *config.Sync, n *sync.Notifier, store *storage.SyncServerDatabase) (*OutputRoomEvent, error) { - kafkaConsumer, err := sarama.NewConsumer(cfg.KafkaConsumerURIs, nil) +func NewOutputRoomEvent(cfg *config.Dendrite, n *sync.Notifier, store *storage.SyncServerDatabase) (*OutputRoomEvent, error) { + kafkaConsumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil) if err != nil { return nil, err } + roomserverURL := "http://" + string(cfg.Listen.RoomServer) consumer := common.ContinualConsumer{ - Topic: cfg.RoomserverOutputTopic, + Topic: string(cfg.Kafka.Topics.OutputRoomEvent), Consumer: kafkaConsumer, PartitionStore: store, } @@ -53,7 +54,7 @@ func NewOutputRoomEvent(cfg *config.Sync, n *sync.Notifier, store *storage.SyncS roomServerConsumer: &consumer, db: store, notifier: n, - query: api.NewRoomserverQueryAPIHTTP(cfg.RoomserverURL, nil), + query: api.NewRoomserverQueryAPIHTTP(roomserverURL, nil), } consumer.ProcessMessage = s.onMessage diff --git a/src/github.com/matrix-org/dendrite/syncapi/routing/routing.go b/src/github.com/matrix-org/dendrite/syncapi/routing/routing.go index ad09cd172..0b65a08ac 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/routing/routing.go +++ b/src/github.com/matrix-org/dendrite/syncapi/routing/routing.go @@ -21,7 +21,6 @@ import ( "github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/clientapi/auth/storage/devices" "github.com/matrix-org/dendrite/common" - "github.com/matrix-org/dendrite/syncapi/config" "github.com/matrix-org/dendrite/syncapi/sync" "github.com/matrix-org/util" "github.com/prometheus/client_golang/prometheus" @@ -30,7 +29,7 @@ import ( const pathPrefixR0 = "/_matrix/client/r0" // SetupSyncServerListeners configures the given mux with sync-server listeners -func SetupSyncServerListeners(servMux *http.ServeMux, httpClient *http.Client, cfg config.Sync, srp *sync.RequestPool, deviceDB *devices.Database) { +func SetupSyncServerListeners(servMux *http.ServeMux, httpClient *http.Client, srp *sync.RequestPool, deviceDB *devices.Database) { apiMux := mux.NewRouter() r0mux := apiMux.PathPrefix(pathPrefixR0).Subrouter() r0mux.Handle("/sync", common.MakeAuthAPI("sync", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {