Tweaks to pinecone demo to shutdown more cleanly

This commit is contained in:
Devon Hudson 2023-02-24 15:41:47 -07:00
parent 3d31b131fc
commit b28406c7d0
No known key found for this signature in database
GPG key ID: CD06B18E77F6A628
3 changed files with 62 additions and 28 deletions

View file

@ -68,12 +68,14 @@ type P2PMonolith struct {
EventChannel chan pineconeEvents.Event EventChannel chan pineconeEvents.Event
RelayRetriever relay.RelayServerRetriever RelayRetriever relay.RelayServerRetriever
dendrite setup.Monolith dendrite setup.Monolith
port int port int
httpMux *mux.Router httpMux *mux.Router
pineconeMux *mux.Router pineconeMux *mux.Router
listener net.Listener httpServer *http.Server
httpListenAddr string listener net.Listener
httpListenAddr string
stopHandlingEvents chan bool
} }
func GenerateDefaultConfig(sk ed25519.PrivateKey, storageDir string, cacheDir string, dbPrefix string) *config.Dendrite { func GenerateDefaultConfig(sk ed25519.PrivateKey, storageDir string, cacheDir string, dbPrefix string) *config.Dendrite {
@ -199,8 +201,10 @@ func (p *P2PMonolith) StartMonolith() {
} }
func (p *P2PMonolith) Stop() { func (p *P2PMonolith) Stop() {
logrus.Info("Stopping monolith")
_ = p.BaseDendrite.Close() _ = p.BaseDendrite.Close()
p.WaitForShutdown() p.WaitForShutdown()
logrus.Info("Stopped monolith")
} }
func (p *P2PMonolith) WaitForShutdown() { func (p *P2PMonolith) WaitForShutdown() {
@ -209,6 +213,16 @@ func (p *P2PMonolith) WaitForShutdown() {
} }
func (p *P2PMonolith) closeAllResources() { func (p *P2PMonolith) closeAllResources() {
logrus.Info("Closing monolith resources")
if p.httpServer != nil {
p.httpServer.Shutdown(context.Background())
}
select {
case p.stopHandlingEvents <- true:
default:
}
if p.listener != nil { if p.listener != nil {
_ = p.listener.Close() _ = p.listener.Close()
} }
@ -224,6 +238,7 @@ func (p *P2PMonolith) closeAllResources() {
if p.Router != nil { if p.Router != nil {
_ = p.Router.Close() _ = p.Router.Close()
} }
logrus.Info("Monolith resources closed")
} }
func (p *P2PMonolith) Addr() string { func (p *P2PMonolith) Addr() string {
@ -280,7 +295,7 @@ func (p *P2PMonolith) setupHttpServers(userProvider *users.PineconeUserProvider,
func (p *P2PMonolith) startHTTPServers() { func (p *P2PMonolith) startHTTPServers() {
go func() { go func() {
// Build both ends of a HTTP multiplex. // Build both ends of a HTTP multiplex.
httpServer := &http.Server{ p.httpServer = &http.Server{
Addr: ":0", Addr: ":0",
TLSNextProto: map[string]func(*http.Server, *tls.Conn, http.Handler){}, TLSNextProto: map[string]func(*http.Server, *tls.Conn, http.Handler){},
ReadTimeout: 10 * time.Second, ReadTimeout: 10 * time.Second,
@ -296,12 +311,13 @@ func (p *P2PMonolith) startHTTPServers() {
pubkeyString := hex.EncodeToString(pubkey[:]) pubkeyString := hex.EncodeToString(pubkey[:])
logrus.Info("Listening on ", pubkeyString) logrus.Info("Listening on ", pubkeyString)
switch httpServer.Serve(p.Sessions.Protocol(SessionProtocol)) { switch p.httpServer.Serve(p.Sessions.Protocol(SessionProtocol)) {
case net.ErrClosed, http.ErrServerClosed: case net.ErrClosed, http.ErrServerClosed:
logrus.Info("Stopped listening on ", pubkeyString) logrus.Info("Stopped listening on ", pubkeyString)
default: default:
logrus.Error("Stopped listening on ", pubkeyString) logrus.Error("Stopped listening on ", pubkeyString)
} }
logrus.Info("Stopped goroutine listening on ", pubkeyString)
}() }()
p.httpListenAddr = fmt.Sprintf(":%d", p.port) p.httpListenAddr = fmt.Sprintf(":%d", p.port)
@ -313,10 +329,12 @@ func (p *P2PMonolith) startHTTPServers() {
default: default:
logrus.Error("Stopped listening on ", p.httpListenAddr) logrus.Error("Stopped listening on ", p.httpListenAddr)
} }
logrus.Info("Stopped goroutine listening on ", p.httpListenAddr)
}() }()
} }
func (p *P2PMonolith) startEventHandler() { func (p *P2PMonolith) startEventHandler() {
p.stopHandlingEvents = make(chan bool)
stopRelayServerSync := make(chan bool) stopRelayServerSync := make(chan bool)
eLog := logrus.WithField("pinecone", "events") eLog := logrus.WithField("pinecone", "events")
p.RelayRetriever = relay.NewRelayServerRetriever( p.RelayRetriever = relay.NewRelayServerRetriever(
@ -329,25 +347,40 @@ func (p *P2PMonolith) startEventHandler() {
p.RelayRetriever.InitializeRelayServers(eLog) p.RelayRetriever.InitializeRelayServers(eLog)
go func(ch <-chan pineconeEvents.Event) { go func(ch <-chan pineconeEvents.Event) {
for event := range ch { for {
switch e := event.(type) { select {
case pineconeEvents.PeerAdded: case event := <-ch:
p.RelayRetriever.StartSync() switch e := event.(type) {
case pineconeEvents.PeerRemoved: case pineconeEvents.PeerAdded:
if p.RelayRetriever.IsRunning() && p.Router.TotalPeerCount() == 0 { p.RelayRetriever.StartSync()
stopRelayServerSync <- true case pineconeEvents.PeerRemoved:
} if p.RelayRetriever.IsRunning() && p.Router.TotalPeerCount() == 0 {
case pineconeEvents.BroadcastReceived: // NOTE: Don't block on channel
// eLog.Info("Broadcast received from: ", e.PeerID) select {
case stopRelayServerSync <- true:
default:
}
}
case pineconeEvents.BroadcastReceived:
// eLog.Info("Broadcast received from: ", e.PeerID)
req := &federationAPI.PerformWakeupServersRequest{ req := &federationAPI.PerformWakeupServersRequest{
ServerNames: []gomatrixserverlib.ServerName{gomatrixserverlib.ServerName(e.PeerID)}, ServerNames: []gomatrixserverlib.ServerName{gomatrixserverlib.ServerName(e.PeerID)},
}
res := &federationAPI.PerformWakeupServersResponse{}
if err := p.dendrite.FederationAPI.PerformWakeupServers(p.BaseDendrite.Context(), req, res); err != nil {
eLog.WithError(err).Error("Failed to wakeup destination", e.PeerID)
}
} }
res := &federationAPI.PerformWakeupServersResponse{} case <-p.stopHandlingEvents:
if err := p.dendrite.FederationAPI.PerformWakeupServers(p.BaseDendrite.Context(), req, res); err != nil { logrus.Info("Stopping processing pinecone events")
eLog.WithError(err).Error("Failed to wakeup destination", e.PeerID) // NOTE: Don't block on channel
select {
case stopRelayServerSync <- true:
default:
} }
default: logrus.Info("Stopped processing pinecone events")
return
} }
} }
}(p.EventChannel) }(p.EventChannel)

View file

@ -38,7 +38,7 @@ type RelayServerRetriever struct {
relayServersQueried map[gomatrixserverlib.ServerName]bool relayServersQueried map[gomatrixserverlib.ServerName]bool
queriedServersMutex sync.Mutex queriedServersMutex sync.Mutex
running atomic.Bool running atomic.Bool
quit <-chan bool quit chan bool
} }
func NewRelayServerRetriever( func NewRelayServerRetriever(
@ -46,7 +46,7 @@ func NewRelayServerRetriever(
serverName gomatrixserverlib.ServerName, serverName gomatrixserverlib.ServerName,
federationAPI federationAPI.FederationInternalAPI, federationAPI federationAPI.FederationInternalAPI,
relayAPI relayServerAPI.RelayInternalAPI, relayAPI relayServerAPI.RelayInternalAPI,
quit <-chan bool, quit chan bool,
) RelayServerRetriever { ) RelayServerRetriever {
return RelayServerRetriever{ return RelayServerRetriever{
ctx: ctx, ctx: ctx,
@ -151,6 +151,7 @@ func (r *RelayServerRetriever) SyncRelayServers(stop <-chan bool) {
if !t.Stop() { if !t.Stop() {
<-t.C <-t.C
} }
logrus.Info("Stopped relay server retriever")
return return
case <-t.C: case <-t.C:
} }

View file

@ -60,7 +60,7 @@ func TestRelayRetrieverInitialization(t *testing.T) {
"server", "server",
&FakeFedAPI{}, &FakeFedAPI{},
&FakeRelayAPI{}, &FakeRelayAPI{},
make(<-chan bool), make(chan bool),
) )
retriever.InitializeRelayServers(logrus.WithField("test", "relay")) retriever.InitializeRelayServers(logrus.WithField("test", "relay"))
@ -74,7 +74,7 @@ func TestRelayRetrieverSync(t *testing.T) {
"server", "server",
&FakeFedAPI{}, &FakeFedAPI{},
&FakeRelayAPI{}, &FakeRelayAPI{},
make(<-chan bool), make(chan bool),
) )
retriever.InitializeRelayServers(logrus.WithField("test", "relay")) retriever.InitializeRelayServers(logrus.WithField("test", "relay"))