From 1d673f3b62512a14709ccb9f91f2fe665677d8d1 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 8 Apr 2020 16:55:29 +0100 Subject: [PATCH] Fix some linter errors, update some libp2p packages/calls, other tidying up --- cmd/dendrite-p2p-demo/main.go | 37 ++++---- common/basecomponent/base.go | 95 ++++++------------- common/basecomponent/libp2p.go | 2 +- go.mod | 4 - go.sum | 2 - .../storage/postgreswithpubsub/storage.go | 8 +- publicroomsapi/storage/storage.go | 15 +-- 7 files changed, 63 insertions(+), 100 deletions(-) diff --git a/cmd/dendrite-p2p-demo/main.go b/cmd/dendrite-p2p-demo/main.go index cb3d64cf5..aeb136921 100644 --- a/cmd/dendrite-p2p-demo/main.go +++ b/cmd/dendrite-p2p-demo/main.go @@ -16,11 +16,11 @@ package main import ( "crypto/ed25519" + "flag" "fmt" "io/ioutil" "net/http" "os" - "os/user" gostream "github.com/libp2p/go-libp2p-gostream" "github.com/matrix-org/dendrite/appservice" @@ -45,19 +45,16 @@ import ( "github.com/sirupsen/logrus" ) -const PrivateKeyFileName = ".dendrite-p2p-private" - func main() { - filename := PrivateKeyFileName - if u, err := user.Current(); err == nil { - filename = fmt.Sprintf("%s/%s", u.HomeDir, PrivateKeyFileName) - } + instanceName := flag.String("name", "dendrite-p2p", "the name of this P2P demo instance") + flag.Parse() + filename := fmt.Sprintf("%s-private.key", *instanceName) _, err := os.Stat(filename) var privKey ed25519.PrivateKey if os.IsNotExist(err) { _, privKey, _ = ed25519.GenerateKey(nil) - if err := ioutil.WriteFile(filename, privKey, 0600); err != nil { + if err = ioutil.WriteFile(filename, privKey, 0600); err != nil { fmt.Printf("Couldn't write private key to file '%s': %s\n", filename, err) } } else { @@ -77,17 +74,19 @@ func main() { cfg.Kafka.Topics.OutputClientData = "clientapiOutput" cfg.Kafka.Topics.OutputTypingEvent = "typingServerOutput" cfg.Kafka.Topics.UserUpdates = "userUpdates" - cfg.Database.Account = config.DataSource("file:account.db") - cfg.Database.Device = config.DataSource("file:device.db") - cfg.Database.MediaAPI = config.DataSource("file:media_api.db") - cfg.Database.SyncAPI = config.DataSource("file:sync_api.db") - cfg.Database.RoomServer = config.DataSource("file:room_server.db") - cfg.Database.ServerKey = config.DataSource("file:server_key.db") - cfg.Database.FederationSender = config.DataSource("file:federation_sender.db") - cfg.Database.AppService = config.DataSource("file:app_service.db") - cfg.Database.PublicRoomsAPI = config.DataSource("file:public_rooms_api.db") - cfg.Database.Naffka = config.DataSource("file:naffka.db") - cfg.Derive() + cfg.Database.Account = config.DataSource(fmt.Sprintf("file:%s-account.db", *instanceName)) + cfg.Database.Device = config.DataSource(fmt.Sprintf("file:%s-device.db", *instanceName)) + cfg.Database.MediaAPI = config.DataSource(fmt.Sprintf("file:%s-mediaapi.db", *instanceName)) + cfg.Database.SyncAPI = config.DataSource(fmt.Sprintf("file:%s-syncapi.db", *instanceName)) + cfg.Database.RoomServer = config.DataSource(fmt.Sprintf("file:%s-roomserver.db", *instanceName)) + cfg.Database.ServerKey = config.DataSource(fmt.Sprintf("file:%s-serverkey.db", *instanceName)) + cfg.Database.FederationSender = config.DataSource(fmt.Sprintf("file:%s-federationsender.db", *instanceName)) + cfg.Database.AppService = config.DataSource(fmt.Sprintf("file:%s-appservice.db", *instanceName)) + cfg.Database.PublicRoomsAPI = config.DataSource(fmt.Sprintf("file:%s-publicroomsa.db", *instanceName)) + cfg.Database.Naffka = config.DataSource(fmt.Sprintf("file:%s-naffka.db", *instanceName)) + if err := cfg.Derive(); err != nil { + panic(err) + } base := basecomponent.NewBaseDendrite(&cfg, "Monolith") defer base.Close() // nolint: errcheck diff --git a/common/basecomponent/base.go b/common/basecomponent/base.go index 75e360260..6617dd441 100644 --- a/common/basecomponent/base.go +++ b/common/basecomponent/base.go @@ -28,11 +28,11 @@ import ( "github.com/libp2p/go-libp2p" circuit "github.com/libp2p/go-libp2p-circuit" + crypto "github.com/libp2p/go-libp2p-core/crypto" "github.com/libp2p/go-libp2p-core/peer" - crypto "github.com/libp2p/go-libp2p-crypto" - routing "github.com/libp2p/go-libp2p-routing" + routing "github.com/libp2p/go-libp2p-core/routing" - host "github.com/libp2p/go-libp2p-host" + host "github.com/libp2p/go-libp2p-core/host" p2phttp "github.com/libp2p/go-libp2p-http" dht "github.com/libp2p/go-libp2p-kad-dht" pubsub "github.com/libp2p/go-libp2p-pubsub" @@ -326,44 +326,7 @@ func (b *BaseDendrite) SetupAndServeHTTP(bindaddr string, listenaddr string) { // should use naffka. func setupKafka(cfg *config.Dendrite) (sarama.Consumer, sarama.SyncProducer) { if cfg.Kafka.UseNaffka { - var naffkaDB *naffka.DatabaseImpl - uri, err := url.Parse(string(cfg.Database.Naffka)) - if err != nil { - panic(err) - } - switch uri.Scheme { - case "file": - db, err := sql.Open(common.SQLiteDriverName(), string(cfg.Database.Naffka)) - if err != nil { - logrus.WithError(err).Panic("Failed to open naffka database") - } - - naffkaDB, err = naffka.NewSqliteDatabase(db) - if err != nil { - logrus.WithError(err).Panic("Failed to setup naffka database") - } - case "postgres": - fallthrough - default: - db, err := sql.Open("postgres", string(cfg.Database.Naffka)) - if err != nil { - logrus.WithError(err).Panic("Failed to open naffka database") - } - - naffkaDB, err = naffka.NewPostgresqlDatabase(db) - if err != nil { - logrus.WithError(err).Panic("Failed to setup naffka database") - } - } - - naff, err := naffka.New(naffkaDB) - if err != nil { - panic(err) - } - - //logrus.WithError(err).Panic("Failed to setup naffka") - - return naff, naff + return setupNaffka(cfg) } consumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil) @@ -381,35 +344,35 @@ func setupKafka(cfg *config.Dendrite) (sarama.Consumer, sarama.SyncProducer) { // setupNaffka creates kafka consumer/producer pair from the config. func setupNaffka(cfg *config.Dendrite) (sarama.Consumer, sarama.SyncProducer) { - var err error - var db *sql.DB + var sqlDB *sql.DB var naffkaDB *naffka.DatabaseImpl - uri, err := url.Parse(string(cfg.Database.Naffka)) - if err != nil || uri.Scheme == "file" { - db, err = sql.Open(common.SQLiteDriverName(), string(cfg.Database.Naffka)) - if err != nil { - logrus.WithError(err).Panic("Failed to open naffka database") - } - - naffkaDB, err = naffka.NewSqliteDatabase(db) - if err != nil { - logrus.WithError(err).Panic("Failed to setup naffka database") - } - } else { - db, err = sql.Open("postgres", string(cfg.Database.Naffka)) - if err != nil { - logrus.WithError(err).Panic("Failed to open naffka database") - } - - naffkaDB, err = naffka.NewPostgresqlDatabase(db) - if err != nil { - logrus.WithError(err).Panic("Failed to setup naffka database") - } + if err != nil { + panic(err) } + switch uri.Scheme { + case "file": + sqlDB, err = sql.Open(common.SQLiteDriverName(), string(cfg.Database.Naffka)) + if err != nil { + logrus.WithError(err).Panic("Failed to open naffka database") + } - if naffkaDB == nil { - panic("naffka connection string not understood") + naffkaDB, err = naffka.NewSqliteDatabase(sqlDB) + if err != nil { + logrus.WithError(err).Panic("Failed to setup naffka database") + } + case "postgres": + fallthrough + default: + sqlDB, err = sql.Open("postgres", string(cfg.Database.Naffka)) + if err != nil { + logrus.WithError(err).Panic("Failed to open naffka database") + } + + naffkaDB, err = naffka.NewPostgresqlDatabase(sqlDB) + if err != nil { + logrus.WithError(err).Panic("Failed to setup naffka database") + } } naff, err := naffka.New(naffkaDB) diff --git a/common/basecomponent/libp2p.go b/common/basecomponent/libp2p.go index d643d94df..f25913af9 100644 --- a/common/basecomponent/libp2p.go +++ b/common/basecomponent/libp2p.go @@ -3,7 +3,7 @@ package basecomponent import ( "errors" - pstore "github.com/libp2p/go-libp2p-peerstore" + pstore "github.com/libp2p/go-libp2p-core/peerstore" record "github.com/libp2p/go-libp2p-record" ) diff --git a/go.mod b/go.mod index 07fd48eb3..736508470 100644 --- a/go.mod +++ b/go.mod @@ -7,15 +7,11 @@ require ( github.com/libp2p/go-libp2p v0.6.0 github.com/libp2p/go-libp2p-circuit v0.1.4 github.com/libp2p/go-libp2p-core v0.5.0 - github.com/libp2p/go-libp2p-crypto v0.1.0 github.com/libp2p/go-libp2p-gostream v0.2.1 - github.com/libp2p/go-libp2p-host v0.1.0 github.com/libp2p/go-libp2p-http v0.1.5 github.com/libp2p/go-libp2p-kad-dht v0.5.0 - github.com/libp2p/go-libp2p-peerstore v0.2.0 github.com/libp2p/go-libp2p-pubsub v0.2.5 github.com/libp2p/go-libp2p-record v0.1.2 - github.com/libp2p/go-libp2p-routing v0.1.0 github.com/matrix-org/dugong v0.0.0-20171220115018-ea0a4690a0d5 github.com/matrix-org/go-http-js-libp2p v0.0.0-20200318135427-31631a9ef51f github.com/matrix-org/go-sqlite3-js v0.0.0-20200325174927-327088cdef10 diff --git a/go.sum b/go.sum index fd14409c3..4ccf7cbe4 100644 --- a/go.sum +++ b/go.sum @@ -248,8 +248,6 @@ github.com/libp2p/go-libp2p-discovery v0.2.0 h1:1p3YSOq7VsgaL+xVHPi8XAmtGyas6D2J github.com/libp2p/go-libp2p-discovery v0.2.0/go.mod h1:s4VGaxYMbw4+4+tsoQTqh7wfxg97AEdo4GYBt6BadWg= github.com/libp2p/go-libp2p-gostream v0.2.1 h1:JjA9roGokaR2BgWmaI/3HQu1/+jSbVVDLatQGnVdGjI= github.com/libp2p/go-libp2p-gostream v0.2.1/go.mod h1:1Mjp3LDmkqICe5tH9yLVNCqFaRTy6OwBvuJV6j1b9Nk= -github.com/libp2p/go-libp2p-host v0.1.0 h1:OZwENiFm6JOK3YR5PZJxkXlJE8a5u8g4YvAUrEV2MjM= -github.com/libp2p/go-libp2p-host v0.1.0/go.mod h1:5+fWuLbDn8OxoxPN3CV0vsLe1hAKScSMbT84qRfxum8= github.com/libp2p/go-libp2p-http v0.1.5 h1:FfLnzjlEzV4/6UCXCpPXRYZNoGCfogqCFjd7eF0Jbm8= github.com/libp2p/go-libp2p-http v0.1.5/go.mod h1:2YfPjsQxUlBGFQl2u461unkQ7ukwiSs7NX2eSslOJiU= github.com/libp2p/go-libp2p-kad-dht v0.5.0 h1:kDMtCftpQOL2s84/dZmw5z4NmBe6ByeDLKpcn6TcyxU= diff --git a/publicroomsapi/storage/postgreswithpubsub/storage.go b/publicroomsapi/storage/postgreswithpubsub/storage.go index 0eaac5ca6..5a4dcda18 100644 --- a/publicroomsapi/storage/postgreswithpubsub/storage.go +++ b/publicroomsapi/storage/postgreswithpubsub/storage.go @@ -58,7 +58,9 @@ func NewPublicRoomsServerDatabase(dataSourceName string, pubsub *pubsub.PubSub) PublicRoomsServerDatabase: *pg, foundRooms: make(map[string]discoveredRoom), } - if sub, err := pubsub.Subscribe("/matrix/publicRooms"); err == nil { + if topic, err := pubsub.Join("/matrix/publicRooms"); err != nil { + return nil, err + } else if sub, err := topic.Subscribe(); err == nil { provider.subscription = sub go provider.MaintenanceTimer() go provider.FindRooms() @@ -144,7 +146,9 @@ func (d *PublicRoomsServerDatabase) AdvertiseRooms() error { advertised := 0 for _, room := range ourRooms { if j, err := json.Marshal(room); err == nil { - if err := d.pubsub.Publish("/matrix/publicRooms", j); err != nil { + if topic, err := d.pubsub.Join("/matrix/publicRooms"); err != nil { + fmt.Println("Failed to subscribe to topic:", err) + } else if err := topic.Publish(context.TODO(), j); err != nil { fmt.Println("Failed to publish public room:", err) } else { advertised++ diff --git a/publicroomsapi/storage/storage.go b/publicroomsapi/storage/storage.go index 43755b413..43ef9628a 100644 --- a/publicroomsapi/storage/storage.go +++ b/publicroomsapi/storage/storage.go @@ -27,6 +27,9 @@ import ( "github.com/matrix-org/dendrite/publicroomsapi/storage/sqlite3" ) +const schemePostgres = "postgres" +const schemeFile = "file" + // NewPublicRoomsServerDatabase opens a database connection. func NewPublicRoomsServerDatabase(dataSourceName string) (Database, error) { uri, err := url.Parse(dataSourceName) @@ -34,9 +37,9 @@ func NewPublicRoomsServerDatabase(dataSourceName string) (Database, error) { return postgres.NewPublicRoomsServerDatabase(dataSourceName) } switch uri.Scheme { - case "postgres": + case schemePostgres: return postgres.NewPublicRoomsServerDatabase(dataSourceName) - case "file": + case schemeFile: return sqlite3.NewPublicRoomsServerDatabase(dataSourceName) default: return postgres.NewPublicRoomsServerDatabase(dataSourceName) @@ -50,9 +53,9 @@ func NewPublicRoomsServerDatabaseWithDHT(dataSourceName string, dht *dht.IpfsDHT return postgreswithdht.NewPublicRoomsServerDatabase(dataSourceName, dht) } switch uri.Scheme { - case "postgres": + case schemePostgres: return postgreswithdht.NewPublicRoomsServerDatabase(dataSourceName, dht) - case "file": + case schemeFile: return sqlite3.NewPublicRoomsServerDatabase(dataSourceName) default: return postgreswithdht.NewPublicRoomsServerDatabase(dataSourceName, dht) @@ -66,9 +69,9 @@ func NewPublicRoomsServerDatabaseWithPubSub(dataSourceName string, pubsub *pubsu return postgreswithpubsub.NewPublicRoomsServerDatabase(dataSourceName, pubsub) } switch uri.Scheme { - case "postgres": + case schemePostgres: return postgreswithpubsub.NewPublicRoomsServerDatabase(dataSourceName, pubsub) - case "file": + case schemeFile: return sqlite3.NewPublicRoomsServerDatabase(dataSourceName) default: return postgreswithpubsub.NewPublicRoomsServerDatabase(dataSourceName, pubsub)