From b167f3c60873711feffd4ae58ec3f6d42b7ca93a Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 6 May 2020 10:04:12 +0100 Subject: [PATCH] Break out statistics (tracked component-wide), report success and failures from Perform actions --- federationsender/federationsender.go | 7 +- federationsender/internal/api.go | 4 + federationsender/internal/perform.go | 8 ++ federationsender/queue/destinationqueue.go | 94 +++------------- federationsender/queue/queue.go | 16 ++- federationsender/types/statistics.go | 125 +++++++++++++++++++++ 6 files changed, 172 insertions(+), 82 deletions(-) create mode 100644 federationsender/types/statistics.go diff --git a/federationsender/federationsender.go b/federationsender/federationsender.go index cf4395527..9e31699b3 100644 --- a/federationsender/federationsender.go +++ b/federationsender/federationsender.go @@ -24,6 +24,7 @@ import ( "github.com/matrix-org/dendrite/federationsender/producers" "github.com/matrix-org/dendrite/federationsender/queue" "github.com/matrix-org/dendrite/federationsender/storage" + "github.com/matrix-org/dendrite/federationsender/types" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrixserverlib" "github.com/sirupsen/logrus" @@ -44,7 +45,10 @@ func SetupFederationSenderComponent( roomserverProducer := producers.NewRoomserverProducer(rsAPI, base.Cfg.Matrix.ServerName) - queues := queue.NewOutgoingQueues(base.Cfg.Matrix.ServerName, federation, roomserverProducer) + statistics := &types.Statistics{} + queues := queue.NewOutgoingQueues( + base.Cfg.Matrix.ServerName, federation, roomserverProducer, statistics, + ) rsConsumer := consumers.NewOutputRoomEventConsumer( base.Cfg, base.KafkaConsumer, queues, @@ -63,6 +67,7 @@ func SetupFederationSenderComponent( queryAPI := internal.NewFederationSenderInternalAPI( federationSenderDB, base.Cfg, roomserverProducer, federation, keyRing, + statistics, ) queryAPI.SetupHTTP(http.DefaultServeMux) diff --git a/federationsender/internal/api.go b/federationsender/internal/api.go index 89a1fda40..481795220 100644 --- a/federationsender/internal/api.go +++ b/federationsender/internal/api.go @@ -9,6 +9,7 @@ import ( "github.com/matrix-org/dendrite/federationsender/api" "github.com/matrix-org/dendrite/federationsender/producers" "github.com/matrix-org/dendrite/federationsender/storage" + "github.com/matrix-org/dendrite/federationsender/types" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" ) @@ -18,6 +19,7 @@ type FederationSenderInternalAPI struct { api.FederationSenderInternalAPI db storage.Database cfg *config.Dendrite + statistics *types.Statistics producer *producers.RoomserverProducer federation *gomatrixserverlib.FederationClient keyRing *gomatrixserverlib.KeyRing @@ -28,6 +30,7 @@ func NewFederationSenderInternalAPI( producer *producers.RoomserverProducer, federation *gomatrixserverlib.FederationClient, keyRing *gomatrixserverlib.KeyRing, + statistics *types.Statistics, ) *FederationSenderInternalAPI { return &FederationSenderInternalAPI{ db: db, @@ -35,6 +38,7 @@ func NewFederationSenderInternalAPI( producer: producer, federation: federation, keyRing: keyRing, + statistics: statistics, } } diff --git a/federationsender/internal/perform.go b/federationsender/internal/perform.go index ff7f821c1..431b2a2d3 100644 --- a/federationsender/internal/perform.go +++ b/federationsender/internal/perform.go @@ -25,10 +25,12 @@ func (r *FederationSenderInternalAPI) PerformDirectoryLookup( request.RoomAlias, ) if err != nil { + r.statistics.ForServer(request.ServerName).Failure() return err } response.RoomID = dir.RoomID response.ServerNames = dir.Servers + r.statistics.ForServer(request.ServerName).Success() return nil } @@ -61,6 +63,7 @@ func (r *FederationSenderInternalAPI) PerformJoin( ) if err != nil { // TODO: Check if the user was not allowed to join the room. + r.statistics.ForServer(serverName).Failure() return fmt.Errorf("r.federation.MakeJoin: %w", err) } @@ -112,6 +115,7 @@ func (r *FederationSenderInternalAPI) PerformJoin( ) if err != nil { logrus.WithError(err).Warnf("r.federation.SendJoin failed") + r.statistics.ForServer(serverName).Failure() continue } @@ -137,6 +141,7 @@ func (r *FederationSenderInternalAPI) PerformJoin( } // We're all good. + r.statistics.ForServer(serverName).Success() return nil } @@ -170,6 +175,7 @@ func (r *FederationSenderInternalAPI) PerformLeave( if err != nil { // TODO: Check if the user was not allowed to leave the room. logrus.WithError(err).Warnf("r.federation.MakeLeave failed") + r.statistics.ForServer(serverName).Failure() continue } @@ -221,9 +227,11 @@ func (r *FederationSenderInternalAPI) PerformLeave( ) if err != nil { logrus.WithError(err).Warnf("r.federation.SendLeave failed") + r.statistics.ForServer(serverName).Failure() continue } + r.statistics.ForServer(serverName).Success() return nil } diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go index 405a7dea2..4fe73be08 100644 --- a/federationsender/queue/destinationqueue.go +++ b/federationsender/queue/destinationqueue.go @@ -18,24 +18,17 @@ import ( "context" "encoding/json" "fmt" - "math" "sync" "time" "github.com/matrix-org/dendrite/federationsender/producers" + "github.com/matrix-org/dendrite/federationsender/types" "github.com/matrix-org/gomatrixserverlib" "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus" "go.uber.org/atomic" ) -const ( - // How many times should we tolerate consecutive failures before we - // just blacklist the host altogether? Bear in mind that the backoff - // is exponential, so the max time here to attempt is 2**failures. - FailuresUntilBlacklist = 16 // 16 equates to roughly 18 hours. -) - // destinationQueue is a queue of events for a single destination. // It is responsible for sending the events to the destination and // ensures that only one request is in flight to a given destination @@ -46,12 +39,9 @@ type destinationQueue struct { origin gomatrixserverlib.ServerName // origin of requests destination gomatrixserverlib.ServerName // destination of requests running atomic.Bool // is the queue worker running? - blacklisted atomic.Bool // is the remote side dead? - backoffUntil atomic.Value // time.Time to wait until before sending requests - idleCounter atomic.Uint32 // how many ticks have we done nothing? - failCounter atomic.Uint32 // how many times have we failed? - sentCounter atomic.Uint32 // how many times have we succeeded? wakeup chan bool // wakes up a sleeping worker + statistics *types.ServerStatistics // statistics about this remote server + idleCounter atomic.Uint32 // how many ticks have we done nothing? runningMutex sync.RWMutex // protects the below lastTransactionIDs []gomatrixserverlib.TransactionID // protected by runningMutex pendingPDUs []*gomatrixserverlib.HeaderedEvent // protected by runningMutex @@ -69,58 +59,11 @@ func (oq *destinationQueue) wake() { } } -// Backoff marks a failure and works out when to back off until. It -// returns true if the worker should give up altogether because of -// too many consecutive failures. -func (oq *destinationQueue) backoff() bool { - // Increase the fail counter. - failCounter := oq.failCounter.Load() - failCounter++ - oq.failCounter.Store(failCounter) - - // Check that we haven't failed more times than is acceptable. - if failCounter < FailuresUntilBlacklist { - // We're still under the threshold so work out the exponential - // backoff based on how many times we have failed already. The - // worker goroutine will wait until this time before processing - // anything from the queue. - backoffSeconds := time.Second * time.Duration(math.Exp2(float64(failCounter))) - oq.backoffUntil.Store( - time.Now().Add(backoffSeconds), - ) - logrus.WithField("server_name", oq.destination).Infof("Increasing backoff to %s", backoffSeconds) - return false // Don't give up yet. - } else { - // We've exceeded the maximum amount of times we're willing - // to back off, which is probably in the region of hours by - // now. Just give up - clear the queues and reset the queue - // back to its default state. - oq.blacklisted.Store(true) - oq.runningMutex.Lock() - oq.pendingPDUs = nil - oq.pendingEDUs = nil - oq.pendingInvites = nil - oq.runningMutex.Unlock() - logrus.WithField("server_name", oq.destination).Infof("Blacklisting server due to %d consecutive errors", failCounter) - return true // Give up. - } -} - -func (oq *destinationQueue) success() { - // Reset the idle and fail counters. - oq.idleCounter.Store(0) - oq.failCounter.Store(0) - - // Increase the sent counter. - sentCounter := oq.failCounter.Load() - oq.sentCounter.Store(sentCounter + 1) -} - // Send event adds the event to the pending queue for the destination. // If the queue is empty then it starts a background goroutine to // start sending events to that destination. func (oq *destinationQueue) sendEvent(ev *gomatrixserverlib.HeaderedEvent) { - if oq.blacklisted.Load() { + if oq.statistics.Blacklisted() { // If the destination is blacklisted then drop the event. return } @@ -134,7 +77,7 @@ func (oq *destinationQueue) sendEvent(ev *gomatrixserverlib.HeaderedEvent) { // If the queue is empty then it starts a background goroutine to // start sending events to that destination. func (oq *destinationQueue) sendEDU(ev *gomatrixserverlib.EDU) { - if oq.blacklisted.Load() { + if oq.statistics.Blacklisted() { // If the destination is blacklisted then drop the event. return } @@ -148,7 +91,7 @@ func (oq *destinationQueue) sendEDU(ev *gomatrixserverlib.EDU) { // destination. If the queue is empty then it starts a background // goroutine to start sending events to that destination. func (oq *destinationQueue) sendInvite(ev *gomatrixserverlib.InviteV2Request) { - if oq.blacklisted.Load() { + if oq.statistics.Blacklisted() { // If the destination is blacklisted then drop the event. return } @@ -168,22 +111,17 @@ func (oq *destinationQueue) backgroundSend() { defer close(oq.wakeup) for { - // Wait for our backoff timer. - backoffUntil := time.Now() - if b, ok := oq.backoffUntil.Load().(time.Time); ok { - backoffUntil = b - } - - // If we have a backoff period then sit and wait for it. - if backoffUntil.After(time.Now()) { - <-time.After(time.Until(backoffUntil)) + // If we are backing off this server then wait for the + // backoff duration to complete first. + if backoff, duration := oq.statistics.BackoffDuration(); backoff { + <-time.After(duration) } // Retrieve any waiting things. oq.runningMutex.RLock() pendingPDUs, pendingEDUs := oq.pendingPDUs, oq.pendingEDUs pendingInvites := oq.pendingInvites - idleCounter, sentCounter := oq.idleCounter.Load(), oq.sentCounter.Load() + idleCounter, sentCounter := oq.idleCounter.Load(), oq.statistics.SuccessCount() oq.runningMutex.RUnlock() // If we have pending PDUs or EDUs then construct a transaction. @@ -192,7 +130,7 @@ func (oq *destinationQueue) backgroundSend() { transaction, terr := oq.nextTransaction(pendingPDUs, pendingEDUs, sentCounter) if terr != nil { // We failed to send the transaction. - if giveUp := oq.backoff(); giveUp { + if giveUp := oq.statistics.Failure(); giveUp { // It's been suggested that we should give up because // the backoff has exceeded a maximum allowable value. return @@ -224,7 +162,11 @@ func (oq *destinationQueue) backgroundSend() { if ierr != nil { // We failed to send the transaction so increase the // backoff and give it another go shortly. - oq.backoffUntil.Store(time.Until(backoffUntil) * 2) + if giveUp := oq.statistics.Failure(); giveUp { + // It's been suggested that we should give up because + // the backoff has exceeded a maximum allowable value. + return + } continue } @@ -244,7 +186,7 @@ func (oq *destinationQueue) backgroundSend() { // If everything was fine at this point then we can update // the counters for the transaction IDs. - oq.success() + oq.statistics.Success() // Wait either for a few seconds, or until a new event is // available. diff --git a/federationsender/queue/queue.go b/federationsender/queue/queue.go index cdcd0da81..9e6523414 100644 --- a/federationsender/queue/queue.go +++ b/federationsender/queue/queue.go @@ -19,6 +19,7 @@ import ( "sync" "github.com/matrix-org/dendrite/federationsender/producers" + "github.com/matrix-org/dendrite/federationsender/types" "github.com/matrix-org/gomatrixserverlib" log "github.com/sirupsen/logrus" ) @@ -26,11 +27,11 @@ import ( // OutgoingQueues is a collection of queues for sending transactions to other // matrix servers type OutgoingQueues struct { - rsProducer *producers.RoomserverProducer - origin gomatrixserverlib.ServerName - client *gomatrixserverlib.FederationClient - // The queuesMutex protects queues - queuesMutex sync.RWMutex + rsProducer *producers.RoomserverProducer + origin gomatrixserverlib.ServerName + client *gomatrixserverlib.FederationClient + statistics *types.Statistics + queuesMutex sync.RWMutex // protects the below queues map[gomatrixserverlib.ServerName]*destinationQueue } @@ -39,11 +40,13 @@ func NewOutgoingQueues( origin gomatrixserverlib.ServerName, client *gomatrixserverlib.FederationClient, rsProducer *producers.RoomserverProducer, + statistics *types.Statistics, ) *OutgoingQueues { return &OutgoingQueues{ rsProducer: rsProducer, origin: origin, client: client, + statistics: statistics, queues: map[gomatrixserverlib.ServerName]*destinationQueue{}, } } @@ -78,6 +81,7 @@ func (oqs *OutgoingQueues) SendEvent( origin: oqs.origin, destination: destination, client: oqs.client, + statistics: oqs.statistics.ForServer(destination), } oqs.queuesMutex.Lock() oqs.queues[destination] = oq @@ -125,6 +129,7 @@ func (oqs *OutgoingQueues) SendInvite( origin: oqs.origin, destination: destination, client: oqs.client, + statistics: oqs.statistics.ForServer(destination), } oqs.queuesMutex.Lock() oqs.queues[destination] = oq @@ -168,6 +173,7 @@ func (oqs *OutgoingQueues) SendEDU( origin: oqs.origin, destination: destination, client: oqs.client, + statistics: oqs.statistics.ForServer(destination), } oqs.queuesMutex.Lock() oqs.queues[destination] = oq diff --git a/federationsender/types/statistics.go b/federationsender/types/statistics.go new file mode 100644 index 000000000..959acec89 --- /dev/null +++ b/federationsender/types/statistics.go @@ -0,0 +1,125 @@ +package types + +import ( + "math" + "sync" + "time" + + "github.com/matrix-org/gomatrixserverlib" + "go.uber.org/atomic" +) + +const ( + // How many times should we tolerate consecutive failures before we + // just blacklist the host altogether? Bear in mind that the backoff + // is exponential, so the max time here to attempt is 2**failures. + FailuresUntilBlacklist = 16 // 16 equates to roughly 18 hours. +) + +// Statistics contains information about all of the remote federated +// hosts that we have interacted with. It is basically a threadsafe +// wrapper. +type Statistics struct { + servers map[gomatrixserverlib.ServerName]*ServerStatistics + mutex sync.RWMutex +} + +// ForServer returns server statistics for the given server name. If it +// does not exist, it will create empty statistics and return those. +func (s *Statistics) ForServer(serverName gomatrixserverlib.ServerName) *ServerStatistics { + // If the map hasn't been initialised yet then do that. + if s.servers == nil { + s.mutex.Lock() + s.servers = make(map[gomatrixserverlib.ServerName]*ServerStatistics) + s.mutex.Unlock() + } + // Look up if we have statistics for this server already. + s.mutex.RLock() + server, found := s.servers[serverName] + s.mutex.RUnlock() + // If we don't, then make one. + if !found { + s.mutex.Lock() + server = &ServerStatistics{} + s.servers[serverName] = server + s.mutex.Unlock() + } + return server +} + +// ServerStatistics contains information about our interactions with a +// remote federated host, e.g. how many times we were successful, how +// many times we failed etc. It also manages the backoff time and black- +// listing a remote host if it remains uncooperative. +type ServerStatistics struct { + blacklisted atomic.Bool // is the remote side dead? + backoffUntil atomic.Value // time.Time to wait until before sending requests + failCounter atomic.Uint32 // how many times have we failed? + successCounter atomic.Uint32 // how many times have we succeeded? +} + +// Success updates the server statistics with a new successful +// attempt, which increases the sent counter and resets the idle and +// failure counters. If a host was blacklisted at this point then +// we will unblacklist it. +func (s *ServerStatistics) Success() { + sentCounter := s.failCounter.Load() + s.successCounter.Store(sentCounter + 1) + s.failCounter.Store(0) + s.blacklisted.Store(false) +} + +// Failure marks a failure and works out when to backoff until. It +// returns true if the worker should give up altogether because of +// too many consecutive failures. At this point the host is marked +// as blacklisted. +func (s *ServerStatistics) Failure() bool { + // Increase the fail counter. + failCounter := s.failCounter.Load() + failCounter++ + s.failCounter.Store(failCounter) + + // Check that we haven't failed more times than is acceptable. + if failCounter >= FailuresUntilBlacklist { + // We've exceeded the maximum amount of times we're willing + // to back off, which is probably in the region of hours by + // now. Mark the host as blacklisted and tell the caller to + // give up. + s.blacklisted.Store(true) + return true + } + + // We're still under the threshold so work out the exponential + // backoff based on how many times we have failed already. The + // worker goroutine will wait until this time before processing + // anything from the queue. + backoffSeconds := time.Second * time.Duration(math.Exp2(float64(failCounter))) + s.backoffUntil.Store( + time.Now().Add(backoffSeconds), + ) + return false +} + +// WaitUntil returns both a bool stating whether to wait, and then +// if true, a duration to wait for. +func (s *ServerStatistics) BackoffDuration() (bool, time.Duration) { + backoff, until := false, time.Second + if b, ok := s.backoffUntil.Load().(time.Time); ok { + if b.After(time.Now()) { + backoff, until = true, time.Until(b) + } + } + return backoff, until +} + +// Blacklisted returns true if the server is blacklisted and false +// otherwise. +func (s *ServerStatistics) Blacklisted() bool { + return s.blacklisted.Load() +} + +// SuccessCount returns the number of successful requests. This is +// usually useful in constructing transaction IDs. +func (s *ServerStatistics) SuccessCount() uint32 { + return s.successCounter.Load() +}