Direct peer lookup, other tweaks

This commit is contained in:
Neil Alexander 2020-08-05 15:41:09 +01:00
parent a662defa8c
commit a0048c3b70
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
3 changed files with 64 additions and 36 deletions

View file

@ -25,7 +25,6 @@ import (
"github.com/matrix-org/dendrite/userapi" "github.com/matrix-org/dendrite/userapi"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"go.uber.org/atomic"
) )
type DendriteMonolith struct { type DendriteMonolith struct {
@ -34,8 +33,6 @@ type DendriteMonolith struct {
StorageDirectory string StorageDirectory string
listener net.Listener listener net.Listener
httpServer *http.Server httpServer *http.Server
httpListening atomic.Bool
yggListening atomic.Bool
} }
func (m *DendriteMonolith) BaseURL() string { func (m *DendriteMonolith) BaseURL() string {
@ -46,6 +43,10 @@ func (m *DendriteMonolith) PeerCount() int {
return m.YggdrasilNode.PeerCount() return m.YggdrasilNode.PeerCount()
} }
func (m *DendriteMonolith) SessionCount() int {
return m.YggdrasilNode.SessionCount()
}
func (m *DendriteMonolith) SetMulticastEnabled(enabled bool) { func (m *DendriteMonolith) SetMulticastEnabled(enabled bool) {
m.YggdrasilNode.SetMulticastEnabled(enabled) m.YggdrasilNode.SetMulticastEnabled(enabled)
} }
@ -86,7 +87,7 @@ func (m *DendriteMonolith) Start() {
cfg.Matrix.ServerName = gomatrixserverlib.ServerName(ygg.DerivedServerName()) cfg.Matrix.ServerName = gomatrixserverlib.ServerName(ygg.DerivedServerName())
cfg.Matrix.PrivateKey = ygg.SigningPrivateKey() cfg.Matrix.PrivateKey = ygg.SigningPrivateKey()
cfg.Matrix.KeyID = gomatrixserverlib.KeyID(signing.KeyID) cfg.Matrix.KeyID = gomatrixserverlib.KeyID(signing.KeyID)
cfg.Matrix.FederationMaxRetries = 6 cfg.Matrix.FederationMaxRetries = 8
cfg.Kafka.UseNaffka = true cfg.Kafka.UseNaffka = true
cfg.Kafka.Topics.OutputRoomEvent = "roomserverOutput" cfg.Kafka.Topics.OutputRoomEvent = "roomserverOutput"
cfg.Kafka.Topics.OutputClientData = "clientapiOutput" cfg.Kafka.Topics.OutputClientData = "clientapiOutput"

View file

@ -32,6 +32,7 @@ import (
"github.com/lucas-clemente/quic-go" "github.com/lucas-clemente/quic-go"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/convert" "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/convert"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"go.uber.org/atomic"
yggdrasilconfig "github.com/yggdrasil-network/yggdrasil-go/src/config" yggdrasilconfig "github.com/yggdrasil-network/yggdrasil-go/src/config"
yggdrasilmulticast "github.com/yggdrasil-network/yggdrasil-go/src/multicast" yggdrasilmulticast "github.com/yggdrasil-network/yggdrasil-go/src/multicast"
@ -50,6 +51,7 @@ type Node struct {
tlsConfig *tls.Config tlsConfig *tls.Config
quicConfig *quic.Config quicConfig *quic.Config
sessions sync.Map // string -> quic.Session sessions sync.Map // string -> quic.Session
sessionCount atomic.Uint32
coords sync.Map // string -> yggdrasil.Coords coords sync.Map // string -> yggdrasil.Coords
incoming chan QUICStream incoming chan QUICStream
NewSession func(remote gomatrixserverlib.ServerName) NewSession func(remote gomatrixserverlib.ServerName)
@ -178,6 +180,10 @@ func (n *Node) PeerCount() int {
return len(n.core.GetPeers()) - 1 return len(n.core.GetPeers()) - 1
} }
func (n *Node) SessionCount() int {
return int(n.sessionCount.Load())
}
func (n *Node) KnownNodes() []gomatrixserverlib.ServerName { func (n *Node) KnownNodes() []gomatrixserverlib.ServerName {
nodemap := map[string]struct{}{ nodemap := map[string]struct{}{
"b5ae50589e50991dd9dd7d59c5c5f7a4521e8da5b603b7f57076272abc58b374": struct{}{}, "b5ae50589e50991dd9dd7d59c5c5f7a4521e8da5b603b7f57076272abc58b374": struct{}{},

View file

@ -63,8 +63,10 @@ func (n *Node) listenFromYgg() {
} }
func (n *Node) listenFromQUIC(session quic.Session, address string) { func (n *Node) listenFromQUIC(session quic.Session, address string) {
n.sessionCount.Inc()
n.sessions.Store(address, session) n.sessions.Store(address, session)
defer n.sessions.Delete(address) defer n.sessions.Delete(address)
defer n.sessionCount.Dec()
for { for {
st, err := session.AcceptStream(context.TODO()) st, err := session.AcceptStream(context.TODO())
if err != nil { if err != nil {
@ -123,10 +125,28 @@ func (n *Node) DialContext(ctx context.Context, network, address string) (net.Co
// We either don't know the coords for the node, or we failed // We either don't know the coords for the node, or we failed
// to dial it before, in which case try to resolve the coords. // to dial it before, in which case try to resolve the coords.
if _, ok := n.coords.Load(address); !ok { if _, ok := n.coords.Load(address); !ok {
var coords yggdrasil.Coords
var err error
// First look and see if the node is something that we already
// know about from our direct switch peers.
for _, peer := range n.core.GetSwitchPeers() {
if peer.PublicKey.String() == address {
fmt.Println("*", peer.PublicKey.String(), address)
coords = peer.Coords
n.log.Infof("%q is a direct peer, coords are %s", address, coords.String())
n.coords.Store(address, coords)
break
}
}
// If it isn' a node that we know directly then try to search
// the network.
if coords == nil {
n.log.Infof("Searching for coords for %q", address) n.log.Infof("Searching for coords for %q", address)
dest, err := hex.DecodeString(address) dest, derr := hex.DecodeString(address)
if err != nil { if derr != nil {
return nil, err return nil, derr
} }
if len(dest) != crypto.BoxPubKeyLen { if len(dest) != crypto.BoxPubKeyLen {
return nil, errors.New("invalid key length supplied") return nil, errors.New("invalid key length supplied")
@ -140,12 +160,13 @@ func (n *Node) DialContext(ctx context.Context, network, address string) (net.Co
} }
fmt.Println("Resolving coords") fmt.Println("Resolving coords")
coords, err := n.core.Resolve(nodeID, nodeMask) coords, err = n.core.Resolve(nodeID, nodeMask)
if err != nil { if err != nil {
return nil, fmt.Errorf("n.core.Resolve: %w", err) return nil, fmt.Errorf("n.core.Resolve: %w", err)
} }
fmt.Println("Found coords:", coords) fmt.Println("Found coords:", coords)
n.coords.Store(address, coords) n.coords.Store(address, coords)
}
// We now know the coords in theory. Let's try dialling the // We now know the coords in theory. Let's try dialling the
// node again. // node again.