Stay connected to static peers more stickily

This commit is contained in:
Neil Alexander 2021-03-18 14:33:52 +00:00
parent f2b10d854e
commit 9f2b61fb37
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
3 changed files with 50 additions and 26 deletions

View file

@ -11,6 +11,7 @@ import (
"io" "io"
"io/ioutil" "io/ioutil"
"log" "log"
"math"
"net" "net"
"net/http" "net/http"
"os" "os"
@ -94,12 +95,12 @@ func (m *DendriteMonolith) SetMulticastEnabled(enabled bool) {
} }
func (m *DendriteMonolith) SetStaticPeer(uri string) { func (m *DendriteMonolith) SetStaticPeer(uri string) {
m.staticPeerMutex.Lock()
m.staticPeerURI = uri
m.staticPeerMutex.Unlock()
m.DisconnectType(pineconeRouter.PeerTypeRemote) m.DisconnectType(pineconeRouter.PeerTypeRemote)
if uri != "" { if uri != "" {
m.staticPeerMutex.Lock() m.staticPeerConnect()
m.staticPeerURI = uri
m.staticPeerMutex.Unlock()
go conn.ConnectToPeer(m.PineconeRouter, uri)
} }
} }
@ -194,6 +195,21 @@ func (m *DendriteMonolith) RegisterDevice(localpart, deviceID string) (string, e
return loginRes.Device.AccessToken, nil return loginRes.Device.AccessToken, nil
} }
func (m *DendriteMonolith) staticPeerConnect() {
m.staticPeerMutex.RLock()
uri := m.staticPeerURI
m.staticPeerMutex.RUnlock()
if uri == "" {
return
}
if err := conn.ConnectToPeer(m.PineconeRouter, uri); err != nil {
exp := time.Second * time.Duration(math.Exp2(float64(m.staticPeerAttempts.Inc())))
time.AfterFunc(exp, m.staticPeerConnect)
} else {
m.staticPeerAttempts.Store(0)
}
}
// nolint:gocyclo // nolint:gocyclo
func (m *DendriteMonolith) Start() { func (m *DendriteMonolith) Start() {
m.config = yggdrasilConfig.GenerateConfig() m.config = yggdrasilConfig.GenerateConfig()
@ -256,11 +272,9 @@ func (m *DendriteMonolith) Start() {
m.PineconeMulticast = pineconeMulticast.NewMulticast(logger, m.PineconeRouter) m.PineconeMulticast = pineconeMulticast.NewMulticast(logger, m.PineconeRouter)
m.PineconeRouter.SetDisconnectedCallback(func(port pineconeTypes.SwitchPortID, public pineconeTypes.PublicKey, peertype int, err error) { m.PineconeRouter.SetDisconnectedCallback(func(port pineconeTypes.SwitchPortID, public pineconeTypes.PublicKey, peertype int, err error) {
m.staticPeerMutex.RLock() if peertype == pineconeRouter.PeerTypeRemote && err != nil {
uri := m.staticPeerURI m.staticPeerAttempts.Store(0)
m.staticPeerMutex.RUnlock() time.AfterFunc(time.Second, m.staticPeerConnect)
if peertype == pineconeRouter.PeerTypeRemote && uri != "" && err != nil {
conn.ConnectToPeer(m.PineconeRouter, uri)
} }
}) })

View file

@ -1,6 +1,7 @@
package conn package conn
import ( import (
"fmt"
"net" "net"
"net/http" "net/http"
"strings" "strings"
@ -8,35 +9,31 @@ import (
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"github.com/matrix-org/dendrite/setup" "github.com/matrix-org/dendrite/setup"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
pineconeRouter "github.com/matrix-org/pinecone/router" pineconeRouter "github.com/matrix-org/pinecone/router"
pineconeSessions "github.com/matrix-org/pinecone/sessions" pineconeSessions "github.com/matrix-org/pinecone/sessions"
) )
func ConnectToPeer(pRouter *pineconeRouter.Router, peer string) { func ConnectToPeer(pRouter *pineconeRouter.Router, peer string) error {
var parent net.Conn var parent net.Conn
if strings.HasPrefix(peer, "ws://") || strings.HasPrefix(peer, "wss://") { if strings.HasPrefix(peer, "ws://") || strings.HasPrefix(peer, "wss://") {
c, _, err := websocket.DefaultDialer.Dial(peer, nil) c, _, err := websocket.DefaultDialer.Dial(peer, nil)
if err != nil { if err != nil {
logrus.WithError(err).Errorf("Failed to connect to Pinecone static peer %q via WebSockets", peer) return fmt.Errorf("websocket.DefaultDialer.Dial: %w", err)
return
} }
parent = WrapWebSocketConn(c) parent = WrapWebSocketConn(c)
} else { } else {
var err error var err error
parent, err = net.Dial("tcp", peer) parent, err = net.Dial("tcp", peer)
if err != nil { if err != nil {
logrus.WithError(err).Errorf("Failed to connect to Pinecone static peer %q via TCP", peer) return fmt.Errorf("net.Dial: %w", err)
return
} }
} }
if parent == nil { if parent == nil {
return return fmt.Errorf("failed to wrap connection")
}
if _, err := pRouter.AuthenticatedConnect(parent, "static", pineconeRouter.PeerTypeRemote); err != nil {
logrus.WithError(err).Errorf("Failed to connect Pinecone static peer to switch")
} }
_, err := pRouter.AuthenticatedConnect(parent, "static", pineconeRouter.PeerTypeRemote)
return err
} }
type RoundTripper struct { type RoundTripper struct {

View file

@ -23,6 +23,7 @@ import (
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"log" "log"
"math"
"net" "net"
"net/http" "net/http"
"os" "os"
@ -47,6 +48,7 @@ import (
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/userapi" "github.com/matrix-org/dendrite/userapi"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"go.uber.org/atomic"
pineconeMulticast "github.com/matrix-org/pinecone/multicast" pineconeMulticast "github.com/matrix-org/pinecone/multicast"
pineconeRouter "github.com/matrix-org/pinecone/router" pineconeRouter "github.com/matrix-org/pinecone/router"
@ -92,10 +94,6 @@ func main() {
logger := log.New(os.Stdout, "", 0) logger := log.New(os.Stdout, "", 0)
pRouter := pineconeRouter.NewRouter(logger, "dendrite", sk, pk, nil) pRouter := pineconeRouter.NewRouter(logger, "dendrite", sk, pk, nil)
if instancePeer != nil && *instancePeer != "" {
go conn.ConnectToPeer(pRouter, *instancePeer)
}
go func() { go func() {
listener, err := net.Listen("tcp", *instanceListen) listener, err := net.Listen("tcp", *instanceListen)
if err != nil { if err != nil {
@ -125,12 +123,27 @@ func main() {
pMulticast := pineconeMulticast.NewMulticast(logger, pRouter) pMulticast := pineconeMulticast.NewMulticast(logger, pRouter)
pMulticast.Start() pMulticast.Start()
pRouter.SetDisconnectedCallback(func(port pineconeTypes.SwitchPortID, public pineconeTypes.PublicKey, peertype int, err error) { var staticPeerAttempts atomic.Uint32
var connectToStaticPeer func()
connectToStaticPeer = func() {
uri := *instancePeer uri := *instancePeer
if peertype == pineconeRouter.PeerTypeRemote && uri != "" && err != nil { if uri == "" {
conn.ConnectToPeer(pRouter, uri) return
}
if err := conn.ConnectToPeer(pRouter, uri); err != nil {
exp := time.Second * time.Duration(math.Exp2(float64(staticPeerAttempts.Inc())))
time.AfterFunc(exp, connectToStaticPeer)
} else {
staticPeerAttempts.Store(0)
}
}
pRouter.SetDisconnectedCallback(func(port pineconeTypes.SwitchPortID, public pineconeTypes.PublicKey, peertype int, err error) {
if peertype == pineconeRouter.PeerTypeRemote && err != nil {
staticPeerAttempts.Store(0)
time.AfterFunc(time.Second, connectToStaticPeer)
} }
}) })
go connectToStaticPeer()
cfg := &config.Dendrite{} cfg := &config.Dendrite{}
cfg.Defaults() cfg.Defaults()