mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-11 00:43:10 -06:00
Get latest changes from master
This commit is contained in:
commit
ebea9244d9
|
|
@ -132,7 +132,7 @@ type monolith struct {
|
|||
queryAPI *roomserver_query.RoomserverQueryAPI
|
||||
aliasAPI *roomserver_alias.RoomserverAliasAPI
|
||||
|
||||
kafkaConsumer sarama.Consumer
|
||||
naffka *naffka.Naffka
|
||||
kafkaProducer sarama.SyncProducer
|
||||
|
||||
roomServerProducer *producers.RoomserverProducer
|
||||
|
|
@ -205,16 +205,9 @@ func (m *monolith) setupKafka() {
|
|||
log.ErrorKey: err,
|
||||
}).Panic("Failed to setup naffka")
|
||||
}
|
||||
m.kafkaConsumer = naff
|
||||
m.naffka = naff
|
||||
m.kafkaProducer = naff
|
||||
} else {
|
||||
m.kafkaConsumer, err = sarama.NewConsumer(m.cfg.Kafka.Addresses, nil)
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
log.ErrorKey: err,
|
||||
"addresses": m.cfg.Kafka.Addresses,
|
||||
}).Panic("Failed to setup kafka consumers")
|
||||
}
|
||||
m.kafkaProducer, err = sarama.NewSyncProducer(m.cfg.Kafka.Addresses, nil)
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
|
|
@ -225,6 +218,20 @@ func (m *monolith) setupKafka() {
|
|||
}
|
||||
}
|
||||
|
||||
func (m *monolith) kafkaConsumer() sarama.Consumer {
|
||||
if m.cfg.Kafka.UseNaffka {
|
||||
return m.naffka
|
||||
}
|
||||
consumer, err := sarama.NewConsumer(m.cfg.Kafka.Addresses, nil)
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
log.ErrorKey: err,
|
||||
"addresses": m.cfg.Kafka.Addresses,
|
||||
}).Panic("Failed to setup kafka consumers")
|
||||
}
|
||||
return consumer
|
||||
}
|
||||
|
||||
func (m *monolith) setupRoomServer() {
|
||||
m.inputAPI = &roomserver_input.RoomserverInputAPI{
|
||||
DB: m.roomServerDB,
|
||||
|
|
@ -272,21 +279,21 @@ func (m *monolith) setupConsumers() {
|
|||
var err error
|
||||
|
||||
clientAPIConsumer := clientapi_consumers.NewOutputRoomEvent(
|
||||
m.cfg, m.kafkaConsumer, m.accountDB, m.queryAPI,
|
||||
m.cfg, m.kafkaConsumer(), m.accountDB, m.queryAPI,
|
||||
)
|
||||
if err = clientAPIConsumer.Start(); err != nil {
|
||||
log.Panicf("startup: failed to start room server consumer")
|
||||
}
|
||||
|
||||
syncAPIRoomConsumer := syncapi_consumers.NewOutputRoomEvent(
|
||||
m.cfg, m.kafkaConsumer, m.syncAPINotifier, m.syncAPIDB, m.queryAPI,
|
||||
m.cfg, m.kafkaConsumer(), m.syncAPINotifier, m.syncAPIDB, m.queryAPI,
|
||||
)
|
||||
if err = syncAPIRoomConsumer.Start(); err != nil {
|
||||
log.Panicf("startup: failed to start room server consumer: %s", err)
|
||||
}
|
||||
|
||||
syncAPIClientConsumer := syncapi_consumers.NewOutputClientData(
|
||||
m.cfg, m.kafkaConsumer, m.syncAPINotifier, m.syncAPIDB,
|
||||
m.cfg, m.kafkaConsumer(), m.syncAPINotifier, m.syncAPIDB,
|
||||
)
|
||||
if err = syncAPIClientConsumer.Start(); err != nil {
|
||||
log.Panicf("startup: failed to start client API server consumer: %s", err)
|
||||
|
|
@ -302,7 +309,7 @@ func (m *monolith) setupConsumers() {
|
|||
federationSenderQueues := queue.NewOutgoingQueues(m.cfg.Matrix.ServerName, m.federation)
|
||||
|
||||
federationSenderRoomConsumer := federationsender_consumers.NewOutputRoomEvent(
|
||||
m.cfg, m.kafkaConsumer, federationSenderQueues, m.federationSenderDB, m.queryAPI,
|
||||
m.cfg, m.kafkaConsumer(), federationSenderQueues, m.federationSenderDB, m.queryAPI,
|
||||
)
|
||||
if err = federationSenderRoomConsumer.Start(); err != nil {
|
||||
log.WithError(err).Panicf("startup: failed to start room server consumer")
|
||||
|
|
|
|||
Loading…
Reference in a new issue