From c27d1fdfb40032956ed5d13d767bcf12bfc5a65c Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 16 Aug 2017 13:36:41 +0100 Subject: [PATCH] Optionally use naffka in the monolithic server (#183) * dependency injection for the kafka consumers/producers * Optionally use naffka in the monolithic server * remember to call setupKafka() * tweak imports * fix integration tests * Add use_naffka to the example config * Update comment on the listen APIs --- dendrite-config.yaml | 7 +- .../clientapi/consumers/roomserver.go | 19 ++-- .../dendrite/clientapi/producers/syncapi.go | 12 --- .../clientapi/producers/userupdate.go | 12 --- .../cmd/dendrite-client-api-server/main.go | 38 +++++--- .../dendrite-federation-sender-server/main.go | 17 ++-- .../cmd/dendrite-monolith-server/main.go | 88 +++++++++++-------- .../cmd/dendrite-sync-api-server/main.go | 19 ++-- .../dendrite/common/config/config.go | 62 ++++++++++--- .../dendrite/common/config/config_test.go | 3 +- .../matrix-org/dendrite/common/test/config.go | 1 + .../federationsender/consumers/roomserver.go | 18 ++-- .../dendrite/syncapi/consumers/clientapi.go | 13 +-- .../dendrite/syncapi/consumers/roomserver.go | 17 ++-- 14 files changed, 195 insertions(+), 131 deletions(-) diff --git a/dendrite-config.yaml b/dendrite-config.yaml index d04707a61..b19add182 100644 --- a/dendrite-config.yaml +++ b/dendrite-config.yaml @@ -52,9 +52,13 @@ media: kafka: # Where the kafka servers are running. addresses: ["localhost:9092"] + # Whether to use naffka instead of kafka. + # Naffka can only be used when running dendrite as a single monolithic server. + # Kafka can be used both with a monolithic server and when running the + # components as separate servers. + use_naffka: false # The names of the kafka topics to use. topics: - input_room_event: roomserverInput output_room_event: roomserverOutput output_client_data: clientapiOutput user_updates: userUpdates @@ -71,6 +75,7 @@ database: # The TCP host:port pairs to bind the internal HTTP APIs to. # These shouldn't be exposed to the public internet. +# These aren't needed when running dendrite as a monolithic server. listen: room_server: "localhost:7770" client_api: "localhost:7771" diff --git a/src/github.com/matrix-org/dendrite/clientapi/consumers/roomserver.go b/src/github.com/matrix-org/dendrite/clientapi/consumers/roomserver.go index 1e2320787..314ca42bc 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/consumers/roomserver.go +++ b/src/github.com/matrix-org/dendrite/clientapi/consumers/roomserver.go @@ -17,12 +17,13 @@ package consumers import ( "encoding/json" - log "github.com/Sirupsen/logrus" "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common/config" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrixserverlib" + + log "github.com/Sirupsen/logrus" sarama "gopkg.in/Shopify/sarama.v1" ) @@ -35,12 +36,12 @@ type OutputRoomEvent struct { } // NewOutputRoomEvent creates a new OutputRoomEvent consumer. Call Start() to begin consuming from room servers. -func NewOutputRoomEvent(cfg *config.Dendrite, store *accounts.Database) (*OutputRoomEvent, error) { - kafkaConsumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil) - if err != nil { - return nil, err - } - roomServerURL := cfg.RoomServerURL() +func NewOutputRoomEvent( + cfg *config.Dendrite, + kafkaConsumer sarama.Consumer, + store *accounts.Database, + queryAPI api.RoomserverQueryAPI, +) *OutputRoomEvent { consumer := common.ContinualConsumer{ Topic: string(cfg.Kafka.Topics.OutputRoomEvent), @@ -50,12 +51,12 @@ func NewOutputRoomEvent(cfg *config.Dendrite, store *accounts.Database) (*Output s := &OutputRoomEvent{ roomServerConsumer: &consumer, db: store, - query: api.NewRoomserverQueryAPIHTTP(roomServerURL, nil), + query: queryAPI, serverName: string(cfg.Matrix.ServerName), } consumer.ProcessMessage = s.onMessage - return s, nil + return s } // Start consuming from room servers diff --git a/src/github.com/matrix-org/dendrite/clientapi/producers/syncapi.go b/src/github.com/matrix-org/dendrite/clientapi/producers/syncapi.go index 2597089e3..dba104f90 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/producers/syncapi.go +++ b/src/github.com/matrix-org/dendrite/clientapi/producers/syncapi.go @@ -28,18 +28,6 @@ type SyncAPIProducer struct { Producer sarama.SyncProducer } -// NewSyncAPIProducer creates a new SyncAPIProducer -func NewSyncAPIProducer(kafkaURIs []string, topic string) (*SyncAPIProducer, error) { - producer, err := sarama.NewSyncProducer(kafkaURIs, nil) - if err != nil { - return nil, err - } - return &SyncAPIProducer{ - Topic: topic, - Producer: producer, - }, nil -} - // SendData sends account data to the sync API server func (p *SyncAPIProducer) SendData(userID string, roomID string, dataType string) error { var m sarama.ProducerMessage diff --git a/src/github.com/matrix-org/dendrite/clientapi/producers/userupdate.go b/src/github.com/matrix-org/dendrite/clientapi/producers/userupdate.go index f76be0d75..2f2ed7565 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/producers/userupdate.go +++ b/src/github.com/matrix-org/dendrite/clientapi/producers/userupdate.go @@ -34,18 +34,6 @@ type profileUpdate struct { NewValue string `json:"new_value"` // The attribute's value after the update } -// NewUserUpdateProducer creates a new UserUpdateProducer -func NewUserUpdateProducer(kafkaURIs []string, topic string) (*UserUpdateProducer, error) { - producer, err := sarama.NewSyncProducer(kafkaURIs, nil) - if err != nil { - return nil, err - } - return &UserUpdateProducer{ - Topic: topic, - Producer: producer, - }, nil -} - // SendUpdate sends an update using kafka to notify the roomserver of the // profile update. Returns an error if the update failed to send. func (p *UserUpdateProducer) SendUpdate( diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-client-api-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-client-api-server/main.go index 5d195bee8..53ebdb934 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-client-api-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-client-api-server/main.go @@ -33,6 +33,7 @@ import ( "github.com/matrix-org/gomatrixserverlib" log "github.com/Sirupsen/logrus" + sarama "gopkg.in/Shopify/sarama.v1" ) var ( @@ -50,24 +51,28 @@ func main() { log.Fatalf("Invalid config file: %s", err) } - log.Info("config: ", cfg) - queryAPI := api.NewRoomserverQueryAPIHTTP(cfg.RoomServerURL(), nil) aliasAPI := api.NewRoomserverAliasAPIHTTP(cfg.RoomServerURL(), nil) inputAPI := api.NewRoomserverInputAPIHTTP(cfg.RoomServerURL(), nil) roomserverProducer := producers.NewRoomserverProducer(inputAPI) - userUpdateProducer, err := producers.NewUserUpdateProducer( - cfg.Kafka.Addresses, string(cfg.Kafka.Topics.UserUpdates), - ) + + kafkaProducer, err := sarama.NewSyncProducer(cfg.Kafka.Addresses, nil) if err != nil { - log.Panicf("Failed to setup kafka producers(%q): %s", cfg.Kafka.Addresses, err) + log.WithFields(log.Fields{ + log.ErrorKey: err, + "addresses": cfg.Kafka.Addresses, + }).Panic("Failed to setup kafka producers") } - syncProducer, err := producers.NewSyncAPIProducer( - cfg.Kafka.Addresses, string(cfg.Kafka.Topics.OutputClientData), - ) - if err != nil { - log.Panicf("Failed to setup kafka producers(%q): %s", cfg.Kafka.Addresses, err) + + userUpdateProducer := &producers.UserUpdateProducer{ + Producer: kafkaProducer, + Topic: string(cfg.Kafka.Topics.UserUpdates), + } + + syncProducer := &producers.SyncAPIProducer{ + Producer: kafkaProducer, + Topic: string(cfg.Kafka.Topics.OutputClientData), } federation := gomatrixserverlib.NewFederationClient( @@ -90,15 +95,20 @@ func main() { keyRing := gomatrixserverlib.KeyRing{ KeyFetchers: []gomatrixserverlib.KeyFetcher{ // TODO: Use perspective key fetchers for production. - &gomatrixserverlib.DirectKeyFetcher{federation.Client}, + &gomatrixserverlib.DirectKeyFetcher{Client: federation.Client}, }, KeyDatabase: keyDB, } - consumer, err := consumers.NewOutputRoomEvent(cfg, accountDB) + kafkaConsumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil) if err != nil { - log.Panicf("startup: failed to create room server consumer: %s", err) + log.WithFields(log.Fields{ + log.ErrorKey: err, + "addresses": cfg.Kafka.Addresses, + }).Panic("Failed to setup kafka consumers") } + + consumer := consumers.NewOutputRoomEvent(cfg, kafkaConsumer, accountDB, queryAPI) if err = consumer.Start(); err != nil { log.Panicf("startup: failed to start room server consumer") } diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-federation-sender-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-federation-sender-server/main.go index 9052c3f87..dfc2dc2ff 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-federation-sender-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-federation-sender-server/main.go @@ -25,9 +25,11 @@ import ( "github.com/matrix-org/dendrite/federationsender/consumers" "github.com/matrix-org/dendrite/federationsender/queue" "github.com/matrix-org/dendrite/federationsender/storage" + "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrixserverlib" log "github.com/Sirupsen/logrus" + sarama "gopkg.in/Shopify/sarama.v1" ) var configPath = flag.String("config", "dendrite.yaml", "The path to the config file. For more information, see the config file in this repository.") @@ -45,7 +47,15 @@ func main() { log.Fatalf("Invalid config file: %s", err) } - log.Info("config: ", cfg) + kafkaConsumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil) + if err != nil { + log.WithFields(log.Fields{ + log.ErrorKey: err, + "addresses": cfg.Kafka.Addresses, + }).Panic("Failed to setup kafka consumers") + } + + queryAPI := api.NewRoomserverQueryAPIHTTP(cfg.RoomServerURL(), nil) db, err := storage.NewDatabase(string(cfg.Database.FederationSender)) if err != nil { @@ -58,10 +68,7 @@ func main() { queues := queue.NewOutgoingQueues(cfg.Matrix.ServerName, federation) - consumer, err := consumers.NewOutputRoomEvent(cfg, queues, db) - if err != nil { - log.WithError(err).Panicf("startup: failed to create room server consumer") - } + consumer := consumers.NewOutputRoomEvent(cfg, kafkaConsumer, queues, db, queryAPI) if err = consumer.Start(); err != nil { log.WithError(err).Panicf("startup: failed to start room server consumer") } diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-monolith-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-monolith-server/main.go index f0046d7b1..ee86469d3 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-monolith-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-monolith-server/main.go @@ -27,6 +27,7 @@ import ( "github.com/matrix-org/dendrite/common/config" "github.com/matrix-org/dendrite/common/keydb" "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/naffka" mediaapi_routing "github.com/matrix-org/dendrite/mediaapi/routing" mediaapi_storage "github.com/matrix-org/dendrite/mediaapi/storage" @@ -72,7 +73,7 @@ func main() { if *configPath == "" { log.Fatal("--config must be supplied") } - cfg, err := config.Load(*configPath) + cfg, err := config.LoadMonolithic(*configPath) if err != nil { log.Fatalf("Invalid config file: %s", err) } @@ -80,6 +81,7 @@ func main() { m := newMonolith(cfg) m.setupDatabases() m.setupFederation() + m.setupKafka() m.setupRoomServer() m.setupProducers() m.setupNotifiers() @@ -125,6 +127,9 @@ type monolith struct { queryAPI *roomserver_query.RoomserverQueryAPI aliasAPI *roomserver_alias.RoomserverAliasAPI + kafkaConsumer sarama.Consumer + kafkaProducer sarama.SyncProducer + roomServerProducer *producers.RoomserverProducer userUpdateProducer *producers.UserUpdateProducer syncProducer *producers.SyncAPIProducer @@ -182,15 +187,39 @@ func (m *monolith) setupFederation() { } } -func (m *monolith) setupRoomServer() { - kafkaProducer, err := sarama.NewSyncProducer(m.cfg.Kafka.Addresses, nil) - if err != nil { - panic(err) +func (m *monolith) setupKafka() { + var err error + if m.cfg.Kafka.UseNaffka { + naff, err := naffka.New(&naffka.MemoryDatabase{}) + if err != nil { + log.WithFields(log.Fields{ + log.ErrorKey: err, + }).Panic("Failed to setup naffka") + } + m.kafkaConsumer = naff + m.kafkaProducer = naff + } else { + m.kafkaConsumer, err = sarama.NewConsumer(m.cfg.Kafka.Addresses, nil) + if err != nil { + log.WithFields(log.Fields{ + log.ErrorKey: err, + "addresses": m.cfg.Kafka.Addresses, + }).Panic("Failed to setup kafka consumers") + } + m.kafkaProducer, err = sarama.NewSyncProducer(m.cfg.Kafka.Addresses, nil) + if err != nil { + log.WithFields(log.Fields{ + log.ErrorKey: err, + "addresses": m.cfg.Kafka.Addresses, + }).Panic("Failed to setup kafka producers") + } } +} +func (m *monolith) setupRoomServer() { m.inputAPI = &roomserver_input.RoomserverInputAPI{ DB: m.roomServerDB, - Producer: kafkaProducer, + Producer: m.kafkaProducer, OutputRoomEventTopic: string(m.cfg.Kafka.Topics.OutputRoomEvent), } @@ -207,19 +236,14 @@ func (m *monolith) setupRoomServer() { } func (m *monolith) setupProducers() { - var err error m.roomServerProducer = producers.NewRoomserverProducer(m.inputAPI) - m.userUpdateProducer, err = producers.NewUserUpdateProducer( - m.cfg.Kafka.Addresses, string(m.cfg.Kafka.Topics.UserUpdates), - ) - if err != nil { - log.Panicf("Failed to setup kafka producers(%q): %s", m.cfg.Kafka.Addresses, err) + m.userUpdateProducer = &producers.UserUpdateProducer{ + Producer: m.kafkaProducer, + Topic: string(m.cfg.Kafka.Topics.UserUpdates), } - m.syncProducer, err = producers.NewSyncAPIProducer( - m.cfg.Kafka.Addresses, string(m.cfg.Kafka.Topics.OutputClientData), - ) - if err != nil { - log.Panicf("Failed to setup kafka producers(%q): %s", m.cfg.Kafka.Addresses, err) + m.syncProducer = &producers.SyncAPIProducer{ + Producer: m.kafkaProducer, + Topic: string(m.cfg.Kafka.Topics.OutputClientData), } } @@ -236,42 +260,34 @@ func (m *monolith) setupNotifiers() { } func (m *monolith) setupConsumers() { - clientAPIConsumer, err := clientapi_consumers.NewOutputRoomEvent(m.cfg, m.accountDB) - if err != nil { - log.Panicf("startup: failed to create room server consumer: %s", err) - } + var err error + + clientAPIConsumer := clientapi_consumers.NewOutputRoomEvent( + m.cfg, m.kafkaConsumer, m.accountDB, m.queryAPI, + ) if err = clientAPIConsumer.Start(); err != nil { log.Panicf("startup: failed to start room server consumer") } - syncAPIRoomConsumer, err := syncapi_consumers.NewOutputRoomEvent( - m.cfg, m.syncAPINotifier, m.syncAPIDB, + syncAPIRoomConsumer := syncapi_consumers.NewOutputRoomEvent( + m.cfg, m.kafkaConsumer, m.syncAPINotifier, m.syncAPIDB, m.queryAPI, ) - if err != nil { - log.Panicf("startup: failed to create room server consumer: %s", err) - } if err = syncAPIRoomConsumer.Start(); err != nil { log.Panicf("startup: failed to start room server consumer: %s", err) } - syncAPIClientConsumer, err := syncapi_consumers.NewOutputClientData( - m.cfg, m.syncAPINotifier, m.syncAPIDB, + syncAPIClientConsumer := syncapi_consumers.NewOutputClientData( + m.cfg, m.kafkaConsumer, m.syncAPINotifier, m.syncAPIDB, ) - if err != nil { - log.Panicf("startup: failed to create client API server consumer: %s", err) - } if err = syncAPIClientConsumer.Start(); err != nil { log.Panicf("startup: failed to start client API server consumer: %s", err) } federationSenderQueues := queue.NewOutgoingQueues(m.cfg.Matrix.ServerName, m.federation) - federationSenderRoomConsumer, err := federationsender_consumers.NewOutputRoomEvent( - m.cfg, federationSenderQueues, m.federationSenderDB, + federationSenderRoomConsumer := federationsender_consumers.NewOutputRoomEvent( + m.cfg, m.kafkaConsumer, federationSenderQueues, m.federationSenderDB, m.queryAPI, ) - if err != nil { - log.WithError(err).Panicf("startup: failed to create room server consumer") - } if err = federationSenderRoomConsumer.Start(); err != nil { log.WithError(err).Panicf("startup: failed to start room server consumer") } diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-api-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-api-server/main.go index 7db4d4ca4..7e9e4c128 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-api-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-api-server/main.go @@ -24,6 +24,7 @@ import ( "github.com/matrix-org/dendrite/clientapi/auth/storage/devices" "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common/config" + "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/syncapi/consumers" "github.com/matrix-org/dendrite/syncapi/routing" "github.com/matrix-org/dendrite/syncapi/storage" @@ -31,6 +32,7 @@ import ( "github.com/matrix-org/dendrite/syncapi/types" log "github.com/Sirupsen/logrus" + sarama "gopkg.in/Shopify/sarama.v1" ) var configPath = flag.String("config", "dendrite.yaml", "The path to the config file. For more information, see the config file in this repository.") @@ -48,7 +50,7 @@ func main() { log.Fatalf("Invalid config file: %s", err) } - log.Info("config: ", cfg) + queryAPI := api.NewRoomserverQueryAPIHTTP(cfg.RoomServerURL(), nil) db, err := storage.NewSyncServerDatabase(string(cfg.Database.SyncAPI)) if err != nil { @@ -74,17 +76,20 @@ func main() { if err = n.Load(db); err != nil { log.Panicf("startup: failed to set up notifier: %s", err) } - roomConsumer, err := consumers.NewOutputRoomEvent(cfg, n, db) + + kafkaConsumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil) if err != nil { - log.Panicf("startup: failed to create room server consumer: %s", err) + log.WithFields(log.Fields{ + log.ErrorKey: err, + "addresses": cfg.Kafka.Addresses, + }).Panic("Failed to setup kafka consumers") } + + roomConsumer := consumers.NewOutputRoomEvent(cfg, kafkaConsumer, n, db, queryAPI) if err = roomConsumer.Start(); err != nil { log.Panicf("startup: failed to start room server consumer: %s", err) } - clientConsumer, err := consumers.NewOutputClientData(cfg, n, db) - if err != nil { - log.Panicf("startup: failed to create client API server consumer: %s", err) - } + clientConsumer := consumers.NewOutputClientData(cfg, kafkaConsumer, n, db) if err = clientConsumer.Start(); err != nil { log.Panicf("startup: failed to start client API server consumer: %s", err) } diff --git a/src/github.com/matrix-org/dendrite/common/config/config.go b/src/github.com/matrix-org/dendrite/common/config/config.go index 324561f68..ae0fe62cd 100644 --- a/src/github.com/matrix-org/dendrite/common/config/config.go +++ b/src/github.com/matrix-org/dendrite/common/config/config.go @@ -94,6 +94,11 @@ type Dendrite struct { Kafka struct { // A list of kafka addresses to connect to. Addresses []string `yaml:"addresses"` + // Whether to use naffka instead of kafka. + // Naffka can only be used when running dendrite as a single monolithic server. + // Kafka can be used both with a monolithic server and when running the + // components as separate servers. + UseNaffka bool `yaml:"use_naffka,omitempty"` // The names of the topics to use when reading and writing from kafka. Topics struct { // Topic for roomserver/api.OutputRoomEvent events. @@ -169,7 +174,10 @@ type ThumbnailSize struct { ResizeMethod string `yaml:"method,omitempty"` } -// Load a yaml config file +// Load a yaml config file for a server run as multiple processes. +// Checks the config to ensure that it is valid. +// The checks are different if the server is run as a monolithic process instead +// of being split into multiple components func Load(configPath string) (*Dendrite, error) { configData, err := ioutil.ReadFile(configPath) if err != nil { @@ -181,7 +189,27 @@ func Load(configPath string) (*Dendrite, error) { } // Pass the current working directory and ioutil.ReadFile so that they can // be mocked in the tests - return loadConfig(basePath, configData, ioutil.ReadFile) + monolithic := false + return loadConfig(basePath, configData, ioutil.ReadFile, monolithic) +} + +// LoadMonolithic loads a yaml config file for a server run as a single monolith. +// Checks the config to ensure that it is valid. +// The checks are different if the server is run as a monolithic process instead +// of being split into multiple components +func LoadMonolithic(configPath string) (*Dendrite, error) { + configData, err := ioutil.ReadFile(configPath) + if err != nil { + return nil, err + } + basePath, err := filepath.Abs(".") + if err != nil { + return nil, err + } + // Pass the current working directory and ioutil.ReadFile so that they can + // be mocked in the tests + monolithic := true + return loadConfig(basePath, configData, ioutil.ReadFile, monolithic) } // An Error indicates a problem parsing the config. @@ -194,6 +222,7 @@ func loadConfig( basePath string, configData []byte, readFile func(string) ([]byte, error), + monolithic bool, ) (*Dendrite, error) { var config Dendrite var err error @@ -203,7 +232,7 @@ func loadConfig( config.setDefaults() - if err = config.check(); err != nil { + if err = config.check(monolithic); err != nil { return nil, err } @@ -259,7 +288,7 @@ func (e Error) Error() string { ) } -func (config *Dendrite) check() error { +func (config *Dendrite) check(monolithic bool) error { var problems []string if config.Version != Version { @@ -297,21 +326,32 @@ func (config *Dendrite) check() error { checkPositive(fmt.Sprintf("media.thumbnail_sizes[%d].width", i), int64(size.Width)) checkPositive(fmt.Sprintf("media.thumbnail_sizes[%d].height", i), int64(size.Height)) } - - checkNotZero("kafka.addresses", int64(len(config.Kafka.Addresses))) + if config.Kafka.UseNaffka { + if !monolithic { + problems = append(problems, fmt.Sprintf("naffka can only be used in a monolithic server")) + } + } else { + // If we aren't using naffka then we need to have at least one kafka + // server to talk to. + checkNotZero("kafka.addresses", int64(len(config.Kafka.Addresses))) + } checkNotEmpty("kafka.topics.output_room_event", string(config.Kafka.Topics.OutputRoomEvent)) checkNotEmpty("kafka.topics.output_client_data", string(config.Kafka.Topics.OutputClientData)) + checkNotEmpty("kafka.topics.user_updates", string(config.Kafka.Topics.UserUpdates)) checkNotEmpty("database.account", string(config.Database.Account)) checkNotEmpty("database.device", string(config.Database.Device)) checkNotEmpty("database.server_key", string(config.Database.ServerKey)) checkNotEmpty("database.media_api", string(config.Database.MediaAPI)) checkNotEmpty("database.sync_api", string(config.Database.SyncAPI)) checkNotEmpty("database.room_server", string(config.Database.RoomServer)) - checkNotEmpty("listen.media_api", string(config.Listen.MediaAPI)) - checkNotEmpty("listen.client_api", string(config.Listen.ClientAPI)) - checkNotEmpty("listen.federation_api", string(config.Listen.FederationAPI)) - checkNotEmpty("listen.sync_api", string(config.Listen.SyncAPI)) - checkNotEmpty("listen.room_server", string(config.Listen.RoomServer)) + + if !monolithic { + checkNotEmpty("listen.media_api", string(config.Listen.MediaAPI)) + checkNotEmpty("listen.client_api", string(config.Listen.ClientAPI)) + checkNotEmpty("listen.federation_api", string(config.Listen.FederationAPI)) + checkNotEmpty("listen.sync_api", string(config.Listen.SyncAPI)) + checkNotEmpty("listen.room_server", string(config.Listen.RoomServer)) + } if problems != nil { return Error{problems} diff --git a/src/github.com/matrix-org/dendrite/common/config/config_test.go b/src/github.com/matrix-org/dendrite/common/config/config_test.go index 4275e3d41..24b0dfc1f 100644 --- a/src/github.com/matrix-org/dendrite/common/config/config_test.go +++ b/src/github.com/matrix-org/dendrite/common/config/config_test.go @@ -25,6 +25,7 @@ func TestLoadConfigRelative(t *testing.T) { "/my/config/dir/matrix_key.pem": testKey, "/my/config/dir/tls_cert.pem": testCert, }.readFile, + false, ) if err != nil { t.Error("failed to load config:", err) @@ -42,9 +43,9 @@ media: kafka: addresses: ["localhost:9092"] topics: - input_room_event: input.room output_room_event: output.room output_client_data: output.client + user_updates: output.user database: media_api: "postgresql:///media_api" account: "postgresql:///account" diff --git a/src/github.com/matrix-org/dendrite/common/test/config.go b/src/github.com/matrix-org/dendrite/common/test/config.go index a28a08d54..948c60f10 100644 --- a/src/github.com/matrix-org/dendrite/common/test/config.go +++ b/src/github.com/matrix-org/dendrite/common/test/config.go @@ -83,6 +83,7 @@ func MakeConfig(configDir, kafkaURI, database, host string, startPort int) (*con // Make this configurable somehow? cfg.Kafka.Topics.OutputRoomEvent = "test.room.output" cfg.Kafka.Topics.OutputClientData = "test.clientapi.output" + cfg.Kafka.Topics.UserUpdates = "test.user.output" // TODO: Use different databases for the different schemas. // Using the same database for every schema currently works because diff --git a/src/github.com/matrix-org/dendrite/federationsender/consumers/roomserver.go b/src/github.com/matrix-org/dendrite/federationsender/consumers/roomserver.go index 7f133d303..4ebc36c77 100644 --- a/src/github.com/matrix-org/dendrite/federationsender/consumers/roomserver.go +++ b/src/github.com/matrix-org/dendrite/federationsender/consumers/roomserver.go @@ -38,13 +38,13 @@ type OutputRoomEvent struct { } // NewOutputRoomEvent creates a new OutputRoomEvent consumer. Call Start() to begin consuming from room servers. -func NewOutputRoomEvent(cfg *config.Dendrite, queues *queue.OutgoingQueues, store *storage.Database) (*OutputRoomEvent, error) { - kafkaConsumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil) - if err != nil { - return nil, err - } - roomServerURL := cfg.RoomServerURL() - +func NewOutputRoomEvent( + cfg *config.Dendrite, + kafkaConsumer sarama.Consumer, + queues *queue.OutgoingQueues, + store *storage.Database, + queryAPI api.RoomserverQueryAPI, +) *OutputRoomEvent { consumer := common.ContinualConsumer{ Topic: string(cfg.Kafka.Topics.OutputRoomEvent), Consumer: kafkaConsumer, @@ -54,11 +54,11 @@ func NewOutputRoomEvent(cfg *config.Dendrite, queues *queue.OutgoingQueues, stor roomServerConsumer: &consumer, db: store, queues: queues, - query: api.NewRoomserverQueryAPIHTTP(roomServerURL, nil), + query: queryAPI, } consumer.ProcessMessage = s.onMessage - return s, nil + return s } // Start consuming from room servers diff --git a/src/github.com/matrix-org/dendrite/syncapi/consumers/clientapi.go b/src/github.com/matrix-org/dendrite/syncapi/consumers/clientapi.go index a2a240ff9..7cc38b815 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/consumers/clientapi.go +++ b/src/github.com/matrix-org/dendrite/syncapi/consumers/clientapi.go @@ -33,11 +33,12 @@ type OutputClientData struct { } // NewOutputClientData creates a new OutputClientData consumer. Call Start() to begin consuming from room servers. -func NewOutputClientData(cfg *config.Dendrite, n *sync.Notifier, store *storage.SyncServerDatabase) (*OutputClientData, error) { - kafkaConsumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil) - if err != nil { - return nil, err - } +func NewOutputClientData( + cfg *config.Dendrite, + kafkaConsumer sarama.Consumer, + n *sync.Notifier, + store *storage.SyncServerDatabase, +) *OutputClientData { consumer := common.ContinualConsumer{ Topic: string(cfg.Kafka.Topics.OutputClientData), @@ -51,7 +52,7 @@ func NewOutputClientData(cfg *config.Dendrite, n *sync.Notifier, store *storage. } consumer.ProcessMessage = s.onMessage - return s, nil + return s } // Start consuming from room servers diff --git a/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go b/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go index c846705fc..373577894 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go @@ -44,12 +44,13 @@ type prevEventRef struct { } // NewOutputRoomEvent creates a new OutputRoomEvent consumer. Call Start() to begin consuming from room servers. -func NewOutputRoomEvent(cfg *config.Dendrite, n *sync.Notifier, store *storage.SyncServerDatabase) (*OutputRoomEvent, error) { - kafkaConsumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil) - if err != nil { - return nil, err - } - roomServerURL := cfg.RoomServerURL() +func NewOutputRoomEvent( + cfg *config.Dendrite, + kafkaConsumer sarama.Consumer, + n *sync.Notifier, + store *storage.SyncServerDatabase, + queryAPI api.RoomserverQueryAPI, +) *OutputRoomEvent { consumer := common.ContinualConsumer{ Topic: string(cfg.Kafka.Topics.OutputRoomEvent), @@ -60,11 +61,11 @@ func NewOutputRoomEvent(cfg *config.Dendrite, n *sync.Notifier, store *storage.S roomServerConsumer: &consumer, db: store, notifier: n, - query: api.NewRoomserverQueryAPIHTTP(roomServerURL, nil), + query: queryAPI, } consumer.ProcessMessage = s.onMessage - return s, nil + return s } // Start consuming from room servers