diff --git a/currentstateserver/currentstateserver_test.go b/currentstateserver/currentstateserver_test.go index 09b91c276..b83103f13 100644 --- a/currentstateserver/currentstateserver_test.go +++ b/currentstateserver/currentstateserver_test.go @@ -34,11 +34,11 @@ import ( "github.com/matrix-org/dendrite/currentstateserver/storage" "github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/internal/httputil" - "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/internal/test" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/naffka" + naffkaStorage "github.com/matrix-org/naffka/storage" ) var ( @@ -122,13 +122,7 @@ func MustMakeInternalAPI(t *testing.T) (api.CurrentStateInternalAPI, storage.Dat cfg.Global.ServerName = "kaer.morhen" cfg.CurrentStateServer.Database.ConnectionString = config.DataSource("file:" + stateDBName) cfg.Global.Kafka.TopicPrefix = kafkaPrefix - db, err := sqlutil.Open(&config.DatabaseOptions{ - ConnectionString: config.DataSource("file:" + naffkaDBName), - }) - if err != nil { - t.Fatalf("Failed to open naffka database: %s", err) - } - naffkaDB, err := naffka.NewSqliteDatabase(db) + naffkaDB, err := naffkaStorage.NewDatabase("file:" + naffkaDBName) if err != nil { t.Fatalf("Failed to setup naffka database: %s", err) } diff --git a/internal/setup/base.go b/internal/setup/base.go index a7b495ebc..fc4083115 100644 --- a/internal/setup/base.go +++ b/internal/setup/base.go @@ -24,11 +24,12 @@ import ( currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api" "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/internal/httputil" - "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/gomatrixserverlib" - "github.com/matrix-org/naffka" "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/matrix-org/naffka" + naffkaStorage "github.com/matrix-org/naffka/storage" + "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/userapi/storage/accounts" "github.com/matrix-org/dendrite/userapi/storage/devices" @@ -356,36 +357,13 @@ func setupKafka(cfg *config.Dendrite) (sarama.Consumer, sarama.SyncProducer) { // setupNaffka creates kafka consumer/producer pair from the config. func setupNaffka(cfg *config.Dendrite) (sarama.Consumer, sarama.SyncProducer) { - var naffkaDB *naffka.DatabaseImpl - - db, err := sqlutil.Open(&cfg.Global.Kafka.Database) + naffkaDB, err := naffkaStorage.NewDatabase(string(cfg.Global.Kafka.Database.ConnectionString)) if err != nil { - logrus.WithError(err).Panic("Failed to open naffka database") + logrus.WithError(err).Panic("Failed to setup naffka database") } - - switch { - case cfg.Global.Kafka.Database.ConnectionString.IsSQLite(): - naffkaDB, err = naffka.NewSqliteDatabase(db) - if err != nil { - logrus.WithError(err).Panic("Failed to setup naffka database") - } - case cfg.Global.Kafka.Database.ConnectionString.IsPostgres(): - naffkaDB, err = naffka.NewPostgresqlDatabase(db) - if err != nil { - logrus.WithError(err).Panic("Failed to setup naffka database") - } - default: - panic("unknown naffka database type") - } - - if naffkaDB == nil { - panic("naffka connection string not understood") - } - naff, err := naffka.New(naffkaDB) if err != nil { logrus.WithError(err).Panic("Failed to setup naffka") } - return naff, naff }