Add configuration for max_message_bytes for sarama

This commit is contained in:
Till Faelligen 2020-10-24 19:54:33 +02:00
parent 59428cdde3
commit 253a709c72
2 changed files with 12 additions and 2 deletions

View file

@ -24,6 +24,8 @@ type Kafka struct {
UseNaffka bool `yaml:"use_naffka"` UseNaffka bool `yaml:"use_naffka"`
// The Naffka database is used internally by the naffka library, if used. // The Naffka database is used internally by the naffka library, if used.
Database DatabaseOptions `yaml:"naffka_database"` Database DatabaseOptions `yaml:"naffka_database"`
// The max size a message can have
MaxMessageBytes *int `yaml:"max_message_bytes"`
} }
func (k *Kafka) TopicFor(name string) string { func (k *Kafka) TopicFor(name string) string {
@ -36,6 +38,9 @@ func (c *Kafka) Defaults() {
c.Addresses = []string{"localhost:2181"} c.Addresses = []string{"localhost:2181"}
c.Database.ConnectionString = DataSource("file:naffka.db") c.Database.ConnectionString = DataSource("file:naffka.db")
c.TopicPrefix = "Dendrite" c.TopicPrefix = "Dendrite"
maxBytes := 2 << 22 // about 8MB
c.MaxMessageBytes = &maxBytes
} }
func (c *Kafka) Verify(configErrs *ConfigErrors, isMonolith bool) { func (c *Kafka) Verify(configErrs *ConfigErrors, isMonolith bool) {
@ -50,4 +55,5 @@ func (c *Kafka) Verify(configErrs *ConfigErrors, isMonolith bool) {
checkNotZero(configErrs, "global.kafka.addresses", int64(len(c.Addresses))) checkNotZero(configErrs, "global.kafka.addresses", int64(len(c.Addresses)))
} }
checkNotEmpty(configErrs, "global.kafka.topic_prefix", string(c.TopicPrefix)) checkNotEmpty(configErrs, "global.kafka.topic_prefix", string(c.TopicPrefix))
checkPositive(configErrs, "global.kafka.max_message_bytes", int64(*c.MaxMessageBytes))
} }

View file

@ -17,12 +17,16 @@ func SetupConsumerProducer(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProdu
// setupKafka creates kafka consumer/producer pair from the config. // setupKafka creates kafka consumer/producer pair from the config.
func setupKafka(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) { func setupKafka(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) {
consumer, err := sarama.NewConsumer(cfg.Addresses, nil) sCfg := sarama.NewConfig()
sCfg.Producer.MaxMessageBytes = *cfg.MaxMessageBytes
sCfg.Producer.Return.Successes = true
consumer, err := sarama.NewConsumer(cfg.Addresses, sCfg)
if err != nil { if err != nil {
logrus.WithError(err).Panic("failed to start kafka consumer") logrus.WithError(err).Panic("failed to start kafka consumer")
} }
producer, err := sarama.NewSyncProducer(cfg.Addresses, nil) producer, err := sarama.NewSyncProducer(cfg.Addresses, sCfg)
if err != nil { if err != nil {
logrus.WithError(err).Panic("failed to setup kafka producers") logrus.WithError(err).Panic("failed to setup kafka producers")
} }