From e2d9f86569937b16a1460824356db1862f5a6c95 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Tue, 9 Jun 2020 13:27:30 +0100 Subject: [PATCH] Updates to yamux --- cmd/dendrite-demo-yggdrasil/yggconn/node.go | 6 +- .../yggconn/session.go | 64 ++++++++++--------- cmd/dendrite-demo-yggdrasil/yggconn/stream.go | 9 ++- go.mod | 1 + go.sum | 2 + 5 files changed, 43 insertions(+), 39 deletions(-) diff --git a/cmd/dendrite-demo-yggdrasil/yggconn/node.go b/cmd/dendrite-demo-yggdrasil/yggconn/node.go index 44581abc8..0066d4966 100644 --- a/cmd/dendrite-demo-yggdrasil/yggconn/node.go +++ b/cmd/dendrite-demo-yggdrasil/yggconn/node.go @@ -10,6 +10,7 @@ import ( "os" "sync" + "github.com/libp2p/go-yamux" yggdrasiladmin "github.com/yggdrasil-network/yggdrasil-go/src/admin" yggdrasilconfig "github.com/yggdrasil-network/yggdrasil-go/src/config" yggdrasilmulticast "github.com/yggdrasil-network/yggdrasil-go/src/multicast" @@ -27,9 +28,8 @@ type Node struct { log *gologme.Logger listener *yggdrasil.Listener dialer *yggdrasil.Dialer - conns sync.Map // string -> yggdrasil.Conn sessions sync.Map // string -> yamux.Session - incoming chan *stream + incoming chan *yamux.Stream } func Setup(instanceName string) (*Node, error) { @@ -38,7 +38,7 @@ func Setup(instanceName string) (*Node, error) { admin: &yggdrasiladmin.AdminSocket{}, multicast: &yggdrasilmulticast.Multicast{}, log: gologme.New(os.Stdout, "YGG ", log.Flags()), - incoming: make(chan *stream), + incoming: make(chan *yamux.Stream), } n.config.AdminListen = fmt.Sprintf("unix://./%s-yggdrasil.sock", instanceName) diff --git a/cmd/dendrite-demo-yggdrasil/yggconn/session.go b/cmd/dendrite-demo-yggdrasil/yggconn/session.go index 9af352602..0f03ec595 100644 --- a/cmd/dendrite-demo-yggdrasil/yggconn/session.go +++ b/cmd/dendrite-demo-yggdrasil/yggconn/session.go @@ -2,35 +2,47 @@ package yggconn import ( "context" - "errors" + "fmt" "net" + "time" "github.com/libp2p/go-yamux" ) +func (n *Node) yamuxConfig() *yamux.Config { + cfg := yamux.DefaultConfig() + cfg.EnableKeepAlive = true + cfg.KeepAliveInterval = time.Second + return cfg +} + func (n *Node) listenFromYgg() { for { conn, err := n.listener.Accept() if err != nil { + fmt.Println("n.listener.Accept:", err) return } - session, err := yamux.Server(conn, nil) + session, err := yamux.Server(conn, n.yamuxConfig()) if err != nil { + fmt.Println("yamux.Server:", err) return } - n.conns.Store(conn.RemoteAddr(), conn) - n.sessions.Store(conn.RemoteAddr(), session) - go n.listenFromYggConn(session, conn) + go n.listenFromYggConn(session) } } -func (n *Node) listenFromYggConn(session *yamux.Session, conn net.Conn) { +func (n *Node) listenFromYggConn(session *yamux.Session) { + n.sessions.Store(session.RemoteAddr().String(), session) + defer n.sessions.Delete(session.RemoteAddr()) + for { st, err := session.AcceptStream() if err != nil { + fmt.Println("session.AcceptStream:", err) return } - n.incoming <- &stream{st, conn} + n.incoming <- st } } @@ -51,34 +63,24 @@ func (n *Node) Dial(network, address string) (net.Conn, error) { } func (n *Node) DialContext(ctx context.Context, network, address string) (net.Conn, error) { - conn, err := n.dialer.DialContext(ctx, network, address) - if err != nil { - c, ok := n.conns.Load(address) - if !ok { - return nil, errors.New("conn not found") + s, ok1 := n.sessions.Load(address) + session, ok2 := s.(*yamux.Session) + if !ok1 || !ok2 { + conn, err := n.dialer.DialContext(ctx, network, address) + if err != nil { + fmt.Println("n.dialer.DialContext:", err) + return nil, err } - conn, ok = c.(net.Conn) - if !ok { - return nil, errors.New("conn type assertion error") + session, err = yamux.Client(conn, n.yamuxConfig()) + if err != nil { + fmt.Println("yamux.Client.AcceptStream:", err) + return nil, err } - } else { - client, cerr := yamux.Client(conn, nil) - if cerr != nil { - return nil, cerr - } - n.sessions.Store(address, client) + go n.listenFromYggConn(session) } - s, ok := n.sessions.Load(address) - if !ok { - return nil, errors.New("session not found") - } - session, ok := s.(*yamux.Session) - if !ok { - return nil, errors.New("session type assertion error") - } - ch, err := session.OpenStream() + st, err := session.OpenStream() if err != nil { return nil, err } - return &stream{ch, conn}, nil + return st, nil } diff --git a/cmd/dendrite-demo-yggdrasil/yggconn/stream.go b/cmd/dendrite-demo-yggdrasil/yggconn/stream.go index c61407385..6c3d06639 100644 --- a/cmd/dendrite-demo-yggdrasil/yggconn/stream.go +++ b/cmd/dendrite-demo-yggdrasil/yggconn/stream.go @@ -1,19 +1,17 @@ package yggconn import ( - "net" - "time" - "github.com/libp2p/go-yamux" ) type stream struct { *yamux.Stream - conn net.Conn + //conn net.Conn } +/* func (c *stream) LocalAddr() net.Addr { - return c.conn.LocalAddr() + return c.LocalAddr() } func (c *stream) RemoteAddr() net.Addr { @@ -31,3 +29,4 @@ func (c *stream) SetReadDeadline(t time.Time) error { func (c *stream) SetWriteDeadline(t time.Time) error { return c.conn.SetWriteDeadline(t) } +*/ diff --git a/go.mod b/go.mod index 42fc25f05..2d77d84a7 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,7 @@ require ( github.com/libp2p/go-libp2p-kad-dht v0.5.0 github.com/libp2p/go-libp2p-pubsub v0.2.5 github.com/libp2p/go-libp2p-record v0.1.2 + github.com/libp2p/go-yamux v1.3.7 github.com/matrix-org/dugong v0.0.0-20171220115018-ea0a4690a0d5 github.com/matrix-org/go-http-js-libp2p v0.0.0-20200518170932-783164aeeda4 github.com/matrix-org/go-sqlite3-js v0.0.0-20200522092705-bc8506ccbcf3 diff --git a/go.sum b/go.sum index bd515949c..644e70f37 100644 --- a/go.sum +++ b/go.sum @@ -356,6 +356,8 @@ github.com/libp2p/go-yamux v1.2.3 h1:xX8A36vpXb59frIzWFdEgptLMsOANMFq2K7fPRlunYI github.com/libp2p/go-yamux v1.2.3/go.mod h1:FGTiPvoV/3DVdgWpX+tM0OW3tsM+W5bSE3gZwqQTcow= github.com/libp2p/go-yamux v1.3.0 h1:FsYzT16Wq2XqUGJsBbOxoz9g+dFklvNi7jN6YFPfl7U= github.com/libp2p/go-yamux v1.3.0/go.mod h1:FGTiPvoV/3DVdgWpX+tM0OW3tsM+W5bSE3gZwqQTcow= +github.com/libp2p/go-yamux v1.3.7 h1:v40A1eSPJDIZwz2AvrV3cxpTZEGDP11QJbukmEhYyQI= +github.com/libp2p/go-yamux v1.3.7/go.mod h1:fr7aVgmdNGJK+N1g+b6DW6VxzbRCjCOejR/hkmpooHE= github.com/lxn/walk v0.0.0-20191128110447-55ccb3a9f5c1/go.mod h1:E23UucZGqpuUANJooIbHWCufXvOcT6E7Stq81gU+CSQ= github.com/lxn/win v0.0.0-20191128105842-2da648fda5b4/go.mod h1:ouWl4wViUNh8tPSIwxTVMuS014WakR1hqvBc2I0bMoA= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=