Backoff fixes

This commit is contained in:
Neil Alexander 2020-08-07 17:10:54 +01:00
parent 5dd5a41119
commit 92fcf5c586
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
3 changed files with 105 additions and 31 deletions

View file

@ -262,16 +262,7 @@ func (oq *destinationQueue) backgroundSend() {
// If we are backing off this server then wait for the
// backoff duration to complete first, or until explicitly
// told to retry.
if backoff, duration := oq.statistics.BackoffDuration(); backoff {
log.WithField("duration", duration).Debugf("Backing off %s", oq.destination)
oq.backingOff.Store(true)
select {
case <-time.After(duration):
case <-oq.interruptBackoff:
log.Debugf("Interrupting backoff for %q", oq.destination)
}
oq.backingOff.Store(false)
}
oq.statistics.NextBackoff(oq.backingOff, oq.interruptBackoff)
// If we have pending PDUs or EDUs then construct a transaction.
if pendingPDUs || pendingEDUs {

View file

@ -65,7 +65,8 @@ type ServerStatistics struct {
statistics *Statistics //
serverName gomatrixserverlib.ServerName //
blacklisted atomic.Bool // is the node blacklisted
backoffUntil atomic.Value // time.Time to wait until before sending requests
backoffStarted atomic.Bool // is the backoff started
backoffCount atomic.Uint32 // number of times BackoffDuration has been called
failCounter atomic.Uint32 // how many times have we failed?
successCounter atomic.Uint32 // how many times have we succeeded?
}
@ -77,11 +78,15 @@ type ServerStatistics struct {
func (s *ServerStatistics) Success() {
s.successCounter.Add(1)
s.failCounter.Store(0)
s.backoffStarted.Store(false)
s.backoffCount.Store(0)
s.blacklisted.Store(false)
if s.statistics.DB != nil {
if err := s.statistics.DB.RemoveServerFromBlacklist(s.serverName); err != nil {
logrus.WithError(err).Errorf("Failed to remove %q from blacklist", s.serverName)
}
}
}
// Failure marks a failure and works out when to backoff until. It
// returns true if the worker should give up altogether because of
@ -98,33 +103,58 @@ func (s *ServerStatistics) Failure() bool {
// now. Mark the host as blacklisted and tell the caller to
// give up.
s.blacklisted.Store(true)
if s.statistics.DB != nil {
if err := s.statistics.DB.AddServerToBlacklist(s.serverName); err != nil {
logrus.WithError(err).Errorf("Failed to add %q to blacklist", s.serverName)
}
}
return true
}
// We're still under the threshold so work out the exponential
// backoff based on how many times we have failed already. The
// worker goroutine will wait until this time before processing
// anything from the queue.
backoffSeconds := time.Second * time.Duration(math.Exp2(float64(failCounter)))
s.backoffUntil.Store(
time.Now().Add(backoffSeconds),
)
if s.backoffStarted.CAS(false, true) {
s.backoffCount.Store(0)
}
return false
}
// BackoffDuration returns both a bool stating whether to wait,
// BackoffIfRequired returns both a bool stating whether to wait,
// and then if true, a duration to wait for.
func (s *ServerStatistics) BackoffDuration() (bool, time.Duration) {
backoff, until := false, time.Second
if b, ok := s.backoffUntil.Load().(time.Time); ok {
if b.After(time.Now()) {
backoff, until = true, time.Until(b)
func (s *ServerStatistics) BackoffIfRequired(backingOff atomic.Bool, interrupt <-chan bool) time.Duration {
if started := s.backoffStarted.Load(); !started {
return 0
}
// Work out how many times we've backed off so far. If we
// have passed the failure counter then we can stop backing
// off - we've done our time.
count := s.backoffCount.Inc()
if count >= s.failCounter.Load() {
s.backoffStarted.Store(false)
return 0
}
return backoff, until
// Notify the destination queue that we're backing off now.
backingOff.Store(true)
defer backingOff.Store(false)
// Work out how long we should be backing off for.
duration := time.Second * time.Duration(math.Exp2(float64(count)))
logrus.Debugf("Backing off %q for %d", s.serverName, duration)
// Wait for either an interruption or for the backoff to
// complete.
select {
case <-interrupt:
logrus.Infof("Interrupting backoff for %q", s.serverName)
case <-time.After(duration):
}
// Drain the interrupt queue - helps with tests.
for range interrupt {
}
return duration
}
// Blacklisted returns true if the server is blacklisted and false

View file

@ -0,0 +1,53 @@
package statistics
import (
"math"
"testing"
"time"
"go.uber.org/atomic"
)
func TestBackoff(t *testing.T) {
stats := Statistics{
FailuresUntilBlacklist: 6,
}
server := ServerStatistics{
statistics: &stats,
serverName: "test.com",
}
server.Success()
if successes := server.SuccessCount(); successes != 1 {
t.Fatalf("Expected success count 1, got %d", successes)
}
for i := uint32(1); i <= stats.FailuresUntilBlacklist; i++ {
if server.Failure() == (i < stats.FailuresUntilBlacklist) {
t.Fatalf("Failure %d resulted in blacklist too soon", i)
}
}
t.Logf("Failure counter: %d", server.failCounter)
t.Logf("Backoff counter: %d", server.backoffCount)
backingOff := atomic.Bool{}
for i := uint32(1); i <= 10; i++ {
interrupt := make(chan bool, 1)
close(interrupt)
duration := server.BackoffIfRequired(backingOff, interrupt)
t.Logf("Backoff %d is for %s", i, duration)
if i < stats.FailuresUntilBlacklist {
if wanted := time.Second * time.Duration(math.Exp2(float64(i))); duration != wanted {
t.Fatalf("Backoff should have been %s but was %s", wanted, duration)
}
} else {
if duration != 0 {
t.Fatalf("Backoff should have been zero but was %s", duration)
}
}
}
}