This commit is contained in:
Neil Alexander 2020-06-09 12:22:58 +01:00
parent eee22f53f9
commit 23d2341811
4 changed files with 62 additions and 55 deletions

View file

@ -1,33 +0,0 @@
package yggconn
import (
"net"
"time"
"github.com/alecthomas/multiplex"
)
type channel struct {
*multiplex.Channel
conn net.Conn
}
func (c *channel) LocalAddr() net.Addr {
return c.conn.LocalAddr()
}
func (c *channel) RemoteAddr() net.Addr {
return c.conn.RemoteAddr()
}
func (c *channel) SetDeadline(t time.Time) error {
return c.conn.SetDeadline(t)
}
func (c *channel) SetReadDeadline(t time.Time) error {
return c.conn.SetReadDeadline(t)
}
func (c *channel) SetWriteDeadline(t time.Time) error {
return c.conn.SetWriteDeadline(t)
}

View file

@ -28,8 +28,8 @@ type Node struct {
listener *yggdrasil.Listener
dialer *yggdrasil.Dialer
conns sync.Map // string -> yggdrasil.Conn
streams sync.Map // string -> multiplex.MultiplexedStream
incoming chan *channel
sessions sync.Map // string -> yamux.Session
incoming chan *stream
}
func Setup(instanceName string) (*Node, error) {
@ -38,7 +38,7 @@ func Setup(instanceName string) (*Node, error) {
admin: &yggdrasiladmin.AdminSocket{},
multicast: &yggdrasilmulticast.Multicast{},
log: gologme.New(os.Stdout, "YGG ", log.Flags()),
incoming: make(chan *channel),
incoming: make(chan *stream),
}
n.config.AdminListen = fmt.Sprintf("unix://./%s-yggdrasil.sock", instanceName)

View file

@ -5,7 +5,7 @@ import (
"errors"
"net"
"github.com/alecthomas/multiplex"
"github.com/libp2p/go-yamux"
)
func (n *Node) listenFromYgg() {
@ -14,20 +14,23 @@ func (n *Node) listenFromYgg() {
if err != nil {
return
}
stream := multiplex.MultiplexedServer(conn)
n.conns.Store(conn.RemoteAddr(), conn)
n.streams.Store(conn.RemoteAddr(), stream)
go n.listenFromYggConn(stream, conn)
}
}
func (n *Node) listenFromYggConn(stream *multiplex.MultiplexedStream, conn net.Conn) {
for {
ch, err := stream.Accept()
session, err := yamux.Server(conn, nil)
if err != nil {
return
}
n.incoming <- &channel{ch, conn}
n.conns.Store(conn.RemoteAddr(), conn)
n.sessions.Store(conn.RemoteAddr(), session)
go n.listenFromYggConn(session, conn)
}
}
func (n *Node) listenFromYggConn(session *yamux.Session, conn net.Conn) {
for {
st, err := session.AcceptStream()
if err != nil {
return
}
n.incoming <- &stream{st, conn}
}
}
@ -59,19 +62,23 @@ func (n *Node) DialContext(ctx context.Context, network, address string) (net.Co
return nil, errors.New("conn type assertion error")
}
} else {
n.streams.Store(address, multiplex.MultiplexedClient(conn))
client, cerr := yamux.Client(conn, nil)
if cerr != nil {
return nil, cerr
}
n.sessions.Store(address, client)
}
s, ok := n.streams.Load(address)
s, ok := n.sessions.Load(address)
if !ok {
return nil, errors.New("stream not found")
return nil, errors.New("session not found")
}
stream, ok := s.(*multiplex.MultiplexedStream)
session, ok := s.(*yamux.Session)
if !ok {
return nil, errors.New("stream type assertion error")
return nil, errors.New("session type assertion error")
}
ch, err := stream.Dial()
ch, err := session.OpenStream()
if err != nil {
return nil, err
}
return &channel{ch, conn}, nil
return &stream{ch, conn}, nil
}

View file

@ -0,0 +1,33 @@
package yggconn
import (
"net"
"time"
"github.com/libp2p/go-yamux"
)
type stream struct {
*yamux.Stream
conn net.Conn
}
func (c *stream) LocalAddr() net.Addr {
return c.conn.LocalAddr()
}
func (c *stream) RemoteAddr() net.Addr {
return c.conn.RemoteAddr()
}
func (c *stream) SetDeadline(t time.Time) error {
return c.conn.SetDeadline(t)
}
func (c *stream) SetReadDeadline(t time.Time) error {
return c.conn.SetReadDeadline(t)
}
func (c *stream) SetWriteDeadline(t time.Time) error {
return c.conn.SetWriteDeadline(t)
}