From eee22f53f9fa6ad93d4b554ff63724b36131f494 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Tue, 9 Jun 2020 12:14:51 +0100 Subject: [PATCH] Muxing? --- cmd/dendrite-demo-yggdrasil/main.go | 141 +++++------------- .../yggconn/channel.go | 33 ++++ .../yggconn/multiplex.go | 77 ++++++++++ cmd/dendrite-demo-yggdrasil/yggconn/node.go | 108 ++++++++++++++ go.mod | 1 + go.sum | 2 + 6 files changed, 257 insertions(+), 105 deletions(-) create mode 100644 cmd/dendrite-demo-yggdrasil/yggconn/channel.go create mode 100644 cmd/dendrite-demo-yggdrasil/yggconn/multiplex.go create mode 100644 cmd/dendrite-demo-yggdrasil/yggconn/node.go diff --git a/cmd/dendrite-demo-yggdrasil/main.go b/cmd/dendrite-demo-yggdrasil/main.go index 843a15c53..873c70dba 100644 --- a/cmd/dendrite-demo-yggdrasil/main.go +++ b/cmd/dendrite-demo-yggdrasil/main.go @@ -16,23 +16,18 @@ package main import ( "context" - "crypto/ed25519" "crypto/tls" - "encoding/hex" - "encoding/json" "flag" "fmt" - "io/ioutil" - "log" "net" "net/http" - "os" "strings" "time" "github.com/matrix-org/dendrite/appservice" "github.com/matrix-org/dendrite/clientapi" "github.com/matrix-org/dendrite/clientapi/producers" + "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/yggconn" "github.com/matrix-org/dendrite/eduserver" "github.com/matrix-org/dendrite/eduserver/cache" "github.com/matrix-org/dendrite/federationapi" @@ -50,12 +45,6 @@ import ( "github.com/matrix-org/dendrite/syncapi" "github.com/matrix-org/gomatrixserverlib" - yggdrasiladmin "github.com/yggdrasil-network/yggdrasil-go/src/admin" - yggdrasilconfig "github.com/yggdrasil-network/yggdrasil-go/src/config" - yggdrasilmulticast "github.com/yggdrasil-network/yggdrasil-go/src/multicast" - "github.com/yggdrasil-network/yggdrasil-go/src/yggdrasil" - - gologme "github.com/gologme/log" "github.com/sirupsen/logrus" ) @@ -64,17 +53,6 @@ var ( instancePort = flag.Int("port", 8080, "the port that the client API will listen on") ) -type node struct { - core yggdrasil.Core - config *yggdrasilconfig.NodeConfig - state *yggdrasilconfig.NodeState - admin *yggdrasiladmin.AdminSocket - multicast *yggdrasilmulticast.Multicast - log *gologme.Logger - listener *yggdrasil.Listener - dialer *yggdrasil.Dialer -} - type yggroundtripper struct { inner *http.Transport } @@ -84,74 +62,37 @@ func (y *yggroundtripper) RoundTrip(req *http.Request) (*http.Response, error) { return y.inner.RoundTrip(req) } -// nolint:gocyclo -func main() { - flag.Parse() - - // Build an Yggdrasil node. - n := node{ - config: yggdrasilconfig.GenerateConfig(), - log: gologme.New(os.Stdout, "YGG ", log.Flags()), - } - //n.config.AdminListen = fmt.Sprintf("unix:%s-yggdrasil.sock", *instanceName) - - yggfile := fmt.Sprintf("%s-yggdrasil.conf", *instanceName) - if _, err := os.Stat(yggfile); !os.IsNotExist(err) { - yggconf, e := ioutil.ReadFile(yggfile) - if e != nil { - panic(err) - } - if err := json.Unmarshal([]byte(yggconf), &n.config); err != nil { - panic(err) - } - } else { - j, err := json.Marshal(n.config) - if err != nil { - panic(err) - } - if e := ioutil.WriteFile(yggfile, j, 0600); e != nil { - fmt.Printf("Couldn't write private key to file '%s': %s\n", yggfile, e) - } - } - - var err error - n.log.EnableLevel("error") - n.log.EnableLevel("warn") - n.log.EnableLevel("info") - n.log.EnableLevel("debug") - n.state, err = n.core.Start(n.config, n.log) - if err != nil { - panic(err) - } - _ = n.admin.Init(&n.core, n.state, n.log, nil) - if err = n.admin.Start(); err != nil { - panic(err) - } - n.admin.SetupAdminHandlers(n.admin) - n.multicast = &yggdrasilmulticast.Multicast{} - if err = n.multicast.Init(&n.core, n.state, n.log, nil); err != nil { - panic(err) - } - n.multicast.SetupAdminHandlers(n.admin) - if err = n.multicast.Start(); err != nil { - panic(err) - } - n.listener, err = n.core.ConnListen() - if err != nil { - panic(err) - } - n.dialer, err = n.core.ConnDialer() - if err != nil { - panic(err) - } +func createFederationClient( + base *basecomponent.BaseDendrite, n *yggconn.Node, +) *gomatrixserverlib.FederationClient { yggdialer := func(_, address string) (net.Conn, error) { tokens := strings.Split(address, ":") - return n.dialer.Dial("curve25519", tokens[0]) + return n.Dial("curve25519", tokens[0]) } yggdialerctx := func(ctx context.Context, network, address string) (net.Conn, error) { return yggdialer(network, address) } + tr := &http.Transport{ + MaxConnsPerHost: 1, + } + tr.RegisterProtocol( + "matrix", &yggroundtripper{ + inner: &http.Transport{ + DialContext: yggdialerctx, + MaxConnsPerHost: 1, + }, + }, + ) + return gomatrixserverlib.NewFederationClientWithTransport( + base.Cfg.Matrix.ServerName, base.Cfg.Matrix.KeyID, base.Cfg.Matrix.PrivateKey, tr, + ) +} + +// nolint:gocyclo +func main() { + flag.Parse() + // Build both ends of a HTTP multiplex. httpServer := &http.Server{ Addr: ":0", @@ -159,22 +100,19 @@ func main() { ReadTimeout: 5 * time.Second, WriteTimeout: 10 * time.Second, IdleTimeout: 15 * time.Second, - } - httpClient := &http.Client{ - Transport: &yggroundtripper{ - inner: &http.Transport{ - DialContext: yggdialerctx, - MaxConnsPerHost: 1, - }, + BaseContext: func(_ net.Listener) context.Context { + return context.Background() }, } - privBytes, _ := hex.DecodeString(n.config.SigningPrivateKey) - privKey := ed25519.PrivateKey(privBytes) + ygg, err := yggconn.Setup(*instanceName) + if err != nil { + panic(err) + } cfg := &config.Dendrite{} - cfg.Matrix.ServerName = gomatrixserverlib.ServerName(n.core.EncryptionPublicKey()) - cfg.Matrix.PrivateKey = privKey + cfg.Matrix.ServerName = gomatrixserverlib.ServerName(ygg.EncryptionPublicKey()) + cfg.Matrix.PrivateKey = ygg.SigningPrivateKey() cfg.Matrix.KeyID = gomatrixserverlib.KeyID("ed25519:auto") cfg.Kafka.UseNaffka = true cfg.Kafka.Topics.OutputRoomEvent = "roomserverOutput" @@ -200,12 +138,7 @@ func main() { accountDB := base.CreateAccountsDB() deviceDB := base.CreateDeviceDB() - federation := gomatrixserverlib.NewFederationClientWithHTTPClient( - cfg.Matrix.ServerName, - cfg.Matrix.KeyID, - cfg.Matrix.PrivateKey, - httpClient, - ) + federation := createFederationClient(base, ygg) serverKeyAPI := serverkeyapi.NewInternalAPI( base.Cfg, federation, base.Caches, @@ -276,10 +209,9 @@ func main() { base.UseHTTPAPIs, ) - // Expose the matrix APIs directly rather than putting them under a /api path. go func() { - logrus.Info("Listening on ", n.core.EncryptionPublicKey()) - logrus.Fatal(httpServer.ListenAndServe()) + logrus.Info("Listening on ", ygg.EncryptionPublicKey()) + logrus.Fatal(httpServer.Serve(ygg)) }() go func() { httpBindAddr := fmt.Sprintf(":%d", *instancePort) @@ -287,6 +219,5 @@ func main() { logrus.Fatal(http.ListenAndServe(httpBindAddr, nil)) }() - // We want to block forever to let the HTTP and HTTPS handler serve the APIs select {} } diff --git a/cmd/dendrite-demo-yggdrasil/yggconn/channel.go b/cmd/dendrite-demo-yggdrasil/yggconn/channel.go new file mode 100644 index 000000000..6a16a2b12 --- /dev/null +++ b/cmd/dendrite-demo-yggdrasil/yggconn/channel.go @@ -0,0 +1,33 @@ +package yggconn + +import ( + "net" + "time" + + "github.com/alecthomas/multiplex" +) + +type channel struct { + *multiplex.Channel + conn net.Conn +} + +func (c *channel) LocalAddr() net.Addr { + return c.conn.LocalAddr() +} + +func (c *channel) RemoteAddr() net.Addr { + return c.conn.RemoteAddr() +} + +func (c *channel) SetDeadline(t time.Time) error { + return c.conn.SetDeadline(t) +} + +func (c *channel) SetReadDeadline(t time.Time) error { + return c.conn.SetReadDeadline(t) +} + +func (c *channel) SetWriteDeadline(t time.Time) error { + return c.conn.SetWriteDeadline(t) +} diff --git a/cmd/dendrite-demo-yggdrasil/yggconn/multiplex.go b/cmd/dendrite-demo-yggdrasil/yggconn/multiplex.go new file mode 100644 index 000000000..c50934d1f --- /dev/null +++ b/cmd/dendrite-demo-yggdrasil/yggconn/multiplex.go @@ -0,0 +1,77 @@ +package yggconn + +import ( + "context" + "errors" + "net" + + "github.com/alecthomas/multiplex" +) + +func (n *Node) listenFromYgg() { + for { + conn, err := n.listener.Accept() + if err != nil { + return + } + stream := multiplex.MultiplexedServer(conn) + n.conns.Store(conn.RemoteAddr(), conn) + n.streams.Store(conn.RemoteAddr(), stream) + go n.listenFromYggConn(stream, conn) + } +} + +func (n *Node) listenFromYggConn(stream *multiplex.MultiplexedStream, conn net.Conn) { + for { + ch, err := stream.Accept() + if err != nil { + return + } + n.incoming <- &channel{ch, conn} + } +} + +func (n *Node) Accept() (net.Conn, error) { + return <-n.incoming, nil +} + +func (n *Node) Close() error { + return n.listener.Close() +} + +func (n *Node) Addr() net.Addr { + return n.listener.Addr() +} + +func (n *Node) Dial(network, address string) (net.Conn, error) { + return n.DialContext(context.TODO(), network, address) +} + +func (n *Node) DialContext(ctx context.Context, network, address string) (net.Conn, error) { + conn, err := n.dialer.DialContext(ctx, network, address) + if err != nil { + c, ok := n.conns.Load(address) + if !ok { + return nil, errors.New("conn not found") + } + conn, ok = c.(net.Conn) + if !ok { + return nil, errors.New("conn type assertion error") + } + } else { + n.streams.Store(address, multiplex.MultiplexedClient(conn)) + } + s, ok := n.streams.Load(address) + if !ok { + return nil, errors.New("stream not found") + } + stream, ok := s.(*multiplex.MultiplexedStream) + if !ok { + return nil, errors.New("stream type assertion error") + } + ch, err := stream.Dial() + if err != nil { + return nil, err + } + return &channel{ch, conn}, nil +} diff --git a/cmd/dendrite-demo-yggdrasil/yggconn/node.go b/cmd/dendrite-demo-yggdrasil/yggconn/node.go new file mode 100644 index 000000000..36dac38b0 --- /dev/null +++ b/cmd/dendrite-demo-yggdrasil/yggconn/node.go @@ -0,0 +1,108 @@ +package yggconn + +import ( + "crypto/ed25519" + "encoding/hex" + "encoding/json" + "fmt" + "io/ioutil" + "log" + "os" + "sync" + + yggdrasiladmin "github.com/yggdrasil-network/yggdrasil-go/src/admin" + yggdrasilconfig "github.com/yggdrasil-network/yggdrasil-go/src/config" + yggdrasilmulticast "github.com/yggdrasil-network/yggdrasil-go/src/multicast" + "github.com/yggdrasil-network/yggdrasil-go/src/yggdrasil" + + gologme "github.com/gologme/log" +) + +type Node struct { + core yggdrasil.Core + config *yggdrasilconfig.NodeConfig + state *yggdrasilconfig.NodeState + admin *yggdrasiladmin.AdminSocket + multicast *yggdrasilmulticast.Multicast + log *gologme.Logger + listener *yggdrasil.Listener + dialer *yggdrasil.Dialer + conns sync.Map // string -> yggdrasil.Conn + streams sync.Map // string -> multiplex.MultiplexedStream + incoming chan *channel +} + +func Setup(instanceName string) (*Node, error) { + n := Node{ + config: yggdrasilconfig.GenerateConfig(), + admin: &yggdrasiladmin.AdminSocket{}, + multicast: &yggdrasilmulticast.Multicast{}, + log: gologme.New(os.Stdout, "YGG ", log.Flags()), + incoming: make(chan *channel), + } + n.config.AdminListen = fmt.Sprintf("unix://./%s-yggdrasil.sock", instanceName) + + yggfile := fmt.Sprintf("%s-yggdrasil.conf", instanceName) + if _, err := os.Stat(yggfile); !os.IsNotExist(err) { + yggconf, e := ioutil.ReadFile(yggfile) + if e != nil { + panic(err) + } + if err := json.Unmarshal([]byte(yggconf), &n.config); err != nil { + panic(err) + } + } else { + j, err := json.Marshal(n.config) + if err != nil { + panic(err) + } + if e := ioutil.WriteFile(yggfile, j, 0600); e != nil { + fmt.Printf("Couldn't write private key to file '%s': %s\n", yggfile, e) + } + } + + var err error + n.log.EnableLevel("error") + n.log.EnableLevel("warn") + n.log.EnableLevel("info") + n.state, err = n.core.Start(n.config, n.log) + if err != nil { + panic(err) + } + n.core.UpdateConfig(n.config) + if err = n.admin.Init(&n.core, n.state, n.log, nil); err != nil { + panic(err) + } + if err = n.admin.Start(); err != nil { + panic(err) + } + if err = n.multicast.Init(&n.core, n.state, n.log, nil); err != nil { + panic(err) + } + if err = n.multicast.Start(); err != nil { + panic(err) + } + n.admin.SetupAdminHandlers(n.admin) + n.multicast.SetupAdminHandlers(n.admin) + n.listener, err = n.core.ConnListen() + if err != nil { + panic(err) + } + n.dialer, err = n.core.ConnDialer() + if err != nil { + panic(err) + } + + go n.listenFromYgg() + + return &n, nil +} + +func (n *Node) EncryptionPublicKey() string { + return n.core.EncryptionPublicKey() +} + +func (n *Node) SigningPrivateKey() ed25519.PrivateKey { + privBytes, _ := hex.DecodeString(n.config.SigningPrivateKey) + return ed25519.PrivateKey(privBytes) +} diff --git a/go.mod b/go.mod index d962067d3..42fc25f05 100644 --- a/go.mod +++ b/go.mod @@ -2,6 +2,7 @@ module github.com/matrix-org/dendrite require ( github.com/Shopify/sarama v1.26.1 + github.com/alecthomas/multiplex v0.0.0-20140815001232-e565e6fcbcd0 github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd // indirect github.com/gologme/log v1.2.0 github.com/gorilla/mux v1.7.3 diff --git a/go.sum b/go.sum index 31bb1bc8a..bd515949c 100644 --- a/go.sum +++ b/go.sum @@ -11,6 +11,8 @@ github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWso github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA= github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII= +github.com/alecthomas/multiplex v0.0.0-20140815001232-e565e6fcbcd0 h1:QY9/zSgVTSpDvS+Pm3/E3vS4UyB0Tv15244y1oPy6AY= +github.com/alecthomas/multiplex v0.0.0-20140815001232-e565e6fcbcd0/go.mod h1:4TKC8jw6Fbi8xrRAul7JpNEbXyJWvNdNOLV79IobTKM= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafoB+tBA3gMyHYHrpOtNuDiK/uB5uXxq5wM= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=