diff --git a/build/gobind/monolith.go b/build/gobind/monolith.go index 430fb0f3f..378cebea1 100644 --- a/build/gobind/monolith.go +++ b/build/gobind/monolith.go @@ -25,7 +25,6 @@ import ( "github.com/matrix-org/dendrite/userapi" "github.com/matrix-org/gomatrixserverlib" "github.com/sirupsen/logrus" - "go.uber.org/atomic" ) type DendriteMonolith struct { @@ -34,8 +33,6 @@ type DendriteMonolith struct { StorageDirectory string listener net.Listener httpServer *http.Server - httpListening atomic.Bool - yggListening atomic.Bool } func (m *DendriteMonolith) BaseURL() string { @@ -46,6 +43,10 @@ func (m *DendriteMonolith) PeerCount() int { return m.YggdrasilNode.PeerCount() } +func (m *DendriteMonolith) SessionCount() int { + return m.YggdrasilNode.SessionCount() +} + func (m *DendriteMonolith) SetMulticastEnabled(enabled bool) { m.YggdrasilNode.SetMulticastEnabled(enabled) } @@ -86,7 +87,7 @@ func (m *DendriteMonolith) Start() { cfg.Matrix.ServerName = gomatrixserverlib.ServerName(ygg.DerivedServerName()) cfg.Matrix.PrivateKey = ygg.SigningPrivateKey() cfg.Matrix.KeyID = gomatrixserverlib.KeyID(signing.KeyID) - cfg.Matrix.FederationMaxRetries = 6 + cfg.Matrix.FederationMaxRetries = 8 cfg.Kafka.UseNaffka = true cfg.Kafka.Topics.OutputRoomEvent = "roomserverOutput" cfg.Kafka.Topics.OutputClientData = "clientapiOutput" diff --git a/cmd/dendrite-demo-yggdrasil/yggconn/node.go b/cmd/dendrite-demo-yggdrasil/yggconn/node.go index 6e413483b..a326dd442 100644 --- a/cmd/dendrite-demo-yggdrasil/yggconn/node.go +++ b/cmd/dendrite-demo-yggdrasil/yggconn/node.go @@ -32,6 +32,7 @@ import ( "github.com/lucas-clemente/quic-go" "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/convert" "github.com/matrix-org/gomatrixserverlib" + "go.uber.org/atomic" yggdrasilconfig "github.com/yggdrasil-network/yggdrasil-go/src/config" yggdrasilmulticast "github.com/yggdrasil-network/yggdrasil-go/src/multicast" @@ -41,18 +42,19 @@ import ( ) type Node struct { - core *yggdrasil.Core - config *yggdrasilconfig.NodeConfig - state *yggdrasilconfig.NodeState - multicast *yggdrasilmulticast.Multicast - log *gologme.Logger - listener quic.Listener - tlsConfig *tls.Config - quicConfig *quic.Config - sessions sync.Map // string -> quic.Session - coords sync.Map // string -> yggdrasil.Coords - incoming chan QUICStream - NewSession func(remote gomatrixserverlib.ServerName) + core *yggdrasil.Core + config *yggdrasilconfig.NodeConfig + state *yggdrasilconfig.NodeState + multicast *yggdrasilmulticast.Multicast + log *gologme.Logger + listener quic.Listener + tlsConfig *tls.Config + quicConfig *quic.Config + sessions sync.Map // string -> quic.Session + sessionCount atomic.Uint32 + coords sync.Map // string -> yggdrasil.Coords + incoming chan QUICStream + NewSession func(remote gomatrixserverlib.ServerName) } func (n *Node) Dialer(_, address string) (net.Conn, error) { @@ -178,6 +180,10 @@ func (n *Node) PeerCount() int { return len(n.core.GetPeers()) - 1 } +func (n *Node) SessionCount() int { + return int(n.sessionCount.Load()) +} + func (n *Node) KnownNodes() []gomatrixserverlib.ServerName { nodemap := map[string]struct{}{ "b5ae50589e50991dd9dd7d59c5c5f7a4521e8da5b603b7f57076272abc58b374": struct{}{}, diff --git a/cmd/dendrite-demo-yggdrasil/yggconn/session.go b/cmd/dendrite-demo-yggdrasil/yggconn/session.go index f3e4751dd..09b0bd22c 100644 --- a/cmd/dendrite-demo-yggdrasil/yggconn/session.go +++ b/cmd/dendrite-demo-yggdrasil/yggconn/session.go @@ -63,8 +63,10 @@ func (n *Node) listenFromYgg() { } func (n *Node) listenFromQUIC(session quic.Session, address string) { + n.sessionCount.Inc() n.sessions.Store(address, session) defer n.sessions.Delete(address) + defer n.sessionCount.Dec() for { st, err := session.AcceptStream(context.TODO()) if err != nil { @@ -123,29 +125,48 @@ func (n *Node) DialContext(ctx context.Context, network, address string) (net.Co // We either don't know the coords for the node, or we failed // to dial it before, in which case try to resolve the coords. if _, ok := n.coords.Load(address); !ok { - n.log.Infof("Searching for coords for %q", address) - dest, err := hex.DecodeString(address) - if err != nil { - return nil, err - } - if len(dest) != crypto.BoxPubKeyLen { - return nil, errors.New("invalid key length supplied") - } - var pubKey crypto.BoxPubKey - copy(pubKey[:], dest) - nodeID := crypto.GetNodeID(&pubKey) - nodeMask := &crypto.NodeID{} - for i := range nodeMask { - nodeMask[i] = 0xFF + var coords yggdrasil.Coords + var err error + + // First look and see if the node is something that we already + // know about from our direct switch peers. + for _, peer := range n.core.GetSwitchPeers() { + if peer.PublicKey.String() == address { + fmt.Println("*", peer.PublicKey.String(), address) + coords = peer.Coords + n.log.Infof("%q is a direct peer, coords are %s", address, coords.String()) + n.coords.Store(address, coords) + break + } } - fmt.Println("Resolving coords") - coords, err := n.core.Resolve(nodeID, nodeMask) - if err != nil { - return nil, fmt.Errorf("n.core.Resolve: %w", err) + // If it isn' a node that we know directly then try to search + // the network. + if coords == nil { + n.log.Infof("Searching for coords for %q", address) + dest, derr := hex.DecodeString(address) + if derr != nil { + return nil, derr + } + if len(dest) != crypto.BoxPubKeyLen { + return nil, errors.New("invalid key length supplied") + } + var pubKey crypto.BoxPubKey + copy(pubKey[:], dest) + nodeID := crypto.GetNodeID(&pubKey) + nodeMask := &crypto.NodeID{} + for i := range nodeMask { + nodeMask[i] = 0xFF + } + + fmt.Println("Resolving coords") + coords, err = n.core.Resolve(nodeID, nodeMask) + if err != nil { + return nil, fmt.Errorf("n.core.Resolve: %w", err) + } + fmt.Println("Found coords:", coords) + n.coords.Store(address, coords) } - fmt.Println("Found coords:", coords) - n.coords.Store(address, coords) // We now know the coords in theory. Let's try dialling the // node again.