Optionally use naffka in the monolithic server

This commit is contained in:
Mark Haines 2017-08-15 15:58:59 +01:00
parent 5ca806263a
commit f428f8dbf0

View file

@ -27,6 +27,7 @@ import (
"github.com/matrix-org/dendrite/common/config" "github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/dendrite/common/keydb" "github.com/matrix-org/dendrite/common/keydb"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/naffka"
mediaapi_routing "github.com/matrix-org/dendrite/mediaapi/routing" mediaapi_routing "github.com/matrix-org/dendrite/mediaapi/routing"
mediaapi_storage "github.com/matrix-org/dendrite/mediaapi/storage" mediaapi_storage "github.com/matrix-org/dendrite/mediaapi/storage"
@ -187,19 +188,30 @@ func (m *monolith) setupFederation() {
func (m *monolith) setupKafka() { func (m *monolith) setupKafka() {
var err error var err error
m.kafkaConsumer, err = sarama.NewConsumer(m.cfg.Kafka.Addresses, nil) if m.cfg.Kafka.UseNaffka {
if err != nil { naff, err := naffka.New(&naffka.MemoryDatabase{})
log.WithFields(log.Fields{ if err != nil {
log.ErrorKey: err, log.WithFields(log.Fields{
"addresses": m.cfg.Kafka.Addresses, log.ErrorKey: err,
}).Panic("Failed to setup kafka consumers") }).Panic("Failed to setup naffka")
} }
m.kafkaProducer, err = sarama.NewSyncProducer(m.cfg.Kafka.Addresses, nil) m.kafkaConsumer = naff
if err != nil { m.kafkaProducer = naff
log.WithFields(log.Fields{ } else {
log.ErrorKey: err, m.kafkaConsumer, err = sarama.NewConsumer(m.cfg.Kafka.Addresses, nil)
"addresses": m.cfg.Kafka.Addresses, if err != nil {
}).Panic("Failed to setup kafka producers") log.WithFields(log.Fields{
log.ErrorKey: err,
"addresses": m.cfg.Kafka.Addresses,
}).Panic("Failed to setup kafka consumers")
}
m.kafkaProducer, err = sarama.NewSyncProducer(m.cfg.Kafka.Addresses, nil)
if err != nil {
log.WithFields(log.Fields{
log.ErrorKey: err,
"addresses": m.cfg.Kafka.Addresses,
}).Panic("Failed to setup kafka producers")
}
} }
} }