Updates to yamux

This commit is contained in:
Neil Alexander 2020-06-09 13:27:30 +01:00
parent 23d2341811
commit e2d9f86569
5 changed files with 43 additions and 39 deletions

View file

@ -10,6 +10,7 @@ import (
"os"
"sync"
"github.com/libp2p/go-yamux"
yggdrasiladmin "github.com/yggdrasil-network/yggdrasil-go/src/admin"
yggdrasilconfig "github.com/yggdrasil-network/yggdrasil-go/src/config"
yggdrasilmulticast "github.com/yggdrasil-network/yggdrasil-go/src/multicast"
@ -27,9 +28,8 @@ type Node struct {
log *gologme.Logger
listener *yggdrasil.Listener
dialer *yggdrasil.Dialer
conns sync.Map // string -> yggdrasil.Conn
sessions sync.Map // string -> yamux.Session
incoming chan *stream
incoming chan *yamux.Stream
}
func Setup(instanceName string) (*Node, error) {
@ -38,7 +38,7 @@ func Setup(instanceName string) (*Node, error) {
admin: &yggdrasiladmin.AdminSocket{},
multicast: &yggdrasilmulticast.Multicast{},
log: gologme.New(os.Stdout, "YGG ", log.Flags()),
incoming: make(chan *stream),
incoming: make(chan *yamux.Stream),
}
n.config.AdminListen = fmt.Sprintf("unix://./%s-yggdrasil.sock", instanceName)

View file

@ -2,35 +2,47 @@ package yggconn
import (
"context"
"errors"
"fmt"
"net"
"time"
"github.com/libp2p/go-yamux"
)
func (n *Node) yamuxConfig() *yamux.Config {
cfg := yamux.DefaultConfig()
cfg.EnableKeepAlive = true
cfg.KeepAliveInterval = time.Second
return cfg
}
func (n *Node) listenFromYgg() {
for {
conn, err := n.listener.Accept()
if err != nil {
fmt.Println("n.listener.Accept:", err)
return
}
session, err := yamux.Server(conn, nil)
session, err := yamux.Server(conn, n.yamuxConfig())
if err != nil {
fmt.Println("yamux.Server:", err)
return
}
n.conns.Store(conn.RemoteAddr(), conn)
n.sessions.Store(conn.RemoteAddr(), session)
go n.listenFromYggConn(session, conn)
go n.listenFromYggConn(session)
}
}
func (n *Node) listenFromYggConn(session *yamux.Session, conn net.Conn) {
func (n *Node) listenFromYggConn(session *yamux.Session) {
n.sessions.Store(session.RemoteAddr().String(), session)
defer n.sessions.Delete(session.RemoteAddr())
for {
st, err := session.AcceptStream()
if err != nil {
fmt.Println("session.AcceptStream:", err)
return
}
n.incoming <- &stream{st, conn}
n.incoming <- st
}
}
@ -51,34 +63,24 @@ func (n *Node) Dial(network, address string) (net.Conn, error) {
}
func (n *Node) DialContext(ctx context.Context, network, address string) (net.Conn, error) {
conn, err := n.dialer.DialContext(ctx, network, address)
if err != nil {
c, ok := n.conns.Load(address)
if !ok {
return nil, errors.New("conn not found")
s, ok1 := n.sessions.Load(address)
session, ok2 := s.(*yamux.Session)
if !ok1 || !ok2 {
conn, err := n.dialer.DialContext(ctx, network, address)
if err != nil {
fmt.Println("n.dialer.DialContext:", err)
return nil, err
}
conn, ok = c.(net.Conn)
if !ok {
return nil, errors.New("conn type assertion error")
session, err = yamux.Client(conn, n.yamuxConfig())
if err != nil {
fmt.Println("yamux.Client.AcceptStream:", err)
return nil, err
}
} else {
client, cerr := yamux.Client(conn, nil)
if cerr != nil {
return nil, cerr
}
n.sessions.Store(address, client)
go n.listenFromYggConn(session)
}
s, ok := n.sessions.Load(address)
if !ok {
return nil, errors.New("session not found")
}
session, ok := s.(*yamux.Session)
if !ok {
return nil, errors.New("session type assertion error")
}
ch, err := session.OpenStream()
st, err := session.OpenStream()
if err != nil {
return nil, err
}
return &stream{ch, conn}, nil
return st, nil
}

View file

@ -1,19 +1,17 @@
package yggconn
import (
"net"
"time"
"github.com/libp2p/go-yamux"
)
type stream struct {
*yamux.Stream
conn net.Conn
//conn net.Conn
}
/*
func (c *stream) LocalAddr() net.Addr {
return c.conn.LocalAddr()
return c.LocalAddr()
}
func (c *stream) RemoteAddr() net.Addr {
@ -31,3 +29,4 @@ func (c *stream) SetReadDeadline(t time.Time) error {
func (c *stream) SetWriteDeadline(t time.Time) error {
return c.conn.SetWriteDeadline(t)
}
*/

1
go.mod
View file

@ -16,6 +16,7 @@ require (
github.com/libp2p/go-libp2p-kad-dht v0.5.0
github.com/libp2p/go-libp2p-pubsub v0.2.5
github.com/libp2p/go-libp2p-record v0.1.2
github.com/libp2p/go-yamux v1.3.7
github.com/matrix-org/dugong v0.0.0-20171220115018-ea0a4690a0d5
github.com/matrix-org/go-http-js-libp2p v0.0.0-20200518170932-783164aeeda4
github.com/matrix-org/go-sqlite3-js v0.0.0-20200522092705-bc8506ccbcf3

2
go.sum
View file

@ -356,6 +356,8 @@ github.com/libp2p/go-yamux v1.2.3 h1:xX8A36vpXb59frIzWFdEgptLMsOANMFq2K7fPRlunYI
github.com/libp2p/go-yamux v1.2.3/go.mod h1:FGTiPvoV/3DVdgWpX+tM0OW3tsM+W5bSE3gZwqQTcow=
github.com/libp2p/go-yamux v1.3.0 h1:FsYzT16Wq2XqUGJsBbOxoz9g+dFklvNi7jN6YFPfl7U=
github.com/libp2p/go-yamux v1.3.0/go.mod h1:FGTiPvoV/3DVdgWpX+tM0OW3tsM+W5bSE3gZwqQTcow=
github.com/libp2p/go-yamux v1.3.7 h1:v40A1eSPJDIZwz2AvrV3cxpTZEGDP11QJbukmEhYyQI=
github.com/libp2p/go-yamux v1.3.7/go.mod h1:fr7aVgmdNGJK+N1g+b6DW6VxzbRCjCOejR/hkmpooHE=
github.com/lxn/walk v0.0.0-20191128110447-55ccb3a9f5c1/go.mod h1:E23UucZGqpuUANJooIbHWCufXvOcT6E7Stq81gU+CSQ=
github.com/lxn/win v0.0.0-20191128105842-2da648fda5b4/go.mod h1:ouWl4wViUNh8tPSIwxTVMuS014WakR1hqvBc2I0bMoA=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=