dendrite/setup/kafka/kafka.go
alexfca e7b9129146
Implement Naffka storage in CosmosDB (#12)
* - Implement Naffka storage in CosmosDB
- Add Topic SEQ
- Add Topic tables in Cosmos
- Update the Yaml for Naffka to use a CosmosDB dsn

* - Fix TableName for Messages
2021-05-31 13:20:25 +10:00

73 lines
2.3 KiB
Go

package kafka
import (
"github.com/Shopify/sarama"
"github.com/matrix-org/dendrite/internal/naffka/naffkacosmosdb"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/naffka"
naffkaStorage "github.com/matrix-org/naffka/storage"
"github.com/sirupsen/logrus"
)
func SetupConsumerProducer(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) {
if cfg.UseNaffka {
return setupNaffka(cfg)
}
return setupKafka(cfg)
}
// setupKafka creates kafka consumer/producer pair from the config.
func setupKafka(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) {
sCfg := sarama.NewConfig()
sCfg.Producer.MaxMessageBytes = *cfg.MaxMessageBytes
sCfg.Producer.Return.Successes = true
sCfg.Consumer.Fetch.Default = int32(*cfg.MaxMessageBytes)
consumer, err := sarama.NewConsumer(cfg.Addresses, sCfg)
if err != nil {
logrus.WithError(err).Panic("failed to start kafka consumer")
}
producer, err := sarama.NewSyncProducer(cfg.Addresses, sCfg)
if err != nil {
logrus.WithError(err).Panic("failed to setup kafka producers")
}
return consumer, producer
}
// In monolith mode with Naffka, we don't have the same constraints about
// consuming the same topic from more than one place like we do with Kafka.
// Therefore, we will only open one Naffka connection in case Naffka is
// running on SQLite.
var naffkaInstance *naffka.Naffka
// setupNaffka creates kafka consumer/producer pair from the config.
func setupNaffka(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) {
if naffkaInstance != nil {
return naffkaInstance, naffkaInstance
}
if cfg.Database.ConnectionString.IsCosmosDB() {
//TODO: What do we do for Nafka
naffkaDB, err := naffkacosmosdb.NewDatabase(string(cfg.Database.ConnectionString))
if err != nil {
logrus.WithError(err).Panic("Failed to setup naffka database for Cosmos")
}
naffkaInstance, err = naffka.New(naffkaDB)
if err != nil {
logrus.WithError(err).Panic("Failed to setup naffka for Cosmos")
}
} else {
naffkaDB, err := naffkaStorage.NewDatabase(string(cfg.Database.ConnectionString))
if err != nil {
logrus.WithError(err).Panic("Failed to setup naffka database")
}
naffkaInstance, err = naffka.New(naffkaDB)
if err != nil {
logrus.WithError(err).Panic("Failed to setup naffka")
}
}
return naffkaInstance, naffkaInstance
}