From 9bc1c36ff6fbf8de130c7131b67b7a7ce9e31e1e Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Thu, 25 Nov 2021 09:46:26 +0000 Subject: [PATCH] Support connecting to multiple Pinecone static peers in the P2P demos (supply a comma-separated list) --- build/gobind-pinecone/monolith.go | 47 +++++++++++++++-------- cmd/dendrite-demo-pinecone/conn/client.go | 7 +++- cmd/dendrite-demo-pinecone/main.go | 35 +++++++++++------ 3 files changed, 62 insertions(+), 27 deletions(-) diff --git a/build/gobind-pinecone/monolith.go b/build/gobind-pinecone/monolith.go index 47ca20dbd..cd2809e46 100644 --- a/build/gobind-pinecone/monolith.go +++ b/build/gobind-pinecone/monolith.go @@ -13,6 +13,7 @@ import ( "net" "net/http" "os" + "strings" "sync" "time" @@ -75,7 +76,7 @@ func (m *DendriteMonolith) BaseURL() string { } func (m *DendriteMonolith) PeerCount(peertype int) int { - return m.PineconeRouter.PeerCount(peertype) + return m.PineconeRouter.PeerCount(pineconeRouter.ConnectionPeerType(peertype)) } func (m *DendriteMonolith) SessionCount() int { @@ -87,15 +88,15 @@ func (m *DendriteMonolith) SetMulticastEnabled(enabled bool) { m.PineconeMulticast.Start() } else { m.PineconeMulticast.Stop() - m.DisconnectType(pineconeRouter.PeerTypeMulticast) + m.DisconnectType(int(pineconeRouter.PeerTypeMulticast)) } } func (m *DendriteMonolith) SetStaticPeer(uri string) { m.staticPeerMutex.Lock() - m.staticPeerURI = uri + m.staticPeerURI = strings.TrimSpace(uri) m.staticPeerMutex.Unlock() - m.DisconnectType(pineconeRouter.PeerTypeRemote) + m.DisconnectType(int(pineconeRouter.PeerTypeRemote)) if uri != "" { go func() { m.staticPeerAttempt <- struct{}{} @@ -105,7 +106,7 @@ func (m *DendriteMonolith) SetStaticPeer(uri string) { func (m *DendriteMonolith) DisconnectType(peertype int) { for _, p := range m.PineconeRouter.Peers() { - if peertype == p.PeerType { + if int(peertype) == p.PeerType { m.PineconeRouter.Disconnect(types.SwitchPortID(p.Port), nil) } } @@ -133,7 +134,11 @@ func (m *DendriteMonolith) Conduit(zone string, peertype int) (*Conduit, error) for i := 1; i <= 10; i++ { logrus.Errorf("Attempting authenticated connect (attempt %d)", i) var err error - conduit.port, err = m.PineconeRouter.AuthenticatedConnect(l, zone, peertype, true) + conduit.port, err = m.PineconeRouter.Connect( + l, + pineconeRouter.ConnectionZone(zone), + pineconeRouter.ConnectionPeerType(peertype), + ) switch err { case io.ErrClosedPipe: logrus.Errorf("Authenticated connect failed due to closed pipe (attempt %d)", i) @@ -195,16 +200,28 @@ func (m *DendriteMonolith) RegisterDevice(localpart, deviceID string) (string, e } func (m *DendriteMonolith) staticPeerConnect() { + connected := map[string]bool{} // URI -> connected? attempt := func() { - if m.PineconeRouter.PeerCount(pineconeRouter.PeerTypeRemote) == 0 { - m.staticPeerMutex.RLock() - uri := m.staticPeerURI - m.staticPeerMutex.RUnlock() - if uri == "" { - return - } - if err := conn.ConnectToPeer(m.PineconeRouter, uri); err != nil { - logrus.WithError(err).Error("Failed to connect to static peer") + m.staticPeerMutex.RLock() + uri := m.staticPeerURI + m.staticPeerMutex.RUnlock() + if uri == "" { + return + } + for k := range connected { + connected[k] = false + } + for _, uri := range strings.Split(uri, ",") { + connected[strings.TrimSpace(uri)] = false + } + for _, info := range m.PineconeRouter.Peers() { + connected[info.URI] = true + } + for k, online := range connected { + if !online { + if err := conn.ConnectToPeer(m.PineconeRouter, k); err != nil { + logrus.WithError(err).Error("Failed to connect to static peer") + } } } } diff --git a/cmd/dendrite-demo-pinecone/conn/client.go b/cmd/dendrite-demo-pinecone/conn/client.go index 40ccb9c02..14b648a3b 100644 --- a/cmd/dendrite-demo-pinecone/conn/client.go +++ b/cmd/dendrite-demo-pinecone/conn/client.go @@ -34,7 +34,12 @@ func ConnectToPeer(pRouter *pineconeRouter.Router, peer string) error { if parent == nil { return fmt.Errorf("failed to wrap connection") } - _, err := pRouter.AuthenticatedConnect(parent, "static", pineconeRouter.PeerTypeRemote, true) + _, err := pRouter.Connect( + parent, + pineconeRouter.ConnectionZone("static"), + pineconeRouter.PeerTypeRemote, + pineconeRouter.ConnectionURI(peer), + ) return err } diff --git a/cmd/dendrite-demo-pinecone/main.go b/cmd/dendrite-demo-pinecone/main.go index 2b56ef35d..8ed3f349e 100644 --- a/cmd/dendrite-demo-pinecone/main.go +++ b/cmd/dendrite-demo-pinecone/main.go @@ -26,6 +26,7 @@ import ( "net" "net/http" "os" + "strings" "time" "github.com/gorilla/mux" @@ -61,7 +62,7 @@ import ( var ( instanceName = flag.String("name", "dendrite-p2p-pinecone", "the name of this P2P demo instance") instancePort = flag.Int("port", 8008, "the port that the client API will listen on") - instancePeer = flag.String("peer", "", "the static Pinecone peer to connect to") + instancePeer = flag.String("peer", "", "the static Pinecone peers to connect to, comma separated-list") instanceListen = flag.String("listen", ":0", "the port Pinecone peers can connect to") ) @@ -109,9 +110,9 @@ func main() { continue } - port, err := pRouter.AuthenticatedConnect(conn, "", pineconeRouter.PeerTypeRemote, true) + port, err := pRouter.Connect(conn, pineconeRouter.PeerTypeRemote) if err != nil { - logrus.WithError(err).Error("pSwitch.AuthenticatedConnect failed") + logrus.WithError(err).Error("pSwitch.Connect failed") continue } @@ -124,14 +125,22 @@ func main() { pMulticast.Start() connectToStaticPeer := func() { + connected := map[string]bool{} // URI -> connected? + for _, uri := range strings.Split(*instancePeer, ",") { + connected[strings.TrimSpace(uri)] = false + } attempt := func() { - if pRouter.PeerCount(pineconeRouter.PeerTypeRemote) == 0 { - uri := *instancePeer - if uri == "" { - return - } - if err := conn.ConnectToPeer(pRouter, uri); err != nil { - logrus.WithError(err).Error("Failed to connect to static peer") + for k := range connected { + connected[k] = false + } + for _, info := range pRouter.Peers() { + connected[info.URI] = true + } + for k, online := range connected { + if !online { + if err := conn.ConnectToPeer(pRouter, k); err != nil { + logrus.WithError(err).Error("Failed to connect to static peer") + } } } } @@ -230,7 +239,11 @@ func main() { return } conn := conn.WrapWebSocketConn(c) - if _, err = pRouter.AuthenticatedConnect(conn, "websocket", pineconeRouter.PeerTypeRemote, true); err != nil { + if _, err = pRouter.Connect( + conn, + pineconeRouter.ConnectionZone("websocket"), + pineconeRouter.PeerTypeRemote, + ); err != nil { logrus.WithError(err).Error("Failed to connect WebSocket peer to Pinecone switch") } })