mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-12 01:13:10 -06:00
replace syncserver config with common config
This commit is contained in:
parent
a6e85b0760
commit
e27f22569d
|
|
@ -16,13 +16,12 @@ package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"flag"
|
"flag"
|
||||||
"io/ioutil"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
|
"github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
|
||||||
"github.com/matrix-org/dendrite/common"
|
"github.com/matrix-org/dendrite/common"
|
||||||
"github.com/matrix-org/dendrite/syncapi/config"
|
"github.com/matrix-org/dendrite/common/config"
|
||||||
"github.com/matrix-org/dendrite/syncapi/consumers"
|
"github.com/matrix-org/dendrite/syncapi/consumers"
|
||||||
"github.com/matrix-org/dendrite/syncapi/routing"
|
"github.com/matrix-org/dendrite/syncapi/routing"
|
||||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||||
|
|
@ -30,28 +29,11 @@ import (
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
|
|
||||||
log "github.com/Sirupsen/logrus"
|
log "github.com/Sirupsen/logrus"
|
||||||
yaml "gopkg.in/yaml.v2"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var configPath = flag.String("config", "sync-server-config.yaml", "The path to the config file. For more information, see the config file in this repository.")
|
var configPath = flag.String("config", "dendrite.yaml", "The path to the config file. For more information, see the config file in this repository.")
|
||||||
var bindAddr = flag.String("listen", ":4200", "The port to listen on.")
|
var bindAddr = flag.String("listen", ":4200", "The port to listen on.")
|
||||||
|
|
||||||
func loadConfig(configPath string) (*config.Sync, error) {
|
|
||||||
contents, err := ioutil.ReadFile(configPath)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
var cfg config.Sync
|
|
||||||
if err = yaml.Unmarshal(contents, &cfg); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
// check required fields
|
|
||||||
if cfg.ServerName == "" {
|
|
||||||
log.Fatalf("'server_name' must be supplied in %s", configPath)
|
|
||||||
}
|
|
||||||
return &cfg, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
common.SetupLogging(os.Getenv("LOG_DIR"))
|
common.SetupLogging(os.Getenv("LOG_DIR"))
|
||||||
|
|
||||||
|
|
@ -60,7 +42,7 @@ func main() {
|
||||||
if *configPath == "" {
|
if *configPath == "" {
|
||||||
log.Fatal("--config must be supplied")
|
log.Fatal("--config must be supplied")
|
||||||
}
|
}
|
||||||
cfg, err := loadConfig(*configPath)
|
cfg, err := config.Load(*configPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("Invalid config file: %s", err)
|
log.Fatalf("Invalid config file: %s", err)
|
||||||
}
|
}
|
||||||
|
|
@ -71,15 +53,14 @@ func main() {
|
||||||
|
|
||||||
log.Info("sync server config: ", cfg)
|
log.Info("sync server config: ", cfg)
|
||||||
|
|
||||||
db, err := storage.NewSyncServerDatabase(cfg.DataSource)
|
db, err := storage.NewSyncServerDatabase(string(cfg.Database.SyncServer))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Panicf("startup: failed to create sync server database with data source %s : %s", cfg.DataSource, err)
|
log.Panicf("startup: failed to create sync server database with data source %s : %s", cfg.Database.SyncServer, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: DO NOT USE THIS DATA SOURCE (it's the sync one, not devices!)
|
deviceDB, err := devices.NewDatabase(string(cfg.Database.Device), cfg.Matrix.ServerName)
|
||||||
deviceDB, err := devices.NewDatabase(cfg.DataSource, cfg.ServerName)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Panicf("startup: failed to create device database with data source %s : %s", cfg.DataSource, err)
|
log.Panicf("startup: failed to create device database with data source %s : %s", cfg.Database.Device, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
pos, err := db.SyncStreamPosition()
|
pos, err := db.SyncStreamPosition()
|
||||||
|
|
@ -88,7 +69,7 @@ func main() {
|
||||||
}
|
}
|
||||||
|
|
||||||
n := sync.NewNotifier(types.StreamPosition(pos))
|
n := sync.NewNotifier(types.StreamPosition(pos))
|
||||||
if err := n.Load(db); err != nil {
|
if err = n.Load(db); err != nil {
|
||||||
log.Panicf("startup: failed to set up notifier: %s", err)
|
log.Panicf("startup: failed to set up notifier: %s", err)
|
||||||
}
|
}
|
||||||
consumer, err := consumers.NewOutputRoomEvent(cfg, n, db)
|
consumer, err := consumers.NewOutputRoomEvent(cfg, n, db)
|
||||||
|
|
@ -100,6 +81,6 @@ func main() {
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Info("Starting sync server on ", *bindAddr)
|
log.Info("Starting sync server on ", *bindAddr)
|
||||||
routing.SetupSyncServerListeners(http.DefaultServeMux, http.DefaultClient, *cfg, sync.NewRequestPool(db, n), deviceDB)
|
routing.SetupSyncServerListeners(http.DefaultServeMux, http.DefaultClient, sync.NewRequestPool(db, n), deviceDB)
|
||||||
log.Fatal(http.ListenAndServe(*bindAddr, nil))
|
log.Fatal(http.ListenAndServe(*bindAddr, nil))
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -90,13 +90,13 @@ type Dendrite struct {
|
||||||
// The configuration for talking to kafka.
|
// The configuration for talking to kafka.
|
||||||
Kafka struct {
|
Kafka struct {
|
||||||
// A list of kafka addresses to connect to.
|
// A list of kafka addresses to connect to.
|
||||||
Addresses []Address `yaml:"addresses"`
|
Addresses []string `yaml:"addresses"`
|
||||||
// The names of the topics to use when reading and writing from kafka.
|
// The names of the topics to use when reading and writing from kafka.
|
||||||
Topics struct {
|
Topics struct {
|
||||||
// Topic for roomserver/api.InputRoomEvent events.
|
// Topic for roomserver/api.InputRoomEvent events.
|
||||||
InputRoomEvent Topic `yaml:"input_room_event"`
|
InputRoomEvent Topic `yaml:"input_room_event"`
|
||||||
// Topic for roomserver/api.OutputRoomEvent events.
|
// Topic for roomserver/api.OutputRoomEvent events.
|
||||||
OuputRoomEvent Topic `yaml:"output_room_event"`
|
OutputRoomEvent Topic `yaml:"output_room_event"`
|
||||||
}
|
}
|
||||||
} `yaml:"kafka"`
|
} `yaml:"kafka"`
|
||||||
|
|
||||||
|
|
@ -104,7 +104,8 @@ type Dendrite struct {
|
||||||
Database struct {
|
Database struct {
|
||||||
MediaServer DataSource `yaml:"media_server"`
|
MediaServer DataSource `yaml:"media_server"`
|
||||||
Account DataSource `yaml:"account"`
|
Account DataSource `yaml:"account"`
|
||||||
ServerKeys DataSource `yaml:"server_keys"`
|
Device DataSource `yaml:"device"`
|
||||||
|
ServerKey DataSource `yaml:"server_key"`
|
||||||
SyncServer DataSource `yaml:"sync_server"`
|
SyncServer DataSource `yaml:"sync_server"`
|
||||||
RoomServer DataSource `yaml:"room_server"`
|
RoomServer DataSource `yaml:"room_server"`
|
||||||
} `yaml:"database"`
|
} `yaml:"database"`
|
||||||
|
|
@ -258,11 +259,13 @@ func (config *Dendrite) check() error {
|
||||||
checkNotEmpty("matrix.private_key", string(config.Matrix.PrivateKeyPath))
|
checkNotEmpty("matrix.private_key", string(config.Matrix.PrivateKeyPath))
|
||||||
checkNotZero("matrix.federation_certificates", len(config.Matrix.FederationCertificatePaths))
|
checkNotZero("matrix.federation_certificates", len(config.Matrix.FederationCertificatePaths))
|
||||||
checkNotEmpty("media.base_path", string(config.Media.BasePath))
|
checkNotEmpty("media.base_path", string(config.Media.BasePath))
|
||||||
|
checkNotZero("kafka.addresses", len(config.Kafka.Addresses))
|
||||||
checkNotEmpty("kafka.topics.input_room_event", string(config.Kafka.Topics.InputRoomEvent))
|
checkNotEmpty("kafka.topics.input_room_event", string(config.Kafka.Topics.InputRoomEvent))
|
||||||
checkNotEmpty("kafka.topics.output_room_event", string(config.Kafka.Topics.InputRoomEvent))
|
checkNotEmpty("kafka.topics.output_room_event", string(config.Kafka.Topics.OutputRoomEvent))
|
||||||
checkNotEmpty("database.media_server", string(config.Database.MediaServer))
|
checkNotEmpty("database.media_server", string(config.Database.MediaServer))
|
||||||
checkNotEmpty("database.account", string(config.Database.Account))
|
checkNotEmpty("database.account", string(config.Database.Account))
|
||||||
checkNotEmpty("database.server_keys", string(config.Database.ServerKeys))
|
checkNotEmpty("database.device", string(config.Database.Device))
|
||||||
|
checkNotEmpty("database.server_key", string(config.Database.ServerKey))
|
||||||
checkNotEmpty("database.sync_server", string(config.Database.SyncServer))
|
checkNotEmpty("database.sync_server", string(config.Database.SyncServer))
|
||||||
checkNotEmpty("database.room_server", string(config.Database.RoomServer))
|
checkNotEmpty("database.room_server", string(config.Database.RoomServer))
|
||||||
checkNotEmpty("listen.media_api", string(config.Listen.MediaAPI))
|
checkNotEmpty("listen.media_api", string(config.Listen.MediaAPI))
|
||||||
|
|
|
||||||
|
|
@ -47,7 +47,8 @@ kafka:
|
||||||
database:
|
database:
|
||||||
media_server: "postgresql:///media_server"
|
media_server: "postgresql:///media_server"
|
||||||
account: "postgresql:///account"
|
account: "postgresql:///account"
|
||||||
server_keys: "postgresql:///server_keys"
|
device: "postgresql:///device"
|
||||||
|
server_key: "postgresql:///server_keys"
|
||||||
sync_server: "postgresql:///sync_server"
|
sync_server: "postgresql:///sync_server"
|
||||||
room_server: "postgresql:///room_server"
|
room_server: "postgresql:///room_server"
|
||||||
listen:
|
listen:
|
||||||
|
|
|
||||||
|
|
@ -1,33 +0,0 @@
|
||||||
// Copyright 2017 Vector Creations Ltd
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package config
|
|
||||||
|
|
||||||
import (
|
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Sync contains the config information necessary to spin up a sync-server process.
|
|
||||||
type Sync struct {
|
|
||||||
// Where the room server is listening for queries.
|
|
||||||
RoomserverURL string `yaml:"roomserver_url"`
|
|
||||||
// The topic for events which are written by the room server output log.
|
|
||||||
RoomserverOutputTopic string `yaml:"roomserver_topic"`
|
|
||||||
// A list of URIs to consume events from. These kafka logs should be produced by a Room Server.
|
|
||||||
KafkaConsumerURIs []string `yaml:"consumer_uris"`
|
|
||||||
// The postgres connection config for connecting to the database e.g a postgres:// URI
|
|
||||||
DataSource string `yaml:"database"`
|
|
||||||
// The server_name of the running process e.g "localhost"
|
|
||||||
ServerName gomatrixserverlib.ServerName `yaml:"server_name"`
|
|
||||||
}
|
|
||||||
|
|
@ -20,8 +20,8 @@ import (
|
||||||
|
|
||||||
log "github.com/Sirupsen/logrus"
|
log "github.com/Sirupsen/logrus"
|
||||||
"github.com/matrix-org/dendrite/common"
|
"github.com/matrix-org/dendrite/common"
|
||||||
|
"github.com/matrix-org/dendrite/common/config"
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
"github.com/matrix-org/dendrite/syncapi/config"
|
|
||||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||||
"github.com/matrix-org/dendrite/syncapi/sync"
|
"github.com/matrix-org/dendrite/syncapi/sync"
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
|
|
@ -38,14 +38,15 @@ type OutputRoomEvent struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewOutputRoomEvent creates a new OutputRoomEvent consumer. Call Start() to begin consuming from room servers.
|
// NewOutputRoomEvent creates a new OutputRoomEvent consumer. Call Start() to begin consuming from room servers.
|
||||||
func NewOutputRoomEvent(cfg *config.Sync, n *sync.Notifier, store *storage.SyncServerDatabase) (*OutputRoomEvent, error) {
|
func NewOutputRoomEvent(cfg *config.Dendrite, n *sync.Notifier, store *storage.SyncServerDatabase) (*OutputRoomEvent, error) {
|
||||||
kafkaConsumer, err := sarama.NewConsumer(cfg.KafkaConsumerURIs, nil)
|
kafkaConsumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
roomserverURL := "http://" + string(cfg.Listen.RoomServer)
|
||||||
|
|
||||||
consumer := common.ContinualConsumer{
|
consumer := common.ContinualConsumer{
|
||||||
Topic: cfg.RoomserverOutputTopic,
|
Topic: string(cfg.Kafka.Topics.OutputRoomEvent),
|
||||||
Consumer: kafkaConsumer,
|
Consumer: kafkaConsumer,
|
||||||
PartitionStore: store,
|
PartitionStore: store,
|
||||||
}
|
}
|
||||||
|
|
@ -53,7 +54,7 @@ func NewOutputRoomEvent(cfg *config.Sync, n *sync.Notifier, store *storage.SyncS
|
||||||
roomServerConsumer: &consumer,
|
roomServerConsumer: &consumer,
|
||||||
db: store,
|
db: store,
|
||||||
notifier: n,
|
notifier: n,
|
||||||
query: api.NewRoomserverQueryAPIHTTP(cfg.RoomserverURL, nil),
|
query: api.NewRoomserverQueryAPIHTTP(roomserverURL, nil),
|
||||||
}
|
}
|
||||||
consumer.ProcessMessage = s.onMessage
|
consumer.ProcessMessage = s.onMessage
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -21,7 +21,6 @@ import (
|
||||||
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
||||||
"github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
|
"github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
|
||||||
"github.com/matrix-org/dendrite/common"
|
"github.com/matrix-org/dendrite/common"
|
||||||
"github.com/matrix-org/dendrite/syncapi/config"
|
|
||||||
"github.com/matrix-org/dendrite/syncapi/sync"
|
"github.com/matrix-org/dendrite/syncapi/sync"
|
||||||
"github.com/matrix-org/util"
|
"github.com/matrix-org/util"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
|
@ -30,7 +29,7 @@ import (
|
||||||
const pathPrefixR0 = "/_matrix/client/r0"
|
const pathPrefixR0 = "/_matrix/client/r0"
|
||||||
|
|
||||||
// SetupSyncServerListeners configures the given mux with sync-server listeners
|
// SetupSyncServerListeners configures the given mux with sync-server listeners
|
||||||
func SetupSyncServerListeners(servMux *http.ServeMux, httpClient *http.Client, cfg config.Sync, srp *sync.RequestPool, deviceDB *devices.Database) {
|
func SetupSyncServerListeners(servMux *http.ServeMux, httpClient *http.Client, srp *sync.RequestPool, deviceDB *devices.Database) {
|
||||||
apiMux := mux.NewRouter()
|
apiMux := mux.NewRouter()
|
||||||
r0mux := apiMux.PathPrefix(pathPrefixR0).Subrouter()
|
r0mux := apiMux.PathPrefix(pathPrefixR0).Subrouter()
|
||||||
r0mux.Handle("/sync", common.MakeAuthAPI("sync", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
|
r0mux.Handle("/sync", common.MakeAuthAPI("sync", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue