From dbc2869cbd36b069f4a0745557320c5fe1daf0f7 Mon Sep 17 00:00:00 2001 From: Devon Hudson Date: Tue, 31 Jan 2023 17:35:21 -0700 Subject: [PATCH] Refactor pinecone demo to remove duplicate pinecone setup --- build/gobind-pinecone/monolith.go | 125 ++++++------------ cmd/dendrite-demo-pinecone/main.go | 86 ++++-------- .../{keys => monolith}/keys.go | 2 +- .../monolith/monolith.go | 80 +++++++++++ 4 files changed, 153 insertions(+), 140 deletions(-) rename cmd/dendrite-demo-pinecone/{keys => monolith}/keys.go (99%) create mode 100644 cmd/dendrite-demo-pinecone/monolith/monolith.go diff --git a/build/gobind-pinecone/monolith.go b/build/gobind-pinecone/monolith.go index dd4838737..cca92151b 100644 --- a/build/gobind-pinecone/monolith.go +++ b/build/gobind-pinecone/monolith.go @@ -32,7 +32,7 @@ import ( "github.com/matrix-org/dendrite/clientapi/userutil" "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/conduit" "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/conn" - "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/keys" + "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/monolith" "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/relay" "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/rooms" "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/users" @@ -47,21 +47,18 @@ import ( "github.com/matrix-org/dendrite/roomserver" "github.com/matrix-org/dendrite/setup" "github.com/matrix-org/dendrite/setup/base" - "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/userapi" userapiAPI "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/pinecone/types" "github.com/sirupsen/logrus" "golang.org/x/net/http2" "golang.org/x/net/http2/h2c" - pineconeConnections "github.com/matrix-org/pinecone/connections" pineconeMulticast "github.com/matrix-org/pinecone/multicast" pineconeRouter "github.com/matrix-org/pinecone/router" pineconeEvents "github.com/matrix-org/pinecone/router/events" - pineconeSessions "github.com/matrix-org/pinecone/sessions" - "github.com/matrix-org/pinecone/types" _ "golang.org/x/mobile/bind" ) @@ -74,24 +71,20 @@ const ( ) type DendriteMonolith struct { - logger logrus.Logger - baseDendrite *base.BaseDendrite - PineconeRouter *pineconeRouter.Router - PineconeMulticast *pineconeMulticast.Multicast - PineconeQUIC *pineconeSessions.Sessions - PineconeManager *pineconeConnections.ConnectionManager - StorageDirectory string - CacheDirectory string - listener net.Listener - httpServer *http.Server - userAPI userapiAPI.UserInternalAPI - federationAPI api.FederationInternalAPI - relayAPI relayServerAPI.RelayInternalAPI - relayRetriever relay.RelayServerRetriever + logger logrus.Logger + baseDendrite *base.BaseDendrite + p2pMonolith monolith.P2PMonolith + StorageDirectory string + listener net.Listener + httpServer *http.Server + userAPI userapiAPI.UserInternalAPI + federationAPI api.FederationInternalAPI + relayAPI relayServerAPI.RelayInternalAPI + relayRetriever relay.RelayServerRetriever } func (m *DendriteMonolith) PublicKey() string { - return m.PineconeRouter.PublicKey().String() + return m.p2pMonolith.Router.PublicKey().String() } func (m *DendriteMonolith) BaseURL() string { @@ -99,11 +92,11 @@ func (m *DendriteMonolith) BaseURL() string { } func (m *DendriteMonolith) PeerCount(peertype int) int { - return m.PineconeRouter.PeerCount(peertype) + return m.p2pMonolith.Router.PeerCount(peertype) } func (m *DendriteMonolith) SessionCount() int { - return len(m.PineconeQUIC.Protocol("matrix").Sessions()) + return len(m.p2pMonolith.Sessions.Protocol("matrix").Sessions()) } type InterfaceInfo struct { @@ -145,22 +138,22 @@ func (m *DendriteMonolith) RegisterNetworkCallback(intfCallback InterfaceRetriev } return intfs } - m.PineconeMulticast.RegisterNetworkCallback(callback) + m.p2pMonolith.Multicast.RegisterNetworkCallback(callback) } func (m *DendriteMonolith) SetMulticastEnabled(enabled bool) { if enabled { - m.PineconeMulticast.Start() + m.p2pMonolith.Multicast.Start() } else { - m.PineconeMulticast.Stop() + m.p2pMonolith.Multicast.Stop() m.DisconnectType(int(pineconeRouter.PeerTypeMulticast)) } } func (m *DendriteMonolith) SetStaticPeer(uri string) { - m.PineconeManager.RemovePeers() + m.p2pMonolith.ConnManager.RemovePeers() for _, uri := range strings.Split(uri, ",") { - m.PineconeManager.AddPeer(strings.TrimSpace(uri)) + m.p2pMonolith.ConnManager.AddPeer(strings.TrimSpace(uri)) } } @@ -268,23 +261,23 @@ func (m *DendriteMonolith) SetRelayingEnabled(enabled bool) { } func (m *DendriteMonolith) DisconnectType(peertype int) { - for _, p := range m.PineconeRouter.Peers() { + for _, p := range m.p2pMonolith.Router.Peers() { if int(peertype) == p.PeerType { - m.PineconeRouter.Disconnect(types.SwitchPortID(p.Port), nil) + m.p2pMonolith.Router.Disconnect(types.SwitchPortID(p.Port), nil) } } } func (m *DendriteMonolith) DisconnectZone(zone string) { - for _, p := range m.PineconeRouter.Peers() { + for _, p := range m.p2pMonolith.Router.Peers() { if zone == p.Zone { - m.PineconeRouter.Disconnect(types.SwitchPortID(p.Port), nil) + m.p2pMonolith.Router.Disconnect(types.SwitchPortID(p.Port), nil) } } } func (m *DendriteMonolith) DisconnectPort(port int) { - m.PineconeRouter.Disconnect(types.SwitchPortID(port), nil) + m.p2pMonolith.Router.Disconnect(types.SwitchPortID(port), nil) } func (m *DendriteMonolith) Conduit(zone string, peertype int) (*conduit.Conduit, error) { @@ -294,7 +287,7 @@ func (m *DendriteMonolith) Conduit(zone string, peertype int) (*conduit.Conduit, logrus.Errorf("Attempting authenticated connect") var port types.SwitchPortID var err error - if port, err = m.PineconeRouter.Connect( + if port, err = m.p2pMonolith.Router.Connect( l, pineconeRouter.ConnectionZone(zone), pineconeRouter.ConnectionPeerType(peertype), @@ -312,7 +305,7 @@ func (m *DendriteMonolith) Conduit(zone string, peertype int) (*conduit.Conduit, } func (m *DendriteMonolith) RegisterUser(localpart, password string) (string, error) { - pubkey := m.PineconeRouter.PublicKey() + pubkey := m.p2pMonolith.Router.PublicKey() userID := userutil.MakeUserID( localpart, gomatrixserverlib.ServerName(hex.EncodeToString(pubkey[:])), @@ -353,7 +346,7 @@ func (m *DendriteMonolith) RegisterDevice(localpart, deviceID string) (string, e func (m *DendriteMonolith) Start() { keyfile := filepath.Join(m.StorageDirectory, "p2p.pem") oldKeyfile := filepath.Join(m.StorageDirectory, "p2p.key") - sk, pk := keys.GetOrCreateKey(keyfile, oldKeyfile) + sk, pk := monolith.GetOrCreateKey(keyfile, oldKeyfile) var err error m.listener, err = net.Listen("tcp", "localhost:65432") @@ -367,50 +360,20 @@ func (m *DendriteMonolith) Start() { m.logger.SetOutput(BindLogger{}) logrus.SetOutput(BindLogger{}) - pineconeEventChannel := make(chan pineconeEvents.Event) - m.PineconeRouter = pineconeRouter.NewRouter(logrus.WithField("pinecone", "router"), sk) - m.PineconeRouter.EnableHopLimiting() - m.PineconeRouter.EnableWakeupBroadcasts() - m.PineconeRouter.Subscribe(pineconeEventChannel) - - m.PineconeQUIC = pineconeSessions.NewSessions(logrus.WithField("pinecone", "sessions"), m.PineconeRouter, []string{"matrix"}) - m.PineconeMulticast = pineconeMulticast.NewMulticast(logrus.WithField("pinecone", "multicast"), m.PineconeRouter) - m.PineconeManager = pineconeConnections.NewConnectionManager(m.PineconeRouter, nil) + m.p2pMonolith = monolith.P2PMonolith{} + m.p2pMonolith.SetupPinecone(sk) prefix := hex.EncodeToString(pk) - cfg := &config.Dendrite{} - cfg.Defaults(config.DefaultOpts{ - Generate: true, - Monolithic: true, - }) + cfg := monolith.GenerateDefaultConfig(sk, m.StorageDirectory, prefix) cfg.Global.ServerName = gomatrixserverlib.ServerName(hex.EncodeToString(pk)) - cfg.Global.PrivateKey = sk cfg.Global.KeyID = gomatrixserverlib.KeyID(signing.KeyID) cfg.Global.JetStream.InMemory = false - cfg.Global.JetStream.StoragePath = config.Path(filepath.Join(m.CacheDirectory, prefix)) - cfg.UserAPI.AccountDatabase.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-account.db", filepath.Join(m.StorageDirectory, prefix))) - cfg.MediaAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-mediaapi.db", filepath.Join(m.StorageDirectory, prefix))) - cfg.SyncAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-syncapi.db", filepath.Join(m.StorageDirectory, prefix))) - cfg.RoomServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-roomserver.db", filepath.Join(m.StorageDirectory, prefix))) - cfg.KeyServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-keyserver.db", filepath.Join(m.StorageDirectory, prefix))) - cfg.FederationAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-federationsender.db", filepath.Join(m.StorageDirectory, prefix))) - cfg.MediaAPI.BasePath = config.Path(filepath.Join(m.CacheDirectory, "media")) - cfg.MediaAPI.AbsBasePath = config.Path(filepath.Join(m.CacheDirectory, "media")) - cfg.RelayAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-relayapi.db", filepath.Join(m.StorageDirectory, prefix))) - cfg.MSCs.MSCs = []string{"msc2836", "msc2946"} - cfg.ClientAPI.RegistrationDisabled = false - cfg.ClientAPI.OpenRegistrationWithoutVerificationEnabled = true - cfg.SyncAPI.Fulltext.Enabled = true - cfg.SyncAPI.Fulltext.IndexPath = config.Path(filepath.Join(m.CacheDirectory, "search")) - if err = cfg.Derive(); err != nil { - panic(err) - } base := base.NewBaseDendrite(cfg, "Monolith", base.DisableMetrics) m.baseDendrite = base base.ConfigureAdminEndpoints() - federation := conn.CreateFederationClient(base, m.PineconeQUIC) + federation := conn.CreateFederationClient(base, m.p2pMonolith.Sessions) serverKeyAPI := &signing.YggdrasilKeys{} keyRing := serverKeyAPI.KeyRing() @@ -431,8 +394,8 @@ func (m *DendriteMonolith) Start() { // This is different to rsAPI which can be the http client which doesn't need this dependency rsAPI.SetFederationAPI(m.federationAPI, keyRing) - userProvider := users.NewPineconeUserProvider(m.PineconeRouter, m.PineconeQUIC, m.userAPI, federation) - roomProvider := rooms.NewPineconeRoomProvider(m.PineconeRouter, m.PineconeQUIC, m.federationAPI, federation) + userProvider := users.NewPineconeUserProvider(m.p2pMonolith.Router, m.p2pMonolith.Sessions, m.userAPI, federation) + roomProvider := rooms.NewPineconeRoomProvider(m.p2pMonolith.Router, m.p2pMonolith.Sessions, m.federationAPI, federation) js, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream) producer := &producers.SyncAPIProducer{ @@ -450,7 +413,7 @@ func (m *DendriteMonolith) Start() { monolith := setup.Monolith{ Config: base.Cfg, - Client: conn.CreateClient(base, m.PineconeQUIC), + Client: conn.CreateClient(base, m.p2pMonolith.Sessions), FedClient: federation, KeyRing: keyRing, @@ -471,14 +434,14 @@ func (m *DendriteMonolith) Start() { httpRouter.PathPrefix(httputil.PublicMediaPathPrefix).Handler(base.PublicMediaAPIMux) httpRouter.PathPrefix(httputil.DendriteAdminPathPrefix).Handler(base.DendriteAdminMux) httpRouter.PathPrefix(httputil.SynapseAdminPathPrefix).Handler(base.SynapseAdminMux) - httpRouter.HandleFunc("/pinecone", m.PineconeRouter.ManholeHandler) + httpRouter.HandleFunc("/pinecone", m.p2pMonolith.Router.ManholeHandler) pMux := mux.NewRouter().SkipClean(true).UseEncodedPath() pMux.PathPrefix(users.PublicURL).HandlerFunc(userProvider.FederatedUserProfiles) pMux.PathPrefix(httputil.PublicFederationPathPrefix).Handler(base.PublicFederationAPIMux) pMux.PathPrefix(httputil.PublicMediaPathPrefix).Handler(base.PublicMediaAPIMux) - pHTTP := m.PineconeQUIC.Protocol("matrix").HTTP() + pHTTP := m.p2pMonolith.Sessions.Protocol("matrix").HTTP() pHTTP.Mux().Handle(users.PublicURL, pMux) pHTTP.Mux().Handle(httputil.PublicFederationPathPrefix, pMux) pHTTP.Mux().Handle(httputil.PublicMediaPathPrefix, pMux) @@ -500,7 +463,7 @@ func (m *DendriteMonolith) Start() { go func() { m.logger.Info("Listening on ", cfg.Global.ServerName) - switch m.httpServer.Serve(m.PineconeQUIC.Protocol("matrix")) { + switch m.httpServer.Serve(m.p2pMonolith.Sessions.Protocol("matrix")) { case net.ErrClosed, http.ErrServerClosed: m.logger.Info("Stopped listening on ", cfg.Global.ServerName) default: @@ -522,7 +485,7 @@ func (m *DendriteMonolith) Start() { eLog := logrus.WithField("pinecone", "events") m.relayRetriever = relay.NewRelayServerRetriever( context.Background(), - gomatrixserverlib.ServerName(m.PineconeRouter.PublicKey().String()), + gomatrixserverlib.ServerName(m.p2pMonolith.Router.PublicKey().String()), m.federationAPI, monolith.RelayAPI, stopRelayServerSync, @@ -535,7 +498,7 @@ func (m *DendriteMonolith) Start() { case pineconeEvents.PeerAdded: m.relayRetriever.StartSync() case pineconeEvents.PeerRemoved: - if m.relayRetriever.IsRunning() && m.PineconeRouter.TotalPeerCount() == 0 { + if m.relayRetriever.IsRunning() && m.p2pMonolith.Router.TotalPeerCount() == 0 { stopRelayServerSync <- true } case pineconeEvents.BroadcastReceived: @@ -551,14 +514,14 @@ func (m *DendriteMonolith) Start() { default: } } - }(pineconeEventChannel) + }(m.p2pMonolith.EventChannel) } func (m *DendriteMonolith) Stop() { _ = m.baseDendrite.Close() m.baseDendrite.WaitForShutdown() _ = m.listener.Close() - m.PineconeMulticast.Stop() - _ = m.PineconeQUIC.Close() - _ = m.PineconeRouter.Close() + m.p2pMonolith.Multicast.Stop() + _ = m.p2pMonolith.Sessions.Close() + _ = m.p2pMonolith.Router.Close() } diff --git a/cmd/dendrite-demo-pinecone/main.go b/cmd/dendrite-demo-pinecone/main.go index 9ee792e4a..ffb7ace19 100644 --- a/cmd/dendrite-demo-pinecone/main.go +++ b/cmd/dendrite-demo-pinecone/main.go @@ -33,7 +33,7 @@ import ( "github.com/matrix-org/dendrite/appservice" "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/conn" "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/embed" - "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/keys" + "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/monolith" "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/relay" "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/rooms" "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/users" @@ -53,11 +53,8 @@ import ( "github.com/matrix-org/dendrite/userapi" "github.com/matrix-org/gomatrixserverlib" - pineconeConnections "github.com/matrix-org/pinecone/connections" - pineconeMulticast "github.com/matrix-org/pinecone/multicast" pineconeRouter "github.com/matrix-org/pinecone/router" pineconeEvents "github.com/matrix-org/pinecone/router/events" - pineconeSessions "github.com/matrix-org/pinecone/sessions" "github.com/sirupsen/logrus" ) @@ -87,7 +84,7 @@ func main() { } } - cfg := &config.Dendrite{} + var cfg *config.Dendrite // use custom config if config flag is set if configFlagSet { @@ -97,31 +94,8 @@ func main() { } else { keyfile := filepath.Join(*instanceDir, *instanceName) + ".pem" oldKeyfile := *instanceName + ".key" - sk, pk = keys.GetOrCreateKey(keyfile, oldKeyfile) - - cfg.Defaults(config.DefaultOpts{ - Generate: true, - Monolithic: true, - }) - cfg.Global.PrivateKey = sk - cfg.Global.JetStream.StoragePath = config.Path(fmt.Sprintf("%s/", filepath.Join(*instanceDir, *instanceName))) - cfg.UserAPI.AccountDatabase.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-account.db", filepath.Join(*instanceDir, *instanceName))) - cfg.MediaAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-mediaapi.db", filepath.Join(*instanceDir, *instanceName))) - cfg.SyncAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-syncapi.db", filepath.Join(*instanceDir, *instanceName))) - cfg.RoomServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-roomserver.db", filepath.Join(*instanceDir, *instanceName))) - cfg.KeyServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-keyserver.db", filepath.Join(*instanceDir, *instanceName))) - cfg.FederationAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-federationapi.db", filepath.Join(*instanceDir, *instanceName))) - cfg.RelayAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-relayapi.db", filepath.Join(*instanceDir, *instanceName))) - cfg.MSCs.MSCs = []string{"msc2836", "msc2946"} - cfg.MSCs.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-mscs.db", filepath.Join(*instanceDir, *instanceName))) - cfg.ClientAPI.RegistrationDisabled = false - cfg.ClientAPI.OpenRegistrationWithoutVerificationEnabled = true - cfg.MediaAPI.BasePath = config.Path(*instanceDir) - cfg.SyncAPI.Fulltext.Enabled = true - cfg.SyncAPI.Fulltext.IndexPath = config.Path(*instanceDir) - if err := cfg.Derive(); err != nil { - panic(err) - } + sk, pk = monolith.GetOrCreateKey(keyfile, oldKeyfile) + cfg = monolith.GenerateDefaultConfig(sk, *instanceDir, *instanceName) } cfg.Global.ServerName = gomatrixserverlib.ServerName(hex.EncodeToString(pk)) @@ -131,19 +105,13 @@ func main() { base.ConfigureAdminEndpoints() defer base.Close() // nolint: errcheck - pineconeEventChannel := make(chan pineconeEvents.Event) - pRouter := pineconeRouter.NewRouter(logrus.WithField("pinecone", "router"), sk) - pRouter.EnableHopLimiting() - pRouter.EnableWakeupBroadcasts() - pRouter.Subscribe(pineconeEventChannel) + p2pMonolith := monolith.P2PMonolith{} + p2pMonolith.SetupPinecone(sk) + p2pMonolith.Multicast.Start() - pQUIC := pineconeSessions.NewSessions(logrus.WithField("pinecone", "sessions"), pRouter, []string{"matrix"}) - pMulticast := pineconeMulticast.NewMulticast(logrus.WithField("pinecone", "multicast"), pRouter) - pManager := pineconeConnections.NewConnectionManager(pRouter, nil) - pMulticast.Start() if instancePeer != nil && *instancePeer != "" { for _, peer := range strings.Split(*instancePeer, ",") { - pManager.AddPeer(strings.Trim(peer, " \t\r\n")) + p2pMonolith.ConnManager.AddPeer(strings.Trim(peer, " \t\r\n")) } } @@ -162,7 +130,7 @@ func main() { continue } - port, err := pRouter.Connect( + port, err := p2pMonolith.Router.Connect( conn, pineconeRouter.ConnectionPeerType(pineconeRouter.PeerTypeRemote), ) @@ -175,7 +143,8 @@ func main() { } }() - federation := conn.CreateFederationClient(base, pQUIC) + // TODO : factor this dendrite setup out to a common place + federation := conn.CreateFederationClient(base, p2pMonolith.Sessions) serverKeyAPI := &signing.YggdrasilKeys{} keyRing := serverKeyAPI.KeyRing() @@ -194,8 +163,8 @@ func main() { rsComponent.SetFederationAPI(fsAPI, keyRing) - userProvider := users.NewPineconeUserProvider(pRouter, pQUIC, userAPI, federation) - roomProvider := rooms.NewPineconeRoomProvider(pRouter, pQUIC, fsAPI, federation) + userProvider := users.NewPineconeUserProvider(p2pMonolith.Router, p2pMonolith.Sessions, userAPI, federation) + roomProvider := rooms.NewPineconeRoomProvider(p2pMonolith.Router, p2pMonolith.Sessions, fsAPI, federation) js, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream) producer := &producers.SyncAPIProducer{ @@ -212,9 +181,9 @@ func main() { relayAPI := relayapi.NewRelayInternalAPI(base, federation, rsAPI, keyRing, producer, *instanceRelayingEnabled) logrus.Infof("Relaying enabled: %v", relayAPI.RelayingEnabled()) - monolith := setup.Monolith{ + m := setup.Monolith{ Config: base.Cfg, - Client: conn.CreateClient(base, pQUIC), + Client: conn.CreateClient(base, p2pMonolith.Sessions), FedClient: federation, KeyRing: keyRing, @@ -227,7 +196,7 @@ func main() { ExtPublicRoomsProvider: roomProvider, ExtUserDirectoryProvider: userProvider, } - monolith.AddAllPublicRoutes(base) + m.AddAllPublicRoutes(base) wsUpgrader := websocket.Upgrader{ CheckOrigin: func(_ *http.Request) bool { @@ -247,7 +216,7 @@ func main() { return } conn := conn.WrapWebSocketConn(c) - if _, err = pRouter.Connect( + if _, err = p2pMonolith.Router.Connect( conn, pineconeRouter.ConnectionZone("websocket"), pineconeRouter.ConnectionPeerType(pineconeRouter.PeerTypeRemote), @@ -255,7 +224,7 @@ func main() { logrus.WithError(err).Error("Failed to connect WebSocket peer to Pinecone switch") } }) - httpRouter.HandleFunc("/pinecone", pRouter.ManholeHandler) + httpRouter.HandleFunc("/pinecone", p2pMonolith.Router.ManholeHandler) embed.Embed(httpRouter, *instancePort, "Pinecone Demo") pMux := mux.NewRouter().SkipClean(true).UseEncodedPath() @@ -263,7 +232,7 @@ func main() { pMux.PathPrefix(httputil.PublicFederationPathPrefix).Handler(base.PublicFederationAPIMux) pMux.PathPrefix(httputil.PublicMediaPathPrefix).Handler(base.PublicMediaAPIMux) - pHTTP := pQUIC.Protocol("matrix").HTTP() + pHTTP := p2pMonolith.Sessions.Protocol("matrix").HTTP() pHTTP.Mux().Handle(users.PublicURL, pMux) pHTTP.Mux().Handle(httputil.PublicFederationPathPrefix, pMux) pHTTP.Mux().Handle(httputil.PublicMediaPathPrefix, pMux) @@ -281,10 +250,11 @@ func main() { Handler: pMux, } + // TODO : factor these funcs out to a common place go func() { - pubkey := pRouter.PublicKey() + pubkey := p2pMonolith.Router.PublicKey() logrus.Info("Listening on ", hex.EncodeToString(pubkey[:])) - logrus.Fatal(httpServer.Serve(pQUIC.Protocol("matrix"))) + logrus.Fatal(httpServer.Serve(p2pMonolith.Sessions.Protocol("matrix"))) }() go func() { httpBindAddr := fmt.Sprintf(":%d", *instancePort) @@ -296,9 +266,9 @@ func main() { eLog := logrus.WithField("pinecone", "events") relayRetriever := relay.NewRelayServerRetriever( context.Background(), - gomatrixserverlib.ServerName(pRouter.PublicKey().String()), - monolith.FederationAPI, - monolith.RelayAPI, + gomatrixserverlib.ServerName(p2pMonolith.Router.PublicKey().String()), + m.FederationAPI, + m.RelayAPI, stopRelayServerSync, ) relayRetriever.InitializeRelayServers(eLog) @@ -309,7 +279,7 @@ func main() { case pineconeEvents.PeerAdded: relayRetriever.StartSync() case pineconeEvents.PeerRemoved: - if relayRetriever.IsRunning() && pRouter.TotalPeerCount() == 0 { + if relayRetriever.IsRunning() && p2pMonolith.Router.TotalPeerCount() == 0 { stopRelayServerSync <- true } case pineconeEvents.BroadcastReceived: @@ -319,13 +289,13 @@ func main() { ServerNames: []gomatrixserverlib.ServerName{gomatrixserverlib.ServerName(e.PeerID)}, } res := &api.PerformWakeupServersResponse{} - if err := monolith.FederationAPI.PerformWakeupServers(base.Context(), req, res); err != nil { + if err := m.FederationAPI.PerformWakeupServers(base.Context(), req, res); err != nil { eLog.WithError(err).Error("Failed to wakeup destination", e.PeerID) } default: } } - }(pineconeEventChannel) + }(p2pMonolith.EventChannel) base.WaitForShutdown() } diff --git a/cmd/dendrite-demo-pinecone/keys/keys.go b/cmd/dendrite-demo-pinecone/monolith/keys.go similarity index 99% rename from cmd/dendrite-demo-pinecone/keys/keys.go rename to cmd/dendrite-demo-pinecone/monolith/keys.go index db84d8ec9..637f24a43 100644 --- a/cmd/dendrite-demo-pinecone/keys/keys.go +++ b/cmd/dendrite-demo-pinecone/monolith/keys.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package keys +package monolith import ( "crypto/ed25519" diff --git a/cmd/dendrite-demo-pinecone/monolith/monolith.go b/cmd/dendrite-demo-pinecone/monolith/monolith.go new file mode 100644 index 000000000..d3f0562bc --- /dev/null +++ b/cmd/dendrite-demo-pinecone/monolith/monolith.go @@ -0,0 +1,80 @@ +// Copyright 2023 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package monolith + +import ( + "crypto/ed25519" + "fmt" + "path/filepath" + + "github.com/matrix-org/dendrite/setup/config" + "github.com/sirupsen/logrus" + + pineconeConnections "github.com/matrix-org/pinecone/connections" + pineconeMulticast "github.com/matrix-org/pinecone/multicast" + pineconeRouter "github.com/matrix-org/pinecone/router" + pineconeEvents "github.com/matrix-org/pinecone/router/events" + pineconeSessions "github.com/matrix-org/pinecone/sessions" +) + +type P2PMonolith struct { + Sessions *pineconeSessions.Sessions + Multicast *pineconeMulticast.Multicast + ConnManager *pineconeConnections.ConnectionManager + Router *pineconeRouter.Router + EventChannel chan pineconeEvents.Event +} + +func GenerateDefaultConfig(sk ed25519.PrivateKey, storageDir string, dbPrefix string) *config.Dendrite { + cfg := config.Dendrite{} + cfg.Defaults(config.DefaultOpts{ + Generate: true, + Monolithic: true, + }) + cfg.Global.PrivateKey = sk + cfg.Global.JetStream.StoragePath = config.Path(fmt.Sprintf("%s/", filepath.Join(storageDir, dbPrefix))) + cfg.UserAPI.AccountDatabase.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-account.db", filepath.Join(storageDir, dbPrefix))) + cfg.MediaAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-mediaapi.db", filepath.Join(storageDir, dbPrefix))) + cfg.SyncAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-syncapi.db", filepath.Join(storageDir, dbPrefix))) + cfg.RoomServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-roomserver.db", filepath.Join(storageDir, dbPrefix))) + cfg.KeyServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-keyserver.db", filepath.Join(storageDir, dbPrefix))) + cfg.FederationAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-federationsender.db", filepath.Join(storageDir, dbPrefix))) + cfg.RelayAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-relayapi.db", filepath.Join(storageDir, dbPrefix))) + cfg.MSCs.MSCs = []string{"msc2836", "msc2946"} + cfg.MSCs.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-mscs.db", filepath.Join(storageDir, dbPrefix))) + cfg.ClientAPI.RegistrationDisabled = false + cfg.ClientAPI.OpenRegistrationWithoutVerificationEnabled = true + cfg.MediaAPI.BasePath = config.Path(filepath.Join(storageDir, "media")) + cfg.MediaAPI.AbsBasePath = config.Path(filepath.Join(storageDir, "media")) + cfg.SyncAPI.Fulltext.Enabled = true + cfg.SyncAPI.Fulltext.IndexPath = config.Path(filepath.Join(storageDir, "search")) + if err := cfg.Derive(); err != nil { + panic(err) + } + + return &cfg +} + +func (p *P2PMonolith) SetupPinecone(sk ed25519.PrivateKey) { + p.EventChannel = make(chan pineconeEvents.Event) + p.Router = pineconeRouter.NewRouter(logrus.WithField("pinecone", "router"), sk) + p.Router.EnableHopLimiting() + p.Router.EnableWakeupBroadcasts() + p.Router.Subscribe(p.EventChannel) + + p.Sessions = pineconeSessions.NewSessions(logrus.WithField("pinecone", "sessions"), p.Router, []string{"matrix"}) + p.Multicast = pineconeMulticast.NewMulticast(logrus.WithField("pinecone", "multicast"), p.Router) + p.ConnManager = pineconeConnections.NewConnectionManager(p.Router, nil) +}