From 1740e54ac9d89826ee6586e5382f9c4a8a538134 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Wed, 16 Aug 2017 16:28:33 +0100 Subject: [PATCH] Add component to monolith --- .../cmd/dendrite-monolith-server/main.go | 18 ++++++++++++++++++ .../dendrite-public-rooms-api-server/main.go | 11 ++++++++++- .../publicroomsapi/consumers/roomserver.go | 14 ++++++-------- 3 files changed, 34 insertions(+), 9 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-monolith-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-monolith-server/main.go index ee86469d3..74f2b911c 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-monolith-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-monolith-server/main.go @@ -52,6 +52,10 @@ import ( "github.com/matrix-org/dendrite/federationsender/queue" federationsender_storage "github.com/matrix-org/dendrite/federationsender/storage" + publicroomsapi_consumers "github.com/matrix-org/dendrite/publicroomsapi/consumers" + publicroomsapi_routing "github.com/matrix-org/dendrite/publicroomsapi/routing" + publicroomsapi_storage "github.com/matrix-org/dendrite/publicroomsapi/storage" + log "github.com/Sirupsen/logrus" sarama "gopkg.in/Shopify/sarama.v1" ) @@ -119,6 +123,7 @@ type monolith struct { mediaAPIDB *mediaapi_storage.Database syncAPIDB *syncapi_storage.SyncServerDatabase federationSenderDB *federationsender_storage.Database + publicRoomsAPIDB *publicroomsapi_storage.PublicRoomsServerDatabase federation *gomatrixserverlib.FederationClient keyRing gomatrixserverlib.KeyRing @@ -171,6 +176,10 @@ func (m *monolith) setupDatabases() { if err != nil { log.Panicf("startup: failed to create federation sender database with data source %s : %s", m.cfg.Database.FederationSender, err) } + m.publicRoomsAPIDB, err = publicroomsapi_storage.NewPublicRoomsServerDatabase(string(m.cfg.Database.PublicRoomsAPI)) + if err != nil { + log.Panicf("startup: failed to setup public rooms api database with data source %s : %s", m.cfg.Database.PublicRoomsAPI, err) + } } func (m *monolith) setupFederation() { @@ -283,6 +292,13 @@ func (m *monolith) setupConsumers() { log.Panicf("startup: failed to start client API server consumer: %s", err) } + publicRoomsAPIConsumer := publicroomsapi_consumers.NewOutputRoomEvent( + m.cfg, m.kafkaConsumer, m.publicRoomsAPIDB, + ) + if err = publicRoomsAPIConsumer.Start(); err != nil { + log.Panicf("startup: failed to start room server consumer: %s", err) + } + federationSenderQueues := queue.NewOutgoingQueues(m.cfg.Matrix.ServerName, m.federation) federationSenderRoomConsumer := federationsender_consumers.NewOutputRoomEvent( @@ -311,4 +327,6 @@ func (m *monolith) setupAPIs() { federationapi_routing.Setup( m.api, *m.cfg, m.queryAPI, m.roomServerProducer, m.keyRing, m.federation, ) + + publicroomsapi_routing.Setup(m.api, m.deviceDB, m.publicRoomsAPIDB) } diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-public-rooms-api-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-public-rooms-api-server/main.go index 8502b8751..21f65410e 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-public-rooms-api-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-public-rooms-api-server/main.go @@ -28,6 +28,7 @@ import ( "github.com/matrix-org/dendrite/publicroomsapi/storage" log "github.com/Sirupsen/logrus" + sarama "gopkg.in/Shopify/sarama.v1" ) var configPath = flag.String("config", "dendrite.yaml", "The path to the config file. For more information, see the config file in this repository.") @@ -55,7 +56,15 @@ func main() { log.Panicf("startup: failed to create device database with data source %s : %s", cfg.Database.Device, err) } - roomConsumer, err := consumers.NewOutputRoomEvent(cfg, db) + kafkaConsumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil) + if err != nil { + log.WithFields(log.Fields{ + log.ErrorKey: err, + "addresses": cfg.Kafka.Addresses, + }).Panic("Failed to setup kafka consumers") + } + + roomConsumer := consumers.NewOutputRoomEvent(cfg, kafkaConsumer, db) if err != nil { log.Panicf("startup: failed to create room server consumer: %s", err) } diff --git a/src/github.com/matrix-org/dendrite/publicroomsapi/consumers/roomserver.go b/src/github.com/matrix-org/dendrite/publicroomsapi/consumers/roomserver.go index f4a9793f4..6a308fd21 100644 --- a/src/github.com/matrix-org/dendrite/publicroomsapi/consumers/roomserver.go +++ b/src/github.com/matrix-org/dendrite/publicroomsapi/consumers/roomserver.go @@ -33,12 +33,11 @@ type OutputRoomEvent struct { } // NewOutputRoomEvent creates a new OutputRoomEvent consumer. Call Start() to begin consuming from room servers. -func NewOutputRoomEvent(cfg *config.Dendrite, store *storage.PublicRoomsServerDatabase) (*OutputRoomEvent, error) { - kafkaConsumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil) - if err != nil { - return nil, err - } - roomServerURL := cfg.RoomServerURL() +func NewOutputRoomEvent( + cfg *config.Dendrite, + kafkaConsumer sarama.Consumer, + store *storage.PublicRoomsServerDatabase, +) *OutputRoomEvent { consumer := common.ContinualConsumer{ Topic: string(cfg.Kafka.Topics.OutputRoomEvent), @@ -48,11 +47,10 @@ func NewOutputRoomEvent(cfg *config.Dendrite, store *storage.PublicRoomsServerDa s := &OutputRoomEvent{ roomServerConsumer: &consumer, db: store, - query: api.NewRoomserverQueryAPIHTTP(roomServerURL, nil), } consumer.ProcessMessage = s.onMessage - return s, nil + return s } // Start consuming from room servers