diff --git a/build/gobind-pinecone/monolith.go b/build/gobind-pinecone/monolith.go index c15707e56..09af80f6c 100644 --- a/build/gobind-pinecone/monolith.go +++ b/build/gobind-pinecone/monolith.go @@ -10,7 +10,6 @@ import ( "io" "io/ioutil" "log" - "math" "net" "net/http" "os" @@ -37,15 +36,14 @@ import ( userapiAPI "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" "github.com/sirupsen/logrus" - "go.uber.org/atomic" "golang.org/x/net/http2" "golang.org/x/net/http2/h2c" pineconeMulticast "github.com/matrix-org/pinecone/multicast" + "github.com/matrix-org/pinecone/router" pineconeRouter "github.com/matrix-org/pinecone/router" pineconeSessions "github.com/matrix-org/pinecone/sessions" "github.com/matrix-org/pinecone/types" - pineconeTypes "github.com/matrix-org/pinecone/types" _ "golang.org/x/mobile/bind" ) @@ -57,19 +55,19 @@ const ( ) type DendriteMonolith struct { - logger logrus.Logger - PineconeRouter *pineconeRouter.Router - PineconeMulticast *pineconeMulticast.Multicast - PineconeQUIC *pineconeSessions.Sessions - StorageDirectory string - CacheDirectory string - staticPeerURI string - staticPeerMutex sync.RWMutex - staticPeerAttempts atomic.Uint32 - listener net.Listener - httpServer *http.Server - processContext *process.ProcessContext - userAPI userapiAPI.UserInternalAPI + logger logrus.Logger + PineconeRouter *pineconeRouter.Router + PineconeMulticast *pineconeMulticast.Multicast + PineconeQUIC *pineconeSessions.Sessions + StorageDirectory string + CacheDirectory string + staticPeerURI string + staticPeerMutex sync.RWMutex + staticPeerAttempt chan struct{} + listener net.Listener + httpServer *http.Server + processContext *process.ProcessContext + userAPI userapiAPI.UserInternalAPI } func (m *DendriteMonolith) BaseURL() string { @@ -99,7 +97,9 @@ func (m *DendriteMonolith) SetStaticPeer(uri string) { m.staticPeerMutex.Unlock() m.DisconnectType(pineconeRouter.PeerTypeRemote) if uri != "" { - go m.staticPeerConnect() + go func() { + m.staticPeerAttempt <- struct{}{} + }() } } @@ -195,17 +195,27 @@ func (m *DendriteMonolith) RegisterDevice(localpart, deviceID string) (string, e } func (m *DendriteMonolith) staticPeerConnect() { - m.staticPeerMutex.RLock() - uri := m.staticPeerURI - m.staticPeerMutex.RUnlock() - if uri == "" { - return + attempt := func() { + if m.PineconeRouter.PeerCount(router.PeerTypeRemote) == 0 { + m.staticPeerMutex.RLock() + uri := m.staticPeerURI + m.staticPeerMutex.RUnlock() + if uri == "" { + return + } + if err := conn.ConnectToPeer(m.PineconeRouter, uri); err != nil { + logrus.WithError(err).Error("Failed to connect to static peer") + } + } } - if err := conn.ConnectToPeer(m.PineconeRouter, uri); err != nil { - exp := time.Second * time.Duration(math.Exp2(float64(m.staticPeerAttempts.Inc()))) - time.AfterFunc(exp, m.staticPeerConnect) - } else { - m.staticPeerAttempts.Store(0) + for { + select { + case <-m.processContext.Context().Done(): + case <-m.staticPeerAttempt: + attempt() + case <-time.After(time.Second * 5): + attempt() + } } } @@ -248,13 +258,6 @@ func (m *DendriteMonolith) Start() { m.PineconeQUIC = pineconeSessions.NewSessions(logger, m.PineconeRouter) m.PineconeMulticast = pineconeMulticast.NewMulticast(logger, m.PineconeRouter) - m.PineconeRouter.SetDisconnectedCallback(func(port pineconeTypes.SwitchPortID, public pineconeTypes.PublicKey, peertype int, err error) { - if peertype == pineconeRouter.PeerTypeRemote { - m.staticPeerAttempts.Store(0) - time.AfterFunc(time.Second, m.staticPeerConnect) - } - }) - prefix := hex.EncodeToString(pk) cfg := &config.Dendrite{} cfg.Defaults() @@ -359,8 +362,12 @@ func (m *DendriteMonolith) Start() { }, Handler: h2c.NewHandler(pMux, h2s), } + m.processContext = base.ProcessContext + m.staticPeerAttempt = make(chan struct{}, 1) + go m.staticPeerConnect() + go func() { m.logger.Info("Listening on ", cfg.Global.ServerName) m.logger.Fatal(m.httpServer.Serve(m.PineconeQUIC)) diff --git a/cmd/dendrite-demo-pinecone/main.go b/cmd/dendrite-demo-pinecone/main.go index 7851fdb19..f40f9190a 100644 --- a/cmd/dendrite-demo-pinecone/main.go +++ b/cmd/dendrite-demo-pinecone/main.go @@ -23,7 +23,6 @@ import ( "fmt" "io/ioutil" "log" - "math" "net" "net/http" "os" @@ -48,12 +47,11 @@ import ( "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/userapi" "github.com/matrix-org/gomatrixserverlib" - "go.uber.org/atomic" pineconeMulticast "github.com/matrix-org/pinecone/multicast" + "github.com/matrix-org/pinecone/router" pineconeRouter "github.com/matrix-org/pinecone/router" pineconeSessions "github.com/matrix-org/pinecone/sessions" - pineconeTypes "github.com/matrix-org/pinecone/types" "github.com/sirupsen/logrus" ) @@ -123,27 +121,23 @@ func main() { pMulticast := pineconeMulticast.NewMulticast(logger, pRouter) pMulticast.Start() - var staticPeerAttempts atomic.Uint32 - var connectToStaticPeer func() - connectToStaticPeer = func() { - uri := *instancePeer - if uri == "" { - return + connectToStaticPeer := func() { + attempt := func() { + if pRouter.PeerCount(router.PeerTypeRemote) == 0 { + uri := *instancePeer + if uri == "" { + return + } + if err := conn.ConnectToPeer(pRouter, uri); err != nil { + logrus.WithError(err).Error("Failed to connect to static peer") + } + } } - if err := conn.ConnectToPeer(pRouter, uri); err != nil { - exp := time.Second * time.Duration(math.Exp2(float64(staticPeerAttempts.Inc()))) - time.AfterFunc(exp, connectToStaticPeer) - } else { - staticPeerAttempts.Store(0) + for { + attempt() + time.Sleep(time.Second * 5) } } - pRouter.SetDisconnectedCallback(func(port pineconeTypes.SwitchPortID, public pineconeTypes.PublicKey, peertype int, err error) { - if peertype == pineconeRouter.PeerTypeRemote && err != nil { - staticPeerAttempts.Store(0) - time.AfterFunc(time.Second, connectToStaticPeer) - } - }) - go connectToStaticPeer() cfg := &config.Dendrite{} cfg.Defaults() @@ -257,6 +251,7 @@ func main() { Handler: pMux, } + go connectToStaticPeer() go func() { pubkey := pRouter.PublicKey() logrus.Info("Listening on ", hex.EncodeToString(pubkey[:])) diff --git a/go.mod b/go.mod index 7273da647..48c8bbf7d 100644 --- a/go.mod +++ b/go.mod @@ -25,7 +25,7 @@ require ( github.com/matrix-org/gomatrix v0.0.0-20200827122206-7dd5e2a05bcd github.com/matrix-org/gomatrixserverlib v0.0.0-20210525110027-8cb7699aa64a github.com/matrix-org/naffka v0.0.0-20201009174903-d26a3b9cb161 - github.com/matrix-org/pinecone v0.0.0-20210602111459-5cb0e6aa1a6a + github.com/matrix-org/pinecone v0.0.0-20210614112651-5da1fab608c6 github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 github.com/mattn/go-sqlite3 v1.14.7-0.20210414154423-1157a4212dcb github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646 diff --git a/go.sum b/go.sum index eba9a60b1..0e7ee0f6c 100644 --- a/go.sum +++ b/go.sum @@ -706,8 +706,8 @@ github.com/matrix-org/gomatrixserverlib v0.0.0-20210525110027-8cb7699aa64a h1:pV github.com/matrix-org/gomatrixserverlib v0.0.0-20210525110027-8cb7699aa64a/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU= github.com/matrix-org/naffka v0.0.0-20201009174903-d26a3b9cb161 h1:h1XVh05pLoC+nJjP3GIpj5wUsuC8WdHP3He0RTkRJTs= github.com/matrix-org/naffka v0.0.0-20201009174903-d26a3b9cb161/go.mod h1:sjyPyRxKM5uw1nD2cJ6O2OxI6GOqyVBfNXqKjBZTBZE= -github.com/matrix-org/pinecone v0.0.0-20210602111459-5cb0e6aa1a6a h1:BE/cfpyHO2ua1BK4Tibr+2oZCV3H1mC9G7g7Yvl1AmM= -github.com/matrix-org/pinecone v0.0.0-20210602111459-5cb0e6aa1a6a/go.mod h1:UQzJS6UVyVwfkr+RLrdvBB1vLyECqe3fLYNcbRxv8SA= +github.com/matrix-org/pinecone v0.0.0-20210614112651-5da1fab608c6 h1:ytVf81AkLmMAs0KeCYW6po0X3foMSKz0HccnipfsOVc= +github.com/matrix-org/pinecone v0.0.0-20210614112651-5da1fab608c6/go.mod h1:UQzJS6UVyVwfkr+RLrdvBB1vLyECqe3fLYNcbRxv8SA= github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U= github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 h1:eCEHXWDv9Rm335MSuB49mFUK44bwZPFSDde3ORE3syk= github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U=