Add component to monolith

This commit is contained in:
Brendan Abolivier 2017-08-16 16:28:33 +01:00
parent 40d31b7e73
commit 1740e54ac9
No known key found for this signature in database
GPG key ID: 8EF1500759F70623
3 changed files with 34 additions and 9 deletions

View file

@ -52,6 +52,10 @@ import (
"github.com/matrix-org/dendrite/federationsender/queue" "github.com/matrix-org/dendrite/federationsender/queue"
federationsender_storage "github.com/matrix-org/dendrite/federationsender/storage" federationsender_storage "github.com/matrix-org/dendrite/federationsender/storage"
publicroomsapi_consumers "github.com/matrix-org/dendrite/publicroomsapi/consumers"
publicroomsapi_routing "github.com/matrix-org/dendrite/publicroomsapi/routing"
publicroomsapi_storage "github.com/matrix-org/dendrite/publicroomsapi/storage"
log "github.com/Sirupsen/logrus" log "github.com/Sirupsen/logrus"
sarama "gopkg.in/Shopify/sarama.v1" sarama "gopkg.in/Shopify/sarama.v1"
) )
@ -119,6 +123,7 @@ type monolith struct {
mediaAPIDB *mediaapi_storage.Database mediaAPIDB *mediaapi_storage.Database
syncAPIDB *syncapi_storage.SyncServerDatabase syncAPIDB *syncapi_storage.SyncServerDatabase
federationSenderDB *federationsender_storage.Database federationSenderDB *federationsender_storage.Database
publicRoomsAPIDB *publicroomsapi_storage.PublicRoomsServerDatabase
federation *gomatrixserverlib.FederationClient federation *gomatrixserverlib.FederationClient
keyRing gomatrixserverlib.KeyRing keyRing gomatrixserverlib.KeyRing
@ -171,6 +176,10 @@ func (m *monolith) setupDatabases() {
if err != nil { if err != nil {
log.Panicf("startup: failed to create federation sender database with data source %s : %s", m.cfg.Database.FederationSender, err) log.Panicf("startup: failed to create federation sender database with data source %s : %s", m.cfg.Database.FederationSender, err)
} }
m.publicRoomsAPIDB, err = publicroomsapi_storage.NewPublicRoomsServerDatabase(string(m.cfg.Database.PublicRoomsAPI))
if err != nil {
log.Panicf("startup: failed to setup public rooms api database with data source %s : %s", m.cfg.Database.PublicRoomsAPI, err)
}
} }
func (m *monolith) setupFederation() { func (m *monolith) setupFederation() {
@ -283,6 +292,13 @@ func (m *monolith) setupConsumers() {
log.Panicf("startup: failed to start client API server consumer: %s", err) log.Panicf("startup: failed to start client API server consumer: %s", err)
} }
publicRoomsAPIConsumer := publicroomsapi_consumers.NewOutputRoomEvent(
m.cfg, m.kafkaConsumer, m.publicRoomsAPIDB,
)
if err = publicRoomsAPIConsumer.Start(); err != nil {
log.Panicf("startup: failed to start room server consumer: %s", err)
}
federationSenderQueues := queue.NewOutgoingQueues(m.cfg.Matrix.ServerName, m.federation) federationSenderQueues := queue.NewOutgoingQueues(m.cfg.Matrix.ServerName, m.federation)
federationSenderRoomConsumer := federationsender_consumers.NewOutputRoomEvent( federationSenderRoomConsumer := federationsender_consumers.NewOutputRoomEvent(
@ -311,4 +327,6 @@ func (m *monolith) setupAPIs() {
federationapi_routing.Setup( federationapi_routing.Setup(
m.api, *m.cfg, m.queryAPI, m.roomServerProducer, m.keyRing, m.federation, m.api, *m.cfg, m.queryAPI, m.roomServerProducer, m.keyRing, m.federation,
) )
publicroomsapi_routing.Setup(m.api, m.deviceDB, m.publicRoomsAPIDB)
} }

View file

@ -28,6 +28,7 @@ import (
"github.com/matrix-org/dendrite/publicroomsapi/storage" "github.com/matrix-org/dendrite/publicroomsapi/storage"
log "github.com/Sirupsen/logrus" log "github.com/Sirupsen/logrus"
sarama "gopkg.in/Shopify/sarama.v1"
) )
var configPath = flag.String("config", "dendrite.yaml", "The path to the config file. For more information, see the config file in this repository.") var configPath = flag.String("config", "dendrite.yaml", "The path to the config file. For more information, see the config file in this repository.")
@ -55,7 +56,15 @@ func main() {
log.Panicf("startup: failed to create device database with data source %s : %s", cfg.Database.Device, err) log.Panicf("startup: failed to create device database with data source %s : %s", cfg.Database.Device, err)
} }
roomConsumer, err := consumers.NewOutputRoomEvent(cfg, db) kafkaConsumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil)
if err != nil {
log.WithFields(log.Fields{
log.ErrorKey: err,
"addresses": cfg.Kafka.Addresses,
}).Panic("Failed to setup kafka consumers")
}
roomConsumer := consumers.NewOutputRoomEvent(cfg, kafkaConsumer, db)
if err != nil { if err != nil {
log.Panicf("startup: failed to create room server consumer: %s", err) log.Panicf("startup: failed to create room server consumer: %s", err)
} }

View file

@ -33,12 +33,11 @@ type OutputRoomEvent struct {
} }
// NewOutputRoomEvent creates a new OutputRoomEvent consumer. Call Start() to begin consuming from room servers. // NewOutputRoomEvent creates a new OutputRoomEvent consumer. Call Start() to begin consuming from room servers.
func NewOutputRoomEvent(cfg *config.Dendrite, store *storage.PublicRoomsServerDatabase) (*OutputRoomEvent, error) { func NewOutputRoomEvent(
kafkaConsumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil) cfg *config.Dendrite,
if err != nil { kafkaConsumer sarama.Consumer,
return nil, err store *storage.PublicRoomsServerDatabase,
} ) *OutputRoomEvent {
roomServerURL := cfg.RoomServerURL()
consumer := common.ContinualConsumer{ consumer := common.ContinualConsumer{
Topic: string(cfg.Kafka.Topics.OutputRoomEvent), Topic: string(cfg.Kafka.Topics.OutputRoomEvent),
@ -48,11 +47,10 @@ func NewOutputRoomEvent(cfg *config.Dendrite, store *storage.PublicRoomsServerDa
s := &OutputRoomEvent{ s := &OutputRoomEvent{
roomServerConsumer: &consumer, roomServerConsumer: &consumer,
db: store, db: store,
query: api.NewRoomserverQueryAPIHTTP(roomServerURL, nil),
} }
consumer.ProcessMessage = s.onMessage consumer.ProcessMessage = s.onMessage
return s, nil return s
} }
// Start consuming from room servers // Start consuming from room servers