mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-23 14:53:10 -06:00
Tweak backoffs
This commit is contained in:
parent
18231f25b4
commit
17f70f4046
|
|
@ -73,6 +73,10 @@ func failBlacklistableError(err error, stats *statistics.ServerStatistics) (unti
|
||||||
if mxerr.Code == 401 { // invalid signature in X-Matrix header
|
if mxerr.Code == 401 { // invalid signature in X-Matrix header
|
||||||
return stats.Failure()
|
return stats.Failure()
|
||||||
}
|
}
|
||||||
|
if mxerr.Code == 404 {
|
||||||
|
// TODO: can any of the endpoints called in this file return genuine 404s?
|
||||||
|
return stats.Failure()
|
||||||
|
}
|
||||||
if mxerr.Code >= 500 && mxerr.Code < 600 { // internal server errors
|
if mxerr.Code >= 500 && mxerr.Code < 600 { // internal server errors
|
||||||
return stats.Failure()
|
return stats.Failure()
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -44,6 +44,7 @@ func (s *Statistics) ForServer(serverName gomatrixserverlib.ServerName) *ServerS
|
||||||
server = &ServerStatistics{
|
server = &ServerStatistics{
|
||||||
statistics: s,
|
statistics: s,
|
||||||
serverName: serverName,
|
serverName: serverName,
|
||||||
|
interrupt: make(chan struct{}),
|
||||||
}
|
}
|
||||||
s.servers[serverName] = server
|
s.servers[serverName] = server
|
||||||
s.mutex.Unlock()
|
s.mutex.Unlock()
|
||||||
|
|
@ -68,6 +69,7 @@ type ServerStatistics struct {
|
||||||
backoffStarted atomic.Bool // is the backoff started
|
backoffStarted atomic.Bool // is the backoff started
|
||||||
backoffUntil atomic.Value // time.Time until this backoff interval ends
|
backoffUntil atomic.Value // time.Time until this backoff interval ends
|
||||||
backoffCount atomic.Uint32 // number of times BackoffDuration has been called
|
backoffCount atomic.Uint32 // number of times BackoffDuration has been called
|
||||||
|
interrupt chan struct{} // interrupts the backoff goroutine
|
||||||
successCounter atomic.Uint32 // how many times have we succeeded?
|
successCounter atomic.Uint32 // how many times have we succeeded?
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -76,15 +78,23 @@ func (s *ServerStatistics) duration(count uint32) time.Duration {
|
||||||
return time.Second * time.Duration(math.Exp2(float64(count)))
|
return time.Second * time.Duration(math.Exp2(float64(count)))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// cancel will interrupt the currently active backoff.
|
||||||
|
func (s *ServerStatistics) cancel() {
|
||||||
|
s.blacklisted.Store(false)
|
||||||
|
select {
|
||||||
|
case s.interrupt <- struct{}{}:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Success updates the server statistics with a new successful
|
// Success updates the server statistics with a new successful
|
||||||
// attempt, which increases the sent counter and resets the idle and
|
// attempt, which increases the sent counter and resets the idle and
|
||||||
// failure counters. If a host was blacklisted at this point then
|
// failure counters. If a host was blacklisted at this point then
|
||||||
// we will unblacklist it.
|
// we will unblacklist it.
|
||||||
func (s *ServerStatistics) Success() {
|
func (s *ServerStatistics) Success() {
|
||||||
s.successCounter.Add(1)
|
s.cancel()
|
||||||
s.backoffStarted.Store(false)
|
s.successCounter.Inc()
|
||||||
s.backoffCount.Store(0)
|
s.backoffCount.Store(0)
|
||||||
s.blacklisted.Store(false)
|
|
||||||
if s.statistics.DB != nil {
|
if s.statistics.DB != nil {
|
||||||
if err := s.statistics.DB.RemoveServerFromBlacklist(s.serverName); err != nil {
|
if err := s.statistics.DB.RemoveServerFromBlacklist(s.serverName); err != nil {
|
||||||
logrus.WithError(err).Errorf("Failed to remove %q from blacklist", s.serverName)
|
logrus.WithError(err).Errorf("Failed to remove %q from blacklist", s.serverName)
|
||||||
|
|
@ -99,10 +109,21 @@ func (s *ServerStatistics) Success() {
|
||||||
// whether we have blacklisted and therefore to give up.
|
// whether we have blacklisted and therefore to give up.
|
||||||
func (s *ServerStatistics) Failure() (time.Time, bool) {
|
func (s *ServerStatistics) Failure() (time.Time, bool) {
|
||||||
// If we aren't already backing off, this call will start
|
// If we aren't already backing off, this call will start
|
||||||
// a new backoff period. Reset the counter to 0 so that
|
// a new backoff period. Increase the failure counter and
|
||||||
// we backoff only for short periods of time to start with.
|
// start a goroutine which will wait out the backoff and
|
||||||
|
// unset the backoffStarted flag when done.
|
||||||
if s.backoffStarted.CAS(false, true) {
|
if s.backoffStarted.CAS(false, true) {
|
||||||
s.backoffCount.Store(0)
|
s.backoffCount.Inc()
|
||||||
|
go func() {
|
||||||
|
until, ok := s.backoffUntil.Load().(time.Time)
|
||||||
|
if ok {
|
||||||
|
select {
|
||||||
|
case <-time.After(time.Until(until)):
|
||||||
|
case <-s.interrupt:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
s.backoffStarted.Store(false)
|
||||||
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check if we have blacklisted this node.
|
// Check if we have blacklisted this node.
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue