diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-monolith-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-monolith-server/main.go index ee86469d3..a856a7249 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-monolith-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-monolith-server/main.go @@ -127,7 +127,7 @@ type monolith struct { queryAPI *roomserver_query.RoomserverQueryAPI aliasAPI *roomserver_alias.RoomserverAliasAPI - kafkaConsumer sarama.Consumer + naffka *naffka.Naffka kafkaProducer sarama.SyncProducer roomServerProducer *producers.RoomserverProducer @@ -196,16 +196,9 @@ func (m *monolith) setupKafka() { log.ErrorKey: err, }).Panic("Failed to setup naffka") } - m.kafkaConsumer = naff + m.naffka = naff m.kafkaProducer = naff } else { - m.kafkaConsumer, err = sarama.NewConsumer(m.cfg.Kafka.Addresses, nil) - if err != nil { - log.WithFields(log.Fields{ - log.ErrorKey: err, - "addresses": m.cfg.Kafka.Addresses, - }).Panic("Failed to setup kafka consumers") - } m.kafkaProducer, err = sarama.NewSyncProducer(m.cfg.Kafka.Addresses, nil) if err != nil { log.WithFields(log.Fields{ @@ -216,6 +209,20 @@ func (m *monolith) setupKafka() { } } +func (m *monolith) kafkaConsumer() sarama.Consumer { + if m.cfg.Kafka.UseNaffka { + return m.naffka + } + consumer, err := sarama.NewConsumer(m.cfg.Kafka.Addresses, nil) + if err != nil { + log.WithFields(log.Fields{ + log.ErrorKey: err, + "addresses": m.cfg.Kafka.Addresses, + }).Panic("Failed to setup kafka consumers") + } + return consumer +} + func (m *monolith) setupRoomServer() { m.inputAPI = &roomserver_input.RoomserverInputAPI{ DB: m.roomServerDB, @@ -263,21 +270,21 @@ func (m *monolith) setupConsumers() { var err error clientAPIConsumer := clientapi_consumers.NewOutputRoomEvent( - m.cfg, m.kafkaConsumer, m.accountDB, m.queryAPI, + m.cfg, m.kafkaConsumer(), m.accountDB, m.queryAPI, ) if err = clientAPIConsumer.Start(); err != nil { log.Panicf("startup: failed to start room server consumer") } syncAPIRoomConsumer := syncapi_consumers.NewOutputRoomEvent( - m.cfg, m.kafkaConsumer, m.syncAPINotifier, m.syncAPIDB, m.queryAPI, + m.cfg, m.kafkaConsumer(), m.syncAPINotifier, m.syncAPIDB, m.queryAPI, ) if err = syncAPIRoomConsumer.Start(); err != nil { log.Panicf("startup: failed to start room server consumer: %s", err) } syncAPIClientConsumer := syncapi_consumers.NewOutputClientData( - m.cfg, m.kafkaConsumer, m.syncAPINotifier, m.syncAPIDB, + m.cfg, m.kafkaConsumer(), m.syncAPINotifier, m.syncAPIDB, ) if err = syncAPIClientConsumer.Start(); err != nil { log.Panicf("startup: failed to start client API server consumer: %s", err) @@ -286,7 +293,7 @@ func (m *monolith) setupConsumers() { federationSenderQueues := queue.NewOutgoingQueues(m.cfg.Matrix.ServerName, m.federation) federationSenderRoomConsumer := federationsender_consumers.NewOutputRoomEvent( - m.cfg, m.kafkaConsumer, federationSenderQueues, m.federationSenderDB, m.queryAPI, + m.cfg, m.kafkaConsumer(), federationSenderQueues, m.federationSenderDB, m.queryAPI, ) if err = federationSenderRoomConsumer.Start(); err != nil { log.WithError(err).Panicf("startup: failed to start room server consumer")