mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-21 13:53:09 -06:00
Fix Naffka setup
This commit is contained in:
parent
185dc07e1e
commit
fea625ef5c
|
|
@ -34,11 +34,11 @@ import (
|
||||||
"github.com/matrix-org/dendrite/currentstateserver/storage"
|
"github.com/matrix-org/dendrite/currentstateserver/storage"
|
||||||
"github.com/matrix-org/dendrite/internal/config"
|
"github.com/matrix-org/dendrite/internal/config"
|
||||||
"github.com/matrix-org/dendrite/internal/httputil"
|
"github.com/matrix-org/dendrite/internal/httputil"
|
||||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
|
||||||
"github.com/matrix-org/dendrite/internal/test"
|
"github.com/matrix-org/dendrite/internal/test"
|
||||||
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
|
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/matrix-org/naffka"
|
"github.com/matrix-org/naffka"
|
||||||
|
naffkaStorage "github.com/matrix-org/naffka/storage"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
|
@ -122,13 +122,7 @@ func MustMakeInternalAPI(t *testing.T) (api.CurrentStateInternalAPI, storage.Dat
|
||||||
cfg.Global.ServerName = "kaer.morhen"
|
cfg.Global.ServerName = "kaer.morhen"
|
||||||
cfg.CurrentStateServer.Database.ConnectionString = config.DataSource("file:" + stateDBName)
|
cfg.CurrentStateServer.Database.ConnectionString = config.DataSource("file:" + stateDBName)
|
||||||
cfg.Global.Kafka.TopicPrefix = kafkaPrefix
|
cfg.Global.Kafka.TopicPrefix = kafkaPrefix
|
||||||
db, err := sqlutil.Open(&config.DatabaseOptions{
|
naffkaDB, err := naffkaStorage.NewDatabase("file:" + naffkaDBName)
|
||||||
ConnectionString: config.DataSource("file:" + naffkaDBName),
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Failed to open naffka database: %s", err)
|
|
||||||
}
|
|
||||||
naffkaDB, err := naffka.NewSqliteDatabase(db)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Failed to setup naffka database: %s", err)
|
t.Fatalf("Failed to setup naffka database: %s", err)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -24,11 +24,12 @@ import (
|
||||||
currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
|
currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
|
||||||
"github.com/matrix-org/dendrite/internal/caching"
|
"github.com/matrix-org/dendrite/internal/caching"
|
||||||
"github.com/matrix-org/dendrite/internal/httputil"
|
"github.com/matrix-org/dendrite/internal/httputil"
|
||||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/matrix-org/naffka"
|
|
||||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||||
|
|
||||||
|
"github.com/matrix-org/naffka"
|
||||||
|
naffkaStorage "github.com/matrix-org/naffka/storage"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/internal"
|
"github.com/matrix-org/dendrite/internal"
|
||||||
"github.com/matrix-org/dendrite/userapi/storage/accounts"
|
"github.com/matrix-org/dendrite/userapi/storage/accounts"
|
||||||
"github.com/matrix-org/dendrite/userapi/storage/devices"
|
"github.com/matrix-org/dendrite/userapi/storage/devices"
|
||||||
|
|
@ -356,36 +357,13 @@ func setupKafka(cfg *config.Dendrite) (sarama.Consumer, sarama.SyncProducer) {
|
||||||
|
|
||||||
// setupNaffka creates kafka consumer/producer pair from the config.
|
// setupNaffka creates kafka consumer/producer pair from the config.
|
||||||
func setupNaffka(cfg *config.Dendrite) (sarama.Consumer, sarama.SyncProducer) {
|
func setupNaffka(cfg *config.Dendrite) (sarama.Consumer, sarama.SyncProducer) {
|
||||||
var naffkaDB *naffka.DatabaseImpl
|
naffkaDB, err := naffkaStorage.NewDatabase(string(cfg.Global.Kafka.Database.ConnectionString))
|
||||||
|
|
||||||
db, err := sqlutil.Open(&cfg.Global.Kafka.Database)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.WithError(err).Panic("Failed to open naffka database")
|
logrus.WithError(err).Panic("Failed to setup naffka database")
|
||||||
}
|
}
|
||||||
|
|
||||||
switch {
|
|
||||||
case cfg.Global.Kafka.Database.ConnectionString.IsSQLite():
|
|
||||||
naffkaDB, err = naffka.NewSqliteDatabase(db)
|
|
||||||
if err != nil {
|
|
||||||
logrus.WithError(err).Panic("Failed to setup naffka database")
|
|
||||||
}
|
|
||||||
case cfg.Global.Kafka.Database.ConnectionString.IsPostgres():
|
|
||||||
naffkaDB, err = naffka.NewPostgresqlDatabase(db)
|
|
||||||
if err != nil {
|
|
||||||
logrus.WithError(err).Panic("Failed to setup naffka database")
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
panic("unknown naffka database type")
|
|
||||||
}
|
|
||||||
|
|
||||||
if naffkaDB == nil {
|
|
||||||
panic("naffka connection string not understood")
|
|
||||||
}
|
|
||||||
|
|
||||||
naff, err := naffka.New(naffkaDB)
|
naff, err := naffka.New(naffkaDB)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.WithError(err).Panic("Failed to setup naffka")
|
logrus.WithError(err).Panic("Failed to setup naffka")
|
||||||
}
|
}
|
||||||
|
|
||||||
return naff, naff
|
return naff, naff
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue