From 7765ba7cff70a0d45158b7b6f114b10ca666cf38 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 14 Jun 2017 16:26:45 +0100 Subject: [PATCH] Replace room server config with common config --- .../dendrite/cmd/dendrite-room-server/main.go | 38 +++++++++++-------- 1 file changed, 22 insertions(+), 16 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-room-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-room-server/main.go index d06be276d..b3c004d18 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-room-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-room-server/main.go @@ -15,14 +15,16 @@ package main import ( + "flag" "fmt" "net/http" _ "net/http/pprof" "os" "strconv" - "strings" + log "github.com/Sirupsen/logrus" "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/common/config" "github.com/matrix-org/dendrite/roomserver/input" "github.com/matrix-org/dendrite/roomserver/query" "github.com/matrix-org/dendrite/roomserver/storage" @@ -31,42 +33,46 @@ import ( ) var ( - database = os.Getenv("DATABASE") - kafkaURIs = strings.Split(os.Getenv("KAFKA_URIS"), ",") - inputRoomEventTopic = os.Getenv("TOPIC_INPUT_ROOM_EVENT") - outputRoomEventTopic = os.Getenv("TOPIC_OUTPUT_ROOM_EVENT") - bindAddr = os.Getenv("BIND_ADDRESS") - // Shuts the roomserver down after processing a given number of messages. - // This is useful for running benchmarks for seeing how quickly the server - // can process a given number of messages. + logDir = os.Getenv("LOG_DIR") + configPath = flag.String("config", "", "The path to the config file. For more information, see the config file in this repository.") stopProcessingAfter = os.Getenv("STOP_AFTER") ) func main() { - db, err := storage.Open(database) + common.SetupLogging(logDir) + + if *configPath == "" { + log.Fatal("--config must be supplied") + } + cfg, err := config.Load(*configPath) + if err != nil { + log.Fatalf("Invalid config file: %s", err) + } + + db, err := storage.Open(string(cfg.Database.RoomServer)) if err != nil { panic(err) } - kafkaConsumer, err := sarama.NewConsumer(kafkaURIs, nil) + kafkaConsumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil) if err != nil { panic(err) } - kafkaProducer, err := sarama.NewSyncProducer(kafkaURIs, nil) + kafkaProducer, err := sarama.NewSyncProducer(cfg.Kafka.Addresses, nil) if err != nil { panic(err) } consumer := input.Consumer{ ContinualConsumer: common.ContinualConsumer{ - Topic: inputRoomEventTopic, + Topic: string(cfg.Kafka.Topics.InputRoomEvent), Consumer: kafkaConsumer, PartitionStore: db, }, DB: db, Producer: kafkaProducer, - OutputRoomEventTopic: outputRoomEventTopic, + OutputRoomEventTopic: string(cfg.Kafka.Topics.OutputRoomEvent), } if stopProcessingAfter != "" { @@ -93,10 +99,10 @@ func main() { http.DefaultServeMux.Handle("/metrics", prometheus.Handler()) - fmt.Println("Started roomserver") + log.Info("Started room server on ", cfg.Listen.RoomServer) // TODO: Implement clean shutdown. - if err := http.ListenAndServe(bindAddr, nil); err != nil { + if err := http.ListenAndServe(string(cfg.Listen.RoomServer), nil); err != nil { panic(err) } }