diff --git a/common/basecomponent/base.go b/common/basecomponent/base.go index 8f074a129..432819a23 100644 --- a/common/basecomponent/base.go +++ b/common/basecomponent/base.go @@ -19,6 +19,7 @@ import ( "io" "net/http" "net/url" + "time" "golang.org/x/crypto/ed25519" @@ -70,13 +71,22 @@ func NewBaseDendrite(cfg *config.Dendrite, componentName string) *BaseDendrite { logrus.WithError(err).Panicf("failed to start opentracing") } - kafkaConsumer, kafkaProducer := setupKafka(cfg) + var kafkaConsumer sarama.Consumer + var kafkaProducer sarama.SyncProducer + if cfg.Kafka.UseNaffka { + kafkaConsumer, kafkaProducer = setupNaffka(cfg) + } else { + kafkaConsumer, kafkaProducer = setupKafka(cfg) + } + + const defaultHTTPTimeout = 30 * time.Second return &BaseDendrite{ componentName: componentName, tracerCloser: closer, Cfg: cfg, APIMux: mux.NewRouter().UseEncodedPath(), + httpClient: &http.Client{Timeout: defaultHTTPTimeout}, KafkaConsumer: kafkaConsumer, KafkaProducer: kafkaProducer, } @@ -210,13 +220,8 @@ func (b *BaseDendrite) SetupAndServeHTTP(bindaddr string, listenaddr string) { logrus.Infof("Stopped %s server on %s", b.componentName, addr) } -// setupKafka creates kafka consumer/producer pair from the config. Checks if -// should use naffka. +// setupKafka creates kafka consumer/producer pair from the config. func setupKafka(cfg *config.Dendrite) (sarama.Consumer, sarama.SyncProducer) { - if cfg.Kafka.UseNaffka { - return setupNaffka(cfg) - } - consumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil) if err != nil { logrus.WithError(err).Panic("failed to start kafka consumer") @@ -232,35 +237,35 @@ func setupKafka(cfg *config.Dendrite) (sarama.Consumer, sarama.SyncProducer) { // setupNaffka creates kafka consumer/producer pair from the config. func setupNaffka(cfg *config.Dendrite) (sarama.Consumer, sarama.SyncProducer) { - var sqlDB *sql.DB + var err error + var db *sql.DB var naffkaDB *naffka.DatabaseImpl + uri, err := url.Parse(string(cfg.Database.Naffka)) - if err != nil { - panic(err) + if err != nil || uri.Scheme == "file" { + db, err = sql.Open(common.SQLiteDriverName(), string(cfg.Database.Naffka)) + if err != nil { + logrus.WithError(err).Panic("Failed to open naffka database") + } + + naffkaDB, err = naffka.NewSqliteDatabase(db) + if err != nil { + logrus.WithError(err).Panic("Failed to setup naffka database") + } + } else { + db, err = sql.Open("postgres", string(cfg.Database.Naffka)) + if err != nil { + logrus.WithError(err).Panic("Failed to open naffka database") + } + + naffkaDB, err = naffka.NewPostgresqlDatabase(db) + if err != nil { + logrus.WithError(err).Panic("Failed to setup naffka database") + } } - switch uri.Scheme { - case "file": - sqlDB, err = sql.Open(common.SQLiteDriverName(), string(cfg.Database.Naffka)) - if err != nil { - logrus.WithError(err).Panic("Failed to open naffka database") - } - naffkaDB, err = naffka.NewSqliteDatabase(sqlDB) - if err != nil { - logrus.WithError(err).Panic("Failed to setup naffka database") - } - case "postgres": - fallthrough - default: - sqlDB, err = sql.Open("postgres", string(cfg.Database.Naffka)) - if err != nil { - logrus.WithError(err).Panic("Failed to open naffka database") - } - - naffkaDB, err = naffka.NewPostgresqlDatabase(sqlDB) - if err != nil { - logrus.WithError(err).Panic("Failed to setup naffka database") - } + if naffkaDB == nil { + panic("naffka connection string not understood") } naff, err := naffka.New(naffkaDB)