From 92fcf5c5866154e5371d0e0ada6bcd9cefb4ea46 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Fri, 7 Aug 2020 17:10:54 +0100 Subject: [PATCH] Backoff fixes --- federationsender/queue/destinationqueue.go | 11 +-- federationsender/statistics/statistics.go | 72 +++++++++++++------ .../statistics/statistics_test.go | 53 ++++++++++++++ 3 files changed, 105 insertions(+), 31 deletions(-) create mode 100644 federationsender/statistics/statistics_test.go diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go index aedaeab1f..c4b768d6a 100644 --- a/federationsender/queue/destinationqueue.go +++ b/federationsender/queue/destinationqueue.go @@ -262,16 +262,7 @@ func (oq *destinationQueue) backgroundSend() { // If we are backing off this server then wait for the // backoff duration to complete first, or until explicitly // told to retry. - if backoff, duration := oq.statistics.BackoffDuration(); backoff { - log.WithField("duration", duration).Debugf("Backing off %s", oq.destination) - oq.backingOff.Store(true) - select { - case <-time.After(duration): - case <-oq.interruptBackoff: - log.Debugf("Interrupting backoff for %q", oq.destination) - } - oq.backingOff.Store(false) - } + oq.statistics.NextBackoff(oq.backingOff, oq.interruptBackoff) // If we have pending PDUs or EDUs then construct a transaction. if pendingPDUs || pendingEDUs { diff --git a/federationsender/statistics/statistics.go b/federationsender/statistics/statistics.go index 17dd896d5..816b70674 100644 --- a/federationsender/statistics/statistics.go +++ b/federationsender/statistics/statistics.go @@ -65,7 +65,8 @@ type ServerStatistics struct { statistics *Statistics // serverName gomatrixserverlib.ServerName // blacklisted atomic.Bool // is the node blacklisted - backoffUntil atomic.Value // time.Time to wait until before sending requests + backoffStarted atomic.Bool // is the backoff started + backoffCount atomic.Uint32 // number of times BackoffDuration has been called failCounter atomic.Uint32 // how many times have we failed? successCounter atomic.Uint32 // how many times have we succeeded? } @@ -77,9 +78,13 @@ type ServerStatistics struct { func (s *ServerStatistics) Success() { s.successCounter.Add(1) s.failCounter.Store(0) + s.backoffStarted.Store(false) + s.backoffCount.Store(0) s.blacklisted.Store(false) - if err := s.statistics.DB.RemoveServerFromBlacklist(s.serverName); err != nil { - logrus.WithError(err).Errorf("Failed to remove %q from blacklist", s.serverName) + if s.statistics.DB != nil { + if err := s.statistics.DB.RemoveServerFromBlacklist(s.serverName); err != nil { + logrus.WithError(err).Errorf("Failed to remove %q from blacklist", s.serverName) + } } } @@ -98,33 +103,58 @@ func (s *ServerStatistics) Failure() bool { // now. Mark the host as blacklisted and tell the caller to // give up. s.blacklisted.Store(true) - if err := s.statistics.DB.AddServerToBlacklist(s.serverName); err != nil { - logrus.WithError(err).Errorf("Failed to add %q to blacklist", s.serverName) + if s.statistics.DB != nil { + if err := s.statistics.DB.AddServerToBlacklist(s.serverName); err != nil { + logrus.WithError(err).Errorf("Failed to add %q to blacklist", s.serverName) + } } return true } - // We're still under the threshold so work out the exponential - // backoff based on how many times we have failed already. The - // worker goroutine will wait until this time before processing - // anything from the queue. - backoffSeconds := time.Second * time.Duration(math.Exp2(float64(failCounter))) - s.backoffUntil.Store( - time.Now().Add(backoffSeconds), - ) + if s.backoffStarted.CAS(false, true) { + s.backoffCount.Store(0) + } + return false } -// BackoffDuration returns both a bool stating whether to wait, +// BackoffIfRequired returns both a bool stating whether to wait, // and then if true, a duration to wait for. -func (s *ServerStatistics) BackoffDuration() (bool, time.Duration) { - backoff, until := false, time.Second - if b, ok := s.backoffUntil.Load().(time.Time); ok { - if b.After(time.Now()) { - backoff, until = true, time.Until(b) - } +func (s *ServerStatistics) BackoffIfRequired(backingOff atomic.Bool, interrupt <-chan bool) time.Duration { + if started := s.backoffStarted.Load(); !started { + return 0 } - return backoff, until + + // Work out how many times we've backed off so far. If we + // have passed the failure counter then we can stop backing + // off - we've done our time. + count := s.backoffCount.Inc() + if count >= s.failCounter.Load() { + s.backoffStarted.Store(false) + return 0 + } + + // Notify the destination queue that we're backing off now. + backingOff.Store(true) + defer backingOff.Store(false) + + // Work out how long we should be backing off for. + duration := time.Second * time.Duration(math.Exp2(float64(count))) + logrus.Debugf("Backing off %q for %d", s.serverName, duration) + + // Wait for either an interruption or for the backoff to + // complete. + select { + case <-interrupt: + logrus.Infof("Interrupting backoff for %q", s.serverName) + case <-time.After(duration): + } + + // Drain the interrupt queue - helps with tests. + for range interrupt { + } + + return duration } // Blacklisted returns true if the server is blacklisted and false diff --git a/federationsender/statistics/statistics_test.go b/federationsender/statistics/statistics_test.go new file mode 100644 index 000000000..483593156 --- /dev/null +++ b/federationsender/statistics/statistics_test.go @@ -0,0 +1,53 @@ +package statistics + +import ( + "math" + "testing" + "time" + + "go.uber.org/atomic" +) + +func TestBackoff(t *testing.T) { + stats := Statistics{ + FailuresUntilBlacklist: 6, + } + server := ServerStatistics{ + statistics: &stats, + serverName: "test.com", + } + + server.Success() + if successes := server.SuccessCount(); successes != 1 { + t.Fatalf("Expected success count 1, got %d", successes) + } + + for i := uint32(1); i <= stats.FailuresUntilBlacklist; i++ { + if server.Failure() == (i < stats.FailuresUntilBlacklist) { + t.Fatalf("Failure %d resulted in blacklist too soon", i) + } + } + + t.Logf("Failure counter: %d", server.failCounter) + t.Logf("Backoff counter: %d", server.backoffCount) + + backingOff := atomic.Bool{} + + for i := uint32(1); i <= 10; i++ { + interrupt := make(chan bool, 1) + close(interrupt) + + duration := server.BackoffIfRequired(backingOff, interrupt) + t.Logf("Backoff %d is for %s", i, duration) + + if i < stats.FailuresUntilBlacklist { + if wanted := time.Second * time.Duration(math.Exp2(float64(i))); duration != wanted { + t.Fatalf("Backoff should have been %s but was %s", wanted, duration) + } + } else { + if duration != 0 { + t.Fatalf("Backoff should have been zero but was %s", duration) + } + } + } +}