mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-17 03:43:11 -06:00
Set base.go to upstream/master
This commit is contained in:
parent
4454d5593d
commit
68436952a9
|
|
@ -19,6 +19,7 @@ import (
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
|
"time"
|
||||||
|
|
||||||
"golang.org/x/crypto/ed25519"
|
"golang.org/x/crypto/ed25519"
|
||||||
|
|
||||||
|
|
@ -70,13 +71,22 @@ func NewBaseDendrite(cfg *config.Dendrite, componentName string) *BaseDendrite {
|
||||||
logrus.WithError(err).Panicf("failed to start opentracing")
|
logrus.WithError(err).Panicf("failed to start opentracing")
|
||||||
}
|
}
|
||||||
|
|
||||||
kafkaConsumer, kafkaProducer := setupKafka(cfg)
|
var kafkaConsumer sarama.Consumer
|
||||||
|
var kafkaProducer sarama.SyncProducer
|
||||||
|
if cfg.Kafka.UseNaffka {
|
||||||
|
kafkaConsumer, kafkaProducer = setupNaffka(cfg)
|
||||||
|
} else {
|
||||||
|
kafkaConsumer, kafkaProducer = setupKafka(cfg)
|
||||||
|
}
|
||||||
|
|
||||||
|
const defaultHTTPTimeout = 30 * time.Second
|
||||||
|
|
||||||
return &BaseDendrite{
|
return &BaseDendrite{
|
||||||
componentName: componentName,
|
componentName: componentName,
|
||||||
tracerCloser: closer,
|
tracerCloser: closer,
|
||||||
Cfg: cfg,
|
Cfg: cfg,
|
||||||
APIMux: mux.NewRouter().UseEncodedPath(),
|
APIMux: mux.NewRouter().UseEncodedPath(),
|
||||||
|
httpClient: &http.Client{Timeout: defaultHTTPTimeout},
|
||||||
KafkaConsumer: kafkaConsumer,
|
KafkaConsumer: kafkaConsumer,
|
||||||
KafkaProducer: kafkaProducer,
|
KafkaProducer: kafkaProducer,
|
||||||
}
|
}
|
||||||
|
|
@ -210,13 +220,8 @@ func (b *BaseDendrite) SetupAndServeHTTP(bindaddr string, listenaddr string) {
|
||||||
logrus.Infof("Stopped %s server on %s", b.componentName, addr)
|
logrus.Infof("Stopped %s server on %s", b.componentName, addr)
|
||||||
}
|
}
|
||||||
|
|
||||||
// setupKafka creates kafka consumer/producer pair from the config. Checks if
|
// setupKafka creates kafka consumer/producer pair from the config.
|
||||||
// should use naffka.
|
|
||||||
func setupKafka(cfg *config.Dendrite) (sarama.Consumer, sarama.SyncProducer) {
|
func setupKafka(cfg *config.Dendrite) (sarama.Consumer, sarama.SyncProducer) {
|
||||||
if cfg.Kafka.UseNaffka {
|
|
||||||
return setupNaffka(cfg)
|
|
||||||
}
|
|
||||||
|
|
||||||
consumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil)
|
consumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.WithError(err).Panic("failed to start kafka consumer")
|
logrus.WithError(err).Panic("failed to start kafka consumer")
|
||||||
|
|
@ -232,37 +237,37 @@ func setupKafka(cfg *config.Dendrite) (sarama.Consumer, sarama.SyncProducer) {
|
||||||
|
|
||||||
// setupNaffka creates kafka consumer/producer pair from the config.
|
// setupNaffka creates kafka consumer/producer pair from the config.
|
||||||
func setupNaffka(cfg *config.Dendrite) (sarama.Consumer, sarama.SyncProducer) {
|
func setupNaffka(cfg *config.Dendrite) (sarama.Consumer, sarama.SyncProducer) {
|
||||||
var sqlDB *sql.DB
|
var err error
|
||||||
|
var db *sql.DB
|
||||||
var naffkaDB *naffka.DatabaseImpl
|
var naffkaDB *naffka.DatabaseImpl
|
||||||
|
|
||||||
uri, err := url.Parse(string(cfg.Database.Naffka))
|
uri, err := url.Parse(string(cfg.Database.Naffka))
|
||||||
if err != nil {
|
if err != nil || uri.Scheme == "file" {
|
||||||
panic(err)
|
db, err = sql.Open(common.SQLiteDriverName(), string(cfg.Database.Naffka))
|
||||||
}
|
|
||||||
switch uri.Scheme {
|
|
||||||
case "file":
|
|
||||||
sqlDB, err = sql.Open(common.SQLiteDriverName(), string(cfg.Database.Naffka))
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.WithError(err).Panic("Failed to open naffka database")
|
logrus.WithError(err).Panic("Failed to open naffka database")
|
||||||
}
|
}
|
||||||
|
|
||||||
naffkaDB, err = naffka.NewSqliteDatabase(sqlDB)
|
naffkaDB, err = naffka.NewSqliteDatabase(db)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.WithError(err).Panic("Failed to setup naffka database")
|
logrus.WithError(err).Panic("Failed to setup naffka database")
|
||||||
}
|
}
|
||||||
case "postgres":
|
} else {
|
||||||
fallthrough
|
db, err = sql.Open("postgres", string(cfg.Database.Naffka))
|
||||||
default:
|
|
||||||
sqlDB, err = sql.Open("postgres", string(cfg.Database.Naffka))
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.WithError(err).Panic("Failed to open naffka database")
|
logrus.WithError(err).Panic("Failed to open naffka database")
|
||||||
}
|
}
|
||||||
|
|
||||||
naffkaDB, err = naffka.NewPostgresqlDatabase(sqlDB)
|
naffkaDB, err = naffka.NewPostgresqlDatabase(db)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.WithError(err).Panic("Failed to setup naffka database")
|
logrus.WithError(err).Panic("Failed to setup naffka database")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if naffkaDB == nil {
|
||||||
|
panic("naffka connection string not understood")
|
||||||
|
}
|
||||||
|
|
||||||
naff, err := naffka.New(naffkaDB)
|
naff, err := naffka.New(naffkaDB)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.WithError(err).Panic("Failed to setup naffka")
|
logrus.WithError(err).Panic("Failed to setup naffka")
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue