diff --git a/build/gobind-pinecone/monolith.go b/build/gobind-pinecone/monolith.go index 4f61e6936..8ae010111 100644 --- a/build/gobind-pinecone/monolith.go +++ b/build/gobind-pinecone/monolith.go @@ -11,6 +11,7 @@ import ( "io" "io/ioutil" "log" + "math" "net" "net/http" "os" @@ -94,12 +95,12 @@ func (m *DendriteMonolith) SetMulticastEnabled(enabled bool) { } func (m *DendriteMonolith) SetStaticPeer(uri string) { + m.staticPeerMutex.Lock() + m.staticPeerURI = uri + m.staticPeerMutex.Unlock() m.DisconnectType(pineconeRouter.PeerTypeRemote) if uri != "" { - m.staticPeerMutex.Lock() - m.staticPeerURI = uri - m.staticPeerMutex.Unlock() - go conn.ConnectToPeer(m.PineconeRouter, uri) + m.staticPeerConnect() } } @@ -194,6 +195,21 @@ func (m *DendriteMonolith) RegisterDevice(localpart, deviceID string) (string, e return loginRes.Device.AccessToken, nil } +func (m *DendriteMonolith) staticPeerConnect() { + m.staticPeerMutex.RLock() + uri := m.staticPeerURI + m.staticPeerMutex.RUnlock() + if uri == "" { + return + } + if err := conn.ConnectToPeer(m.PineconeRouter, uri); err != nil { + exp := time.Second * time.Duration(math.Exp2(float64(m.staticPeerAttempts.Inc()))) + time.AfterFunc(exp, m.staticPeerConnect) + } else { + m.staticPeerAttempts.Store(0) + } +} + // nolint:gocyclo func (m *DendriteMonolith) Start() { m.config = yggdrasilConfig.GenerateConfig() @@ -256,11 +272,9 @@ func (m *DendriteMonolith) Start() { m.PineconeMulticast = pineconeMulticast.NewMulticast(logger, m.PineconeRouter) m.PineconeRouter.SetDisconnectedCallback(func(port pineconeTypes.SwitchPortID, public pineconeTypes.PublicKey, peertype int, err error) { - m.staticPeerMutex.RLock() - uri := m.staticPeerURI - m.staticPeerMutex.RUnlock() - if peertype == pineconeRouter.PeerTypeRemote && uri != "" && err != nil { - conn.ConnectToPeer(m.PineconeRouter, uri) + if peertype == pineconeRouter.PeerTypeRemote && err != nil { + m.staticPeerAttempts.Store(0) + time.AfterFunc(time.Second, m.staticPeerConnect) } }) diff --git a/cmd/dendrite-demo-pinecone/conn/client.go b/cmd/dendrite-demo-pinecone/conn/client.go index 9b51c27c0..bf23085d0 100644 --- a/cmd/dendrite-demo-pinecone/conn/client.go +++ b/cmd/dendrite-demo-pinecone/conn/client.go @@ -1,6 +1,7 @@ package conn import ( + "fmt" "net" "net/http" "strings" @@ -8,35 +9,31 @@ import ( "github.com/gorilla/websocket" "github.com/matrix-org/dendrite/setup" "github.com/matrix-org/gomatrixserverlib" - "github.com/sirupsen/logrus" pineconeRouter "github.com/matrix-org/pinecone/router" pineconeSessions "github.com/matrix-org/pinecone/sessions" ) -func ConnectToPeer(pRouter *pineconeRouter.Router, peer string) { +func ConnectToPeer(pRouter *pineconeRouter.Router, peer string) error { var parent net.Conn if strings.HasPrefix(peer, "ws://") || strings.HasPrefix(peer, "wss://") { c, _, err := websocket.DefaultDialer.Dial(peer, nil) if err != nil { - logrus.WithError(err).Errorf("Failed to connect to Pinecone static peer %q via WebSockets", peer) - return + return fmt.Errorf("websocket.DefaultDialer.Dial: %w", err) } parent = WrapWebSocketConn(c) } else { var err error parent, err = net.Dial("tcp", peer) if err != nil { - logrus.WithError(err).Errorf("Failed to connect to Pinecone static peer %q via TCP", peer) - return + return fmt.Errorf("net.Dial: %w", err) } } if parent == nil { - return - } - if _, err := pRouter.AuthenticatedConnect(parent, "static", pineconeRouter.PeerTypeRemote); err != nil { - logrus.WithError(err).Errorf("Failed to connect Pinecone static peer to switch") + return fmt.Errorf("failed to wrap connection") } + _, err := pRouter.AuthenticatedConnect(parent, "static", pineconeRouter.PeerTypeRemote) + return err } type RoundTripper struct { diff --git a/cmd/dendrite-demo-pinecone/main.go b/cmd/dendrite-demo-pinecone/main.go index 6d065886a..074fb0289 100644 --- a/cmd/dendrite-demo-pinecone/main.go +++ b/cmd/dendrite-demo-pinecone/main.go @@ -23,6 +23,7 @@ import ( "fmt" "io/ioutil" "log" + "math" "net" "net/http" "os" @@ -47,6 +48,7 @@ import ( "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/userapi" "github.com/matrix-org/gomatrixserverlib" + "go.uber.org/atomic" pineconeMulticast "github.com/matrix-org/pinecone/multicast" pineconeRouter "github.com/matrix-org/pinecone/router" @@ -92,10 +94,6 @@ func main() { logger := log.New(os.Stdout, "", 0) pRouter := pineconeRouter.NewRouter(logger, "dendrite", sk, pk, nil) - if instancePeer != nil && *instancePeer != "" { - go conn.ConnectToPeer(pRouter, *instancePeer) - } - go func() { listener, err := net.Listen("tcp", *instanceListen) if err != nil { @@ -125,12 +123,27 @@ func main() { pMulticast := pineconeMulticast.NewMulticast(logger, pRouter) pMulticast.Start() - pRouter.SetDisconnectedCallback(func(port pineconeTypes.SwitchPortID, public pineconeTypes.PublicKey, peertype int, err error) { + var staticPeerAttempts atomic.Uint32 + var connectToStaticPeer func() + connectToStaticPeer = func() { uri := *instancePeer - if peertype == pineconeRouter.PeerTypeRemote && uri != "" && err != nil { - conn.ConnectToPeer(pRouter, uri) + if uri == "" { + return + } + if err := conn.ConnectToPeer(pRouter, uri); err != nil { + exp := time.Second * time.Duration(math.Exp2(float64(staticPeerAttempts.Inc()))) + time.AfterFunc(exp, connectToStaticPeer) + } else { + staticPeerAttempts.Store(0) + } + } + pRouter.SetDisconnectedCallback(func(port pineconeTypes.SwitchPortID, public pineconeTypes.PublicKey, peertype int, err error) { + if peertype == pineconeRouter.PeerTypeRemote && err != nil { + staticPeerAttempts.Store(0) + time.AfterFunc(time.Second, connectToStaticPeer) } }) + go connectToStaticPeer() cfg := &config.Dendrite{} cfg.Defaults()