diff --git a/common/basecomponent/base.go b/common/basecomponent/base.go index f7a02a304..7d6ef9721 100644 --- a/common/basecomponent/base.go +++ b/common/basecomponent/base.go @@ -18,6 +18,7 @@ import ( "database/sql" "io" "net/http" + "net/url" "golang.org/x/crypto/ed25519" @@ -68,7 +69,13 @@ func NewBaseDendrite(cfg *config.Dendrite, componentName string) *BaseDendrite { logrus.WithError(err).Panicf("failed to start opentracing") } - kafkaConsumer, kafkaProducer := setupKafka(cfg) + var kafkaConsumer sarama.Consumer + var kafkaProducer sarama.SyncProducer + if cfg.Kafka.UseNaffka { + kafkaConsumer, kafkaProducer = setupNaffka(cfg) + } else { + kafkaConsumer, kafkaProducer = setupKafka(cfg) + } return &BaseDendrite{ componentName: componentName, @@ -186,28 +193,8 @@ func (b *BaseDendrite) SetupAndServeHTTP(bindaddr string, listenaddr string) { logrus.Infof("Stopped %s server on %s", b.componentName, addr) } -// setupKafka creates kafka consumer/producer pair from the config. Checks if -// should use naffka. +// setupKafka creates kafka consumer/producer pair from the config. func setupKafka(cfg *config.Dendrite) (sarama.Consumer, sarama.SyncProducer) { - if cfg.Kafka.UseNaffka { - db, err := sql.Open("postgres", string(cfg.Database.Naffka)) - if err != nil { - logrus.WithError(err).Panic("Failed to open naffka database") - } - - naffkaDB, err := naffka.NewPostgresqlDatabase(db) - if err != nil { - logrus.WithError(err).Panic("Failed to setup naffka database") - } - - naff, err := naffka.New(naffkaDB) - if err != nil { - logrus.WithError(err).Panic("Failed to setup naffka") - } - - return naff, naff - } - consumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil) if err != nil { logrus.WithError(err).Panic("failed to start kafka consumer") @@ -220,3 +207,44 @@ func setupKafka(cfg *config.Dendrite) (sarama.Consumer, sarama.SyncProducer) { return consumer, producer } + +// setupNaffka creates kafka consumer/producer pair from the config. +func setupNaffka(cfg *config.Dendrite) (sarama.Consumer, sarama.SyncProducer) { + var err error + var db *sql.DB + var naffkaDB *naffka.DatabaseImpl + + uri, err := url.Parse(string(cfg.Database.Naffka)) + if err != nil || uri.Scheme == "postgres" { + db, err = sql.Open("postgres", string(cfg.Database.Naffka)) + if err != nil { + logrus.WithError(err).Panic("Failed to open naffka database") + } + + naffkaDB, err = naffka.NewPostgresqlDatabase(db) + if err != nil { + logrus.WithError(err).Panic("Failed to setup naffka database") + } + } else if uri.Scheme == "file" { + db, err = sql.Open("sqlite3", string(cfg.Database.Naffka)) + if err != nil { + logrus.WithError(err).Panic("Failed to open naffka database") + } + + naffkaDB, err = naffka.NewSqliteDatabase(db) + if err != nil { + logrus.WithError(err).Panic("Failed to setup naffka database") + } + } + + if naffkaDB == nil { + panic("naffka connection string not understood") + } + + naff, err := naffka.New(naffkaDB) + if err != nil { + logrus.WithError(err).Panic("Failed to setup naffka") + } + + return naff, naff +} diff --git a/go.mod b/go.mod index f362819ef..2a45fc316 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,7 @@ require ( github.com/matrix-org/dugong v0.0.0-20171220115018-ea0a4690a0d5 github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26 github.com/matrix-org/gomatrixserverlib v0.0.0-20200124100636-0c2ec91d1df5 - github.com/matrix-org/naffka v0.0.0-20200127181638-7bc0d2da4cbf + github.com/matrix-org/naffka v0.0.0-20200127193556-abd5119028e1 github.com/matrix-org/util v0.0.0-20171127121716-2e2df66af2f5 github.com/mattn/go-sqlite3 v2.0.2+incompatible github.com/miekg/dns v1.1.12 // indirect @@ -39,3 +39,5 @@ require ( gopkg.in/h2non/bimg.v1 v1.0.18 gopkg.in/yaml.v2 v2.2.2 ) + +go 1.13 diff --git a/go.sum b/go.sum index 829066b85..c41e20720 100644 --- a/go.sum +++ b/go.sum @@ -73,6 +73,12 @@ github.com/matrix-org/gomatrixserverlib v0.0.0-20200124100636-0c2ec91d1df5 h1:km github.com/matrix-org/gomatrixserverlib v0.0.0-20200124100636-0c2ec91d1df5/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI= github.com/matrix-org/naffka v0.0.0-20200127181638-7bc0d2da4cbf h1:yI/mCJkuBCNmYfTxrh65yMv1cZ1Sv3pRupBcQgawAOc= github.com/matrix-org/naffka v0.0.0-20200127181638-7bc0d2da4cbf/go.mod h1:cXoYQIENbdWIQHt1SyCo6Bl3C3raHwJ0wgVrXHSqf+A= +github.com/matrix-org/naffka v0.0.0-20200127184217-922c1d71368e h1:/L+cpenI6bCYTo2Ljp/o+0027MfxQOm33OvxgX0uoCc= +github.com/matrix-org/naffka v0.0.0-20200127184217-922c1d71368e/go.mod h1:cXoYQIENbdWIQHt1SyCo6Bl3C3raHwJ0wgVrXHSqf+A= +github.com/matrix-org/naffka v0.0.0-20200127185919-42bbc99f1dae h1:UAZ91pICTaJKQCTEUeICHvZ1Ttfsi7+Pxx5mkB+1uwA= +github.com/matrix-org/naffka v0.0.0-20200127185919-42bbc99f1dae/go.mod h1:cXoYQIENbdWIQHt1SyCo6Bl3C3raHwJ0wgVrXHSqf+A= +github.com/matrix-org/naffka v0.0.0-20200127193556-abd5119028e1 h1:JgIX6fkFwEzVMWE8hf71blUeE67PBuv0M3gR7JqnYUY= +github.com/matrix-org/naffka v0.0.0-20200127193556-abd5119028e1/go.mod h1:cXoYQIENbdWIQHt1SyCo6Bl3C3raHwJ0wgVrXHSqf+A= github.com/matrix-org/util v0.0.0-20171127121716-2e2df66af2f5 h1:W7l5CP4V7wPyPb4tYE11dbmeAOwtFQBTW0rf4OonOS8= github.com/matrix-org/util v0.0.0-20171127121716-2e2df66af2f5/go.mod h1:lePuOiXLNDott7NZfnQvJk0lAZ5HgvIuWGhel6J+RLA= github.com/mattn/go-sqlite3 v2.0.2+incompatible h1:qzw9c2GNT8UFrgWNDhCTqRqYUSmu/Dav/9Z58LGpk7U=