Refactor backoff again (#1431)
* Tweak backoffs * Refactor backoff some more, remove BackoffIfRequired as it adds unnecessary complexity * Ignore 404s
This commit is contained in:
parent
18231f25b4
commit
880b164490
|
@ -231,13 +231,24 @@ func (oq *destinationQueue) backgroundSend() {
|
||||||
// If we are backing off this server then wait for the
|
// If we are backing off this server then wait for the
|
||||||
// backoff duration to complete first, or until explicitly
|
// backoff duration to complete first, or until explicitly
|
||||||
// told to retry.
|
// told to retry.
|
||||||
if _, giveUp := oq.statistics.BackoffIfRequired(oq.backingOff, oq.interruptBackoff); giveUp {
|
until, blacklisted := oq.statistics.BackoffInfo()
|
||||||
|
if blacklisted {
|
||||||
// It's been suggested that we should give up because the backoff
|
// It's been suggested that we should give up because the backoff
|
||||||
// has exceeded a maximum allowable value. Clean up the in-memory
|
// has exceeded a maximum allowable value. Clean up the in-memory
|
||||||
// buffers at this point. The PDU clean-up is already on a defer.
|
// buffers at this point. The PDU clean-up is already on a defer.
|
||||||
log.Warnf("Blacklisting %q due to exceeding backoff threshold", oq.destination)
|
log.Warnf("Blacklisting %q due to exceeding backoff threshold", oq.destination)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
if until != nil {
|
||||||
|
// We haven't backed off yet, so wait for the suggested amount of
|
||||||
|
// time.
|
||||||
|
duration := time.Until(*until)
|
||||||
|
log.Warnf("Backing off %q for %s", oq.destination, duration)
|
||||||
|
select {
|
||||||
|
case <-time.After(duration):
|
||||||
|
case <-oq.interruptBackoff:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// If we have pending PDUs or EDUs then construct a transaction.
|
// If we have pending PDUs or EDUs then construct a transaction.
|
||||||
if pendingPDUs || pendingEDUs {
|
if pendingPDUs || pendingEDUs {
|
||||||
|
|
|
@ -44,6 +44,7 @@ func (s *Statistics) ForServer(serverName gomatrixserverlib.ServerName) *ServerS
|
||||||
server = &ServerStatistics{
|
server = &ServerStatistics{
|
||||||
statistics: s,
|
statistics: s,
|
||||||
serverName: serverName,
|
serverName: serverName,
|
||||||
|
interrupt: make(chan struct{}),
|
||||||
}
|
}
|
||||||
s.servers[serverName] = server
|
s.servers[serverName] = server
|
||||||
s.mutex.Unlock()
|
s.mutex.Unlock()
|
||||||
|
@ -68,6 +69,7 @@ type ServerStatistics struct {
|
||||||
backoffStarted atomic.Bool // is the backoff started
|
backoffStarted atomic.Bool // is the backoff started
|
||||||
backoffUntil atomic.Value // time.Time until this backoff interval ends
|
backoffUntil atomic.Value // time.Time until this backoff interval ends
|
||||||
backoffCount atomic.Uint32 // number of times BackoffDuration has been called
|
backoffCount atomic.Uint32 // number of times BackoffDuration has been called
|
||||||
|
interrupt chan struct{} // interrupts the backoff goroutine
|
||||||
successCounter atomic.Uint32 // how many times have we succeeded?
|
successCounter atomic.Uint32 // how many times have we succeeded?
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -76,15 +78,24 @@ func (s *ServerStatistics) duration(count uint32) time.Duration {
|
||||||
return time.Second * time.Duration(math.Exp2(float64(count)))
|
return time.Second * time.Duration(math.Exp2(float64(count)))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// cancel will interrupt the currently active backoff.
|
||||||
|
func (s *ServerStatistics) cancel() {
|
||||||
|
s.blacklisted.Store(false)
|
||||||
|
s.backoffUntil.Store(time.Time{})
|
||||||
|
select {
|
||||||
|
case s.interrupt <- struct{}{}:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Success updates the server statistics with a new successful
|
// Success updates the server statistics with a new successful
|
||||||
// attempt, which increases the sent counter and resets the idle and
|
// attempt, which increases the sent counter and resets the idle and
|
||||||
// failure counters. If a host was blacklisted at this point then
|
// failure counters. If a host was blacklisted at this point then
|
||||||
// we will unblacklist it.
|
// we will unblacklist it.
|
||||||
func (s *ServerStatistics) Success() {
|
func (s *ServerStatistics) Success() {
|
||||||
s.successCounter.Add(1)
|
s.cancel()
|
||||||
s.backoffStarted.Store(false)
|
s.successCounter.Inc()
|
||||||
s.backoffCount.Store(0)
|
s.backoffCount.Store(0)
|
||||||
s.blacklisted.Store(false)
|
|
||||||
if s.statistics.DB != nil {
|
if s.statistics.DB != nil {
|
||||||
if err := s.statistics.DB.RemoveServerFromBlacklist(s.serverName); err != nil {
|
if err := s.statistics.DB.RemoveServerFromBlacklist(s.serverName); err != nil {
|
||||||
logrus.WithError(err).Errorf("Failed to remove %q from blacklist", s.serverName)
|
logrus.WithError(err).Errorf("Failed to remove %q from blacklist", s.serverName)
|
||||||
|
@ -99,10 +110,30 @@ func (s *ServerStatistics) Success() {
|
||||||
// whether we have blacklisted and therefore to give up.
|
// whether we have blacklisted and therefore to give up.
|
||||||
func (s *ServerStatistics) Failure() (time.Time, bool) {
|
func (s *ServerStatistics) Failure() (time.Time, bool) {
|
||||||
// If we aren't already backing off, this call will start
|
// If we aren't already backing off, this call will start
|
||||||
// a new backoff period. Reset the counter to 0 so that
|
// a new backoff period. Increase the failure counter and
|
||||||
// we backoff only for short periods of time to start with.
|
// start a goroutine which will wait out the backoff and
|
||||||
|
// unset the backoffStarted flag when done.
|
||||||
if s.backoffStarted.CAS(false, true) {
|
if s.backoffStarted.CAS(false, true) {
|
||||||
s.backoffCount.Store(0)
|
if s.backoffCount.Inc() >= s.statistics.FailuresUntilBlacklist {
|
||||||
|
s.blacklisted.Store(true)
|
||||||
|
if s.statistics.DB != nil {
|
||||||
|
if err := s.statistics.DB.AddServerToBlacklist(s.serverName); err != nil {
|
||||||
|
logrus.WithError(err).Errorf("Failed to add %q to blacklist", s.serverName)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return time.Time{}, true
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
until, ok := s.backoffUntil.Load().(time.Time)
|
||||||
|
if ok {
|
||||||
|
select {
|
||||||
|
case <-time.After(time.Until(until)):
|
||||||
|
case <-s.interrupt:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
s.backoffStarted.Store(false)
|
||||||
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check if we have blacklisted this node.
|
// Check if we have blacklisted this node.
|
||||||
|
@ -136,53 +167,6 @@ func (s *ServerStatistics) BackoffInfo() (*time.Time, bool) {
|
||||||
return nil, s.blacklisted.Load()
|
return nil, s.blacklisted.Load()
|
||||||
}
|
}
|
||||||
|
|
||||||
// BackoffIfRequired will block for as long as the current
|
|
||||||
// backoff requires, if needed. Otherwise it will do nothing.
|
|
||||||
// Returns the amount of time to backoff for and whether to give up or not.
|
|
||||||
func (s *ServerStatistics) BackoffIfRequired(backingOff atomic.Bool, interrupt <-chan bool) (time.Duration, bool) {
|
|
||||||
if started := s.backoffStarted.Load(); !started {
|
|
||||||
return 0, false
|
|
||||||
}
|
|
||||||
|
|
||||||
// Work out if we should be blacklisting at this point.
|
|
||||||
count := s.backoffCount.Inc()
|
|
||||||
if count >= s.statistics.FailuresUntilBlacklist {
|
|
||||||
// We've exceeded the maximum amount of times we're willing
|
|
||||||
// to back off, which is probably in the region of hours by
|
|
||||||
// now. Mark the host as blacklisted and tell the caller to
|
|
||||||
// give up.
|
|
||||||
s.blacklisted.Store(true)
|
|
||||||
if s.statistics.DB != nil {
|
|
||||||
if err := s.statistics.DB.AddServerToBlacklist(s.serverName); err != nil {
|
|
||||||
logrus.WithError(err).Errorf("Failed to add %q to blacklist", s.serverName)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return 0, true
|
|
||||||
}
|
|
||||||
|
|
||||||
// Work out when we should wait until.
|
|
||||||
duration := s.duration(count)
|
|
||||||
until := time.Now().Add(duration)
|
|
||||||
s.backoffUntil.Store(until)
|
|
||||||
|
|
||||||
// Notify the destination queue that we're backing off now.
|
|
||||||
backingOff.Store(true)
|
|
||||||
defer backingOff.Store(false)
|
|
||||||
|
|
||||||
// Work out how long we should be backing off for.
|
|
||||||
logrus.Warnf("Backing off %q for %s", s.serverName, duration)
|
|
||||||
|
|
||||||
// Wait for either an interruption or for the backoff to
|
|
||||||
// complete.
|
|
||||||
select {
|
|
||||||
case <-interrupt:
|
|
||||||
logrus.Debugf("Interrupting backoff for %q", s.serverName)
|
|
||||||
case <-time.After(duration):
|
|
||||||
}
|
|
||||||
|
|
||||||
return duration, false
|
|
||||||
}
|
|
||||||
|
|
||||||
// Blacklisted returns true if the server is blacklisted and false
|
// Blacklisted returns true if the server is blacklisted and false
|
||||||
// otherwise.
|
// otherwise.
|
||||||
func (s *ServerStatistics) Blacklisted() bool {
|
func (s *ServerStatistics) Blacklisted() bool {
|
||||||
|
|
|
@ -4,8 +4,6 @@ import (
|
||||||
"math"
|
"math"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"go.uber.org/atomic"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestBackoff(t *testing.T) {
|
func TestBackoff(t *testing.T) {
|
||||||
|
@ -27,34 +25,30 @@ func TestBackoff(t *testing.T) {
|
||||||
server.Failure()
|
server.Failure()
|
||||||
|
|
||||||
t.Logf("Backoff counter: %d", server.backoffCount.Load())
|
t.Logf("Backoff counter: %d", server.backoffCount.Load())
|
||||||
backingOff := atomic.Bool{}
|
|
||||||
|
|
||||||
// Now we're going to simulate backing off a few times to see
|
// Now we're going to simulate backing off a few times to see
|
||||||
// what happens.
|
// what happens.
|
||||||
for i := uint32(1); i <= 10; i++ {
|
for i := uint32(1); i <= 10; i++ {
|
||||||
// Interrupt the backoff - it doesn't really matter if it
|
|
||||||
// completes but we will find out how long the backoff should
|
|
||||||
// have been.
|
|
||||||
interrupt := make(chan bool, 1)
|
|
||||||
close(interrupt)
|
|
||||||
|
|
||||||
// Get the duration.
|
|
||||||
duration, blacklist := server.BackoffIfRequired(backingOff, interrupt)
|
|
||||||
|
|
||||||
// Register another failure for good measure. This should have no
|
// Register another failure for good measure. This should have no
|
||||||
// side effects since a backoff is already in progress. If it does
|
// side effects since a backoff is already in progress. If it does
|
||||||
// then we'll fail.
|
// then we'll fail.
|
||||||
until, blacklisted := server.Failure()
|
until, blacklisted := server.Failure()
|
||||||
if time.Until(until) > duration {
|
|
||||||
t.Fatal("Failure produced unexpected side effect when it shouldn't have")
|
// Get the duration.
|
||||||
}
|
_, blacklist := server.BackoffInfo()
|
||||||
|
duration := time.Until(until).Round(time.Second)
|
||||||
|
|
||||||
|
// Unset the backoff, or otherwise our next call will think that
|
||||||
|
// there's a backoff in progress and return the same result.
|
||||||
|
server.cancel()
|
||||||
|
server.backoffStarted.Store(false)
|
||||||
|
|
||||||
// Check if we should be blacklisted by now.
|
// Check if we should be blacklisted by now.
|
||||||
if i >= stats.FailuresUntilBlacklist {
|
if i >= stats.FailuresUntilBlacklist {
|
||||||
if !blacklist {
|
if !blacklist {
|
||||||
t.Fatalf("Backoff %d should have resulted in blacklist but didn't", i)
|
t.Fatalf("Backoff %d should have resulted in blacklist but didn't", i)
|
||||||
} else if blacklist != blacklisted {
|
} else if blacklist != blacklisted {
|
||||||
t.Fatalf("BackoffIfRequired and Failure returned different blacklist values")
|
t.Fatalf("BackoffInfo and Failure returned different blacklist values")
|
||||||
} else {
|
} else {
|
||||||
t.Logf("Backoff %d is blacklisted as expected", i)
|
t.Logf("Backoff %d is blacklisted as expected", i)
|
||||||
continue
|
continue
|
||||||
|
|
Loading…
Reference in a new issue