mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-21 05:43:09 -06:00
Reset sessions when coordinates change
This commit is contained in:
parent
71198eb303
commit
f15a96bec0
|
|
@ -50,7 +50,7 @@ type Node struct {
|
||||||
listener quic.Listener
|
listener quic.Listener
|
||||||
tlsConfig *tls.Config
|
tlsConfig *tls.Config
|
||||||
quicConfig *quic.Config
|
quicConfig *quic.Config
|
||||||
sessions sync.Map // string -> quic.Session
|
sessions sync.Map // string -> *session
|
||||||
sessionCount atomic.Uint32
|
sessionCount atomic.Uint32
|
||||||
sessionFunc func(address string)
|
sessionFunc func(address string)
|
||||||
coords sync.Map // string -> yggdrasil.Coords
|
coords sync.Map // string -> yggdrasil.Coords
|
||||||
|
|
@ -94,6 +94,24 @@ func Setup(instanceName, storageDirectory string) (*Node, error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
n.core.SetCoordChangeCallback(func(old, new yggdrasil.Coords) {
|
||||||
|
fmt.Println("COORDINATE CHANGE!")
|
||||||
|
fmt.Println("Old:", old)
|
||||||
|
fmt.Println("New:", new)
|
||||||
|
n.coords.Range(func(k, _ interface{}) bool {
|
||||||
|
fmt.Println("Deleting cached coords for", k)
|
||||||
|
n.coords.Delete(k)
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
n.sessions.Range(func(k, v interface{}) bool {
|
||||||
|
if s, ok := v.(*session); ok {
|
||||||
|
fmt.Println("Killing session", k)
|
||||||
|
s.kill()
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
n.config.Peers = []string{}
|
n.config.Peers = []string{}
|
||||||
n.config.AdminListen = "none"
|
n.config.AdminListen = "none"
|
||||||
n.config.MulticastInterfaces = []string{}
|
n.config.MulticastInterfaces = []string{}
|
||||||
|
|
@ -127,7 +145,7 @@ func Setup(instanceName, storageDirectory string) (*Node, error) {
|
||||||
MaxIncomingStreams: 0,
|
MaxIncomingStreams: 0,
|
||||||
MaxIncomingUniStreams: 0,
|
MaxIncomingUniStreams: 0,
|
||||||
KeepAlive: true,
|
KeepAlive: true,
|
||||||
MaxIdleTimeout: time.Minute * 5,
|
MaxIdleTimeout: time.Minute * 30,
|
||||||
HandshakeTimeout: time.Second * 15,
|
HandshakeTimeout: time.Second * 15,
|
||||||
}
|
}
|
||||||
copy(n.quicConfig.StatelessResetKey, n.EncryptionPublicKey())
|
copy(n.quicConfig.StatelessResetKey, n.EncryptionPublicKey())
|
||||||
|
|
|
||||||
|
|
@ -38,20 +38,23 @@ type session struct {
|
||||||
node *Node
|
node *Node
|
||||||
session quic.Session
|
session quic.Session
|
||||||
address string
|
address string
|
||||||
cancel chan struct{}
|
context context.Context
|
||||||
|
cancel context.CancelFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Node) newSession(sess quic.Session, address string) *session {
|
func (n *Node) newSession(sess quic.Session, address string) *session {
|
||||||
|
ctx, cancel := context.WithCancel(context.TODO())
|
||||||
return &session{
|
return &session{
|
||||||
node: n,
|
node: n,
|
||||||
session: sess,
|
session: sess,
|
||||||
address: address,
|
address: address,
|
||||||
cancel: make(chan struct{}),
|
context: ctx,
|
||||||
|
cancel: cancel,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *session) kill() {
|
func (s *session) kill() {
|
||||||
close(s.cancel)
|
s.cancel()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Node) listenFromYgg() {
|
func (n *Node) listenFromYgg() {
|
||||||
|
|
@ -85,30 +88,23 @@ func (n *Node) listenFromYgg() {
|
||||||
|
|
||||||
func (s *session) listenFromQUIC() {
|
func (s *session) listenFromQUIC() {
|
||||||
if existing, ok := s.node.sessions.Load(s.address); ok {
|
if existing, ok := s.node.sessions.Load(s.address); ok {
|
||||||
if existingSession, ok := existing.(session); ok {
|
if existingSession, ok := existing.(*session); ok {
|
||||||
|
fmt.Println("Killing existing session to replace", s.address)
|
||||||
existingSession.kill()
|
existingSession.kill()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
s.node.sessionCount.Inc()
|
s.node.sessionCount.Inc()
|
||||||
s.node.sessions.Store(s.address, s.session)
|
s.node.sessions.Store(s.address, s)
|
||||||
defer s.node.sessions.Delete(s.address)
|
defer s.node.sessions.Delete(s.address)
|
||||||
defer s.node.sessionCount.Dec()
|
defer s.node.sessionCount.Dec()
|
||||||
for {
|
for {
|
||||||
select {
|
st, err := s.session.AcceptStream(s.context)
|
||||||
case <-s.cancel:
|
|
||||||
_ = s.session.CloseWithError(0, "killed")
|
|
||||||
return
|
|
||||||
default:
|
|
||||||
ctx, cancel := context.WithTimeout(context.TODO(), s.node.quicConfig.MaxIdleTimeout)
|
|
||||||
defer cancel()
|
|
||||||
st, err := s.session.AcceptStream(ctx)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.node.log.Println("session.AcceptStream:", err)
|
s.node.log.Println("session.AcceptStream:", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
s.node.incoming <- QUICStream{st, s.session}
|
s.node.incoming <- QUICStream{st, s.session}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Implements net.Listener
|
// Implements net.Listener
|
||||||
|
|
@ -135,7 +131,7 @@ func (n *Node) Dial(network, address string) (net.Conn, error) {
|
||||||
// nolint:gocyclo
|
// nolint:gocyclo
|
||||||
func (n *Node) DialContext(ctx context.Context, network, address string) (net.Conn, error) {
|
func (n *Node) DialContext(ctx context.Context, network, address string) (net.Conn, error) {
|
||||||
s, ok1 := n.sessions.Load(address)
|
s, ok1 := n.sessions.Load(address)
|
||||||
session, ok2 := s.(quic.Session)
|
session, ok2 := s.(*session)
|
||||||
if !ok1 || !ok2 {
|
if !ok1 || !ok2 {
|
||||||
// First of all, check if we think we know the coords of this
|
// First of all, check if we think we know the coords of this
|
||||||
// node. If we do then we'll try to dial to it directly. This
|
// node. If we do then we'll try to dial to it directly. This
|
||||||
|
|
@ -214,17 +210,17 @@ func (n *Node) DialContext(ctx context.Context, network, address string) (net.Co
|
||||||
return nil, fmt.Errorf("should have found session but didn't")
|
return nil, fmt.Errorf("should have found session but didn't")
|
||||||
}
|
}
|
||||||
|
|
||||||
st, err := session.OpenStream()
|
st, err := session.session.OpenStream()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
n.log.Println("session.OpenStream:", err)
|
n.log.Println("session.OpenStream:", err)
|
||||||
_ = session.CloseWithError(0, "expected to be able to open session")
|
_ = session.session.CloseWithError(0, "expected to be able to open session")
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return QUICStream{st, session}, nil
|
return QUICStream{st, session.session}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Node) tryDial(address string, coords yggdrasil.Coords) (quic.Session, error) {
|
func (n *Node) tryDial(address string, coords yggdrasil.Coords) (*session, error) {
|
||||||
session, err := quic.Dial(
|
quicSession, err := quic.Dial(
|
||||||
n.core, // yggdrasil.PacketConn
|
n.core, // yggdrasil.PacketConn
|
||||||
coords, // dial address
|
coords, // dial address
|
||||||
address, // dial SNI
|
address, // dial SNI
|
||||||
|
|
@ -234,19 +230,20 @@ func (n *Node) tryDial(address string, coords yggdrasil.Coords) (quic.Session, e
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if len(session.ConnectionState().PeerCertificates) != 1 {
|
if len(quicSession.ConnectionState().PeerCertificates) != 1 {
|
||||||
_ = session.CloseWithError(0, "expected a peer certificate")
|
_ = quicSession.CloseWithError(0, "expected a peer certificate")
|
||||||
return nil, errors.New("didn't receive a peer certificate")
|
return nil, errors.New("didn't receive a peer certificate")
|
||||||
}
|
}
|
||||||
if len(session.ConnectionState().PeerCertificates[0].DNSNames) != 1 {
|
if len(quicSession.ConnectionState().PeerCertificates[0].DNSNames) != 1 {
|
||||||
_ = session.CloseWithError(0, "expected a DNS name")
|
_ = quicSession.CloseWithError(0, "expected a DNS name")
|
||||||
return nil, errors.New("didn't receive a DNS name")
|
return nil, errors.New("didn't receive a DNS name")
|
||||||
}
|
}
|
||||||
if gotAddress := session.ConnectionState().PeerCertificates[0].DNSNames[0]; address != gotAddress {
|
if gotAddress := quicSession.ConnectionState().PeerCertificates[0].DNSNames[0]; address != gotAddress {
|
||||||
_ = session.CloseWithError(0, "you aren't the host I was hoping for")
|
_ = quicSession.CloseWithError(0, "you aren't the host I was hoping for")
|
||||||
return nil, fmt.Errorf("expected %q but dialled %q", address, gotAddress)
|
return nil, fmt.Errorf("expected %q but dialled %q", address, gotAddress)
|
||||||
}
|
}
|
||||||
go n.newSession(session, address).listenFromQUIC()
|
session := n.newSession(quicSession, address)
|
||||||
|
go session.listenFromQUIC()
|
||||||
go n.sessionFunc(address)
|
go n.sessionFunc(address)
|
||||||
return session, nil
|
return session, nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
2
go.mod
2
go.mod
|
|
@ -36,7 +36,7 @@ require (
|
||||||
github.com/uber-go/atomic v1.3.0 // indirect
|
github.com/uber-go/atomic v1.3.0 // indirect
|
||||||
github.com/uber/jaeger-client-go v2.15.0+incompatible
|
github.com/uber/jaeger-client-go v2.15.0+incompatible
|
||||||
github.com/uber/jaeger-lib v1.5.0
|
github.com/uber/jaeger-lib v1.5.0
|
||||||
github.com/yggdrasil-network/yggdrasil-go v0.3.15-0.20200715104113-1046b00c3be3
|
github.com/yggdrasil-network/yggdrasil-go v0.3.15-0.20200806125501-cd4685a3b4de
|
||||||
go.uber.org/atomic v1.4.0
|
go.uber.org/atomic v1.4.0
|
||||||
golang.org/x/crypto v0.0.0-20200423211502-4bdfaf469ed5
|
golang.org/x/crypto v0.0.0-20200423211502-4bdfaf469ed5
|
||||||
golang.org/x/mobile v0.0.0-20200801112145-973feb4309de // indirect
|
golang.org/x/mobile v0.0.0-20200801112145-973feb4309de // indirect
|
||||||
|
|
|
||||||
4
go.sum
4
go.sum
|
|
@ -655,6 +655,10 @@ github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:
|
||||||
github.com/yggdrasil-network/yggdrasil-extras v0.0.0-20200525205615-6c8a4a2e8855/go.mod h1:xQdsh08Io6nV4WRnOVTe6gI8/2iTvfLDQ0CYa5aMt+I=
|
github.com/yggdrasil-network/yggdrasil-extras v0.0.0-20200525205615-6c8a4a2e8855/go.mod h1:xQdsh08Io6nV4WRnOVTe6gI8/2iTvfLDQ0CYa5aMt+I=
|
||||||
github.com/yggdrasil-network/yggdrasil-go v0.3.15-0.20200715104113-1046b00c3be3 h1:teLoIJgPHysREs8P6GlcS/PgaU9W9+GQndikFCQ1lY0=
|
github.com/yggdrasil-network/yggdrasil-go v0.3.15-0.20200715104113-1046b00c3be3 h1:teLoIJgPHysREs8P6GlcS/PgaU9W9+GQndikFCQ1lY0=
|
||||||
github.com/yggdrasil-network/yggdrasil-go v0.3.15-0.20200715104113-1046b00c3be3/go.mod h1:d+Nz6SPeG6kmeSPFL0cvfWfgwEql75fUnZiAONgvyBE=
|
github.com/yggdrasil-network/yggdrasil-go v0.3.15-0.20200715104113-1046b00c3be3/go.mod h1:d+Nz6SPeG6kmeSPFL0cvfWfgwEql75fUnZiAONgvyBE=
|
||||||
|
github.com/yggdrasil-network/yggdrasil-go v0.3.15-0.20200806124633-bd1bdd6be073 h1:Fg4Bszd2qp6eyz/yDMYfB8g2PC1FfNQphGRgZAyD0VU=
|
||||||
|
github.com/yggdrasil-network/yggdrasil-go v0.3.15-0.20200806124633-bd1bdd6be073/go.mod h1:d+Nz6SPeG6kmeSPFL0cvfWfgwEql75fUnZiAONgvyBE=
|
||||||
|
github.com/yggdrasil-network/yggdrasil-go v0.3.15-0.20200806125501-cd4685a3b4de h1:p91aw0Mvol825U+5bvV9BBPl+HQxIczj7wxIOxZs70M=
|
||||||
|
github.com/yggdrasil-network/yggdrasil-go v0.3.15-0.20200806125501-cd4685a3b4de/go.mod h1:d+Nz6SPeG6kmeSPFL0cvfWfgwEql75fUnZiAONgvyBE=
|
||||||
go.opencensus.io v0.18.0/go.mod h1:vKdFvxhtzZ9onBp9VKHK8z/sRpBMnKAsufL7wlDrCOA=
|
go.opencensus.io v0.18.0/go.mod h1:vKdFvxhtzZ9onBp9VKHK8z/sRpBMnKAsufL7wlDrCOA=
|
||||||
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
|
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
|
||||||
go.opencensus.io v0.22.1/go.mod h1:Ap50jQcDJrx6rB6VgeeFPtuPIf3wMRvRfrfYDO6+BmA=
|
go.opencensus.io v0.22.1/go.mod h1:Ap50jQcDJrx6rB6VgeeFPtuPIf3wMRvRfrfYDO6+BmA=
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue