From 17f70f404669c1767426d01b63374bf2cff45f05 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 21 Sep 2020 11:30:37 +0100 Subject: [PATCH] Tweak backoffs --- federationsender/internal/api.go | 4 +++ federationsender/statistics/statistics.go | 33 ++++++++++++++++++----- 2 files changed, 31 insertions(+), 6 deletions(-) diff --git a/federationsender/internal/api.go b/federationsender/internal/api.go index 2a70f7ed3..0b683abe4 100644 --- a/federationsender/internal/api.go +++ b/federationsender/internal/api.go @@ -73,6 +73,10 @@ func failBlacklistableError(err error, stats *statistics.ServerStatistics) (unti if mxerr.Code == 401 { // invalid signature in X-Matrix header return stats.Failure() } + if mxerr.Code == 404 { + // TODO: can any of the endpoints called in this file return genuine 404s? + return stats.Failure() + } if mxerr.Code >= 500 && mxerr.Code < 600 { // internal server errors return stats.Failure() } diff --git a/federationsender/statistics/statistics.go b/federationsender/statistics/statistics.go index 03ef64e95..9bb088e61 100644 --- a/federationsender/statistics/statistics.go +++ b/federationsender/statistics/statistics.go @@ -44,6 +44,7 @@ func (s *Statistics) ForServer(serverName gomatrixserverlib.ServerName) *ServerS server = &ServerStatistics{ statistics: s, serverName: serverName, + interrupt: make(chan struct{}), } s.servers[serverName] = server s.mutex.Unlock() @@ -68,6 +69,7 @@ type ServerStatistics struct { backoffStarted atomic.Bool // is the backoff started backoffUntil atomic.Value // time.Time until this backoff interval ends backoffCount atomic.Uint32 // number of times BackoffDuration has been called + interrupt chan struct{} // interrupts the backoff goroutine successCounter atomic.Uint32 // how many times have we succeeded? } @@ -76,15 +78,23 @@ func (s *ServerStatistics) duration(count uint32) time.Duration { return time.Second * time.Duration(math.Exp2(float64(count))) } +// cancel will interrupt the currently active backoff. +func (s *ServerStatistics) cancel() { + s.blacklisted.Store(false) + select { + case s.interrupt <- struct{}{}: + default: + } +} + // Success updates the server statistics with a new successful // attempt, which increases the sent counter and resets the idle and // failure counters. If a host was blacklisted at this point then // we will unblacklist it. func (s *ServerStatistics) Success() { - s.successCounter.Add(1) - s.backoffStarted.Store(false) + s.cancel() + s.successCounter.Inc() s.backoffCount.Store(0) - s.blacklisted.Store(false) if s.statistics.DB != nil { if err := s.statistics.DB.RemoveServerFromBlacklist(s.serverName); err != nil { logrus.WithError(err).Errorf("Failed to remove %q from blacklist", s.serverName) @@ -99,10 +109,21 @@ func (s *ServerStatistics) Success() { // whether we have blacklisted and therefore to give up. func (s *ServerStatistics) Failure() (time.Time, bool) { // If we aren't already backing off, this call will start - // a new backoff period. Reset the counter to 0 so that - // we backoff only for short periods of time to start with. + // a new backoff period. Increase the failure counter and + // start a goroutine which will wait out the backoff and + // unset the backoffStarted flag when done. if s.backoffStarted.CAS(false, true) { - s.backoffCount.Store(0) + s.backoffCount.Inc() + go func() { + until, ok := s.backoffUntil.Load().(time.Time) + if ok { + select { + case <-time.After(time.Until(until)): + case <-s.interrupt: + } + } + s.backoffStarted.Store(false) + }() } // Check if we have blacklisted this node.