From bf1b8def6d8726e658e4255faf3ee2b5640ef93d Mon Sep 17 00:00:00 2001 From: Till Faelligen Date: Mon, 26 Oct 2020 12:03:49 +0100 Subject: [PATCH] - Better comments on what MaxMessageBytes is used for - Also sets the size the consumer may use --- dendrite-config.yaml | 7 +++++-- internal/config/config_kafka.go | 5 +++-- internal/setup/kafka/kafka.go | 1 + 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/dendrite-config.yaml b/dendrite-config.yaml index 530784f3e..ba2cbce62 100644 --- a/dendrite-config.yaml +++ b/dendrite-config.yaml @@ -74,8 +74,11 @@ global: # Kafka. use_naffka: true - # The maximal size of messages passed between Kafka producers and consumers - max_message_bytes: 1048576 + # The max size a Kafka message is allowed to use. + # You only need to change this value, if you encounter issues with too large messages. + # Must be less than/equal to "max.message.bytes" configured in Kafka. + # Defaults to 8388608 bytes. + # max_message_bytes: 8388608 # Naffka database options. Not required when using Kafka. naffka_database: diff --git a/internal/config/config_kafka.go b/internal/config/config_kafka.go index 319f179d4..707c92a71 100644 --- a/internal/config/config_kafka.go +++ b/internal/config/config_kafka.go @@ -24,7 +24,8 @@ type Kafka struct { UseNaffka bool `yaml:"use_naffka"` // The Naffka database is used internally by the naffka library, if used. Database DatabaseOptions `yaml:"naffka_database"` - // The max size a message can have + // The max size a Kafka message passed between consumer/producer can have + // Equals roughly max.message.bytes / fetch.message.max.bytes in Kafka MaxMessageBytes *int `yaml:"max_message_bytes"` } @@ -39,7 +40,7 @@ func (c *Kafka) Defaults() { c.Database.ConnectionString = DataSource("file:naffka.db") c.TopicPrefix = "Dendrite" - maxBytes := 2 << 22 // about 8MB + maxBytes := 1024 * 1024 * 8 // about 8MB c.MaxMessageBytes = &maxBytes } diff --git a/internal/setup/kafka/kafka.go b/internal/setup/kafka/kafka.go index 011a97d30..091025ec7 100644 --- a/internal/setup/kafka/kafka.go +++ b/internal/setup/kafka/kafka.go @@ -20,6 +20,7 @@ func setupKafka(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) { sCfg := sarama.NewConfig() sCfg.Producer.MaxMessageBytes = *cfg.MaxMessageBytes sCfg.Producer.Return.Successes = true + sCfg.Consumer.Fetch.Default = int32(*cfg.MaxMessageBytes) consumer, err := sarama.NewConsumer(cfg.Addresses, sCfg) if err != nil {