Use connection manager in Pinecone demos

This commit is contained in:
Neil Alexander 2022-04-08 13:55:46 +01:00
parent 4312b87ab0
commit 6907e42167
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
3 changed files with 16 additions and 101 deletions

View file

@ -52,6 +52,7 @@ import (
"golang.org/x/net/http2" "golang.org/x/net/http2"
"golang.org/x/net/http2/h2c" "golang.org/x/net/http2/h2c"
pineconeConnections "github.com/matrix-org/pinecone/connections"
pineconeMulticast "github.com/matrix-org/pinecone/multicast" pineconeMulticast "github.com/matrix-org/pinecone/multicast"
pineconeRouter "github.com/matrix-org/pinecone/router" pineconeRouter "github.com/matrix-org/pinecone/router"
pineconeSessions "github.com/matrix-org/pinecone/sessions" pineconeSessions "github.com/matrix-org/pinecone/sessions"
@ -71,11 +72,9 @@ type DendriteMonolith struct {
PineconeRouter *pineconeRouter.Router PineconeRouter *pineconeRouter.Router
PineconeMulticast *pineconeMulticast.Multicast PineconeMulticast *pineconeMulticast.Multicast
PineconeQUIC *pineconeSessions.Sessions PineconeQUIC *pineconeSessions.Sessions
PineconeManager *pineconeConnections.ConnectionManager
StorageDirectory string StorageDirectory string
CacheDirectory string CacheDirectory string
staticPeerURI string
staticPeerMutex sync.RWMutex
staticPeerAttempt chan struct{}
listener net.Listener listener net.Listener
httpServer *http.Server httpServer *http.Server
processContext *process.ProcessContext processContext *process.ProcessContext
@ -104,15 +103,8 @@ func (m *DendriteMonolith) SetMulticastEnabled(enabled bool) {
} }
func (m *DendriteMonolith) SetStaticPeer(uri string) { func (m *DendriteMonolith) SetStaticPeer(uri string) {
m.staticPeerMutex.Lock() m.PineconeManager.RemovePeers()
m.staticPeerURI = strings.TrimSpace(uri) m.PineconeManager.AddPeer(strings.TrimSpace(uri))
m.staticPeerMutex.Unlock()
m.DisconnectType(int(pineconeRouter.PeerTypeRemote))
if uri != "" {
go func() {
m.staticPeerAttempt <- struct{}{}
}()
}
} }
func (m *DendriteMonolith) DisconnectType(peertype int) { func (m *DendriteMonolith) DisconnectType(peertype int) {
@ -210,43 +202,6 @@ func (m *DendriteMonolith) RegisterDevice(localpart, deviceID string) (string, e
return loginRes.Device.AccessToken, nil return loginRes.Device.AccessToken, nil
} }
func (m *DendriteMonolith) staticPeerConnect() {
connected := map[string]bool{} // URI -> connected?
attempt := func() {
m.staticPeerMutex.RLock()
uri := m.staticPeerURI
m.staticPeerMutex.RUnlock()
if uri == "" {
return
}
for k := range connected {
delete(connected, k)
}
for _, uri := range strings.Split(uri, ",") {
connected[strings.TrimSpace(uri)] = false
}
for _, info := range m.PineconeRouter.Peers() {
connected[info.URI] = true
}
for k, online := range connected {
if !online {
if err := conn.ConnectToPeer(m.PineconeRouter, k); err != nil {
logrus.WithError(err).Error("Failed to connect to static peer")
}
}
}
}
for {
select {
case <-m.processContext.Context().Done():
case <-m.staticPeerAttempt:
attempt()
case <-time.After(time.Second * 5):
attempt()
}
}
}
// nolint:gocyclo // nolint:gocyclo
func (m *DendriteMonolith) Start() { func (m *DendriteMonolith) Start() {
var err error var err error
@ -284,6 +239,7 @@ func (m *DendriteMonolith) Start() {
m.PineconeRouter = pineconeRouter.NewRouter(logrus.WithField("pinecone", "router"), sk, false) m.PineconeRouter = pineconeRouter.NewRouter(logrus.WithField("pinecone", "router"), sk, false)
m.PineconeQUIC = pineconeSessions.NewSessions(logrus.WithField("pinecone", "sessions"), m.PineconeRouter, []string{"matrix"}) m.PineconeQUIC = pineconeSessions.NewSessions(logrus.WithField("pinecone", "sessions"), m.PineconeRouter, []string{"matrix"})
m.PineconeMulticast = pineconeMulticast.NewMulticast(logrus.WithField("pinecone", "multicast"), m.PineconeRouter) m.PineconeMulticast = pineconeMulticast.NewMulticast(logrus.WithField("pinecone", "multicast"), m.PineconeRouter)
m.PineconeManager = pineconeConnections.NewConnectionManager(m.PineconeRouter)
prefix := hex.EncodeToString(pk) prefix := hex.EncodeToString(pk)
cfg := &config.Dendrite{} cfg := &config.Dendrite{}
@ -392,9 +348,6 @@ func (m *DendriteMonolith) Start() {
m.processContext = base.ProcessContext m.processContext = base.ProcessContext
m.staticPeerAttempt = make(chan struct{}, 1)
go m.staticPeerConnect()
go func() { go func() {
m.logger.Info("Listening on ", cfg.Global.ServerName) m.logger.Info("Listening on ", cfg.Global.ServerName)
m.logger.Fatal(m.httpServer.Serve(m.PineconeQUIC.Protocol("matrix"))) m.logger.Fatal(m.httpServer.Serve(m.PineconeQUIC.Protocol("matrix")))

View file

@ -25,7 +25,6 @@ import (
"net" "net"
"net/http" "net/http"
"os" "os"
"strings"
"time" "time"
"github.com/gorilla/mux" "github.com/gorilla/mux"
@ -47,6 +46,7 @@ import (
"github.com/matrix-org/dendrite/userapi" "github.com/matrix-org/dendrite/userapi"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
pineconeConnections "github.com/matrix-org/pinecone/connections"
pineconeMulticast "github.com/matrix-org/pinecone/multicast" pineconeMulticast "github.com/matrix-org/pinecone/multicast"
pineconeRouter "github.com/matrix-org/pinecone/router" pineconeRouter "github.com/matrix-org/pinecone/router"
pineconeSessions "github.com/matrix-org/pinecone/sessions" pineconeSessions "github.com/matrix-org/pinecone/sessions"
@ -90,6 +90,13 @@ func main() {
} }
pRouter := pineconeRouter.NewRouter(logrus.WithField("pinecone", "router"), sk, false) pRouter := pineconeRouter.NewRouter(logrus.WithField("pinecone", "router"), sk, false)
pQUIC := pineconeSessions.NewSessions(logrus.WithField("pinecone", "sessions"), pRouter, []string{"matrix"})
pMulticast := pineconeMulticast.NewMulticast(logrus.WithField("pinecone", "multicast"), pRouter)
pManager := pineconeConnections.NewConnectionManager(pRouter)
pMulticast.Start()
if instancePeer != nil && *instancePeer != "" {
pManager.AddPeer(*instancePeer)
}
go func() { go func() {
listener, err := net.Listen("tcp", *instanceListen) listener, err := net.Listen("tcp", *instanceListen)
@ -119,36 +126,6 @@ func main() {
} }
}() }()
pQUIC := pineconeSessions.NewSessions(logrus.WithField("pinecone", "sessions"), pRouter, []string{"matrix"})
pMulticast := pineconeMulticast.NewMulticast(logrus.WithField("pinecone", "multicast"), pRouter)
pMulticast.Start()
connectToStaticPeer := func() {
connected := map[string]bool{} // URI -> connected?
for _, uri := range strings.Split(*instancePeer, ",") {
connected[strings.TrimSpace(uri)] = false
}
attempt := func() {
for k := range connected {
connected[k] = false
}
for _, info := range pRouter.Peers() {
connected[info.URI] = true
}
for k, online := range connected {
if !online {
if err := conn.ConnectToPeer(pRouter, k); err != nil {
logrus.WithError(err).Error("Failed to connect to static peer")
}
}
}
}
for {
attempt()
time.Sleep(time.Second * 5)
}
}
cfg := &config.Dendrite{} cfg := &config.Dendrite{}
cfg.Defaults(true) cfg.Defaults(true)
cfg.Global.ServerName = gomatrixserverlib.ServerName(hex.EncodeToString(pk)) cfg.Global.ServerName = gomatrixserverlib.ServerName(hex.EncodeToString(pk))
@ -268,7 +245,6 @@ func main() {
Handler: pMux, Handler: pMux,
} }
go connectToStaticPeer()
go func() { go func() {
pubkey := pRouter.PublicKey() pubkey := pRouter.PublicKey()
logrus.Info("Listening on ", hex.EncodeToString(pubkey[:])) logrus.Info("Listening on ", hex.EncodeToString(pubkey[:]))

View file

@ -22,7 +22,6 @@ import (
"encoding/hex" "encoding/hex"
"fmt" "fmt"
"syscall/js" "syscall/js"
"time"
"github.com/gorilla/mux" "github.com/gorilla/mux"
"github.com/matrix-org/dendrite/appservice" "github.com/matrix-org/dendrite/appservice"
@ -44,6 +43,7 @@ import (
_ "github.com/matrix-org/go-sqlite3-js" _ "github.com/matrix-org/go-sqlite3-js"
pineconeConnections "github.com/matrix-org/pinecone/connections"
pineconeRouter "github.com/matrix-org/pinecone/router" pineconeRouter "github.com/matrix-org/pinecone/router"
pineconeSessions "github.com/matrix-org/pinecone/sessions" pineconeSessions "github.com/matrix-org/pinecone/sessions"
) )
@ -154,6 +154,8 @@ func startup() {
pRouter := pineconeRouter.NewRouter(logrus.WithField("pinecone", "router"), sk, false) pRouter := pineconeRouter.NewRouter(logrus.WithField("pinecone", "router"), sk, false)
pSessions := pineconeSessions.NewSessions(logrus.WithField("pinecone", "sessions"), pRouter, []string{"matrix"}) pSessions := pineconeSessions.NewSessions(logrus.WithField("pinecone", "sessions"), pRouter, []string{"matrix"})
pManager := pineconeConnections.NewConnectionManager(pRouter)
pManager.AddPeer("wss://pinecone.matrix.org/public")
cfg := &config.Dendrite{} cfg := &config.Dendrite{}
cfg.Defaults(true) cfg.Defaults(true)
@ -237,20 +239,4 @@ func startup() {
} }
s.ListenAndServe("fetch") s.ListenAndServe("fetch")
}() }()
// Connect to the static peer
go func() {
for {
if pRouter.PeerCount(pineconeRouter.PeerTypeRemote) == 0 {
if err := conn.ConnectToPeer(pRouter, publicPeer); err != nil {
logrus.WithError(err).Error("Failed to connect to static peer")
}
}
select {
case <-base.ProcessContext.Context().Done():
return
case <-time.After(time.Second * 5):
}
}
}()
} }