diff --git a/federationsender/consumers/keychange.go b/federationsender/consumers/keychange.go index d33a6e070..8060125e7 100644 --- a/federationsender/consumers/keychange.go +++ b/federationsender/consumers/keychange.go @@ -41,7 +41,7 @@ type KeyChangeConsumer struct { // NewKeyChangeConsumer creates a new KeyChangeConsumer. Call Start() to begin consuming from key servers. func NewKeyChangeConsumer( - cfg *config.Dendrite, + cfg *config.KeyServer, kafkaConsumer sarama.Consumer, queues *queue.OutgoingQueues, store storage.Database, @@ -49,7 +49,7 @@ func NewKeyChangeConsumer( ) *KeyChangeConsumer { c := &KeyChangeConsumer{ consumer: &internal.ContinualConsumer{ - Topic: string(cfg.Kafka.Topics.OutputKeyChangeEvent), + Topic: string(cfg.Matrix.Kafka.Topics.OutputKeyChangeEvent), Consumer: kafkaConsumer, PartitionStore: store, }, diff --git a/federationsender/federationsender.go b/federationsender/federationsender.go index f13d2aead..b02686fe7 100644 --- a/federationsender/federationsender.go +++ b/federationsender/federationsender.go @@ -81,7 +81,7 @@ func NewInternalAPI( logrus.WithError(err).Panic("failed to start typing server consumer") } keyConsumer := consumers.NewKeyChangeConsumer( - base.Cfg, base.KafkaConsumer, queues, federationSenderDB, stateAPI, + &base.Cfg.KeyServer, base.KafkaConsumer, queues, federationSenderDB, stateAPI, ) if err := keyConsumer.Start(); err != nil { logrus.WithError(err).Panic("failed to start key server consumer")