diff --git a/internal/config/config_kafka.go b/internal/config/config_kafka.go index e2bd6538e..319f179d4 100644 --- a/internal/config/config_kafka.go +++ b/internal/config/config_kafka.go @@ -24,6 +24,8 @@ type Kafka struct { UseNaffka bool `yaml:"use_naffka"` // The Naffka database is used internally by the naffka library, if used. Database DatabaseOptions `yaml:"naffka_database"` + // The max size a message can have + MaxMessageBytes *int `yaml:"max_message_bytes"` } func (k *Kafka) TopicFor(name string) string { @@ -36,6 +38,9 @@ func (c *Kafka) Defaults() { c.Addresses = []string{"localhost:2181"} c.Database.ConnectionString = DataSource("file:naffka.db") c.TopicPrefix = "Dendrite" + + maxBytes := 2 << 22 // about 8MB + c.MaxMessageBytes = &maxBytes } func (c *Kafka) Verify(configErrs *ConfigErrors, isMonolith bool) { @@ -50,4 +55,5 @@ func (c *Kafka) Verify(configErrs *ConfigErrors, isMonolith bool) { checkNotZero(configErrs, "global.kafka.addresses", int64(len(c.Addresses))) } checkNotEmpty(configErrs, "global.kafka.topic_prefix", string(c.TopicPrefix)) + checkPositive(configErrs, "global.kafka.max_message_bytes", int64(*c.MaxMessageBytes)) } diff --git a/internal/setup/kafka/kafka.go b/internal/setup/kafka/kafka.go index 9855ae156..011a97d30 100644 --- a/internal/setup/kafka/kafka.go +++ b/internal/setup/kafka/kafka.go @@ -17,12 +17,16 @@ func SetupConsumerProducer(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProdu // setupKafka creates kafka consumer/producer pair from the config. func setupKafka(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) { - consumer, err := sarama.NewConsumer(cfg.Addresses, nil) + sCfg := sarama.NewConfig() + sCfg.Producer.MaxMessageBytes = *cfg.MaxMessageBytes + sCfg.Producer.Return.Successes = true + + consumer, err := sarama.NewConsumer(cfg.Addresses, sCfg) if err != nil { logrus.WithError(err).Panic("failed to start kafka consumer") } - producer, err := sarama.NewSyncProducer(cfg.Addresses, nil) + producer, err := sarama.NewSyncProducer(cfg.Addresses, sCfg) if err != nil { logrus.WithError(err).Panic("failed to setup kafka producers") }