This commit is contained in:
Neil Alexander 2020-06-09 12:14:51 +01:00
parent 73274472a5
commit eee22f53f9
6 changed files with 257 additions and 105 deletions

View file

@ -16,23 +16,18 @@ package main
import (
"context"
"crypto/ed25519"
"crypto/tls"
"encoding/hex"
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"log"
"net"
"net/http"
"os"
"strings"
"time"
"github.com/matrix-org/dendrite/appservice"
"github.com/matrix-org/dendrite/clientapi"
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/yggconn"
"github.com/matrix-org/dendrite/eduserver"
"github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/dendrite/federationapi"
@ -50,12 +45,6 @@ import (
"github.com/matrix-org/dendrite/syncapi"
"github.com/matrix-org/gomatrixserverlib"
yggdrasiladmin "github.com/yggdrasil-network/yggdrasil-go/src/admin"
yggdrasilconfig "github.com/yggdrasil-network/yggdrasil-go/src/config"
yggdrasilmulticast "github.com/yggdrasil-network/yggdrasil-go/src/multicast"
"github.com/yggdrasil-network/yggdrasil-go/src/yggdrasil"
gologme "github.com/gologme/log"
"github.com/sirupsen/logrus"
)
@ -64,17 +53,6 @@ var (
instancePort = flag.Int("port", 8080, "the port that the client API will listen on")
)
type node struct {
core yggdrasil.Core
config *yggdrasilconfig.NodeConfig
state *yggdrasilconfig.NodeState
admin *yggdrasiladmin.AdminSocket
multicast *yggdrasilmulticast.Multicast
log *gologme.Logger
listener *yggdrasil.Listener
dialer *yggdrasil.Dialer
}
type yggroundtripper struct {
inner *http.Transport
}
@ -84,74 +62,37 @@ func (y *yggroundtripper) RoundTrip(req *http.Request) (*http.Response, error) {
return y.inner.RoundTrip(req)
}
// nolint:gocyclo
func main() {
flag.Parse()
// Build an Yggdrasil node.
n := node{
config: yggdrasilconfig.GenerateConfig(),
log: gologme.New(os.Stdout, "YGG ", log.Flags()),
}
//n.config.AdminListen = fmt.Sprintf("unix:%s-yggdrasil.sock", *instanceName)
yggfile := fmt.Sprintf("%s-yggdrasil.conf", *instanceName)
if _, err := os.Stat(yggfile); !os.IsNotExist(err) {
yggconf, e := ioutil.ReadFile(yggfile)
if e != nil {
panic(err)
}
if err := json.Unmarshal([]byte(yggconf), &n.config); err != nil {
panic(err)
}
} else {
j, err := json.Marshal(n.config)
if err != nil {
panic(err)
}
if e := ioutil.WriteFile(yggfile, j, 0600); e != nil {
fmt.Printf("Couldn't write private key to file '%s': %s\n", yggfile, e)
}
}
var err error
n.log.EnableLevel("error")
n.log.EnableLevel("warn")
n.log.EnableLevel("info")
n.log.EnableLevel("debug")
n.state, err = n.core.Start(n.config, n.log)
if err != nil {
panic(err)
}
_ = n.admin.Init(&n.core, n.state, n.log, nil)
if err = n.admin.Start(); err != nil {
panic(err)
}
n.admin.SetupAdminHandlers(n.admin)
n.multicast = &yggdrasilmulticast.Multicast{}
if err = n.multicast.Init(&n.core, n.state, n.log, nil); err != nil {
panic(err)
}
n.multicast.SetupAdminHandlers(n.admin)
if err = n.multicast.Start(); err != nil {
panic(err)
}
n.listener, err = n.core.ConnListen()
if err != nil {
panic(err)
}
n.dialer, err = n.core.ConnDialer()
if err != nil {
panic(err)
}
func createFederationClient(
base *basecomponent.BaseDendrite, n *yggconn.Node,
) *gomatrixserverlib.FederationClient {
yggdialer := func(_, address string) (net.Conn, error) {
tokens := strings.Split(address, ":")
return n.dialer.Dial("curve25519", tokens[0])
return n.Dial("curve25519", tokens[0])
}
yggdialerctx := func(ctx context.Context, network, address string) (net.Conn, error) {
return yggdialer(network, address)
}
tr := &http.Transport{
MaxConnsPerHost: 1,
}
tr.RegisterProtocol(
"matrix", &yggroundtripper{
inner: &http.Transport{
DialContext: yggdialerctx,
MaxConnsPerHost: 1,
},
},
)
return gomatrixserverlib.NewFederationClientWithTransport(
base.Cfg.Matrix.ServerName, base.Cfg.Matrix.KeyID, base.Cfg.Matrix.PrivateKey, tr,
)
}
// nolint:gocyclo
func main() {
flag.Parse()
// Build both ends of a HTTP multiplex.
httpServer := &http.Server{
Addr: ":0",
@ -159,22 +100,19 @@ func main() {
ReadTimeout: 5 * time.Second,
WriteTimeout: 10 * time.Second,
IdleTimeout: 15 * time.Second,
}
httpClient := &http.Client{
Transport: &yggroundtripper{
inner: &http.Transport{
DialContext: yggdialerctx,
MaxConnsPerHost: 1,
},
BaseContext: func(_ net.Listener) context.Context {
return context.Background()
},
}
privBytes, _ := hex.DecodeString(n.config.SigningPrivateKey)
privKey := ed25519.PrivateKey(privBytes)
ygg, err := yggconn.Setup(*instanceName)
if err != nil {
panic(err)
}
cfg := &config.Dendrite{}
cfg.Matrix.ServerName = gomatrixserverlib.ServerName(n.core.EncryptionPublicKey())
cfg.Matrix.PrivateKey = privKey
cfg.Matrix.ServerName = gomatrixserverlib.ServerName(ygg.EncryptionPublicKey())
cfg.Matrix.PrivateKey = ygg.SigningPrivateKey()
cfg.Matrix.KeyID = gomatrixserverlib.KeyID("ed25519:auto")
cfg.Kafka.UseNaffka = true
cfg.Kafka.Topics.OutputRoomEvent = "roomserverOutput"
@ -200,12 +138,7 @@ func main() {
accountDB := base.CreateAccountsDB()
deviceDB := base.CreateDeviceDB()
federation := gomatrixserverlib.NewFederationClientWithHTTPClient(
cfg.Matrix.ServerName,
cfg.Matrix.KeyID,
cfg.Matrix.PrivateKey,
httpClient,
)
federation := createFederationClient(base, ygg)
serverKeyAPI := serverkeyapi.NewInternalAPI(
base.Cfg, federation, base.Caches,
@ -276,10 +209,9 @@ func main() {
base.UseHTTPAPIs,
)
// Expose the matrix APIs directly rather than putting them under a /api path.
go func() {
logrus.Info("Listening on ", n.core.EncryptionPublicKey())
logrus.Fatal(httpServer.ListenAndServe())
logrus.Info("Listening on ", ygg.EncryptionPublicKey())
logrus.Fatal(httpServer.Serve(ygg))
}()
go func() {
httpBindAddr := fmt.Sprintf(":%d", *instancePort)
@ -287,6 +219,5 @@ func main() {
logrus.Fatal(http.ListenAndServe(httpBindAddr, nil))
}()
// We want to block forever to let the HTTP and HTTPS handler serve the APIs
select {}
}

View file

@ -0,0 +1,33 @@
package yggconn
import (
"net"
"time"
"github.com/alecthomas/multiplex"
)
type channel struct {
*multiplex.Channel
conn net.Conn
}
func (c *channel) LocalAddr() net.Addr {
return c.conn.LocalAddr()
}
func (c *channel) RemoteAddr() net.Addr {
return c.conn.RemoteAddr()
}
func (c *channel) SetDeadline(t time.Time) error {
return c.conn.SetDeadline(t)
}
func (c *channel) SetReadDeadline(t time.Time) error {
return c.conn.SetReadDeadline(t)
}
func (c *channel) SetWriteDeadline(t time.Time) error {
return c.conn.SetWriteDeadline(t)
}

View file

@ -0,0 +1,77 @@
package yggconn
import (
"context"
"errors"
"net"
"github.com/alecthomas/multiplex"
)
func (n *Node) listenFromYgg() {
for {
conn, err := n.listener.Accept()
if err != nil {
return
}
stream := multiplex.MultiplexedServer(conn)
n.conns.Store(conn.RemoteAddr(), conn)
n.streams.Store(conn.RemoteAddr(), stream)
go n.listenFromYggConn(stream, conn)
}
}
func (n *Node) listenFromYggConn(stream *multiplex.MultiplexedStream, conn net.Conn) {
for {
ch, err := stream.Accept()
if err != nil {
return
}
n.incoming <- &channel{ch, conn}
}
}
func (n *Node) Accept() (net.Conn, error) {
return <-n.incoming, nil
}
func (n *Node) Close() error {
return n.listener.Close()
}
func (n *Node) Addr() net.Addr {
return n.listener.Addr()
}
func (n *Node) Dial(network, address string) (net.Conn, error) {
return n.DialContext(context.TODO(), network, address)
}
func (n *Node) DialContext(ctx context.Context, network, address string) (net.Conn, error) {
conn, err := n.dialer.DialContext(ctx, network, address)
if err != nil {
c, ok := n.conns.Load(address)
if !ok {
return nil, errors.New("conn not found")
}
conn, ok = c.(net.Conn)
if !ok {
return nil, errors.New("conn type assertion error")
}
} else {
n.streams.Store(address, multiplex.MultiplexedClient(conn))
}
s, ok := n.streams.Load(address)
if !ok {
return nil, errors.New("stream not found")
}
stream, ok := s.(*multiplex.MultiplexedStream)
if !ok {
return nil, errors.New("stream type assertion error")
}
ch, err := stream.Dial()
if err != nil {
return nil, err
}
return &channel{ch, conn}, nil
}

View file

@ -0,0 +1,108 @@
package yggconn
import (
"crypto/ed25519"
"encoding/hex"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"os"
"sync"
yggdrasiladmin "github.com/yggdrasil-network/yggdrasil-go/src/admin"
yggdrasilconfig "github.com/yggdrasil-network/yggdrasil-go/src/config"
yggdrasilmulticast "github.com/yggdrasil-network/yggdrasil-go/src/multicast"
"github.com/yggdrasil-network/yggdrasil-go/src/yggdrasil"
gologme "github.com/gologme/log"
)
type Node struct {
core yggdrasil.Core
config *yggdrasilconfig.NodeConfig
state *yggdrasilconfig.NodeState
admin *yggdrasiladmin.AdminSocket
multicast *yggdrasilmulticast.Multicast
log *gologme.Logger
listener *yggdrasil.Listener
dialer *yggdrasil.Dialer
conns sync.Map // string -> yggdrasil.Conn
streams sync.Map // string -> multiplex.MultiplexedStream
incoming chan *channel
}
func Setup(instanceName string) (*Node, error) {
n := Node{
config: yggdrasilconfig.GenerateConfig(),
admin: &yggdrasiladmin.AdminSocket{},
multicast: &yggdrasilmulticast.Multicast{},
log: gologme.New(os.Stdout, "YGG ", log.Flags()),
incoming: make(chan *channel),
}
n.config.AdminListen = fmt.Sprintf("unix://./%s-yggdrasil.sock", instanceName)
yggfile := fmt.Sprintf("%s-yggdrasil.conf", instanceName)
if _, err := os.Stat(yggfile); !os.IsNotExist(err) {
yggconf, e := ioutil.ReadFile(yggfile)
if e != nil {
panic(err)
}
if err := json.Unmarshal([]byte(yggconf), &n.config); err != nil {
panic(err)
}
} else {
j, err := json.Marshal(n.config)
if err != nil {
panic(err)
}
if e := ioutil.WriteFile(yggfile, j, 0600); e != nil {
fmt.Printf("Couldn't write private key to file '%s': %s\n", yggfile, e)
}
}
var err error
n.log.EnableLevel("error")
n.log.EnableLevel("warn")
n.log.EnableLevel("info")
n.state, err = n.core.Start(n.config, n.log)
if err != nil {
panic(err)
}
n.core.UpdateConfig(n.config)
if err = n.admin.Init(&n.core, n.state, n.log, nil); err != nil {
panic(err)
}
if err = n.admin.Start(); err != nil {
panic(err)
}
if err = n.multicast.Init(&n.core, n.state, n.log, nil); err != nil {
panic(err)
}
if err = n.multicast.Start(); err != nil {
panic(err)
}
n.admin.SetupAdminHandlers(n.admin)
n.multicast.SetupAdminHandlers(n.admin)
n.listener, err = n.core.ConnListen()
if err != nil {
panic(err)
}
n.dialer, err = n.core.ConnDialer()
if err != nil {
panic(err)
}
go n.listenFromYgg()
return &n, nil
}
func (n *Node) EncryptionPublicKey() string {
return n.core.EncryptionPublicKey()
}
func (n *Node) SigningPrivateKey() ed25519.PrivateKey {
privBytes, _ := hex.DecodeString(n.config.SigningPrivateKey)
return ed25519.PrivateKey(privBytes)
}

1
go.mod
View file

@ -2,6 +2,7 @@ module github.com/matrix-org/dendrite
require (
github.com/Shopify/sarama v1.26.1
github.com/alecthomas/multiplex v0.0.0-20140815001232-e565e6fcbcd0
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd // indirect
github.com/gologme/log v1.2.0
github.com/gorilla/mux v1.7.3

2
go.sum
View file

@ -11,6 +11,8 @@ github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWso
github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA=
github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII=
github.com/alecthomas/multiplex v0.0.0-20140815001232-e565e6fcbcd0 h1:QY9/zSgVTSpDvS+Pm3/E3vS4UyB0Tv15244y1oPy6AY=
github.com/alecthomas/multiplex v0.0.0-20140815001232-e565e6fcbcd0/go.mod h1:4TKC8jw6Fbi8xrRAul7JpNEbXyJWvNdNOLV79IobTKM=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafoB+tBA3gMyHYHrpOtNuDiK/uB5uXxq5wM=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=