Replace room server config with common config

This commit is contained in:
Mark Haines 2017-06-14 16:26:45 +01:00
parent 928cde17e0
commit 7765ba7cff

View file

@ -15,14 +15,16 @@
package main
import (
"flag"
"fmt"
"net/http"
_ "net/http/pprof"
"os"
"strconv"
"strings"
log "github.com/Sirupsen/logrus"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/dendrite/roomserver/input"
"github.com/matrix-org/dendrite/roomserver/query"
"github.com/matrix-org/dendrite/roomserver/storage"
@ -31,42 +33,46 @@ import (
)
var (
database = os.Getenv("DATABASE")
kafkaURIs = strings.Split(os.Getenv("KAFKA_URIS"), ",")
inputRoomEventTopic = os.Getenv("TOPIC_INPUT_ROOM_EVENT")
outputRoomEventTopic = os.Getenv("TOPIC_OUTPUT_ROOM_EVENT")
bindAddr = os.Getenv("BIND_ADDRESS")
// Shuts the roomserver down after processing a given number of messages.
// This is useful for running benchmarks for seeing how quickly the server
// can process a given number of messages.
logDir = os.Getenv("LOG_DIR")
configPath = flag.String("config", "", "The path to the config file. For more information, see the config file in this repository.")
stopProcessingAfter = os.Getenv("STOP_AFTER")
)
func main() {
db, err := storage.Open(database)
common.SetupLogging(logDir)
if *configPath == "" {
log.Fatal("--config must be supplied")
}
cfg, err := config.Load(*configPath)
if err != nil {
log.Fatalf("Invalid config file: %s", err)
}
db, err := storage.Open(string(cfg.Database.RoomServer))
if err != nil {
panic(err)
}
kafkaConsumer, err := sarama.NewConsumer(kafkaURIs, nil)
kafkaConsumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil)
if err != nil {
panic(err)
}
kafkaProducer, err := sarama.NewSyncProducer(kafkaURIs, nil)
kafkaProducer, err := sarama.NewSyncProducer(cfg.Kafka.Addresses, nil)
if err != nil {
panic(err)
}
consumer := input.Consumer{
ContinualConsumer: common.ContinualConsumer{
Topic: inputRoomEventTopic,
Topic: string(cfg.Kafka.Topics.InputRoomEvent),
Consumer: kafkaConsumer,
PartitionStore: db,
},
DB: db,
Producer: kafkaProducer,
OutputRoomEventTopic: outputRoomEventTopic,
OutputRoomEventTopic: string(cfg.Kafka.Topics.OutputRoomEvent),
}
if stopProcessingAfter != "" {
@ -93,10 +99,10 @@ func main() {
http.DefaultServeMux.Handle("/metrics", prometheus.Handler())
fmt.Println("Started roomserver")
log.Info("Started room server on ", cfg.Listen.RoomServer)
// TODO: Implement clean shutdown.
if err := http.ListenAndServe(bindAddr, nil); err != nil {
if err := http.ListenAndServe(string(cfg.Listen.RoomServer), nil); err != nil {
panic(err)
}
}