From 23d2341811e76298bb31b7c17eb18ffc62c75f86 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Tue, 9 Jun 2020 12:22:58 +0100 Subject: [PATCH] Yamux --- .../yggconn/channel.go | 33 -------------- cmd/dendrite-demo-yggdrasil/yggconn/node.go | 6 +-- .../yggconn/{multiplex.go => session.go} | 45 +++++++++++-------- cmd/dendrite-demo-yggdrasil/yggconn/stream.go | 33 ++++++++++++++ 4 files changed, 62 insertions(+), 55 deletions(-) delete mode 100644 cmd/dendrite-demo-yggdrasil/yggconn/channel.go rename cmd/dendrite-demo-yggdrasil/yggconn/{multiplex.go => session.go} (59%) create mode 100644 cmd/dendrite-demo-yggdrasil/yggconn/stream.go diff --git a/cmd/dendrite-demo-yggdrasil/yggconn/channel.go b/cmd/dendrite-demo-yggdrasil/yggconn/channel.go deleted file mode 100644 index 6a16a2b12..000000000 --- a/cmd/dendrite-demo-yggdrasil/yggconn/channel.go +++ /dev/null @@ -1,33 +0,0 @@ -package yggconn - -import ( - "net" - "time" - - "github.com/alecthomas/multiplex" -) - -type channel struct { - *multiplex.Channel - conn net.Conn -} - -func (c *channel) LocalAddr() net.Addr { - return c.conn.LocalAddr() -} - -func (c *channel) RemoteAddr() net.Addr { - return c.conn.RemoteAddr() -} - -func (c *channel) SetDeadline(t time.Time) error { - return c.conn.SetDeadline(t) -} - -func (c *channel) SetReadDeadline(t time.Time) error { - return c.conn.SetReadDeadline(t) -} - -func (c *channel) SetWriteDeadline(t time.Time) error { - return c.conn.SetWriteDeadline(t) -} diff --git a/cmd/dendrite-demo-yggdrasil/yggconn/node.go b/cmd/dendrite-demo-yggdrasil/yggconn/node.go index 36dac38b0..44581abc8 100644 --- a/cmd/dendrite-demo-yggdrasil/yggconn/node.go +++ b/cmd/dendrite-demo-yggdrasil/yggconn/node.go @@ -28,8 +28,8 @@ type Node struct { listener *yggdrasil.Listener dialer *yggdrasil.Dialer conns sync.Map // string -> yggdrasil.Conn - streams sync.Map // string -> multiplex.MultiplexedStream - incoming chan *channel + sessions sync.Map // string -> yamux.Session + incoming chan *stream } func Setup(instanceName string) (*Node, error) { @@ -38,7 +38,7 @@ func Setup(instanceName string) (*Node, error) { admin: &yggdrasiladmin.AdminSocket{}, multicast: &yggdrasilmulticast.Multicast{}, log: gologme.New(os.Stdout, "YGG ", log.Flags()), - incoming: make(chan *channel), + incoming: make(chan *stream), } n.config.AdminListen = fmt.Sprintf("unix://./%s-yggdrasil.sock", instanceName) diff --git a/cmd/dendrite-demo-yggdrasil/yggconn/multiplex.go b/cmd/dendrite-demo-yggdrasil/yggconn/session.go similarity index 59% rename from cmd/dendrite-demo-yggdrasil/yggconn/multiplex.go rename to cmd/dendrite-demo-yggdrasil/yggconn/session.go index c50934d1f..9af352602 100644 --- a/cmd/dendrite-demo-yggdrasil/yggconn/multiplex.go +++ b/cmd/dendrite-demo-yggdrasil/yggconn/session.go @@ -5,7 +5,7 @@ import ( "errors" "net" - "github.com/alecthomas/multiplex" + "github.com/libp2p/go-yamux" ) func (n *Node) listenFromYgg() { @@ -14,20 +14,23 @@ func (n *Node) listenFromYgg() { if err != nil { return } - stream := multiplex.MultiplexedServer(conn) - n.conns.Store(conn.RemoteAddr(), conn) - n.streams.Store(conn.RemoteAddr(), stream) - go n.listenFromYggConn(stream, conn) - } -} - -func (n *Node) listenFromYggConn(stream *multiplex.MultiplexedStream, conn net.Conn) { - for { - ch, err := stream.Accept() + session, err := yamux.Server(conn, nil) if err != nil { return } - n.incoming <- &channel{ch, conn} + n.conns.Store(conn.RemoteAddr(), conn) + n.sessions.Store(conn.RemoteAddr(), session) + go n.listenFromYggConn(session, conn) + } +} + +func (n *Node) listenFromYggConn(session *yamux.Session, conn net.Conn) { + for { + st, err := session.AcceptStream() + if err != nil { + return + } + n.incoming <- &stream{st, conn} } } @@ -59,19 +62,23 @@ func (n *Node) DialContext(ctx context.Context, network, address string) (net.Co return nil, errors.New("conn type assertion error") } } else { - n.streams.Store(address, multiplex.MultiplexedClient(conn)) + client, cerr := yamux.Client(conn, nil) + if cerr != nil { + return nil, cerr + } + n.sessions.Store(address, client) } - s, ok := n.streams.Load(address) + s, ok := n.sessions.Load(address) if !ok { - return nil, errors.New("stream not found") + return nil, errors.New("session not found") } - stream, ok := s.(*multiplex.MultiplexedStream) + session, ok := s.(*yamux.Session) if !ok { - return nil, errors.New("stream type assertion error") + return nil, errors.New("session type assertion error") } - ch, err := stream.Dial() + ch, err := session.OpenStream() if err != nil { return nil, err } - return &channel{ch, conn}, nil + return &stream{ch, conn}, nil } diff --git a/cmd/dendrite-demo-yggdrasil/yggconn/stream.go b/cmd/dendrite-demo-yggdrasil/yggconn/stream.go new file mode 100644 index 000000000..c61407385 --- /dev/null +++ b/cmd/dendrite-demo-yggdrasil/yggconn/stream.go @@ -0,0 +1,33 @@ +package yggconn + +import ( + "net" + "time" + + "github.com/libp2p/go-yamux" +) + +type stream struct { + *yamux.Stream + conn net.Conn +} + +func (c *stream) LocalAddr() net.Addr { + return c.conn.LocalAddr() +} + +func (c *stream) RemoteAddr() net.Addr { + return c.conn.RemoteAddr() +} + +func (c *stream) SetDeadline(t time.Time) error { + return c.conn.SetDeadline(t) +} + +func (c *stream) SetReadDeadline(t time.Time) error { + return c.conn.SetReadDeadline(t) +} + +func (c *stream) SetWriteDeadline(t time.Time) error { + return c.conn.SetWriteDeadline(t) +}