zerolog: apps #2

This commit is contained in:
KuhnChris 2023-07-03 23:43:12 +02:00
parent e064ee5058
commit 447746e214
7 changed files with 74 additions and 65 deletions

View file

@ -34,7 +34,8 @@ import (
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/gomatrixserverlib/spec"
"github.com/sirupsen/logrus"
log "github.com/rs/zerolog/log"
pineconeRouter "github.com/matrix-org/pinecone/router"
)
@ -112,7 +113,7 @@ func main() {
for {
conn, err := listener.Accept()
if err != nil {
logrus.WithError(err).Error("listener.Accept failed")
log.Error().Err(err).Msg("listener.Accept failed")
continue
}
@ -121,7 +122,7 @@ func main() {
pineconeRouter.ConnectionPeerType(pineconeRouter.PeerTypeRemote),
)
if err != nil {
logrus.WithError(err).Error("pSwitch.Connect failed")
log.Error().Err(err).Msg("pSwitch.Connect failed")
continue
}

View file

@ -54,6 +54,8 @@ import (
"github.com/matrix-org/gomatrixserverlib/spec"
"github.com/sirupsen/logrus"
log "github.com/rs/zerolog/log"
pineconeConnections "github.com/matrix-org/pinecone/connections"
pineconeMulticast "github.com/matrix-org/pinecone/multicast"
pineconeRouter "github.com/matrix-org/pinecone/router"
@ -115,6 +117,8 @@ func GenerateDefaultConfig(sk ed25519.PrivateKey, storageDir string, cacheDir st
func (p *P2PMonolith) SetupPinecone(sk ed25519.PrivateKey) {
p.EventChannel = make(chan pineconeEvents.Event)
// log.Logger.With().Str("pinecone", "router").Logger()
// not compatible with pinecone types.logger => Printf missing
p.Router = pineconeRouter.NewRouter(logrus.WithField("pinecone", "router"), sk)
p.Router.EnableHopLimiting()
p.Router.EnableWakeupBroadcasts()
@ -166,7 +170,7 @@ func (p *P2PMonolith) SetupDendrite(
UserAPI: userAPI,
}
relayAPI := relayapi.NewRelayInternalAPI(cfg, cm, federation, rsAPI, keyRing, producer, enableRelaying, caches)
logrus.Infof("Relaying enabled: %v", relayAPI.RelayingEnabled())
log.Info().Msgf("Relaying enabled: %v", relayAPI.RelayingEnabled())
p.dendrite = setup.Monolith{
Config: cfg,
@ -206,10 +210,10 @@ func (p *P2PMonolith) StartMonolith() {
}
func (p *P2PMonolith) Stop() {
logrus.Info("Stopping monolith")
log.Info().Msg("Stopping monolith")
p.ProcessCtx.ShutdownDendrite()
p.WaitForShutdown()
logrus.Info("Stopped monolith")
log.Info().Msg("Stopped monolith")
}
func (p *P2PMonolith) WaitForShutdown() {
@ -218,7 +222,7 @@ func (p *P2PMonolith) WaitForShutdown() {
}
func (p *P2PMonolith) closeAllResources() {
logrus.Info("Closing monolith resources")
log.Info().Msg("Closing monolith resources")
p.httpServerMu.Lock()
if p.httpServer != nil {
_ = p.httpServer.Shutdown(context.Background())
@ -245,7 +249,7 @@ func (p *P2PMonolith) closeAllResources() {
if p.Router != nil {
_ = p.Router.Close()
}
logrus.Info("Monolith resources closed")
log.Info().Msg("Monolith resources closed")
}
func (p *P2PMonolith) Addr() string {
@ -268,7 +272,7 @@ func (p *P2PMonolith) setupHttpServers(userProvider *users.PineconeUserProvider,
p.httpMux.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
c, err := wsUpgrader.Upgrade(w, r, nil)
if err != nil {
logrus.WithError(err).Error("Failed to upgrade WebSocket connection")
log.Error().Err(err).Msg("Failed to upgrade WebSocket connection")
return
}
conn := conn.WrapWebSocketConn(c)
@ -277,7 +281,7 @@ func (p *P2PMonolith) setupHttpServers(userProvider *users.PineconeUserProvider,
pineconeRouter.ConnectionZone("websocket"),
pineconeRouter.ConnectionPeerType(pineconeRouter.PeerTypeRemote),
); err != nil {
logrus.WithError(err).Error("Failed to connect WebSocket peer to Pinecone switch")
log.Error().Err(err).Msg("Failed to connect WebSocket peer to Pinecone switch")
}
})
}
@ -317,34 +321,35 @@ func (p *P2PMonolith) startHTTPServers() {
p.httpServerMu.Unlock()
pubkey := p.Router.PublicKey()
pubkeyString := hex.EncodeToString(pubkey[:])
logrus.Info("Listening on ", pubkeyString)
log.Info().Msgf("Listening on %s", pubkeyString)
switch p.httpServer.Serve(p.Sessions.Protocol(SessionProtocol)) {
case net.ErrClosed, http.ErrServerClosed:
logrus.Info("Stopped listening on ", pubkeyString)
log.Info().Msgf("Stopped listening on %s", pubkeyString)
default:
logrus.Error("Stopped listening on ", pubkeyString)
log.Error().Msgf("Stopped listening on %s", pubkeyString)
}
logrus.Info("Stopped goroutine listening on ", pubkeyString)
log.Info().Msgf("Stopped goroutine listening on %s", pubkeyString)
}()
p.httpListenAddr = fmt.Sprintf(":%d", p.port)
go func() {
logrus.Info("Listening on ", p.httpListenAddr)
log.Info().Msgf("Listening on %s", p.httpListenAddr)
switch http.ListenAndServe(p.httpListenAddr, p.httpMux) {
case net.ErrClosed, http.ErrServerClosed:
logrus.Info("Stopped listening on ", p.httpListenAddr)
log.Info().Msgf("Stopped listening on %s", p.httpListenAddr)
default:
logrus.Error("Stopped listening on ", p.httpListenAddr)
log.Error().Msgf("Stopped listening on %s", p.httpListenAddr)
}
logrus.Info("Stopped goroutine listening on ", p.httpListenAddr)
log.Info().Msgf("Stopped goroutine listening on %s", p.httpListenAddr)
}()
}
func (p *P2PMonolith) startEventHandler() {
p.stopHandlingEvents = make(chan bool)
stopRelayServerSync := make(chan bool)
eLog := logrus.WithField("pinecone", "events")
logger := log.Logger.With().Str("pinecone", "events").Logger()
//eLog := logrus.WithField("pinecone", "events")
p.RelayRetriever = relay.NewRelayServerRetriever(
context.Background(),
spec.ServerName(p.Router.PublicKey().String()),
@ -352,7 +357,7 @@ func (p *P2PMonolith) startEventHandler() {
p.dendrite.RelayAPI,
stopRelayServerSync,
)
p.RelayRetriever.InitializeRelayServers(eLog)
p.RelayRetriever.InitializeRelayServers(&logger)
go func(ch <-chan pineconeEvents.Event) {
for {
@ -377,17 +382,17 @@ func (p *P2PMonolith) startEventHandler() {
}
res := &federationAPI.PerformWakeupServersResponse{}
if err := p.dendrite.FederationAPI.PerformWakeupServers(p.ProcessCtx.Context(), req, res); err != nil {
eLog.WithError(err).Error("Failed to wakeup destination", e.PeerID)
log.Error().Err(err).Msgf("Failed to wakeup destination %s", e.PeerID)
}
}
case <-p.stopHandlingEvents:
logrus.Info("Stopping processing pinecone events")
log.Info().Msg("Stopping processing pinecone events")
// NOTE: Don't block on channel
select {
case stopRelayServerSync <- true:
default:
}
logrus.Info("Stopped processing pinecone events")
log.Info().Msg("Stopped processing pinecone events")
return
}
}

View file

@ -22,7 +22,8 @@ import (
federationAPI "github.com/matrix-org/dendrite/federationapi/api"
relayServerAPI "github.com/matrix-org/dendrite/relayapi/api"
"github.com/matrix-org/gomatrixserverlib/spec"
"github.com/sirupsen/logrus"
"github.com/rs/zerolog"
log "github.com/rs/zerolog/log"
"go.uber.org/atomic"
)
@ -59,12 +60,12 @@ func NewRelayServerRetriever(
}
}
func (r *RelayServerRetriever) InitializeRelayServers(eLog *logrus.Entry) {
func (r *RelayServerRetriever) InitializeRelayServers(eLog *zerolog.Logger) {
request := federationAPI.P2PQueryRelayServersRequest{Server: spec.ServerName(r.serverName)}
response := federationAPI.P2PQueryRelayServersResponse{}
err := r.federationAPI.P2PQueryRelayServers(r.ctx, &request, &response)
if err != nil {
eLog.Warnf("Failed obtaining list of this node's relay servers: %s", err.Error())
eLog.Warn().Msgf("Failed obtaining list of this node's relay servers: %s", err.Error())
}
r.queriedServersMutex.Lock()
@ -73,7 +74,7 @@ func (r *RelayServerRetriever) InitializeRelayServers(eLog *logrus.Entry) {
r.relayServersQueried[server] = false
}
eLog.Infof("Registered relay servers: %v", response.RelayServers)
eLog.Info().Msgf("Registered relay servers: %v", response.RelayServers)
}
func (r *RelayServerRetriever) SetRelayServers(servers []spec.ServerName) {
@ -114,7 +115,7 @@ func (r *RelayServerRetriever) GetQueriedServerStatus() map[spec.ServerName]bool
func (r *RelayServerRetriever) StartSync() {
if !r.running.Load() {
logrus.Info("Starting relay server sync")
log.Info().Msg("Starting relay server sync")
go r.SyncRelayServers(r.quit)
}
}
@ -140,7 +141,7 @@ func (r *RelayServerRetriever) SyncRelayServers(stop <-chan bool) {
}()
if len(relayServersToQuery) == 0 {
// All relay servers have been synced.
logrus.Info("Finished syncing with all known relays")
log.Info().Msg("Finished syncing with all known relays")
return
}
r.queryRelayServers(relayServersToQuery)
@ -151,7 +152,7 @@ func (r *RelayServerRetriever) SyncRelayServers(stop <-chan bool) {
if !t.Stop() {
<-t.C
}
logrus.Info("Stopped relay server retriever")
log.Info().Msg("Stopped relay server retriever")
return
case <-t.C:
}
@ -159,14 +160,14 @@ func (r *RelayServerRetriever) SyncRelayServers(stop <-chan bool) {
}
func (r *RelayServerRetriever) queryRelayServers(relayServers []spec.ServerName) {
logrus.Info("Querying relay servers for any available transactions")
log.Info().Msg("Querying relay servers for any available transactions")
for _, server := range relayServers {
userID, err := spec.NewUserID("@user:"+string(r.serverName), false)
if err != nil {
return
}
logrus.Infof("Syncing with relay: %s", string(server))
log.Info().Msgf("Syncing with relay: %s", string(server))
err = r.relayAPI.PerformRelayServerSync(context.Background(), *userID, server)
if err == nil {
func() {
@ -181,7 +182,7 @@ func (r *RelayServerRetriever) queryRelayServers(relayServers []spec.ServerName)
// What if you miss this message?
// Maybe you should try querying them again after a certain period of time as a backup?
} else {
logrus.Errorf("Failed querying relay server: %s", err.Error())
log.Error().Msgf("Failed querying relay server: %s", err.Error())
}
}
}
@ -197,7 +198,7 @@ func UpdateNodeRelayServers(
response := federationAPI.P2PQueryRelayServersResponse{}
err := fedAPI.P2PQueryRelayServers(ctx, &request, &response)
if err != nil {
logrus.Warnf("Failed obtaining list of relay servers for %s: %s", node, err.Error())
log.Warn().Msgf("Failed obtaining list of relay servers for %s: %s", node, err.Error())
}
// Remove old, non-matching relays
@ -222,7 +223,7 @@ func UpdateNodeRelayServers(
removeResponse := federationAPI.P2PRemoveRelayServersResponse{}
err = fedAPI.P2PRemoveRelayServers(ctx, &removeRequest, &removeResponse)
if err != nil {
logrus.Warnf("Failed removing old relay servers for %s: %s", node, err.Error())
log.Warn().Msgf("Failed removing old relay servers for %s: %s", node, err.Error())
}
// Add new relays
@ -233,6 +234,6 @@ func UpdateNodeRelayServers(
addResponse := federationAPI.P2PAddRelayServersResponse{}
err = fedAPI.P2PAddRelayServers(ctx, &addRequest, &addResponse)
if err != nil {
logrus.Warnf("Failed adding relay servers for %s: %s", node, err.Error())
log.Warn().Msgf("Failed adding relay servers for %s: %s", node, err.Error())
}
}

View file

@ -22,7 +22,8 @@ import (
federationAPI "github.com/matrix-org/dendrite/federationapi/api"
relayServerAPI "github.com/matrix-org/dendrite/relayapi/api"
"github.com/matrix-org/gomatrixserverlib/spec"
"github.com/sirupsen/logrus"
log "github.com/rs/zerolog/log"
"github.com/stretchr/testify/assert"
"gotest.tools/v3/poll"
)
@ -63,7 +64,8 @@ func TestRelayRetrieverInitialization(t *testing.T) {
make(chan bool),
)
retriever.InitializeRelayServers(logrus.WithField("test", "relay"))
logger := log.Logger.With().Str("test", "relay").Logger()
retriever.InitializeRelayServers(&logger)
relayServers := retriever.GetQueriedServerStatus()
assert.Equal(t, 2, len(relayServers))
}
@ -77,7 +79,8 @@ func TestRelayRetrieverSync(t *testing.T) {
make(chan bool),
)
retriever.InitializeRelayServers(logrus.WithField("test", "relay"))
logger := log.Logger.With().Str("test", "relay").Logger()
retriever.InitializeRelayServers(&logger)
relayServers := retriever.GetQueriedServerStatus()
assert.Equal(t, 2, len(relayServers))

View file

@ -52,7 +52,8 @@ import (
"github.com/matrix-org/dendrite/setup/mscs"
"github.com/matrix-org/dendrite/test"
"github.com/matrix-org/dendrite/userapi"
"github.com/sirupsen/logrus"
log "github.com/rs/zerolog/log"
)
var (
@ -153,29 +154,29 @@ func main() {
cfg.Verify(configErrors)
if len(*configErrors) > 0 {
for _, err := range *configErrors {
logrus.Errorf("Configuration error: %s", err)
log.Error().Msgf("Configuration error: %s", err)
}
logrus.Fatalf("Failed to start due to configuration errors")
log.Fatal().Msg("Failed to start due to configuration errors")
}
internal.SetupStdLogging()
internal.SetupHookLogging(cfg.Logging)
internal.SetupPprof()
logrus.Infof("Dendrite version %s", internal.VersionString())
log.Info().Msgf("Dendrite version %s", internal.VersionString())
if !cfg.ClientAPI.RegistrationDisabled && cfg.ClientAPI.OpenRegistrationWithoutVerificationEnabled {
logrus.Warn("Open registration is enabled")
log.Warn().Msg("Open registration is enabled")
}
closer, err := cfg.SetupTracing()
if err != nil {
logrus.WithError(err).Panicf("failed to start opentracing")
log.Panic().Err(err).Msgf("failed to start opentracing")
}
defer closer.Close() // nolint: errcheck
if cfg.Global.Sentry.Enabled {
logrus.Info("Setting up Sentry for debugging...")
log.Info().Msg("Setting up Sentry for debugging...")
err = sentry.Init(sentry.ClientOptions{
Dsn: cfg.Global.Sentry.DSN,
Environment: cfg.Global.Sentry.Environment,
@ -185,7 +186,7 @@ func main() {
AttachStacktrace: true,
})
if err != nil {
logrus.WithError(err).Panic("failed to start Sentry")
log.Panic().Err(err).Msg("failed to start Sentry")
}
}
@ -239,7 +240,7 @@ func main() {
}
monolith.AddAllPublicRoutes(processCtx, cfg, routers, cm, &natsInstance, caches, caching.EnableMetrics)
if err := mscs.Enable(cfg, cm, routers, &monolith, caches); err != nil {
logrus.WithError(err).Fatalf("Failed to enable MSCs")
log.Fatal().Err(err).Msgf("Failed to enable MSCs")
}
httpRouter := mux.NewRouter().SkipClean(true).UseEncodedPath()
@ -267,20 +268,20 @@ func main() {
}
go func() {
logrus.Info("Listening on ", ygg.DerivedServerName())
logrus.Fatal(httpServer.Serve(ygg))
log.Info().Msgf("Listening on %s", ygg.DerivedServerName())
log.Fatal().Err(httpServer.Serve(ygg))
}()
go func() {
httpBindAddr := fmt.Sprintf(":%d", *instancePort)
logrus.Info("Listening on ", httpBindAddr)
logrus.Fatal(http.ListenAndServe(httpBindAddr, httpRouter))
log.Info().Msgf("Listening on %s", httpBindAddr)
log.Fatal().Err(http.ListenAndServe(httpBindAddr, httpRouter))
}()
go func() {
logrus.Info("Sending wake-up message to known nodes")
log.Info().Msg("Sending wake-up message to known nodes")
req := &api.PerformBroadcastEDURequest{}
res := &api.PerformBroadcastEDUResponse{}
if err := fsAPI.PerformBroadcastEDU(context.TODO(), req, res); err != nil {
logrus.WithError(err).Error("Failed to send wake-up message to known nodes")
log.Error().Err(err).Msg("Failed to send wake-up message to known nodes")
}
}()

View file

@ -25,15 +25,13 @@ import (
"github.com/matrix-org/gomatrixserverlib/spec"
"github.com/neilalexander/utp"
"github.com/sirupsen/logrus"
ironwoodtypes "github.com/Arceliar/ironwood/types"
"github.com/yggdrasil-network/yggdrasil-go/src/core"
yggdrasilcore "github.com/yggdrasil-network/yggdrasil-go/src/core"
"github.com/yggdrasil-network/yggdrasil-go/src/multicast"
yggdrasilmulticast "github.com/yggdrasil-network/yggdrasil-go/src/multicast"
gologme "github.com/gologme/log"
log "github.com/rs/zerolog/log"
)
type Node struct {
@ -56,8 +54,9 @@ func (n *Node) DialerContext(ctx context.Context, _, address string) (net.Conn,
}
func Setup(sk ed25519.PrivateKey, instanceName, storageDirectory, peerURI, listenURI string) (*Node, error) {
// TODO: gologme v.s. zerolog
n := &Node{
log: gologme.New(logrus.StandardLogger().Writer(), "", 0),
log: gologme.New(log.Logger, "", 0),
incoming: make(chan net.Conn),
}
@ -78,7 +77,7 @@ func Setup(sk ed25519.PrivateKey, instanceName, storageDirectory, peerURI, liste
})
}
}
if n.core, err = core.New(sk[:], n.log, options...); err != nil {
if n.core, err = yggdrasilcore.New(sk[:], n.log, options...); err != nil {
panic(err)
}
n.core.SetLogger(n.log)
@ -91,8 +90,8 @@ func Setup(sk ed25519.PrivateKey, instanceName, storageDirectory, peerURI, liste
// Setup the multicast module.
{
var err error
options := []multicast.SetupOption{
multicast.MulticastInterface{
options := []yggdrasilmulticast.SetupOption{
yggdrasilmulticast.MulticastInterface{
Regex: regexp.MustCompile(".*"),
Beacon: true,
Listen: true,
@ -100,7 +99,7 @@ func Setup(sk ed25519.PrivateKey, instanceName, storageDirectory, peerURI, liste
Priority: 0,
},
}
if n.multicast, err = multicast.New(n.core, n.log, options...); err != nil {
if n.multicast, err = yggdrasilmulticast.New(n.core, n.log, options...); err != nil {
panic(err)
}
}

View file

@ -27,7 +27,6 @@ import (
"sync"
"github.com/matrix-org/util"
"github.com/sirupsen/logrus"
"github.com/rs/zerolog"
log "github.com/rs/zerolog/log"
@ -93,9 +92,9 @@ func callerPrettyfier(f *runtime.Frame) (string, string) {
// simplest, and it gives us the freedom to run pprof on a separate port.
func SetupPprof() {
if hostPort := os.Getenv("PPROFLISTEN"); hostPort != "" {
logrus.Warn("Starting pprof on ", hostPort)
log.Warn().Msgf("Starting pprof on %s", hostPort)
go func() {
logrus.WithError(http.ListenAndServe(hostPort, nil)).Error("Failed to setup pprof listener")
log.Error().Err(http.ListenAndServe(hostPort, nil)).Msg("Failed to setup pprof listener")
}()
}
}