diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-monolith-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-monolith-server/main.go index 9abd584bb..fc6db0225 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-monolith-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-monolith-server/main.go @@ -27,6 +27,7 @@ import ( "github.com/matrix-org/dendrite/common/config" "github.com/matrix-org/dendrite/common/keydb" "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/naffka" mediaapi_routing "github.com/matrix-org/dendrite/mediaapi/routing" mediaapi_storage "github.com/matrix-org/dendrite/mediaapi/storage" @@ -187,19 +188,30 @@ func (m *monolith) setupFederation() { func (m *monolith) setupKafka() { var err error - m.kafkaConsumer, err = sarama.NewConsumer(m.cfg.Kafka.Addresses, nil) - if err != nil { - log.WithFields(log.Fields{ - log.ErrorKey: err, - "addresses": m.cfg.Kafka.Addresses, - }).Panic("Failed to setup kafka consumers") - } - m.kafkaProducer, err = sarama.NewSyncProducer(m.cfg.Kafka.Addresses, nil) - if err != nil { - log.WithFields(log.Fields{ - log.ErrorKey: err, - "addresses": m.cfg.Kafka.Addresses, - }).Panic("Failed to setup kafka producers") + if m.cfg.Kafka.UseNaffka { + naff, err := naffka.New(&naffka.MemoryDatabase{}) + if err != nil { + log.WithFields(log.Fields{ + log.ErrorKey: err, + }).Panic("Failed to setup naffka") + } + m.kafkaConsumer = naff + m.kafkaProducer = naff + } else { + m.kafkaConsumer, err = sarama.NewConsumer(m.cfg.Kafka.Addresses, nil) + if err != nil { + log.WithFields(log.Fields{ + log.ErrorKey: err, + "addresses": m.cfg.Kafka.Addresses, + }).Panic("Failed to setup kafka consumers") + } + m.kafkaProducer, err = sarama.NewSyncProducer(m.cfg.Kafka.Addresses, nil) + if err != nil { + log.WithFields(log.Fields{ + log.ErrorKey: err, + "addresses": m.cfg.Kafka.Addresses, + }).Panic("Failed to setup kafka producers") + } } }