- Better comments on what MaxMessageBytes is used for

- Also sets the size the consumer may use
This commit is contained in:
Till Faelligen 2020-10-26 12:03:49 +01:00
parent 6f8bd745a7
commit bf1b8def6d
3 changed files with 9 additions and 4 deletions

View file

@ -74,8 +74,11 @@ global:
# Kafka.
use_naffka: true
# The maximal size of messages passed between Kafka producers and consumers
max_message_bytes: 1048576
# The max size a Kafka message is allowed to use.
# You only need to change this value, if you encounter issues with too large messages.
# Must be less than/equal to "max.message.bytes" configured in Kafka.
# Defaults to 8388608 bytes.
# max_message_bytes: 8388608
# Naffka database options. Not required when using Kafka.
naffka_database:

View file

@ -24,7 +24,8 @@ type Kafka struct {
UseNaffka bool `yaml:"use_naffka"`
// The Naffka database is used internally by the naffka library, if used.
Database DatabaseOptions `yaml:"naffka_database"`
// The max size a message can have
// The max size a Kafka message passed between consumer/producer can have
// Equals roughly max.message.bytes / fetch.message.max.bytes in Kafka
MaxMessageBytes *int `yaml:"max_message_bytes"`
}
@ -39,7 +40,7 @@ func (c *Kafka) Defaults() {
c.Database.ConnectionString = DataSource("file:naffka.db")
c.TopicPrefix = "Dendrite"
maxBytes := 2 << 22 // about 8MB
maxBytes := 1024 * 1024 * 8 // about 8MB
c.MaxMessageBytes = &maxBytes
}

View file

@ -20,6 +20,7 @@ func setupKafka(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) {
sCfg := sarama.NewConfig()
sCfg.Producer.MaxMessageBytes = *cfg.MaxMessageBytes
sCfg.Producer.Return.Successes = true
sCfg.Consumer.Fetch.Default = int32(*cfg.MaxMessageBytes)
consumer, err := sarama.NewConsumer(cfg.Addresses, sCfg)
if err != nil {