diff --git a/build/gobind-pinecone/monolith.go b/build/gobind-pinecone/monolith.go index a219621b1..e8b22c067 100644 --- a/build/gobind-pinecone/monolith.go +++ b/build/gobind-pinecone/monolith.go @@ -18,96 +18,63 @@ import ( "context" "crypto/ed25519" "crypto/rand" - "crypto/tls" "encoding/hex" "fmt" - "io" "net" - "net/http" - "os" "path/filepath" "strings" - "sync" - "time" - "go.uber.org/atomic" - - "github.com/gorilla/mux" - "github.com/matrix-org/dendrite/appservice" "github.com/matrix-org/dendrite/clientapi/userutil" - "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/conn" - "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/rooms" - "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/users" + "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/conduit" + "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/monolith" + "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/relay" "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing" - "github.com/matrix-org/dendrite/federationapi" "github.com/matrix-org/dendrite/federationapi/api" - "github.com/matrix-org/dendrite/federationapi/producers" - "github.com/matrix-org/dendrite/internal/httputil" - "github.com/matrix-org/dendrite/keyserver" - "github.com/matrix-org/dendrite/relayapi" - relayServerAPI "github.com/matrix-org/dendrite/relayapi/api" - "github.com/matrix-org/dendrite/roomserver" - "github.com/matrix-org/dendrite/setup" - "github.com/matrix-org/dendrite/setup/base" - "github.com/matrix-org/dendrite/setup/config" - "github.com/matrix-org/dendrite/setup/jetstream" - "github.com/matrix-org/dendrite/test" - "github.com/matrix-org/dendrite/userapi" userapiAPI "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/pinecone/types" "github.com/sirupsen/logrus" - "golang.org/x/net/http2" - "golang.org/x/net/http2/h2c" - pineconeConnections "github.com/matrix-org/pinecone/connections" pineconeMulticast "github.com/matrix-org/pinecone/multicast" pineconeRouter "github.com/matrix-org/pinecone/router" - pineconeEvents "github.com/matrix-org/pinecone/router/events" - pineconeSessions "github.com/matrix-org/pinecone/sessions" - "github.com/matrix-org/pinecone/types" _ "golang.org/x/mobile/bind" ) const ( - PeerTypeRemote = pineconeRouter.PeerTypeRemote - PeerTypeMulticast = pineconeRouter.PeerTypeMulticast - PeerTypeBluetooth = pineconeRouter.PeerTypeBluetooth - PeerTypeBonjour = pineconeRouter.PeerTypeBonjour - relayServerRetryInterval = time.Second * 30 + PeerTypeRemote = pineconeRouter.PeerTypeRemote + PeerTypeMulticast = pineconeRouter.PeerTypeMulticast + PeerTypeBluetooth = pineconeRouter.PeerTypeBluetooth + PeerTypeBonjour = pineconeRouter.PeerTypeBonjour ) +// Re-export Conduit in this package for bindings. +type Conduit struct { + conduit.Conduit +} + type DendriteMonolith struct { - logger logrus.Logger - baseDendrite *base.BaseDendrite - PineconeRouter *pineconeRouter.Router - PineconeMulticast *pineconeMulticast.Multicast - PineconeQUIC *pineconeSessions.Sessions - PineconeManager *pineconeConnections.ConnectionManager - StorageDirectory string - CacheDirectory string - listener net.Listener - httpServer *http.Server - userAPI userapiAPI.UserInternalAPI - federationAPI api.FederationInternalAPI - relayAPI relayServerAPI.RelayInternalAPI - relayRetriever RelayServerRetriever + logger logrus.Logger + p2pMonolith monolith.P2PMonolith + StorageDirectory string + CacheDirectory string + listener net.Listener } func (m *DendriteMonolith) PublicKey() string { - return m.PineconeRouter.PublicKey().String() + return m.p2pMonolith.Router.PublicKey().String() } func (m *DendriteMonolith) BaseURL() string { - return fmt.Sprintf("http://%s", m.listener.Addr().String()) + return fmt.Sprintf("http://%s", m.p2pMonolith.Addr()) } func (m *DendriteMonolith) PeerCount(peertype int) int { - return m.PineconeRouter.PeerCount(peertype) + return m.p2pMonolith.Router.PeerCount(peertype) } func (m *DendriteMonolith) SessionCount() int { - return len(m.PineconeQUIC.Protocol("matrix").Sessions()) + return len(m.p2pMonolith.Sessions.Protocol(monolith.SessionProtocol).Sessions()) } type InterfaceInfo struct { @@ -149,22 +116,22 @@ func (m *DendriteMonolith) RegisterNetworkCallback(intfCallback InterfaceRetriev } return intfs } - m.PineconeMulticast.RegisterNetworkCallback(callback) + m.p2pMonolith.Multicast.RegisterNetworkCallback(callback) } func (m *DendriteMonolith) SetMulticastEnabled(enabled bool) { if enabled { - m.PineconeMulticast.Start() + m.p2pMonolith.Multicast.Start() } else { - m.PineconeMulticast.Stop() + m.p2pMonolith.Multicast.Stop() m.DisconnectType(int(pineconeRouter.PeerTypeMulticast)) } } func (m *DendriteMonolith) SetStaticPeer(uri string) { - m.PineconeManager.RemovePeers() + m.p2pMonolith.ConnManager.RemovePeers() for _, uri := range strings.Split(uri, ",") { - m.PineconeManager.AddPeer(strings.TrimSpace(uri)) + m.p2pMonolith.ConnManager.AddPeer(strings.TrimSpace(uri)) } } @@ -189,57 +156,6 @@ func getServerKeyFromString(nodeID string) (gomatrixserverlib.ServerName, error) return nodeKey, nil } -func updateNodeRelayServers( - node gomatrixserverlib.ServerName, - relays []gomatrixserverlib.ServerName, - ctx context.Context, - fedAPI api.FederationInternalAPI, -) { - // Get the current relay list - request := api.P2PQueryRelayServersRequest{Server: node} - response := api.P2PQueryRelayServersResponse{} - err := fedAPI.P2PQueryRelayServers(ctx, &request, &response) - if err != nil { - logrus.Warnf("Failed obtaining list of relay servers for %s: %s", node, err.Error()) - } - - // Remove old, non-matching relays - var serversToRemove []gomatrixserverlib.ServerName - for _, existingServer := range response.RelayServers { - shouldRemove := true - for _, newServer := range relays { - if newServer == existingServer { - shouldRemove = false - break - } - } - - if shouldRemove { - serversToRemove = append(serversToRemove, existingServer) - } - } - removeRequest := api.P2PRemoveRelayServersRequest{ - Server: node, - RelayServers: serversToRemove, - } - removeResponse := api.P2PRemoveRelayServersResponse{} - err = fedAPI.P2PRemoveRelayServers(ctx, &removeRequest, &removeResponse) - if err != nil { - logrus.Warnf("Failed removing old relay servers for %s: %s", node, err.Error()) - } - - // Add new relays - addRequest := api.P2PAddRelayServersRequest{ - Server: node, - RelayServers: relays, - } - addResponse := api.P2PAddRelayServersResponse{} - err = fedAPI.P2PAddRelayServers(ctx, &addRequest, &addResponse) - if err != nil { - logrus.Warnf("Failed adding relay servers for %s: %s", node, err.Error()) - } -} - func (m *DendriteMonolith) SetRelayServers(nodeID string, uris string) { relays := []gomatrixserverlib.ServerName{} for _, uri := range strings.Split(uris, ",") { @@ -264,13 +180,13 @@ func (m *DendriteMonolith) SetRelayServers(nodeID string, uris string) { if string(nodeKey) == m.PublicKey() { logrus.Infof("Setting own relay servers to: %v", relays) - m.relayRetriever.SetRelayServers(relays) + m.p2pMonolith.RelayRetriever.SetRelayServers(relays) } else { - updateNodeRelayServers( + relay.UpdateNodeRelayServers( gomatrixserverlib.ServerName(nodeKey), relays, - m.baseDendrite.Context(), - m.federationAPI, + m.p2pMonolith.BaseDendrite.Context(), + m.p2pMonolith.GetFederationAPI(), ) } } @@ -284,7 +200,7 @@ func (m *DendriteMonolith) GetRelayServers(nodeID string) string { relaysString := "" if string(nodeKey) == m.PublicKey() { - relays := m.relayRetriever.GetRelayServers() + relays := m.p2pMonolith.RelayRetriever.GetRelayServers() for i, relay := range relays { if i != 0 { @@ -296,7 +212,7 @@ func (m *DendriteMonolith) GetRelayServers(nodeID string) string { } else { request := api.P2PQueryRelayServersRequest{Server: gomatrixserverlib.ServerName(nodeKey)} response := api.P2PQueryRelayServersResponse{} - err := m.federationAPI.P2PQueryRelayServers(m.baseDendrite.Context(), &request, &response) + err := m.p2pMonolith.GetFederationAPI().P2PQueryRelayServers(m.p2pMonolith.BaseDendrite.Context(), &request, &response) if err != nil { logrus.Warnf("Failed obtaining list of this node's relay servers: %s", err.Error()) return "" @@ -315,43 +231,41 @@ func (m *DendriteMonolith) GetRelayServers(nodeID string) string { } func (m *DendriteMonolith) RelayingEnabled() bool { - return m.relayAPI.RelayingEnabled() + return m.p2pMonolith.GetRelayAPI().RelayingEnabled() } func (m *DendriteMonolith) SetRelayingEnabled(enabled bool) { - m.relayAPI.SetRelayingEnabled(enabled) + m.p2pMonolith.GetRelayAPI().SetRelayingEnabled(enabled) } func (m *DendriteMonolith) DisconnectType(peertype int) { - for _, p := range m.PineconeRouter.Peers() { + for _, p := range m.p2pMonolith.Router.Peers() { if int(peertype) == p.PeerType { - m.PineconeRouter.Disconnect(types.SwitchPortID(p.Port), nil) + m.p2pMonolith.Router.Disconnect(types.SwitchPortID(p.Port), nil) } } } func (m *DendriteMonolith) DisconnectZone(zone string) { - for _, p := range m.PineconeRouter.Peers() { + for _, p := range m.p2pMonolith.Router.Peers() { if zone == p.Zone { - m.PineconeRouter.Disconnect(types.SwitchPortID(p.Port), nil) + m.p2pMonolith.Router.Disconnect(types.SwitchPortID(p.Port), nil) } } } func (m *DendriteMonolith) DisconnectPort(port int) { - m.PineconeRouter.Disconnect(types.SwitchPortID(port), nil) + m.p2pMonolith.Router.Disconnect(types.SwitchPortID(port), nil) } func (m *DendriteMonolith) Conduit(zone string, peertype int) (*Conduit, error) { l, r := net.Pipe() - conduit := &Conduit{conn: r, port: 0} + newConduit := Conduit{conduit.NewConduit(r, 0)} go func() { - conduit.portMutex.Lock() - defer conduit.portMutex.Unlock() - logrus.Errorf("Attempting authenticated connect") + var port types.SwitchPortID var err error - if conduit.port, err = m.PineconeRouter.Connect( + if port, err = m.p2pMonolith.Router.Connect( l, pineconeRouter.ConnectionZone(zone), pineconeRouter.ConnectionPeerType(peertype), @@ -359,16 +273,17 @@ func (m *DendriteMonolith) Conduit(zone string, peertype int) (*Conduit, error) logrus.Errorf("Authenticated connect failed: %s", err) _ = l.Close() _ = r.Close() - _ = conduit.Close() + _ = newConduit.Close() return } - logrus.Infof("Authenticated connect succeeded (port %d)", conduit.port) + newConduit.SetPort(port) + logrus.Infof("Authenticated connect succeeded (port %d)", newConduit.Port()) }() - return conduit, nil + return &newConduit, nil } func (m *DendriteMonolith) RegisterUser(localpart, password string) (string, error) { - pubkey := m.PineconeRouter.PublicKey() + pubkey := m.p2pMonolith.Router.PublicKey() userID := userutil.MakeUserID( localpart, gomatrixserverlib.ServerName(hex.EncodeToString(pubkey[:])), @@ -379,7 +294,7 @@ func (m *DendriteMonolith) RegisterUser(localpart, password string) (string, err Password: password, } userRes := &userapiAPI.PerformAccountCreationResponse{} - if err := m.userAPI.PerformAccountCreation(context.Background(), userReq, userRes); err != nil { + if err := m.p2pMonolith.GetUserAPI().PerformAccountCreation(context.Background(), userReq, userRes); err != nil { return userID, fmt.Errorf("userAPI.PerformAccountCreation: %w", err) } return userID, nil @@ -397,7 +312,7 @@ func (m *DendriteMonolith) RegisterDevice(localpart, deviceID string) (string, e AccessToken: hex.EncodeToString(accessTokenBytes[:n]), } loginRes := &userapiAPI.PerformDeviceCreationResponse{} - if err := m.userAPI.PerformDeviceCreation(context.Background(), loginReq, loginRes); err != nil { + if err := m.p2pMonolith.GetUserAPI().PerformDeviceCreation(context.Background(), loginReq, loginRes); err != nil { return "", fmt.Errorf("userAPI.PerformDeviceCreation: %w", err) } if !loginRes.DeviceCreated { @@ -406,51 +321,10 @@ func (m *DendriteMonolith) RegisterDevice(localpart, deviceID string) (string, e return loginRes.Device.AccessToken, nil } -// nolint:gocyclo func (m *DendriteMonolith) Start() { - var sk ed25519.PrivateKey - var pk ed25519.PublicKey - keyfile := filepath.Join(m.StorageDirectory, "p2p.pem") - if _, err := os.Stat(keyfile); os.IsNotExist(err) { - oldkeyfile := filepath.Join(m.StorageDirectory, "p2p.key") - if _, err = os.Stat(oldkeyfile); os.IsNotExist(err) { - if err = test.NewMatrixKey(keyfile); err != nil { - panic("failed to generate a new PEM key: " + err.Error()) - } - if _, sk, err = config.LoadMatrixKey(keyfile, os.ReadFile); err != nil { - panic("failed to load PEM key: " + err.Error()) - } - if len(sk) != ed25519.PrivateKeySize { - panic("the private key is not long enough") - } - } else { - if sk, err = os.ReadFile(oldkeyfile); err != nil { - panic("failed to read the old private key: " + err.Error()) - } - if len(sk) != ed25519.PrivateKeySize { - panic("the private key is not long enough") - } - if err = test.SaveMatrixKey(keyfile, sk); err != nil { - panic("failed to convert the private key to PEM format: " + err.Error()) - } - } - } else { - if _, sk, err = config.LoadMatrixKey(keyfile, os.ReadFile); err != nil { - panic("failed to load PEM key: " + err.Error()) - } - if len(sk) != ed25519.PrivateKeySize { - panic("the private key is not long enough") - } - } - - pk = sk.Public().(ed25519.PublicKey) - - var err error - m.listener, err = net.Listen("tcp", "localhost:65432") - if err != nil { - panic(err) - } + oldKeyfile := filepath.Join(m.StorageDirectory, "p2p.key") + sk, pk := monolith.GetOrCreateKey(keyfile, oldKeyfile) m.logger = logrus.Logger{ Out: BindLogger{}, @@ -458,384 +332,22 @@ func (m *DendriteMonolith) Start() { m.logger.SetOutput(BindLogger{}) logrus.SetOutput(BindLogger{}) - pineconeEventChannel := make(chan pineconeEvents.Event) - m.PineconeRouter = pineconeRouter.NewRouter(logrus.WithField("pinecone", "router"), sk) - m.PineconeRouter.EnableHopLimiting() - m.PineconeRouter.EnableWakeupBroadcasts() - m.PineconeRouter.Subscribe(pineconeEventChannel) - - m.PineconeQUIC = pineconeSessions.NewSessions(logrus.WithField("pinecone", "sessions"), m.PineconeRouter, []string{"matrix"}) - m.PineconeMulticast = pineconeMulticast.NewMulticast(logrus.WithField("pinecone", "multicast"), m.PineconeRouter) - m.PineconeManager = pineconeConnections.NewConnectionManager(m.PineconeRouter, nil) + m.p2pMonolith = monolith.P2PMonolith{} + m.p2pMonolith.SetupPinecone(sk) prefix := hex.EncodeToString(pk) - cfg := &config.Dendrite{} - cfg.Defaults(config.DefaultOpts{ - Generate: true, - Monolithic: true, - }) + cfg := monolith.GenerateDefaultConfig(sk, m.StorageDirectory, m.CacheDirectory, prefix) cfg.Global.ServerName = gomatrixserverlib.ServerName(hex.EncodeToString(pk)) - cfg.Global.PrivateKey = sk cfg.Global.KeyID = gomatrixserverlib.KeyID(signing.KeyID) cfg.Global.JetStream.InMemory = false - cfg.Global.JetStream.StoragePath = config.Path(filepath.Join(m.CacheDirectory, prefix)) - cfg.UserAPI.AccountDatabase.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-account.db", filepath.Join(m.StorageDirectory, prefix))) - cfg.MediaAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-mediaapi.db", filepath.Join(m.StorageDirectory, prefix))) - cfg.SyncAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-syncapi.db", filepath.Join(m.StorageDirectory, prefix))) - cfg.RoomServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-roomserver.db", filepath.Join(m.StorageDirectory, prefix))) - cfg.KeyServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-keyserver.db", filepath.Join(m.StorageDirectory, prefix))) - cfg.FederationAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-federationsender.db", filepath.Join(m.StorageDirectory, prefix))) - cfg.MediaAPI.BasePath = config.Path(filepath.Join(m.CacheDirectory, "media")) - cfg.MediaAPI.AbsBasePath = config.Path(filepath.Join(m.CacheDirectory, "media")) - cfg.RelayAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-relayapi.db", filepath.Join(m.StorageDirectory, prefix))) - cfg.MSCs.MSCs = []string{"msc2836", "msc2946"} - cfg.ClientAPI.RegistrationDisabled = false - cfg.ClientAPI.OpenRegistrationWithoutVerificationEnabled = true - cfg.SyncAPI.Fulltext.Enabled = true - cfg.SyncAPI.Fulltext.IndexPath = config.Path(filepath.Join(m.CacheDirectory, "search")) - if err = cfg.Derive(); err != nil { - panic(err) - } - base := base.NewBaseDendrite(cfg, "Monolith", base.DisableMetrics) - m.baseDendrite = base - base.ConfigureAdminEndpoints() - - federation := conn.CreateFederationClient(base, m.PineconeQUIC) - - serverKeyAPI := &signing.YggdrasilKeys{} - keyRing := serverKeyAPI.KeyRing() - - rsAPI := roomserver.NewInternalAPI(base) - - m.federationAPI = federationapi.NewInternalAPI( - base, federation, rsAPI, base.Caches, keyRing, true, - ) - - keyAPI := keyserver.NewInternalAPI(base, &base.Cfg.KeyServer, m.federationAPI, rsAPI) - m.userAPI = userapi.NewInternalAPI(base, &cfg.UserAPI, cfg.Derived.ApplicationServices, keyAPI, rsAPI, base.PushGatewayHTTPClient()) - keyAPI.SetUserAPI(m.userAPI) - - asAPI := appservice.NewInternalAPI(base, m.userAPI, rsAPI) - - // The underlying roomserver implementation needs to be able to call the fedsender. - // This is different to rsAPI which can be the http client which doesn't need this dependency - rsAPI.SetFederationAPI(m.federationAPI, keyRing) - - userProvider := users.NewPineconeUserProvider(m.PineconeRouter, m.PineconeQUIC, m.userAPI, federation) - roomProvider := rooms.NewPineconeRoomProvider(m.PineconeRouter, m.PineconeQUIC, m.federationAPI, federation) - - js, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream) - producer := &producers.SyncAPIProducer{ - JetStream: js, - TopicReceiptEvent: base.Cfg.Global.JetStream.Prefixed(jetstream.OutputReceiptEvent), - TopicSendToDeviceEvent: base.Cfg.Global.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent), - TopicTypingEvent: base.Cfg.Global.JetStream.Prefixed(jetstream.OutputTypingEvent), - TopicPresenceEvent: base.Cfg.Global.JetStream.Prefixed(jetstream.OutputPresenceEvent), - TopicDeviceListUpdate: base.Cfg.Global.JetStream.Prefixed(jetstream.InputDeviceListUpdate), - TopicSigningKeyUpdate: base.Cfg.Global.JetStream.Prefixed(jetstream.InputSigningKeyUpdate), - Config: &base.Cfg.FederationAPI, - UserAPI: m.userAPI, - } - m.relayAPI = relayapi.NewRelayInternalAPI(base, federation, rsAPI, keyRing, producer, false) - - monolith := setup.Monolith{ - Config: base.Cfg, - Client: conn.CreateClient(base, m.PineconeQUIC), - FedClient: federation, - KeyRing: keyRing, - - AppserviceAPI: asAPI, - FederationAPI: m.federationAPI, - RoomserverAPI: rsAPI, - UserAPI: m.userAPI, - KeyAPI: keyAPI, - RelayAPI: m.relayAPI, - ExtPublicRoomsProvider: roomProvider, - ExtUserDirectoryProvider: userProvider, - } - monolith.AddAllPublicRoutes(base) - - httpRouter := mux.NewRouter().SkipClean(true).UseEncodedPath() - httpRouter.PathPrefix(httputil.InternalPathPrefix).Handler(base.InternalAPIMux) - httpRouter.PathPrefix(httputil.PublicClientPathPrefix).Handler(base.PublicClientAPIMux) - httpRouter.PathPrefix(httputil.PublicMediaPathPrefix).Handler(base.PublicMediaAPIMux) - httpRouter.PathPrefix(httputil.DendriteAdminPathPrefix).Handler(base.DendriteAdminMux) - httpRouter.PathPrefix(httputil.SynapseAdminPathPrefix).Handler(base.SynapseAdminMux) - httpRouter.HandleFunc("/pinecone", m.PineconeRouter.ManholeHandler) - - pMux := mux.NewRouter().SkipClean(true).UseEncodedPath() - pMux.PathPrefix(users.PublicURL).HandlerFunc(userProvider.FederatedUserProfiles) - pMux.PathPrefix(httputil.PublicFederationPathPrefix).Handler(base.PublicFederationAPIMux) - pMux.PathPrefix(httputil.PublicMediaPathPrefix).Handler(base.PublicMediaAPIMux) - - pHTTP := m.PineconeQUIC.Protocol("matrix").HTTP() - pHTTP.Mux().Handle(users.PublicURL, pMux) - pHTTP.Mux().Handle(httputil.PublicFederationPathPrefix, pMux) - pHTTP.Mux().Handle(httputil.PublicMediaPathPrefix, pMux) - - // Build both ends of a HTTP multiplex. - h2s := &http2.Server{} - m.httpServer = &http.Server{ - Addr: ":0", - TLSNextProto: map[string]func(*http.Server, *tls.Conn, http.Handler){}, - ReadTimeout: 10 * time.Second, - WriteTimeout: 10 * time.Second, - IdleTimeout: 30 * time.Second, - BaseContext: func(_ net.Listener) context.Context { - return context.Background() - }, - Handler: h2c.NewHandler(pMux, h2s), - } - - go func() { - m.logger.Info("Listening on ", cfg.Global.ServerName) - - switch m.httpServer.Serve(m.PineconeQUIC.Protocol("matrix")) { - case net.ErrClosed, http.ErrServerClosed: - m.logger.Info("Stopped listening on ", cfg.Global.ServerName) - default: - m.logger.Error("Stopped listening on ", cfg.Global.ServerName) - } - }() - go func() { - logrus.Info("Listening on ", m.listener.Addr()) - - switch http.Serve(m.listener, httpRouter) { - case net.ErrClosed, http.ErrServerClosed: - m.logger.Info("Stopped listening on ", cfg.Global.ServerName) - default: - m.logger.Error("Stopped listening on ", cfg.Global.ServerName) - } - }() - - stopRelayServerSync := make(chan bool) - - eLog := logrus.WithField("pinecone", "events") - m.relayRetriever = RelayServerRetriever{ - Context: context.Background(), - ServerName: gomatrixserverlib.ServerName(m.PineconeRouter.PublicKey().String()), - FederationAPI: m.federationAPI, - relayServersQueried: make(map[gomatrixserverlib.ServerName]bool), - RelayAPI: monolith.RelayAPI, - running: *atomic.NewBool(false), - quit: stopRelayServerSync, - } - m.relayRetriever.InitializeRelayServers(eLog) - - go func(ch <-chan pineconeEvents.Event) { - - for event := range ch { - switch e := event.(type) { - case pineconeEvents.PeerAdded: - m.relayRetriever.StartSync() - case pineconeEvents.PeerRemoved: - if m.relayRetriever.running.Load() && m.PineconeRouter.TotalPeerCount() == 0 { - stopRelayServerSync <- true - } - case pineconeEvents.BroadcastReceived: - // eLog.Info("Broadcast received from: ", e.PeerID) - - req := &api.PerformWakeupServersRequest{ - ServerNames: []gomatrixserverlib.ServerName{gomatrixserverlib.ServerName(e.PeerID)}, - } - res := &api.PerformWakeupServersResponse{} - if err := m.federationAPI.PerformWakeupServers(base.Context(), req, res); err != nil { - eLog.WithError(err).Error("Failed to wakeup destination", e.PeerID) - } - default: - } - } - }(pineconeEventChannel) + enableRelaying := false + enableMetrics := false + enableWebsockets := false + m.p2pMonolith.SetupDendrite(cfg, 65432, enableRelaying, enableMetrics, enableWebsockets) + m.p2pMonolith.StartMonolith() } func (m *DendriteMonolith) Stop() { - _ = m.baseDendrite.Close() - m.baseDendrite.WaitForShutdown() - _ = m.listener.Close() - m.PineconeMulticast.Stop() - _ = m.PineconeQUIC.Close() - _ = m.PineconeRouter.Close() -} - -type RelayServerRetriever struct { - Context context.Context - ServerName gomatrixserverlib.ServerName - FederationAPI api.FederationInternalAPI - RelayAPI relayServerAPI.RelayInternalAPI - relayServersQueried map[gomatrixserverlib.ServerName]bool - queriedServersMutex sync.Mutex - running atomic.Bool - quit <-chan bool -} - -func (r *RelayServerRetriever) InitializeRelayServers(eLog *logrus.Entry) { - request := api.P2PQueryRelayServersRequest{Server: gomatrixserverlib.ServerName(r.ServerName)} - response := api.P2PQueryRelayServersResponse{} - err := r.FederationAPI.P2PQueryRelayServers(r.Context, &request, &response) - if err != nil { - eLog.Warnf("Failed obtaining list of this node's relay servers: %s", err.Error()) - } - - r.queriedServersMutex.Lock() - defer r.queriedServersMutex.Unlock() - for _, server := range response.RelayServers { - r.relayServersQueried[server] = false - } - - eLog.Infof("Registered relay servers: %v", response.RelayServers) -} - -func (r *RelayServerRetriever) SetRelayServers(servers []gomatrixserverlib.ServerName) { - updateNodeRelayServers(r.ServerName, servers, r.Context, r.FederationAPI) - - // Replace list of servers to sync with and mark them all as unsynced. - r.queriedServersMutex.Lock() - defer r.queriedServersMutex.Unlock() - r.relayServersQueried = make(map[gomatrixserverlib.ServerName]bool) - for _, server := range servers { - r.relayServersQueried[server] = false - } - - r.StartSync() -} - -func (r *RelayServerRetriever) GetRelayServers() []gomatrixserverlib.ServerName { - r.queriedServersMutex.Lock() - defer r.queriedServersMutex.Unlock() - relayServers := []gomatrixserverlib.ServerName{} - for server := range r.relayServersQueried { - relayServers = append(relayServers, server) - } - - return relayServers -} - -func (r *RelayServerRetriever) StartSync() { - if !r.running.Load() { - logrus.Info("Starting relay server sync") - go r.SyncRelayServers(r.quit) - } -} - -func (r *RelayServerRetriever) SyncRelayServers(stop <-chan bool) { - defer r.running.Store(false) - - t := time.NewTimer(relayServerRetryInterval) - for { - relayServersToQuery := []gomatrixserverlib.ServerName{} - func() { - r.queriedServersMutex.Lock() - defer r.queriedServersMutex.Unlock() - for server, complete := range r.relayServersQueried { - if !complete { - relayServersToQuery = append(relayServersToQuery, server) - } - } - }() - if len(relayServersToQuery) == 0 { - // All relay servers have been synced. - logrus.Info("Finished syncing with all known relays") - return - } - r.queryRelayServers(relayServersToQuery) - t.Reset(relayServerRetryInterval) - - select { - case <-stop: - if !t.Stop() { - <-t.C - } - return - case <-t.C: - } - } -} - -func (r *RelayServerRetriever) GetQueriedServerStatus() map[gomatrixserverlib.ServerName]bool { - r.queriedServersMutex.Lock() - defer r.queriedServersMutex.Unlock() - - result := map[gomatrixserverlib.ServerName]bool{} - for server, queried := range r.relayServersQueried { - result[server] = queried - } - return result -} - -func (r *RelayServerRetriever) queryRelayServers(relayServers []gomatrixserverlib.ServerName) { - logrus.Info("Querying relay servers for any available transactions") - for _, server := range relayServers { - userID, err := gomatrixserverlib.NewUserID("@user:"+string(r.ServerName), false) - if err != nil { - return - } - - logrus.Infof("Syncing with relay: %s", string(server)) - err = r.RelayAPI.PerformRelayServerSync(context.Background(), *userID, server) - if err == nil { - func() { - r.queriedServersMutex.Lock() - defer r.queriedServersMutex.Unlock() - r.relayServersQueried[server] = true - }() - // TODO : What happens if your relay receives new messages after this point? - // Should you continue to check with them, or should they try and contact you? - // They could send a "new_async_events" message your way maybe? - // Then you could mark them as needing to be queried again. - // What if you miss this message? - // Maybe you should try querying them again after a certain period of time as a backup? - } else { - logrus.Errorf("Failed querying relay server: %s", err.Error()) - } - } -} - -const MaxFrameSize = types.MaxFrameSize - -type Conduit struct { - closed atomic.Bool - conn net.Conn - port types.SwitchPortID - portMutex sync.Mutex -} - -func (c *Conduit) Port() int { - c.portMutex.Lock() - defer c.portMutex.Unlock() - return int(c.port) -} - -func (c *Conduit) Read(b []byte) (int, error) { - if c.closed.Load() { - return 0, io.EOF - } - return c.conn.Read(b) -} - -func (c *Conduit) ReadCopy() ([]byte, error) { - if c.closed.Load() { - return nil, io.EOF - } - var buf [65535 * 2]byte - n, err := c.conn.Read(buf[:]) - if err != nil { - return nil, err - } - return buf[:n], nil -} - -func (c *Conduit) Write(b []byte) (int, error) { - if c.closed.Load() { - return 0, io.EOF - } - return c.conn.Write(b) -} - -func (c *Conduit) Close() error { - if c.closed.Load() { - return io.ErrClosedPipe - } - c.closed.Store(true) - return c.conn.Close() + m.p2pMonolith.Stop() } diff --git a/build/gobind-pinecone/monolith_test.go b/build/gobind-pinecone/monolith_test.go index 3c8873e09..434e07ef2 100644 --- a/build/gobind-pinecone/monolith_test.go +++ b/build/gobind-pinecone/monolith_test.go @@ -15,182 +15,12 @@ package gobind import ( - "context" - "fmt" - "net" "strings" "testing" - "time" - "github.com/matrix-org/dendrite/federationapi/api" - relayServerAPI "github.com/matrix-org/dendrite/relayapi/api" "github.com/matrix-org/gomatrixserverlib" - "github.com/sirupsen/logrus" - "github.com/stretchr/testify/assert" - "gotest.tools/v3/poll" ) -var TestBuf = []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9} - -type TestNetConn struct { - net.Conn - shouldFail bool -} - -func (t *TestNetConn) Read(b []byte) (int, error) { - if t.shouldFail { - return 0, fmt.Errorf("Failed") - } else { - n := copy(b, TestBuf) - return n, nil - } -} - -func (t *TestNetConn) Write(b []byte) (int, error) { - if t.shouldFail { - return 0, fmt.Errorf("Failed") - } else { - return len(b), nil - } -} - -func (t *TestNetConn) Close() error { - if t.shouldFail { - return fmt.Errorf("Failed") - } else { - return nil - } -} - -func TestConduitStoresPort(t *testing.T) { - conduit := Conduit{port: 7} - assert.Equal(t, 7, conduit.Port()) -} - -func TestConduitRead(t *testing.T) { - conduit := Conduit{conn: &TestNetConn{}} - b := make([]byte, len(TestBuf)) - bytes, err := conduit.Read(b) - assert.NoError(t, err) - assert.Equal(t, len(TestBuf), bytes) - assert.Equal(t, TestBuf, b) -} - -func TestConduitReadCopy(t *testing.T) { - conduit := Conduit{conn: &TestNetConn{}} - result, err := conduit.ReadCopy() - assert.NoError(t, err) - assert.Equal(t, TestBuf, result) -} - -func TestConduitWrite(t *testing.T) { - conduit := Conduit{conn: &TestNetConn{}} - bytes, err := conduit.Write(TestBuf) - assert.NoError(t, err) - assert.Equal(t, len(TestBuf), bytes) -} - -func TestConduitClose(t *testing.T) { - conduit := Conduit{conn: &TestNetConn{}} - err := conduit.Close() - assert.NoError(t, err) - assert.True(t, conduit.closed.Load()) -} - -func TestConduitReadClosed(t *testing.T) { - conduit := Conduit{conn: &TestNetConn{}} - err := conduit.Close() - assert.NoError(t, err) - b := make([]byte, len(TestBuf)) - _, err = conduit.Read(b) - assert.Error(t, err) -} - -func TestConduitReadCopyClosed(t *testing.T) { - conduit := Conduit{conn: &TestNetConn{}} - err := conduit.Close() - assert.NoError(t, err) - _, err = conduit.ReadCopy() - assert.Error(t, err) -} - -func TestConduitWriteClosed(t *testing.T) { - conduit := Conduit{conn: &TestNetConn{}} - err := conduit.Close() - assert.NoError(t, err) - _, err = conduit.Write(TestBuf) - assert.Error(t, err) -} - -func TestConduitReadCopyFails(t *testing.T) { - conduit := Conduit{conn: &TestNetConn{shouldFail: true}} - _, err := conduit.ReadCopy() - assert.Error(t, err) -} - -var testRelayServers = []gomatrixserverlib.ServerName{"relay1", "relay2"} - -type FakeFedAPI struct { - api.FederationInternalAPI -} - -func (f *FakeFedAPI) P2PQueryRelayServers(ctx context.Context, req *api.P2PQueryRelayServersRequest, res *api.P2PQueryRelayServersResponse) error { - res.RelayServers = testRelayServers - return nil -} - -type FakeRelayAPI struct { - relayServerAPI.RelayInternalAPI -} - -func (r *FakeRelayAPI) PerformRelayServerSync(ctx context.Context, userID gomatrixserverlib.UserID, relayServer gomatrixserverlib.ServerName) error { - return nil -} - -func TestRelayRetrieverInitialization(t *testing.T) { - retriever := RelayServerRetriever{ - Context: context.Background(), - ServerName: "server", - relayServersQueried: make(map[gomatrixserverlib.ServerName]bool), - FederationAPI: &FakeFedAPI{}, - RelayAPI: &FakeRelayAPI{}, - } - - retriever.InitializeRelayServers(logrus.WithField("test", "relay")) - relayServers := retriever.GetQueriedServerStatus() - assert.Equal(t, 2, len(relayServers)) -} - -func TestRelayRetrieverSync(t *testing.T) { - retriever := RelayServerRetriever{ - Context: context.Background(), - ServerName: "server", - relayServersQueried: make(map[gomatrixserverlib.ServerName]bool), - FederationAPI: &FakeFedAPI{}, - RelayAPI: &FakeRelayAPI{}, - } - - retriever.InitializeRelayServers(logrus.WithField("test", "relay")) - relayServers := retriever.GetQueriedServerStatus() - assert.Equal(t, 2, len(relayServers)) - - stopRelayServerSync := make(chan bool) - go retriever.SyncRelayServers(stopRelayServerSync) - - check := func(log poll.LogT) poll.Result { - relayServers := retriever.GetQueriedServerStatus() - for _, queried := range relayServers { - if !queried { - return poll.Continue("waiting for all servers to be queried") - } - } - - stopRelayServerSync <- true - return poll.Success() - } - poll.WaitOn(t, check, poll.WithTimeout(5*time.Second), poll.WithDelay(100*time.Millisecond)) -} - func TestMonolithStarts(t *testing.T) { monolith := DendriteMonolith{} monolith.Start() diff --git a/cmd/dendrite-demo-pinecone/conduit/conduit.go b/cmd/dendrite-demo-pinecone/conduit/conduit.go new file mode 100644 index 000000000..be139c19c --- /dev/null +++ b/cmd/dendrite-demo-pinecone/conduit/conduit.go @@ -0,0 +1,84 @@ +// Copyright 2023 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package conduit + +import ( + "io" + "net" + "sync" + + "github.com/matrix-org/pinecone/types" + "go.uber.org/atomic" +) + +type Conduit struct { + closed atomic.Bool + conn net.Conn + portMutex sync.Mutex + port types.SwitchPortID +} + +func NewConduit(conn net.Conn, port int) Conduit { + return Conduit{ + conn: conn, + port: types.SwitchPortID(port), + } +} + +func (c *Conduit) Port() int { + c.portMutex.Lock() + defer c.portMutex.Unlock() + return int(c.port) +} + +func (c *Conduit) SetPort(port types.SwitchPortID) { + c.portMutex.Lock() + defer c.portMutex.Unlock() + c.port = port +} + +func (c *Conduit) Read(b []byte) (int, error) { + if c.closed.Load() { + return 0, io.EOF + } + return c.conn.Read(b) +} + +func (c *Conduit) ReadCopy() ([]byte, error) { + if c.closed.Load() { + return nil, io.EOF + } + var buf [65535 * 2]byte + n, err := c.conn.Read(buf[:]) + if err != nil { + return nil, err + } + return buf[:n], nil +} + +func (c *Conduit) Write(b []byte) (int, error) { + if c.closed.Load() { + return 0, io.EOF + } + return c.conn.Write(b) +} + +func (c *Conduit) Close() error { + if c.closed.Load() { + return io.ErrClosedPipe + } + c.closed.Store(true) + return c.conn.Close() +} diff --git a/cmd/dendrite-demo-pinecone/conduit/conduit_test.go b/cmd/dendrite-demo-pinecone/conduit/conduit_test.go new file mode 100644 index 000000000..d8cd3133f --- /dev/null +++ b/cmd/dendrite-demo-pinecone/conduit/conduit_test.go @@ -0,0 +1,121 @@ +// Copyright 2023 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package conduit + +import ( + "fmt" + "net" + "testing" + + "github.com/stretchr/testify/assert" +) + +var TestBuf = []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9} + +type TestNetConn struct { + net.Conn + shouldFail bool +} + +func (t *TestNetConn) Read(b []byte) (int, error) { + if t.shouldFail { + return 0, fmt.Errorf("Failed") + } else { + n := copy(b, TestBuf) + return n, nil + } +} + +func (t *TestNetConn) Write(b []byte) (int, error) { + if t.shouldFail { + return 0, fmt.Errorf("Failed") + } else { + return len(b), nil + } +} + +func (t *TestNetConn) Close() error { + if t.shouldFail { + return fmt.Errorf("Failed") + } else { + return nil + } +} + +func TestConduitStoresPort(t *testing.T) { + conduit := Conduit{port: 7} + assert.Equal(t, 7, conduit.Port()) +} + +func TestConduitRead(t *testing.T) { + conduit := Conduit{conn: &TestNetConn{}} + b := make([]byte, len(TestBuf)) + bytes, err := conduit.Read(b) + assert.NoError(t, err) + assert.Equal(t, len(TestBuf), bytes) + assert.Equal(t, TestBuf, b) +} + +func TestConduitReadCopy(t *testing.T) { + conduit := Conduit{conn: &TestNetConn{}} + result, err := conduit.ReadCopy() + assert.NoError(t, err) + assert.Equal(t, TestBuf, result) +} + +func TestConduitWrite(t *testing.T) { + conduit := Conduit{conn: &TestNetConn{}} + bytes, err := conduit.Write(TestBuf) + assert.NoError(t, err) + assert.Equal(t, len(TestBuf), bytes) +} + +func TestConduitClose(t *testing.T) { + conduit := Conduit{conn: &TestNetConn{}} + err := conduit.Close() + assert.NoError(t, err) + assert.True(t, conduit.closed.Load()) +} + +func TestConduitReadClosed(t *testing.T) { + conduit := Conduit{conn: &TestNetConn{}} + err := conduit.Close() + assert.NoError(t, err) + b := make([]byte, len(TestBuf)) + _, err = conduit.Read(b) + assert.Error(t, err) +} + +func TestConduitReadCopyClosed(t *testing.T) { + conduit := Conduit{conn: &TestNetConn{}} + err := conduit.Close() + assert.NoError(t, err) + _, err = conduit.ReadCopy() + assert.Error(t, err) +} + +func TestConduitWriteClosed(t *testing.T) { + conduit := Conduit{conn: &TestNetConn{}} + err := conduit.Close() + assert.NoError(t, err) + _, err = conduit.Write(TestBuf) + assert.Error(t, err) +} + +func TestConduitReadCopyFails(t *testing.T) { + conduit := Conduit{conn: &TestNetConn{shouldFail: true}} + _, err := conduit.ReadCopy() + assert.Error(t, err) +} diff --git a/cmd/dendrite-demo-pinecone/main.go b/cmd/dendrite-demo-pinecone/main.go index 3f63ab654..7706a73f5 100644 --- a/cmd/dendrite-demo-pinecone/main.go +++ b/cmd/dendrite-demo-pinecone/main.go @@ -15,52 +15,24 @@ package main import ( - "context" "crypto/ed25519" - "crypto/tls" "encoding/hex" "flag" "fmt" "net" - "net/http" "os" "path/filepath" "strings" - "time" - "github.com/gorilla/mux" - "github.com/gorilla/websocket" - "github.com/matrix-org/dendrite/appservice" - "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/conn" - "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/embed" - "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/rooms" - "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/users" + "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/monolith" "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing" - "github.com/matrix-org/dendrite/federationapi" - "github.com/matrix-org/dendrite/federationapi/api" - "github.com/matrix-org/dendrite/federationapi/producers" "github.com/matrix-org/dendrite/internal" - "github.com/matrix-org/dendrite/internal/httputil" - "github.com/matrix-org/dendrite/keyserver" - "github.com/matrix-org/dendrite/relayapi" - relayServerAPI "github.com/matrix-org/dendrite/relayapi/api" - "github.com/matrix-org/dendrite/roomserver" "github.com/matrix-org/dendrite/setup" - "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" - "github.com/matrix-org/dendrite/setup/jetstream" - "github.com/matrix-org/dendrite/test" - "github.com/matrix-org/dendrite/userapi" "github.com/matrix-org/gomatrixserverlib" - "go.uber.org/atomic" - - pineconeConnections "github.com/matrix-org/pinecone/connections" - pineconeMulticast "github.com/matrix-org/pinecone/multicast" - pineconeRouter "github.com/matrix-org/pinecone/router" - pineconeEvents "github.com/matrix-org/pinecone/router/events" - pineconeSessions "github.com/matrix-org/pinecone/sessions" - "github.com/sirupsen/logrus" + + pineconeRouter "github.com/matrix-org/pinecone/router" ) var ( @@ -72,9 +44,6 @@ var ( instanceRelayingEnabled = flag.Bool("relay", false, "whether to enable store & forward relaying for other nodes") ) -const relayServerRetryInterval = time.Second * 30 - -// nolint:gocyclo func main() { flag.Parse() internal.SetupPprof() @@ -91,7 +60,7 @@ func main() { } } - cfg := &config.Dendrite{} + var cfg *config.Dendrite // use custom config if config flag is set if configFlagSet { @@ -100,89 +69,30 @@ func main() { pk = sk.Public().(ed25519.PublicKey) } else { keyfile := filepath.Join(*instanceDir, *instanceName) + ".pem" - if _, err := os.Stat(keyfile); os.IsNotExist(err) { - oldkeyfile := *instanceName + ".key" - if _, err = os.Stat(oldkeyfile); os.IsNotExist(err) { - if err = test.NewMatrixKey(keyfile); err != nil { - panic("failed to generate a new PEM key: " + err.Error()) - } - if _, sk, err = config.LoadMatrixKey(keyfile, os.ReadFile); err != nil { - panic("failed to load PEM key: " + err.Error()) - } - if len(sk) != ed25519.PrivateKeySize { - panic("the private key is not long enough") - } - } else { - if sk, err = os.ReadFile(oldkeyfile); err != nil { - panic("failed to read the old private key: " + err.Error()) - } - if len(sk) != ed25519.PrivateKeySize { - panic("the private key is not long enough") - } - if err := test.SaveMatrixKey(keyfile, sk); err != nil { - panic("failed to convert the private key to PEM format: " + err.Error()) - } - } - } else { - var err error - if _, sk, err = config.LoadMatrixKey(keyfile, os.ReadFile); err != nil { - panic("failed to load PEM key: " + err.Error()) - } - if len(sk) != ed25519.PrivateKeySize { - panic("the private key is not long enough") - } - } - - pk = sk.Public().(ed25519.PublicKey) - - cfg.Defaults(config.DefaultOpts{ - Generate: true, - Monolithic: true, - }) - cfg.Global.PrivateKey = sk - cfg.Global.JetStream.StoragePath = config.Path(fmt.Sprintf("%s/", filepath.Join(*instanceDir, *instanceName))) - cfg.UserAPI.AccountDatabase.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-account.db", filepath.Join(*instanceDir, *instanceName))) - cfg.MediaAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-mediaapi.db", filepath.Join(*instanceDir, *instanceName))) - cfg.SyncAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-syncapi.db", filepath.Join(*instanceDir, *instanceName))) - cfg.RoomServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-roomserver.db", filepath.Join(*instanceDir, *instanceName))) - cfg.KeyServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-keyserver.db", filepath.Join(*instanceDir, *instanceName))) - cfg.FederationAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-federationapi.db", filepath.Join(*instanceDir, *instanceName))) - cfg.RelayAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-relayapi.db", filepath.Join(*instanceDir, *instanceName))) - cfg.MSCs.MSCs = []string{"msc2836", "msc2946"} - cfg.MSCs.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-mscs.db", filepath.Join(*instanceDir, *instanceName))) - cfg.ClientAPI.RegistrationDisabled = false - cfg.ClientAPI.OpenRegistrationWithoutVerificationEnabled = true - cfg.MediaAPI.BasePath = config.Path(*instanceDir) - cfg.SyncAPI.Fulltext.Enabled = true - cfg.SyncAPI.Fulltext.IndexPath = config.Path(*instanceDir) - if err := cfg.Derive(); err != nil { - panic(err) - } + oldKeyfile := *instanceName + ".key" + sk, pk = monolith.GetOrCreateKey(keyfile, oldKeyfile) + cfg = monolith.GenerateDefaultConfig(sk, *instanceDir, *instanceDir, *instanceName) } cfg.Global.ServerName = gomatrixserverlib.ServerName(hex.EncodeToString(pk)) cfg.Global.KeyID = gomatrixserverlib.KeyID(signing.KeyID) - base := base.NewBaseDendrite(cfg, "Monolith") - base.ConfigureAdminEndpoints() - defer base.Close() // nolint: errcheck + p2pMonolith := monolith.P2PMonolith{} + p2pMonolith.SetupPinecone(sk) + p2pMonolith.Multicast.Start() - pineconeEventChannel := make(chan pineconeEvents.Event) - pRouter := pineconeRouter.NewRouter(logrus.WithField("pinecone", "router"), sk) - pRouter.EnableHopLimiting() - pRouter.EnableWakeupBroadcasts() - pRouter.Subscribe(pineconeEventChannel) - - pQUIC := pineconeSessions.NewSessions(logrus.WithField("pinecone", "sessions"), pRouter, []string{"matrix"}) - pMulticast := pineconeMulticast.NewMulticast(logrus.WithField("pinecone", "multicast"), pRouter) - pManager := pineconeConnections.NewConnectionManager(pRouter, nil) - pMulticast.Start() if instancePeer != nil && *instancePeer != "" { for _, peer := range strings.Split(*instancePeer, ",") { - pManager.AddPeer(strings.Trim(peer, " \t\r\n")) + p2pMonolith.ConnManager.AddPeer(strings.Trim(peer, " \t\r\n")) } } + enableMetrics := true + enableWebsockets := true + p2pMonolith.SetupDendrite(cfg, *instancePort, *instanceRelayingEnabled, enableMetrics, enableWebsockets) + p2pMonolith.StartMonolith() + p2pMonolith.WaitForShutdown() + go func() { listener, err := net.Listen("tcp", *instanceListen) if err != nil { @@ -198,7 +108,7 @@ func main() { continue } - port, err := pRouter.Connect( + port, err := p2pMonolith.Router.Connect( conn, pineconeRouter.ConnectionPeerType(pineconeRouter.PeerTypeRemote), ) @@ -210,237 +120,4 @@ func main() { fmt.Println("Inbound connection", conn.RemoteAddr(), "is connected to port", port) } }() - - federation := conn.CreateFederationClient(base, pQUIC) - - serverKeyAPI := &signing.YggdrasilKeys{} - keyRing := serverKeyAPI.KeyRing() - - rsComponent := roomserver.NewInternalAPI(base) - rsAPI := rsComponent - fsAPI := federationapi.NewInternalAPI( - base, federation, rsAPI, base.Caches, keyRing, true, - ) - - keyAPI := keyserver.NewInternalAPI(base, &base.Cfg.KeyServer, fsAPI, rsComponent) - userAPI := userapi.NewInternalAPI(base, &cfg.UserAPI, nil, keyAPI, rsAPI, base.PushGatewayHTTPClient()) - keyAPI.SetUserAPI(userAPI) - - asAPI := appservice.NewInternalAPI(base, userAPI, rsAPI) - - rsComponent.SetFederationAPI(fsAPI, keyRing) - - userProvider := users.NewPineconeUserProvider(pRouter, pQUIC, userAPI, federation) - roomProvider := rooms.NewPineconeRoomProvider(pRouter, pQUIC, fsAPI, federation) - - js, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream) - producer := &producers.SyncAPIProducer{ - JetStream: js, - TopicReceiptEvent: base.Cfg.Global.JetStream.Prefixed(jetstream.OutputReceiptEvent), - TopicSendToDeviceEvent: base.Cfg.Global.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent), - TopicTypingEvent: base.Cfg.Global.JetStream.Prefixed(jetstream.OutputTypingEvent), - TopicPresenceEvent: base.Cfg.Global.JetStream.Prefixed(jetstream.OutputPresenceEvent), - TopicDeviceListUpdate: base.Cfg.Global.JetStream.Prefixed(jetstream.InputDeviceListUpdate), - TopicSigningKeyUpdate: base.Cfg.Global.JetStream.Prefixed(jetstream.InputSigningKeyUpdate), - Config: &base.Cfg.FederationAPI, - UserAPI: userAPI, - } - relayAPI := relayapi.NewRelayInternalAPI(base, federation, rsAPI, keyRing, producer, *instanceRelayingEnabled) - logrus.Infof("Relaying enabled: %v", relayAPI.RelayingEnabled()) - - monolith := setup.Monolith{ - Config: base.Cfg, - Client: conn.CreateClient(base, pQUIC), - FedClient: federation, - KeyRing: keyRing, - - AppserviceAPI: asAPI, - FederationAPI: fsAPI, - RoomserverAPI: rsAPI, - UserAPI: userAPI, - KeyAPI: keyAPI, - RelayAPI: relayAPI, - ExtPublicRoomsProvider: roomProvider, - ExtUserDirectoryProvider: userProvider, - } - monolith.AddAllPublicRoutes(base) - - wsUpgrader := websocket.Upgrader{ - CheckOrigin: func(_ *http.Request) bool { - return true - }, - } - httpRouter := mux.NewRouter().SkipClean(true).UseEncodedPath() - httpRouter.PathPrefix(httputil.InternalPathPrefix).Handler(base.InternalAPIMux) - httpRouter.PathPrefix(httputil.PublicClientPathPrefix).Handler(base.PublicClientAPIMux) - httpRouter.PathPrefix(httputil.PublicMediaPathPrefix).Handler(base.PublicMediaAPIMux) - httpRouter.PathPrefix(httputil.DendriteAdminPathPrefix).Handler(base.DendriteAdminMux) - httpRouter.PathPrefix(httputil.SynapseAdminPathPrefix).Handler(base.SynapseAdminMux) - httpRouter.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) { - c, err := wsUpgrader.Upgrade(w, r, nil) - if err != nil { - logrus.WithError(err).Error("Failed to upgrade WebSocket connection") - return - } - conn := conn.WrapWebSocketConn(c) - if _, err = pRouter.Connect( - conn, - pineconeRouter.ConnectionZone("websocket"), - pineconeRouter.ConnectionPeerType(pineconeRouter.PeerTypeRemote), - ); err != nil { - logrus.WithError(err).Error("Failed to connect WebSocket peer to Pinecone switch") - } - }) - httpRouter.HandleFunc("/pinecone", pRouter.ManholeHandler) - embed.Embed(httpRouter, *instancePort, "Pinecone Demo") - - pMux := mux.NewRouter().SkipClean(true).UseEncodedPath() - pMux.PathPrefix(users.PublicURL).HandlerFunc(userProvider.FederatedUserProfiles) - pMux.PathPrefix(httputil.PublicFederationPathPrefix).Handler(base.PublicFederationAPIMux) - pMux.PathPrefix(httputil.PublicMediaPathPrefix).Handler(base.PublicMediaAPIMux) - - pHTTP := pQUIC.Protocol("matrix").HTTP() - pHTTP.Mux().Handle(users.PublicURL, pMux) - pHTTP.Mux().Handle(httputil.PublicFederationPathPrefix, pMux) - pHTTP.Mux().Handle(httputil.PublicMediaPathPrefix, pMux) - - // Build both ends of a HTTP multiplex. - httpServer := &http.Server{ - Addr: ":0", - TLSNextProto: map[string]func(*http.Server, *tls.Conn, http.Handler){}, - ReadTimeout: 10 * time.Second, - WriteTimeout: 10 * time.Second, - IdleTimeout: 60 * time.Second, - BaseContext: func(_ net.Listener) context.Context { - return context.Background() - }, - Handler: pMux, - } - - go func() { - pubkey := pRouter.PublicKey() - logrus.Info("Listening on ", hex.EncodeToString(pubkey[:])) - logrus.Fatal(httpServer.Serve(pQUIC.Protocol("matrix"))) - }() - go func() { - httpBindAddr := fmt.Sprintf(":%d", *instancePort) - logrus.Info("Listening on ", httpBindAddr) - logrus.Fatal(http.ListenAndServe(httpBindAddr, httpRouter)) - }() - - go func(ch <-chan pineconeEvents.Event) { - eLog := logrus.WithField("pinecone", "events") - relayServerSyncRunning := atomic.NewBool(false) - stopRelayServerSync := make(chan bool) - - m := RelayServerRetriever{ - Context: context.Background(), - ServerName: gomatrixserverlib.ServerName(pRouter.PublicKey().String()), - FederationAPI: fsAPI, - RelayServersQueried: make(map[gomatrixserverlib.ServerName]bool), - RelayAPI: monolith.RelayAPI, - } - m.InitializeRelayServers(eLog) - - for event := range ch { - switch e := event.(type) { - case pineconeEvents.PeerAdded: - if !relayServerSyncRunning.Load() { - go m.syncRelayServers(stopRelayServerSync, *relayServerSyncRunning) - } - case pineconeEvents.PeerRemoved: - if relayServerSyncRunning.Load() && pRouter.TotalPeerCount() == 0 { - stopRelayServerSync <- true - } - case pineconeEvents.BroadcastReceived: - // eLog.Info("Broadcast received from: ", e.PeerID) - - req := &api.PerformWakeupServersRequest{ - ServerNames: []gomatrixserverlib.ServerName{gomatrixserverlib.ServerName(e.PeerID)}, - } - res := &api.PerformWakeupServersResponse{} - if err := fsAPI.PerformWakeupServers(base.Context(), req, res); err != nil { - eLog.WithError(err).Error("Failed to wakeup destination", e.PeerID) - } - default: - } - } - }(pineconeEventChannel) - - base.WaitForShutdown() -} - -type RelayServerRetriever struct { - Context context.Context - ServerName gomatrixserverlib.ServerName - FederationAPI api.FederationInternalAPI - RelayServersQueried map[gomatrixserverlib.ServerName]bool - RelayAPI relayServerAPI.RelayInternalAPI -} - -func (m *RelayServerRetriever) InitializeRelayServers(eLog *logrus.Entry) { - request := api.P2PQueryRelayServersRequest{Server: gomatrixserverlib.ServerName(m.ServerName)} - response := api.P2PQueryRelayServersResponse{} - err := m.FederationAPI.P2PQueryRelayServers(m.Context, &request, &response) - if err != nil { - eLog.Warnf("Failed obtaining list of this node's relay servers: %s", err.Error()) - } - for _, server := range response.RelayServers { - m.RelayServersQueried[server] = false - } - - eLog.Infof("Registered relay servers: %v", response.RelayServers) -} - -func (m *RelayServerRetriever) syncRelayServers(stop <-chan bool, running atomic.Bool) { - defer running.Store(false) - - t := time.NewTimer(relayServerRetryInterval) - for { - relayServersToQuery := []gomatrixserverlib.ServerName{} - for server, complete := range m.RelayServersQueried { - if !complete { - relayServersToQuery = append(relayServersToQuery, server) - } - } - if len(relayServersToQuery) == 0 { - // All relay servers have been synced. - return - } - m.queryRelayServers(relayServersToQuery) - t.Reset(relayServerRetryInterval) - - select { - case <-stop: - // We have been asked to stop syncing, drain the timer and return. - if !t.Stop() { - <-t.C - } - return - case <-t.C: - // The timer has expired. Continue to the next loop iteration. - } - } -} - -func (m *RelayServerRetriever) queryRelayServers(relayServers []gomatrixserverlib.ServerName) { - logrus.Info("querying relay servers for any available transactions") - for _, server := range relayServers { - userID, err := gomatrixserverlib.NewUserID("@user:"+string(m.ServerName), false) - if err != nil { - return - } - err = m.RelayAPI.PerformRelayServerSync(context.Background(), *userID, server) - if err == nil { - m.RelayServersQueried[server] = true - // TODO : What happens if your relay receives new messages after this point? - // Should you continue to check with them, or should they try and contact you? - // They could send a "new_async_events" message your way maybe? - // Then you could mark them as needing to be queried again. - // What if you miss this message? - // Maybe you should try querying them again after a certain period of time as a backup? - } else { - logrus.Errorf("Failed querying relay server: %s", err.Error()) - } - } } diff --git a/cmd/dendrite-demo-pinecone/monolith/keys.go b/cmd/dendrite-demo-pinecone/monolith/keys.go new file mode 100644 index 000000000..637f24a43 --- /dev/null +++ b/cmd/dendrite-demo-pinecone/monolith/keys.go @@ -0,0 +1,63 @@ +// Copyright 2023 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package monolith + +import ( + "crypto/ed25519" + "os" + + "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/test" +) + +func GetOrCreateKey(keyfile string, oldKeyfile string) (ed25519.PrivateKey, ed25519.PublicKey) { + var sk ed25519.PrivateKey + var pk ed25519.PublicKey + + if _, err := os.Stat(keyfile); os.IsNotExist(err) { + if _, err = os.Stat(oldKeyfile); os.IsNotExist(err) { + if err = test.NewMatrixKey(keyfile); err != nil { + panic("failed to generate a new PEM key: " + err.Error()) + } + if _, sk, err = config.LoadMatrixKey(keyfile, os.ReadFile); err != nil { + panic("failed to load PEM key: " + err.Error()) + } + if len(sk) != ed25519.PrivateKeySize { + panic("the private key is not long enough") + } + } else { + if sk, err = os.ReadFile(oldKeyfile); err != nil { + panic("failed to read the old private key: " + err.Error()) + } + if len(sk) != ed25519.PrivateKeySize { + panic("the private key is not long enough") + } + if err = test.SaveMatrixKey(keyfile, sk); err != nil { + panic("failed to convert the private key to PEM format: " + err.Error()) + } + } + } else { + if _, sk, err = config.LoadMatrixKey(keyfile, os.ReadFile); err != nil { + panic("failed to load PEM key: " + err.Error()) + } + if len(sk) != ed25519.PrivateKeySize { + panic("the private key is not long enough") + } + } + + pk = sk.Public().(ed25519.PublicKey) + + return sk, pk +} diff --git a/cmd/dendrite-demo-pinecone/monolith/monolith.go b/cmd/dendrite-demo-pinecone/monolith/monolith.go new file mode 100644 index 000000000..6f1c69a78 --- /dev/null +++ b/cmd/dendrite-demo-pinecone/monolith/monolith.go @@ -0,0 +1,359 @@ +// Copyright 2023 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package monolith + +import ( + "context" + "crypto/ed25519" + "crypto/tls" + "encoding/hex" + "fmt" + "net" + "net/http" + "path/filepath" + "time" + + "github.com/gorilla/mux" + "github.com/gorilla/websocket" + "github.com/matrix-org/dendrite/appservice" + "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/conn" + "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/embed" + "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/relay" + "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/rooms" + "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/users" + "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing" + "github.com/matrix-org/dendrite/federationapi" + federationAPI "github.com/matrix-org/dendrite/federationapi/api" + "github.com/matrix-org/dendrite/federationapi/producers" + "github.com/matrix-org/dendrite/internal/httputil" + "github.com/matrix-org/dendrite/keyserver" + "github.com/matrix-org/dendrite/relayapi" + relayAPI "github.com/matrix-org/dendrite/relayapi/api" + "github.com/matrix-org/dendrite/roomserver" + "github.com/matrix-org/dendrite/setup" + "github.com/matrix-org/dendrite/setup/base" + "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/jetstream" + "github.com/matrix-org/dendrite/userapi" + userAPI "github.com/matrix-org/dendrite/userapi/api" + "github.com/matrix-org/gomatrixserverlib" + "github.com/sirupsen/logrus" + + pineconeConnections "github.com/matrix-org/pinecone/connections" + pineconeMulticast "github.com/matrix-org/pinecone/multicast" + pineconeRouter "github.com/matrix-org/pinecone/router" + pineconeEvents "github.com/matrix-org/pinecone/router/events" + pineconeSessions "github.com/matrix-org/pinecone/sessions" +) + +const SessionProtocol = "matrix" + +type P2PMonolith struct { + BaseDendrite *base.BaseDendrite + Sessions *pineconeSessions.Sessions + Multicast *pineconeMulticast.Multicast + ConnManager *pineconeConnections.ConnectionManager + Router *pineconeRouter.Router + EventChannel chan pineconeEvents.Event + RelayRetriever relay.RelayServerRetriever + + dendrite setup.Monolith + port int + httpMux *mux.Router + pineconeMux *mux.Router + listener net.Listener + httpListenAddr string +} + +func GenerateDefaultConfig(sk ed25519.PrivateKey, storageDir string, cacheDir string, dbPrefix string) *config.Dendrite { + cfg := config.Dendrite{} + cfg.Defaults(config.DefaultOpts{ + Generate: true, + Monolithic: true, + }) + cfg.Global.PrivateKey = sk + cfg.Global.JetStream.StoragePath = config.Path(fmt.Sprintf("%s/", filepath.Join(cacheDir, dbPrefix))) + cfg.UserAPI.AccountDatabase.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-account.db", filepath.Join(storageDir, dbPrefix))) + cfg.MediaAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-mediaapi.db", filepath.Join(storageDir, dbPrefix))) + cfg.SyncAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-syncapi.db", filepath.Join(storageDir, dbPrefix))) + cfg.RoomServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-roomserver.db", filepath.Join(storageDir, dbPrefix))) + cfg.KeyServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-keyserver.db", filepath.Join(storageDir, dbPrefix))) + cfg.FederationAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-federationsender.db", filepath.Join(storageDir, dbPrefix))) + cfg.RelayAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-relayapi.db", filepath.Join(storageDir, dbPrefix))) + cfg.MSCs.MSCs = []string{"msc2836", "msc2946"} + cfg.MSCs.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-mscs.db", filepath.Join(storageDir, dbPrefix))) + cfg.ClientAPI.RegistrationDisabled = false + cfg.ClientAPI.OpenRegistrationWithoutVerificationEnabled = true + cfg.MediaAPI.BasePath = config.Path(filepath.Join(cacheDir, "media")) + cfg.MediaAPI.AbsBasePath = config.Path(filepath.Join(cacheDir, "media")) + cfg.SyncAPI.Fulltext.Enabled = true + cfg.SyncAPI.Fulltext.IndexPath = config.Path(filepath.Join(cacheDir, "search")) + if err := cfg.Derive(); err != nil { + panic(err) + } + + return &cfg +} + +func (p *P2PMonolith) SetupPinecone(sk ed25519.PrivateKey) { + p.EventChannel = make(chan pineconeEvents.Event) + p.Router = pineconeRouter.NewRouter(logrus.WithField("pinecone", "router"), sk) + p.Router.EnableHopLimiting() + p.Router.EnableWakeupBroadcasts() + p.Router.Subscribe(p.EventChannel) + + p.Sessions = pineconeSessions.NewSessions(logrus.WithField("pinecone", "sessions"), p.Router, []string{SessionProtocol}) + p.Multicast = pineconeMulticast.NewMulticast(logrus.WithField("pinecone", "multicast"), p.Router) + p.ConnManager = pineconeConnections.NewConnectionManager(p.Router, nil) +} + +func (p *P2PMonolith) SetupDendrite(cfg *config.Dendrite, port int, enableRelaying bool, enableMetrics bool, enableWebsockets bool) { + if enableMetrics { + p.BaseDendrite = base.NewBaseDendrite(cfg, "Monolith") + } else { + p.BaseDendrite = base.NewBaseDendrite(cfg, "Monolith", base.DisableMetrics) + } + p.port = port + p.BaseDendrite.ConfigureAdminEndpoints() + + federation := conn.CreateFederationClient(p.BaseDendrite, p.Sessions) + + serverKeyAPI := &signing.YggdrasilKeys{} + keyRing := serverKeyAPI.KeyRing() + + rsComponent := roomserver.NewInternalAPI(p.BaseDendrite) + rsAPI := rsComponent + fsAPI := federationapi.NewInternalAPI( + p.BaseDendrite, federation, rsAPI, p.BaseDendrite.Caches, keyRing, true, + ) + + keyAPI := keyserver.NewInternalAPI(p.BaseDendrite, &p.BaseDendrite.Cfg.KeyServer, fsAPI, rsComponent) + userAPI := userapi.NewInternalAPI(p.BaseDendrite, &cfg.UserAPI, nil, keyAPI, rsAPI, p.BaseDendrite.PushGatewayHTTPClient()) + keyAPI.SetUserAPI(userAPI) + + asAPI := appservice.NewInternalAPI(p.BaseDendrite, userAPI, rsAPI) + + rsComponent.SetFederationAPI(fsAPI, keyRing) + + userProvider := users.NewPineconeUserProvider(p.Router, p.Sessions, userAPI, federation) + roomProvider := rooms.NewPineconeRoomProvider(p.Router, p.Sessions, fsAPI, federation) + + js, _ := p.BaseDendrite.NATS.Prepare(p.BaseDendrite.ProcessContext, &p.BaseDendrite.Cfg.Global.JetStream) + producer := &producers.SyncAPIProducer{ + JetStream: js, + TopicReceiptEvent: p.BaseDendrite.Cfg.Global.JetStream.Prefixed(jetstream.OutputReceiptEvent), + TopicSendToDeviceEvent: p.BaseDendrite.Cfg.Global.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent), + TopicTypingEvent: p.BaseDendrite.Cfg.Global.JetStream.Prefixed(jetstream.OutputTypingEvent), + TopicPresenceEvent: p.BaseDendrite.Cfg.Global.JetStream.Prefixed(jetstream.OutputPresenceEvent), + TopicDeviceListUpdate: p.BaseDendrite.Cfg.Global.JetStream.Prefixed(jetstream.InputDeviceListUpdate), + TopicSigningKeyUpdate: p.BaseDendrite.Cfg.Global.JetStream.Prefixed(jetstream.InputSigningKeyUpdate), + Config: &p.BaseDendrite.Cfg.FederationAPI, + UserAPI: userAPI, + } + relayAPI := relayapi.NewRelayInternalAPI(p.BaseDendrite, federation, rsAPI, keyRing, producer, enableRelaying) + logrus.Infof("Relaying enabled: %v", relayAPI.RelayingEnabled()) + + p.dendrite = setup.Monolith{ + Config: p.BaseDendrite.Cfg, + Client: conn.CreateClient(p.BaseDendrite, p.Sessions), + FedClient: federation, + KeyRing: keyRing, + + AppserviceAPI: asAPI, + FederationAPI: fsAPI, + RoomserverAPI: rsAPI, + UserAPI: userAPI, + KeyAPI: keyAPI, + RelayAPI: relayAPI, + ExtPublicRoomsProvider: roomProvider, + ExtUserDirectoryProvider: userProvider, + } + p.dendrite.AddAllPublicRoutes(p.BaseDendrite) + + p.setupHttpServers(userProvider, enableWebsockets) +} + +func (p *P2PMonolith) GetFederationAPI() federationAPI.FederationInternalAPI { + return p.dendrite.FederationAPI +} + +func (p *P2PMonolith) GetRelayAPI() relayAPI.RelayInternalAPI { + return p.dendrite.RelayAPI +} + +func (p *P2PMonolith) GetUserAPI() userAPI.UserInternalAPI { + return p.dendrite.UserAPI +} + +func (p *P2PMonolith) StartMonolith() { + p.startHTTPServers() + p.startEventHandler() +} + +func (p *P2PMonolith) Stop() { + _ = p.BaseDendrite.Close() + p.WaitForShutdown() +} + +func (p *P2PMonolith) WaitForShutdown() { + p.BaseDendrite.WaitForShutdown() + p.closeAllResources() +} + +func (p *P2PMonolith) closeAllResources() { + if p.listener != nil { + _ = p.listener.Close() + } + + if p.Multicast != nil { + p.Multicast.Stop() + } + + if p.Sessions != nil { + _ = p.Sessions.Close() + } + + if p.Router != nil { + _ = p.Router.Close() + } +} + +func (p *P2PMonolith) Addr() string { + return p.httpListenAddr +} + +func (p *P2PMonolith) setupHttpServers(userProvider *users.PineconeUserProvider, enableWebsockets bool) { + p.httpMux = mux.NewRouter().SkipClean(true).UseEncodedPath() + p.httpMux.PathPrefix(httputil.InternalPathPrefix).Handler(p.BaseDendrite.InternalAPIMux) + p.httpMux.PathPrefix(httputil.PublicClientPathPrefix).Handler(p.BaseDendrite.PublicClientAPIMux) + p.httpMux.PathPrefix(httputil.PublicMediaPathPrefix).Handler(p.BaseDendrite.PublicMediaAPIMux) + p.httpMux.PathPrefix(httputil.DendriteAdminPathPrefix).Handler(p.BaseDendrite.DendriteAdminMux) + p.httpMux.PathPrefix(httputil.SynapseAdminPathPrefix).Handler(p.BaseDendrite.SynapseAdminMux) + + if enableWebsockets { + wsUpgrader := websocket.Upgrader{ + CheckOrigin: func(_ *http.Request) bool { + return true + }, + } + p.httpMux.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) { + c, err := wsUpgrader.Upgrade(w, r, nil) + if err != nil { + logrus.WithError(err).Error("Failed to upgrade WebSocket connection") + return + } + conn := conn.WrapWebSocketConn(c) + if _, err = p.Router.Connect( + conn, + pineconeRouter.ConnectionZone("websocket"), + pineconeRouter.ConnectionPeerType(pineconeRouter.PeerTypeRemote), + ); err != nil { + logrus.WithError(err).Error("Failed to connect WebSocket peer to Pinecone switch") + } + }) + } + + p.httpMux.HandleFunc("/pinecone", p.Router.ManholeHandler) + + if enableWebsockets { + embed.Embed(p.httpMux, p.port, "Pinecone Demo") + } + + p.pineconeMux = mux.NewRouter().SkipClean(true).UseEncodedPath() + p.pineconeMux.PathPrefix(users.PublicURL).HandlerFunc(userProvider.FederatedUserProfiles) + p.pineconeMux.PathPrefix(httputil.PublicFederationPathPrefix).Handler(p.BaseDendrite.PublicFederationAPIMux) + p.pineconeMux.PathPrefix(httputil.PublicMediaPathPrefix).Handler(p.BaseDendrite.PublicMediaAPIMux) + + pHTTP := p.Sessions.Protocol(SessionProtocol).HTTP() + pHTTP.Mux().Handle(users.PublicURL, p.pineconeMux) + pHTTP.Mux().Handle(httputil.PublicFederationPathPrefix, p.pineconeMux) + pHTTP.Mux().Handle(httputil.PublicMediaPathPrefix, p.pineconeMux) +} + +func (p *P2PMonolith) startHTTPServers() { + go func() { + // Build both ends of a HTTP multiplex. + httpServer := &http.Server{ + Addr: ":0", + TLSNextProto: map[string]func(*http.Server, *tls.Conn, http.Handler){}, + ReadTimeout: 10 * time.Second, + WriteTimeout: 10 * time.Second, + IdleTimeout: 30 * time.Second, + BaseContext: func(_ net.Listener) context.Context { + return context.Background() + }, + Handler: p.pineconeMux, + } + + pubkey := p.Router.PublicKey() + pubkeyString := hex.EncodeToString(pubkey[:]) + logrus.Info("Listening on ", pubkeyString) + + switch httpServer.Serve(p.Sessions.Protocol(SessionProtocol)) { + case net.ErrClosed, http.ErrServerClosed: + logrus.Info("Stopped listening on ", pubkeyString) + default: + logrus.Error("Stopped listening on ", pubkeyString) + } + }() + + p.httpListenAddr = fmt.Sprintf(":%d", p.port) + go func() { + logrus.Info("Listening on ", p.httpListenAddr) + switch http.ListenAndServe(p.httpListenAddr, p.httpMux) { + case net.ErrClosed, http.ErrServerClosed: + logrus.Info("Stopped listening on ", p.httpListenAddr) + default: + logrus.Error("Stopped listening on ", p.httpListenAddr) + } + }() +} + +func (p *P2PMonolith) startEventHandler() { + stopRelayServerSync := make(chan bool) + eLog := logrus.WithField("pinecone", "events") + p.RelayRetriever = relay.NewRelayServerRetriever( + context.Background(), + gomatrixserverlib.ServerName(p.Router.PublicKey().String()), + p.dendrite.FederationAPI, + p.dendrite.RelayAPI, + stopRelayServerSync, + ) + p.RelayRetriever.InitializeRelayServers(eLog) + + go func(ch <-chan pineconeEvents.Event) { + for event := range ch { + switch e := event.(type) { + case pineconeEvents.PeerAdded: + p.RelayRetriever.StartSync() + case pineconeEvents.PeerRemoved: + if p.RelayRetriever.IsRunning() && p.Router.TotalPeerCount() == 0 { + stopRelayServerSync <- true + } + case pineconeEvents.BroadcastReceived: + // eLog.Info("Broadcast received from: ", e.PeerID) + + req := &federationAPI.PerformWakeupServersRequest{ + ServerNames: []gomatrixserverlib.ServerName{gomatrixserverlib.ServerName(e.PeerID)}, + } + res := &federationAPI.PerformWakeupServersResponse{} + if err := p.dendrite.FederationAPI.PerformWakeupServers(p.BaseDendrite.Context(), req, res); err != nil { + eLog.WithError(err).Error("Failed to wakeup destination", e.PeerID) + } + default: + } + } + }(p.EventChannel) +} diff --git a/cmd/dendrite-demo-pinecone/relay/retriever.go b/cmd/dendrite-demo-pinecone/relay/retriever.go new file mode 100644 index 000000000..1b5c617ef --- /dev/null +++ b/cmd/dendrite-demo-pinecone/relay/retriever.go @@ -0,0 +1,237 @@ +// Copyright 2022 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package relay + +import ( + "context" + "sync" + "time" + + federationAPI "github.com/matrix-org/dendrite/federationapi/api" + relayServerAPI "github.com/matrix-org/dendrite/relayapi/api" + "github.com/matrix-org/gomatrixserverlib" + "github.com/sirupsen/logrus" + "go.uber.org/atomic" +) + +const ( + relayServerRetryInterval = time.Second * 30 +) + +type RelayServerRetriever struct { + ctx context.Context + serverName gomatrixserverlib.ServerName + federationAPI federationAPI.FederationInternalAPI + relayAPI relayServerAPI.RelayInternalAPI + relayServersQueried map[gomatrixserverlib.ServerName]bool + queriedServersMutex sync.Mutex + running atomic.Bool + quit <-chan bool +} + +func NewRelayServerRetriever( + ctx context.Context, + serverName gomatrixserverlib.ServerName, + federationAPI federationAPI.FederationInternalAPI, + relayAPI relayServerAPI.RelayInternalAPI, + quit <-chan bool, +) RelayServerRetriever { + return RelayServerRetriever{ + ctx: ctx, + serverName: serverName, + federationAPI: federationAPI, + relayAPI: relayAPI, + relayServersQueried: make(map[gomatrixserverlib.ServerName]bool), + running: *atomic.NewBool(false), + quit: quit, + } +} + +func (r *RelayServerRetriever) InitializeRelayServers(eLog *logrus.Entry) { + request := federationAPI.P2PQueryRelayServersRequest{Server: gomatrixserverlib.ServerName(r.serverName)} + response := federationAPI.P2PQueryRelayServersResponse{} + err := r.federationAPI.P2PQueryRelayServers(r.ctx, &request, &response) + if err != nil { + eLog.Warnf("Failed obtaining list of this node's relay servers: %s", err.Error()) + } + + r.queriedServersMutex.Lock() + defer r.queriedServersMutex.Unlock() + for _, server := range response.RelayServers { + r.relayServersQueried[server] = false + } + + eLog.Infof("Registered relay servers: %v", response.RelayServers) +} + +func (r *RelayServerRetriever) SetRelayServers(servers []gomatrixserverlib.ServerName) { + UpdateNodeRelayServers(r.serverName, servers, r.ctx, r.federationAPI) + + // Replace list of servers to sync with and mark them all as unsynced. + r.queriedServersMutex.Lock() + defer r.queriedServersMutex.Unlock() + r.relayServersQueried = make(map[gomatrixserverlib.ServerName]bool) + for _, server := range servers { + r.relayServersQueried[server] = false + } + + r.StartSync() +} + +func (r *RelayServerRetriever) GetRelayServers() []gomatrixserverlib.ServerName { + r.queriedServersMutex.Lock() + defer r.queriedServersMutex.Unlock() + relayServers := []gomatrixserverlib.ServerName{} + for server := range r.relayServersQueried { + relayServers = append(relayServers, server) + } + + return relayServers +} + +func (r *RelayServerRetriever) GetQueriedServerStatus() map[gomatrixserverlib.ServerName]bool { + r.queriedServersMutex.Lock() + defer r.queriedServersMutex.Unlock() + + result := map[gomatrixserverlib.ServerName]bool{} + for server, queried := range r.relayServersQueried { + result[server] = queried + } + return result +} + +func (r *RelayServerRetriever) StartSync() { + if !r.running.Load() { + logrus.Info("Starting relay server sync") + go r.SyncRelayServers(r.quit) + } +} + +func (r *RelayServerRetriever) IsRunning() bool { + return r.running.Load() +} + +func (r *RelayServerRetriever) SyncRelayServers(stop <-chan bool) { + defer r.running.Store(false) + + t := time.NewTimer(relayServerRetryInterval) + for { + relayServersToQuery := []gomatrixserverlib.ServerName{} + func() { + r.queriedServersMutex.Lock() + defer r.queriedServersMutex.Unlock() + for server, complete := range r.relayServersQueried { + if !complete { + relayServersToQuery = append(relayServersToQuery, server) + } + } + }() + if len(relayServersToQuery) == 0 { + // All relay servers have been synced. + logrus.Info("Finished syncing with all known relays") + return + } + r.queryRelayServers(relayServersToQuery) + t.Reset(relayServerRetryInterval) + + select { + case <-stop: + if !t.Stop() { + <-t.C + } + return + case <-t.C: + } + } +} + +func (r *RelayServerRetriever) queryRelayServers(relayServers []gomatrixserverlib.ServerName) { + logrus.Info("Querying relay servers for any available transactions") + for _, server := range relayServers { + userID, err := gomatrixserverlib.NewUserID("@user:"+string(r.serverName), false) + if err != nil { + return + } + + logrus.Infof("Syncing with relay: %s", string(server)) + err = r.relayAPI.PerformRelayServerSync(context.Background(), *userID, server) + if err == nil { + func() { + r.queriedServersMutex.Lock() + defer r.queriedServersMutex.Unlock() + r.relayServersQueried[server] = true + }() + // TODO : What happens if your relay receives new messages after this point? + // Should you continue to check with them, or should they try and contact you? + // They could send a "new_async_events" message your way maybe? + // Then you could mark them as needing to be queried again. + // What if you miss this message? + // Maybe you should try querying them again after a certain period of time as a backup? + } else { + logrus.Errorf("Failed querying relay server: %s", err.Error()) + } + } +} + +func UpdateNodeRelayServers( + node gomatrixserverlib.ServerName, + relays []gomatrixserverlib.ServerName, + ctx context.Context, + fedAPI federationAPI.FederationInternalAPI, +) { + // Get the current relay list + request := federationAPI.P2PQueryRelayServersRequest{Server: node} + response := federationAPI.P2PQueryRelayServersResponse{} + err := fedAPI.P2PQueryRelayServers(ctx, &request, &response) + if err != nil { + logrus.Warnf("Failed obtaining list of relay servers for %s: %s", node, err.Error()) + } + + // Remove old, non-matching relays + var serversToRemove []gomatrixserverlib.ServerName + for _, existingServer := range response.RelayServers { + shouldRemove := true + for _, newServer := range relays { + if newServer == existingServer { + shouldRemove = false + break + } + } + + if shouldRemove { + serversToRemove = append(serversToRemove, existingServer) + } + } + removeRequest := federationAPI.P2PRemoveRelayServersRequest{ + Server: node, + RelayServers: serversToRemove, + } + removeResponse := federationAPI.P2PRemoveRelayServersResponse{} + err = fedAPI.P2PRemoveRelayServers(ctx, &removeRequest, &removeResponse) + if err != nil { + logrus.Warnf("Failed removing old relay servers for %s: %s", node, err.Error()) + } + + // Add new relays + addRequest := federationAPI.P2PAddRelayServersRequest{ + Server: node, + RelayServers: relays, + } + addResponse := federationAPI.P2PAddRelayServersResponse{} + err = fedAPI.P2PAddRelayServers(ctx, &addRequest, &addResponse) + if err != nil { + logrus.Warnf("Failed adding relay servers for %s: %s", node, err.Error()) + } +} diff --git a/cmd/dendrite-demo-pinecone/relay/retriever_test.go b/cmd/dendrite-demo-pinecone/relay/retriever_test.go new file mode 100644 index 000000000..8f86a3770 --- /dev/null +++ b/cmd/dendrite-demo-pinecone/relay/retriever_test.go @@ -0,0 +1,99 @@ +// Copyright 2023 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package relay + +import ( + "context" + "testing" + "time" + + federationAPI "github.com/matrix-org/dendrite/federationapi/api" + relayServerAPI "github.com/matrix-org/dendrite/relayapi/api" + "github.com/matrix-org/gomatrixserverlib" + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + "gotest.tools/v3/poll" +) + +var testRelayServers = []gomatrixserverlib.ServerName{"relay1", "relay2"} + +type FakeFedAPI struct { + federationAPI.FederationInternalAPI +} + +func (f *FakeFedAPI) P2PQueryRelayServers( + ctx context.Context, + req *federationAPI.P2PQueryRelayServersRequest, + res *federationAPI.P2PQueryRelayServersResponse, +) error { + res.RelayServers = testRelayServers + return nil +} + +type FakeRelayAPI struct { + relayServerAPI.RelayInternalAPI +} + +func (r *FakeRelayAPI) PerformRelayServerSync( + ctx context.Context, + userID gomatrixserverlib.UserID, + relayServer gomatrixserverlib.ServerName, +) error { + return nil +} + +func TestRelayRetrieverInitialization(t *testing.T) { + retriever := NewRelayServerRetriever( + context.Background(), + "server", + &FakeFedAPI{}, + &FakeRelayAPI{}, + make(<-chan bool), + ) + + retriever.InitializeRelayServers(logrus.WithField("test", "relay")) + relayServers := retriever.GetQueriedServerStatus() + assert.Equal(t, 2, len(relayServers)) +} + +func TestRelayRetrieverSync(t *testing.T) { + retriever := NewRelayServerRetriever( + context.Background(), + "server", + &FakeFedAPI{}, + &FakeRelayAPI{}, + make(<-chan bool), + ) + + retriever.InitializeRelayServers(logrus.WithField("test", "relay")) + relayServers := retriever.GetQueriedServerStatus() + assert.Equal(t, 2, len(relayServers)) + + stopRelayServerSync := make(chan bool) + go retriever.SyncRelayServers(stopRelayServerSync) + + check := func(log poll.LogT) poll.Result { + relayServers := retriever.GetQueriedServerStatus() + for _, queried := range relayServers { + if !queried { + return poll.Continue("waiting for all servers to be queried") + } + } + + stopRelayServerSync <- true + return poll.Success() + } + poll.WaitOn(t, check, poll.WithTimeout(5*time.Second), poll.WithDelay(100*time.Millisecond)) +} diff --git a/go.mod b/go.mod index 8d5eafcef..25f022707 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,7 @@ require ( github.com/matrix-org/dugong v0.0.0-20210921133753-66e6b1c67e2e github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91 github.com/matrix-org/gomatrix v0.0.0-20220926102614-ceba4d9f7530 - github.com/matrix-org/gomatrixserverlib v0.0.0-20230119205614-cb888d80b00f + github.com/matrix-org/gomatrixserverlib v0.0.0-20230131183213-122f1e0e3fa1 github.com/matrix-org/pinecone v0.11.1-0.20230111184901-61850f0e63cb github.com/matrix-org/util v0.0.0-20221111132719-399730281e66 github.com/mattn/go-sqlite3 v1.14.15 diff --git a/go.sum b/go.sum index d57960561..38122e63c 100644 --- a/go.sum +++ b/go.sum @@ -327,8 +327,8 @@ github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91 h1:s7fexw github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91/go.mod h1:e+cg2q7C7yE5QnAXgzo512tgFh1RbQLC0+jozuegKgo= github.com/matrix-org/gomatrix v0.0.0-20220926102614-ceba4d9f7530 h1:kHKxCOLcHH8r4Fzarl4+Y3K5hjothkVW5z7T1dUM11U= github.com/matrix-org/gomatrix v0.0.0-20220926102614-ceba4d9f7530/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s= -github.com/matrix-org/gomatrixserverlib v0.0.0-20230119205614-cb888d80b00f h1:niRWEVkeeekpjxwnMhKn8PD0PUloDsNXP8W+Ez/co/M= -github.com/matrix-org/gomatrixserverlib v0.0.0-20230119205614-cb888d80b00f/go.mod h1:Mtifyr8q8htcBeugvlDnkBcNUy5LO8OzUoplAf1+mb4= +github.com/matrix-org/gomatrixserverlib v0.0.0-20230131183213-122f1e0e3fa1 h1:JSw0nmjMrgBmoM2aQsa78LTpI5BnuD9+vOiEQ4Qo0qw= +github.com/matrix-org/gomatrixserverlib v0.0.0-20230131183213-122f1e0e3fa1/go.mod h1:Mtifyr8q8htcBeugvlDnkBcNUy5LO8OzUoplAf1+mb4= github.com/matrix-org/pinecone v0.11.1-0.20230111184901-61850f0e63cb h1:2L+ltfNKab56FoBBqAvbBLjoAbxwwoZie+B8d+Mp3JI= github.com/matrix-org/pinecone v0.11.1-0.20230111184901-61850f0e63cb/go.mod h1:F3GHppRuHCTDeoOmmgjZMeJdbql91+RSGGsATWfC7oc= github.com/matrix-org/util v0.0.0-20221111132719-399730281e66 h1:6z4KxomXSIGWqhHcfzExgkH3Z3UkIXry4ibJS4Aqz2Y= diff --git a/relayapi/internal/perform.go b/relayapi/internal/perform.go index c00d0d0ed..62c7d446e 100644 --- a/relayapi/internal/perform.go +++ b/relayapi/internal/perform.go @@ -52,7 +52,7 @@ func (r *RelayInternalAPI) PerformRelayServerSync( logrus.Errorf("P2PGetTransactionFromRelay: %s", err.Error()) return err } - r.processTransaction(&asyncResponse.Txn) + r.processTransaction(&asyncResponse.Transaction) prevEntry = gomatrixserverlib.RelayEntry{EntryID: asyncResponse.EntryID} for asyncResponse.EntriesQueued { @@ -64,7 +64,7 @@ func (r *RelayInternalAPI) PerformRelayServerSync( logrus.Errorf("P2PGetTransactionFromRelay: %s", err.Error()) return err } - r.processTransaction(&asyncResponse.Txn) + r.processTransaction(&asyncResponse.Transaction) } return nil diff --git a/relayapi/internal/perform_test.go b/relayapi/internal/perform_test.go index 5673b1994..278706a3e 100644 --- a/relayapi/internal/perform_test.go +++ b/relayapi/internal/perform_test.go @@ -46,8 +46,8 @@ func (f *testFedClient) P2PGetTransactionFromRelay( } res = gomatrixserverlib.RespGetRelayTransaction{ - Txn: gomatrixserverlib.Transaction{}, - EntryID: 0, + Transaction: gomatrixserverlib.Transaction{}, + EntryID: 0, } if f.queueDepth > 0 { res.EntriesQueued = true diff --git a/relayapi/routing/relaytxn.go b/relayapi/routing/relaytxn.go index 76241a33b..63b42ec7d 100644 --- a/relayapi/routing/relaytxn.go +++ b/relayapi/routing/relaytxn.go @@ -25,12 +25,6 @@ import ( "github.com/sirupsen/logrus" ) -type RelayTransactionResponse struct { - Transaction gomatrixserverlib.Transaction `json:"transaction"` - EntryID int64 `json:"entry_id,omitempty"` - EntriesQueued bool `json:"entries_queued"` -} - // GetTransactionFromRelay implements GET /_matrix/federation/v1/relay_txn/{userID} // This endpoint can be extracted into a separate relay server service. func GetTransactionFromRelay( @@ -41,7 +35,7 @@ func GetTransactionFromRelay( ) util.JSONResponse { logrus.Infof("Processing relay_txn for %s", userID.Raw()) - previousEntry := gomatrixserverlib.RelayEntry{} + var previousEntry gomatrixserverlib.RelayEntry if err := json.Unmarshal(fedReq.Content(), &previousEntry); err != nil { return util.JSONResponse{ Code: http.StatusInternalServerError, @@ -65,7 +59,7 @@ func GetTransactionFromRelay( return util.JSONResponse{ Code: http.StatusOK, - JSON: RelayTransactionResponse{ + JSON: gomatrixserverlib.RespGetRelayTransaction{ Transaction: response.Transaction, EntryID: response.EntryID, EntriesQueued: response.EntriesQueued, diff --git a/relayapi/routing/relaytxn_test.go b/relayapi/routing/relaytxn_test.go index 3cb4c8adc..4c099a642 100644 --- a/relayapi/routing/relaytxn_test.go +++ b/relayapi/routing/relaytxn_test.go @@ -64,7 +64,7 @@ func TestGetEmptyDatabaseReturnsNothing(t *testing.T) { response := routing.GetTransactionFromRelay(httpReq, &request, relayAPI, *userID) assert.Equal(t, http.StatusOK, response.Code) - jsonResponse := response.JSON.(routing.RelayTransactionResponse) + jsonResponse := response.JSON.(gomatrixserverlib.RespGetRelayTransaction) assert.Equal(t, false, jsonResponse.EntriesQueued) assert.Equal(t, gomatrixserverlib.Transaction{}, jsonResponse.Transaction) @@ -130,7 +130,7 @@ func TestGetReturnsSavedTransaction(t *testing.T) { response := routing.GetTransactionFromRelay(httpReq, &request, relayAPI, *userID) assert.Equal(t, http.StatusOK, response.Code) - jsonResponse := response.JSON.(routing.RelayTransactionResponse) + jsonResponse := response.JSON.(gomatrixserverlib.RespGetRelayTransaction) assert.True(t, jsonResponse.EntriesQueued) assert.Equal(t, transaction, jsonResponse.Transaction) @@ -139,7 +139,7 @@ func TestGetReturnsSavedTransaction(t *testing.T) { response = routing.GetTransactionFromRelay(httpReq, &request, relayAPI, *userID) assert.Equal(t, http.StatusOK, response.Code) - jsonResponse = response.JSON.(routing.RelayTransactionResponse) + jsonResponse = response.JSON.(gomatrixserverlib.RespGetRelayTransaction) assert.False(t, jsonResponse.EntriesQueued) assert.Equal(t, gomatrixserverlib.Transaction{}, jsonResponse.Transaction) @@ -193,7 +193,7 @@ func TestGetReturnsMultipleSavedTransactions(t *testing.T) { response := routing.GetTransactionFromRelay(httpReq, &request, relayAPI, *userID) assert.Equal(t, http.StatusOK, response.Code) - jsonResponse := response.JSON.(routing.RelayTransactionResponse) + jsonResponse := response.JSON.(gomatrixserverlib.RespGetRelayTransaction) assert.True(t, jsonResponse.EntriesQueued) assert.Equal(t, transaction, jsonResponse.Transaction) @@ -201,7 +201,7 @@ func TestGetReturnsMultipleSavedTransactions(t *testing.T) { response = routing.GetTransactionFromRelay(httpReq, &request, relayAPI, *userID) assert.Equal(t, http.StatusOK, response.Code) - jsonResponse = response.JSON.(routing.RelayTransactionResponse) + jsonResponse = response.JSON.(gomatrixserverlib.RespGetRelayTransaction) assert.True(t, jsonResponse.EntriesQueued) assert.Equal(t, transaction2, jsonResponse.Transaction) @@ -210,7 +210,7 @@ func TestGetReturnsMultipleSavedTransactions(t *testing.T) { response = routing.GetTransactionFromRelay(httpReq, &request, relayAPI, *userID) assert.Equal(t, http.StatusOK, response.Code) - jsonResponse = response.JSON.(routing.RelayTransactionResponse) + jsonResponse = response.JSON.(gomatrixserverlib.RespGetRelayTransaction) assert.False(t, jsonResponse.EntriesQueued) assert.Equal(t, gomatrixserverlib.Transaction{}, jsonResponse.Transaction) diff --git a/relayapi/routing/routing.go b/relayapi/routing/routing.go index c908e950a..8ee0743ef 100644 --- a/relayapi/routing/routing.go +++ b/relayapi/routing/routing.go @@ -34,10 +34,6 @@ import ( // The provided publicAPIMux MUST have `UseEncodedPath()` enabled or else routes will incorrectly // path unescape twice (once from the router, once from MakeRelayAPI). We need to have this enabled // so we can decode paths like foo/bar%2Fbaz as [foo, bar/baz] - by default it will decode to [foo, bar, baz] -// -// Due to Setup being used to call many other functions, a gocyclo nolint is -// applied: -// nolint: gocyclo func Setup( fedMux *mux.Router, cfg *config.FederationAPI, diff --git a/relayapi/routing/sendrelay.go b/relayapi/routing/sendrelay.go index 85a722c8a..ce744cb49 100644 --- a/relayapi/routing/sendrelay.go +++ b/relayapi/routing/sendrelay.go @@ -36,11 +36,7 @@ func SendTransactionToRelay( ) util.JSONResponse { logrus.Infof("Processing send_relay for %s", userID.Raw()) - var txnEvents struct { - PDUs []json.RawMessage `json:"pdus"` - EDUs []gomatrixserverlib.EDU `json:"edus"` - } - + var txnEvents gomatrixserverlib.RelayEvents if err := json.Unmarshal(fedReq.Content(), &txnEvents); err != nil { logrus.Info("The request body could not be decoded into valid JSON." + err.Error()) return util.JSONResponse{ diff --git a/roomserver/storage/postgres/deltas/20230131091021_published_appservice_pkey.go b/roomserver/storage/postgres/deltas/20230131091021_published_appservice_pkey.go new file mode 100644 index 000000000..add66446b --- /dev/null +++ b/roomserver/storage/postgres/deltas/20230131091021_published_appservice_pkey.go @@ -0,0 +1,32 @@ +// Copyright 2023 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package deltas + +import ( + "context" + "database/sql" + "fmt" +) + +func UpPulishedAppservicePrimaryKey(ctx context.Context, tx *sql.Tx) error { + _, err := tx.ExecContext(ctx, `ALTER TABLE roomserver_published RENAME CONSTRAINT roomserver_published_pkey TO roomserver_published_pkeyold; +CREATE UNIQUE INDEX roomserver_published_pkey ON roomserver_published (room_id, appservice_id, network_id); +ALTER TABLE roomserver_published DROP CONSTRAINT roomserver_published_pkeyold; +ALTER TABLE roomserver_published ADD PRIMARY KEY USING INDEX roomserver_published_pkey;`) + if err != nil { + return fmt.Errorf("failed to execute upgrade: %w", err) + } + return nil +} diff --git a/roomserver/storage/postgres/published_table.go b/roomserver/storage/postgres/published_table.go index 61caccb0e..eca81d81f 100644 --- a/roomserver/storage/postgres/published_table.go +++ b/roomserver/storage/postgres/published_table.go @@ -65,10 +65,16 @@ func CreatePublishedTable(db *sql.DB) error { return err } m := sqlutil.NewMigrator(db) - m.AddMigrations(sqlutil.Migration{ - Version: "roomserver: published appservice", - Up: deltas.UpPulishedAppservice, - }) + m.AddMigrations([]sqlutil.Migration{ + { + Version: "roomserver: published appservice", + Up: deltas.UpPulishedAppservice, + }, + { + Version: "roomserver: published appservice pkey", + Up: deltas.UpPulishedAppservicePrimaryKey, + }, + }...) return m.Up(context.Background()) }