diff --git a/build/gobind-pinecone/monolith.go b/build/gobind-pinecone/monolith.go index 608599498..9cc94d650 100644 --- a/build/gobind-pinecone/monolith.go +++ b/build/gobind-pinecone/monolith.go @@ -52,6 +52,7 @@ import ( "golang.org/x/net/http2" "golang.org/x/net/http2/h2c" + pineconeConnections "github.com/matrix-org/pinecone/connections" pineconeMulticast "github.com/matrix-org/pinecone/multicast" pineconeRouter "github.com/matrix-org/pinecone/router" pineconeSessions "github.com/matrix-org/pinecone/sessions" @@ -71,11 +72,9 @@ type DendriteMonolith struct { PineconeRouter *pineconeRouter.Router PineconeMulticast *pineconeMulticast.Multicast PineconeQUIC *pineconeSessions.Sessions + PineconeManager *pineconeConnections.ConnectionManager StorageDirectory string CacheDirectory string - staticPeerURI string - staticPeerMutex sync.RWMutex - staticPeerAttempt chan struct{} listener net.Listener httpServer *http.Server processContext *process.ProcessContext @@ -104,15 +103,8 @@ func (m *DendriteMonolith) SetMulticastEnabled(enabled bool) { } func (m *DendriteMonolith) SetStaticPeer(uri string) { - m.staticPeerMutex.Lock() - m.staticPeerURI = strings.TrimSpace(uri) - m.staticPeerMutex.Unlock() - m.DisconnectType(int(pineconeRouter.PeerTypeRemote)) - if uri != "" { - go func() { - m.staticPeerAttempt <- struct{}{} - }() - } + m.PineconeManager.RemovePeers() + m.PineconeManager.AddPeer(strings.TrimSpace(uri)) } func (m *DendriteMonolith) DisconnectType(peertype int) { @@ -210,43 +202,6 @@ func (m *DendriteMonolith) RegisterDevice(localpart, deviceID string) (string, e return loginRes.Device.AccessToken, nil } -func (m *DendriteMonolith) staticPeerConnect() { - connected := map[string]bool{} // URI -> connected? - attempt := func() { - m.staticPeerMutex.RLock() - uri := m.staticPeerURI - m.staticPeerMutex.RUnlock() - if uri == "" { - return - } - for k := range connected { - delete(connected, k) - } - for _, uri := range strings.Split(uri, ",") { - connected[strings.TrimSpace(uri)] = false - } - for _, info := range m.PineconeRouter.Peers() { - connected[info.URI] = true - } - for k, online := range connected { - if !online { - if err := conn.ConnectToPeer(m.PineconeRouter, k); err != nil { - logrus.WithError(err).Error("Failed to connect to static peer") - } - } - } - } - for { - select { - case <-m.processContext.Context().Done(): - case <-m.staticPeerAttempt: - attempt() - case <-time.After(time.Second * 5): - attempt() - } - } -} - // nolint:gocyclo func (m *DendriteMonolith) Start() { var err error @@ -284,6 +239,7 @@ func (m *DendriteMonolith) Start() { m.PineconeRouter = pineconeRouter.NewRouter(logrus.WithField("pinecone", "router"), sk, false) m.PineconeQUIC = pineconeSessions.NewSessions(logrus.WithField("pinecone", "sessions"), m.PineconeRouter, []string{"matrix"}) m.PineconeMulticast = pineconeMulticast.NewMulticast(logrus.WithField("pinecone", "multicast"), m.PineconeRouter) + m.PineconeManager = pineconeConnections.NewConnectionManager(m.PineconeRouter) prefix := hex.EncodeToString(pk) cfg := &config.Dendrite{} @@ -392,9 +348,6 @@ func (m *DendriteMonolith) Start() { m.processContext = base.ProcessContext - m.staticPeerAttempt = make(chan struct{}, 1) - go m.staticPeerConnect() - go func() { m.logger.Info("Listening on ", cfg.Global.ServerName) m.logger.Fatal(m.httpServer.Serve(m.PineconeQUIC.Protocol("matrix"))) diff --git a/cmd/dendrite-demo-pinecone/main.go b/cmd/dendrite-demo-pinecone/main.go index a3d3ed175..dd1ab3697 100644 --- a/cmd/dendrite-demo-pinecone/main.go +++ b/cmd/dendrite-demo-pinecone/main.go @@ -25,7 +25,6 @@ import ( "net" "net/http" "os" - "strings" "time" "github.com/gorilla/mux" @@ -47,6 +46,7 @@ import ( "github.com/matrix-org/dendrite/userapi" "github.com/matrix-org/gomatrixserverlib" + pineconeConnections "github.com/matrix-org/pinecone/connections" pineconeMulticast "github.com/matrix-org/pinecone/multicast" pineconeRouter "github.com/matrix-org/pinecone/router" pineconeSessions "github.com/matrix-org/pinecone/sessions" @@ -90,6 +90,13 @@ func main() { } pRouter := pineconeRouter.NewRouter(logrus.WithField("pinecone", "router"), sk, false) + pQUIC := pineconeSessions.NewSessions(logrus.WithField("pinecone", "sessions"), pRouter, []string{"matrix"}) + pMulticast := pineconeMulticast.NewMulticast(logrus.WithField("pinecone", "multicast"), pRouter) + pManager := pineconeConnections.NewConnectionManager(pRouter) + pMulticast.Start() + if instancePeer != nil && *instancePeer != "" { + pManager.AddPeer(*instancePeer) + } go func() { listener, err := net.Listen("tcp", *instanceListen) @@ -119,36 +126,6 @@ func main() { } }() - pQUIC := pineconeSessions.NewSessions(logrus.WithField("pinecone", "sessions"), pRouter, []string{"matrix"}) - pMulticast := pineconeMulticast.NewMulticast(logrus.WithField("pinecone", "multicast"), pRouter) - pMulticast.Start() - - connectToStaticPeer := func() { - connected := map[string]bool{} // URI -> connected? - for _, uri := range strings.Split(*instancePeer, ",") { - connected[strings.TrimSpace(uri)] = false - } - attempt := func() { - for k := range connected { - connected[k] = false - } - for _, info := range pRouter.Peers() { - connected[info.URI] = true - } - for k, online := range connected { - if !online { - if err := conn.ConnectToPeer(pRouter, k); err != nil { - logrus.WithError(err).Error("Failed to connect to static peer") - } - } - } - } - for { - attempt() - time.Sleep(time.Second * 5) - } - } - cfg := &config.Dendrite{} cfg.Defaults(true) cfg.Global.ServerName = gomatrixserverlib.ServerName(hex.EncodeToString(pk)) @@ -268,7 +245,6 @@ func main() { Handler: pMux, } - go connectToStaticPeer() go func() { pubkey := pRouter.PublicKey() logrus.Info("Listening on ", hex.EncodeToString(pubkey[:])) diff --git a/cmd/dendritejs-pinecone/main.go b/cmd/dendritejs-pinecone/main.go index ba9edf230..211b3e131 100644 --- a/cmd/dendritejs-pinecone/main.go +++ b/cmd/dendritejs-pinecone/main.go @@ -22,7 +22,6 @@ import ( "encoding/hex" "fmt" "syscall/js" - "time" "github.com/gorilla/mux" "github.com/matrix-org/dendrite/appservice" @@ -44,6 +43,7 @@ import ( _ "github.com/matrix-org/go-sqlite3-js" + pineconeConnections "github.com/matrix-org/pinecone/connections" pineconeRouter "github.com/matrix-org/pinecone/router" pineconeSessions "github.com/matrix-org/pinecone/sessions" ) @@ -154,6 +154,8 @@ func startup() { pRouter := pineconeRouter.NewRouter(logrus.WithField("pinecone", "router"), sk, false) pSessions := pineconeSessions.NewSessions(logrus.WithField("pinecone", "sessions"), pRouter, []string{"matrix"}) + pManager := pineconeConnections.NewConnectionManager(pRouter) + pManager.AddPeer("wss://pinecone.matrix.org/public") cfg := &config.Dendrite{} cfg.Defaults(true) @@ -237,20 +239,4 @@ func startup() { } s.ListenAndServe("fetch") }() - - // Connect to the static peer - go func() { - for { - if pRouter.PeerCount(pineconeRouter.PeerTypeRemote) == 0 { - if err := conn.ConnectToPeer(pRouter, publicPeer); err != nil { - logrus.WithError(err).Error("Failed to connect to static peer") - } - } - select { - case <-base.ProcessContext.Context().Done(): - return - case <-time.After(time.Second * 5): - } - } - }() }