mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-21 05:43:09 -06:00
Initial QUIC work
This commit is contained in:
parent
55bc82c439
commit
318f681cd1
|
|
@ -17,6 +17,7 @@ package yggconn
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"crypto/ed25519"
|
"crypto/ed25519"
|
||||||
|
"crypto/tls"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
@ -27,9 +28,9 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
"github.com/lucas-clemente/quic-go"
|
||||||
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/convert"
|
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/convert"
|
||||||
|
|
||||||
"github.com/libp2p/go-yamux"
|
|
||||||
yggdrasiladmin "github.com/yggdrasil-network/yggdrasil-go/src/admin"
|
yggdrasiladmin "github.com/yggdrasil-network/yggdrasil-go/src/admin"
|
||||||
yggdrasilconfig "github.com/yggdrasil-network/yggdrasil-go/src/config"
|
yggdrasilconfig "github.com/yggdrasil-network/yggdrasil-go/src/config"
|
||||||
yggdrasilmulticast "github.com/yggdrasil-network/yggdrasil-go/src/multicast"
|
yggdrasilmulticast "github.com/yggdrasil-network/yggdrasil-go/src/multicast"
|
||||||
|
|
@ -39,16 +40,18 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type Node struct {
|
type Node struct {
|
||||||
core *yggdrasil.Core
|
core *yggdrasil.Core
|
||||||
config *yggdrasilconfig.NodeConfig
|
config *yggdrasilconfig.NodeConfig
|
||||||
state *yggdrasilconfig.NodeState
|
state *yggdrasilconfig.NodeState
|
||||||
admin *yggdrasiladmin.AdminSocket
|
admin *yggdrasiladmin.AdminSocket
|
||||||
multicast *yggdrasilmulticast.Multicast
|
multicast *yggdrasilmulticast.Multicast
|
||||||
log *gologme.Logger
|
log *gologme.Logger
|
||||||
listener *yggdrasil.Listener
|
packetConn *yggdrasil.PacketConn
|
||||||
dialer *yggdrasil.Dialer
|
listener quic.Listener
|
||||||
sessions sync.Map // string -> yamux.Session
|
tlsConfig *tls.Config
|
||||||
incoming chan *yamux.Stream
|
quicConfig *quic.Config
|
||||||
|
sessions sync.Map // string -> quic.Session
|
||||||
|
incoming chan QUICStream
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Node) Dialer(_, address string) (net.Conn, error) {
|
func (n *Node) Dialer(_, address string) (net.Conn, error) {
|
||||||
|
|
@ -74,7 +77,7 @@ func Setup(instanceName, instancePeer, storageDirectory string) (*Node, error) {
|
||||||
admin: &yggdrasiladmin.AdminSocket{},
|
admin: &yggdrasiladmin.AdminSocket{},
|
||||||
multicast: &yggdrasilmulticast.Multicast{},
|
multicast: &yggdrasilmulticast.Multicast{},
|
||||||
log: gologme.New(os.Stdout, "YGG ", log.Flags()),
|
log: gologme.New(os.Stdout, "YGG ", log.Flags()),
|
||||||
incoming: make(chan *yamux.Stream),
|
incoming: make(chan QUICStream),
|
||||||
}
|
}
|
||||||
|
|
||||||
yggfile := fmt.Sprintf("%s/%s-yggdrasil.conf", storageDirectory, instanceName)
|
yggfile := fmt.Sprintf("%s/%s-yggdrasil.conf", storageDirectory, instanceName)
|
||||||
|
|
@ -114,30 +117,15 @@ func Setup(instanceName, instancePeer, storageDirectory string) (*Node, error) {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
/*
|
|
||||||
if err = n.admin.Init(n.core, n.state, n.log, nil); err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
if err = n.admin.Start(); err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
if err = n.multicast.Init(n.core, n.state, n.log, nil); err != nil {
|
if err = n.multicast.Init(n.core, n.state, n.log, nil); err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
if err = n.multicast.Start(); err != nil {
|
if err = n.multicast.Start(); err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
//n.admin.SetupAdminHandlers(n.admin)
|
|
||||||
//n.multicast.SetupAdminHandlers(n.admin)
|
n.packetConn = n.core.PacketConn()
|
||||||
n.listener, err = n.core.ConnListen()
|
n.tlsConfig = n.generateTLSConfig()
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
n.dialer, err = n.core.ConnDialer()
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
n.log.Println("Public curve25519:", n.core.EncryptionPublicKey())
|
n.log.Println("Public curve25519:", n.core.EncryptionPublicKey())
|
||||||
n.log.Println("Public ed25519:", n.core.SigningPublicKey())
|
n.log.Println("Public ed25519:", n.core.SigningPublicKey())
|
||||||
|
|
|
||||||
|
|
@ -16,60 +16,53 @@ package yggconn
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"crypto/rand"
|
||||||
|
"crypto/rsa"
|
||||||
|
"crypto/tls"
|
||||||
|
"crypto/x509"
|
||||||
|
"encoding/hex"
|
||||||
|
"encoding/pem"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"math/big"
|
||||||
"net"
|
"net"
|
||||||
"strings"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/libp2p/go-yamux"
|
"github.com/lucas-clemente/quic-go"
|
||||||
|
"github.com/yggdrasil-network/yggdrasil-go/src/crypto"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (n *Node) yamuxConfig() *yamux.Config {
|
|
||||||
cfg := yamux.DefaultConfig()
|
|
||||||
cfg.EnableKeepAlive = false
|
|
||||||
cfg.ConnectionWriteTimeout = time.Second * 15
|
|
||||||
cfg.MaxMessageSize = 65535
|
|
||||||
cfg.ReadBufSize = 655350
|
|
||||||
return cfg
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n *Node) listenFromYgg() {
|
func (n *Node) listenFromYgg() {
|
||||||
|
var err error
|
||||||
|
n.listener, err = quic.Listen(
|
||||||
|
n.packetConn, // yggdrasil.PacketConn
|
||||||
|
n.tlsConfig, // TLS config
|
||||||
|
n.quicConfig, // QUIC config
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
for {
|
for {
|
||||||
conn, err := n.listener.Accept()
|
session, err := n.listener.Accept(context.TODO())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
n.log.Println("n.listener.Accept:", err)
|
n.log.Println("n.listener.Accept:", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
var session *yamux.Session
|
go n.listenFromQUIC(session)
|
||||||
// If the remote address is lower than ours then we'll be the
|
|
||||||
// server. Otherwse we'll be the client.
|
|
||||||
if strings.Compare(conn.RemoteAddr().String(), n.DerivedSessionName()) < 0 {
|
|
||||||
session, err = yamux.Server(conn, n.yamuxConfig())
|
|
||||||
} else {
|
|
||||||
session, err = yamux.Client(conn, n.yamuxConfig())
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
go n.listenFromYggConn(session)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Node) listenFromYggConn(session *yamux.Session) {
|
func (n *Node) listenFromQUIC(session quic.Session) {
|
||||||
n.sessions.Store(session.RemoteAddr().String(), session)
|
n.sessions.Store(session.RemoteAddr().String(), session)
|
||||||
defer n.sessions.Delete(session.RemoteAddr())
|
defer n.sessions.Delete(session.RemoteAddr())
|
||||||
defer func() {
|
|
||||||
if err := session.Close(); err != nil {
|
|
||||||
n.log.Println("session.Close:", err)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
for {
|
for {
|
||||||
st, err := session.AcceptStream()
|
st, err := session.AcceptStream(context.TODO())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
n.log.Println("session.AcceptStream:", err)
|
n.log.Println("session.AcceptStream:", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
n.incoming <- st
|
n.incoming <- QUICStream{st, session}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -96,29 +89,74 @@ func (n *Node) Dial(network, address string) (net.Conn, error) {
|
||||||
// Implements http.Transport.DialContext
|
// Implements http.Transport.DialContext
|
||||||
func (n *Node) DialContext(ctx context.Context, network, address string) (net.Conn, error) {
|
func (n *Node) DialContext(ctx context.Context, network, address string) (net.Conn, error) {
|
||||||
s, ok1 := n.sessions.Load(address)
|
s, ok1 := n.sessions.Load(address)
|
||||||
session, ok2 := s.(*yamux.Session)
|
session, ok2 := s.(quic.Session)
|
||||||
if !ok1 || !ok2 || (ok1 && ok2 && session.IsClosed()) {
|
if !ok1 || !ok2 || (ok1 && ok2 && session.ConnectionState().HandshakeComplete) {
|
||||||
conn, err := n.dialer.DialContext(ctx, network, address)
|
dest, err := hex.DecodeString(address)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if len(dest) != crypto.BoxPubKeyLen {
|
||||||
|
return nil, errors.New("invalid key length supplied")
|
||||||
|
}
|
||||||
|
var pubKey crypto.BoxPubKey
|
||||||
|
copy(pubKey[:], dest)
|
||||||
|
|
||||||
|
nodeID := crypto.GetNodeID(&pubKey)
|
||||||
|
var nodeMask crypto.NodeID
|
||||||
|
for i := range nodeMask {
|
||||||
|
nodeMask[i] = 0xFF
|
||||||
|
}
|
||||||
|
if _, pk, rerr := n.core.Resolve(nodeID, &nodeMask); err != nil {
|
||||||
|
return nil, fmt.Errorf("n.core.Resolve: %w", rerr)
|
||||||
|
} else if pk != nil {
|
||||||
|
pubKey = *pk
|
||||||
|
}
|
||||||
|
|
||||||
|
session, err = quic.Dial(
|
||||||
|
n.packetConn, // yggdrasil.PacketConn
|
||||||
|
&pubKey, // dial address
|
||||||
|
address, // dial SNI
|
||||||
|
n.tlsConfig, // TLS config
|
||||||
|
n.quicConfig, // QUIC config
|
||||||
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
n.log.Println("n.dialer.DialContext:", err)
|
n.log.Println("n.dialer.DialContext:", err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
// If the remote address is lower than ours then we will be the
|
go n.listenFromQUIC(session)
|
||||||
// server. Otherwise we'll be the client.
|
|
||||||
if strings.Compare(conn.RemoteAddr().String(), n.DerivedSessionName()) < 0 {
|
|
||||||
session, err = yamux.Server(conn, n.yamuxConfig())
|
|
||||||
} else {
|
|
||||||
session, err = yamux.Client(conn, n.yamuxConfig())
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
go n.listenFromYggConn(session)
|
|
||||||
}
|
}
|
||||||
st, err := session.OpenStream()
|
st, err := session.OpenStream()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
n.log.Println("session.OpenStream:", err)
|
n.log.Println("session.OpenStream:", err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return st, nil
|
return QUICStream{st, session}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *Node) generateTLSConfig() *tls.Config {
|
||||||
|
key, err := rsa.GenerateKey(rand.Reader, 1024)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
template := x509.Certificate{
|
||||||
|
SerialNumber: big.NewInt(1),
|
||||||
|
NotAfter: time.Now().Add(time.Hour * 24 * 365),
|
||||||
|
DNSNames: []string{n.DerivedSessionName()},
|
||||||
|
}
|
||||||
|
certDER, err := x509.CreateCertificate(rand.Reader, &template, &template, &key.PublicKey, key)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
keyPEM := pem.EncodeToMemory(&pem.Block{Type: "RSA PRIVATE KEY", Bytes: x509.MarshalPKCS1PrivateKey(key)})
|
||||||
|
certPEM := pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: certDER})
|
||||||
|
|
||||||
|
tlsCert, err := tls.X509KeyPair(certPEM, keyPEM)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
return &tls.Config{
|
||||||
|
Certificates: []tls.Certificate{tlsCert},
|
||||||
|
NextProtos: []string{"quic-matrix-ygg"},
|
||||||
|
InsecureSkipVerify: true,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
20
cmd/dendrite-demo-yggdrasil/yggconn/stream.go
Normal file
20
cmd/dendrite-demo-yggdrasil/yggconn/stream.go
Normal file
|
|
@ -0,0 +1,20 @@
|
||||||
|
package yggconn
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net"
|
||||||
|
|
||||||
|
"github.com/lucas-clemente/quic-go"
|
||||||
|
)
|
||||||
|
|
||||||
|
type QUICStream struct {
|
||||||
|
quic.Stream
|
||||||
|
session quic.Session
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s QUICStream) LocalAddr() net.Addr {
|
||||||
|
return s.session.LocalAddr()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s QUICStream) RemoteAddr() net.Addr {
|
||||||
|
return s.session.RemoteAddr()
|
||||||
|
}
|
||||||
Loading…
Reference in a new issue