From 219afbbf8ddf9bf7e7174ad8d4107ecc54d61cea Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Fri, 7 Aug 2020 17:51:59 +0100 Subject: [PATCH] Maybe fix that bug after all --- federationsender/queue/destinationqueue.go | 39 +++++--------- federationsender/statistics/statistics.go | 54 ++++++++----------- .../statistics/statistics_test.go | 34 ++++++------ 3 files changed, 53 insertions(+), 74 deletions(-) diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go index 0b6f539df..32616a292 100644 --- a/federationsender/queue/destinationqueue.go +++ b/federationsender/queue/destinationqueue.go @@ -262,31 +262,22 @@ func (oq *destinationQueue) backgroundSend() { // If we are backing off this server then wait for the // backoff duration to complete first, or until explicitly // told to retry. - oq.statistics.BackoffIfRequired(oq.backingOff, oq.interruptBackoff) + if _, giveUp := oq.statistics.BackoffIfRequired(oq.backingOff, oq.interruptBackoff); giveUp { + // It's been suggested that we should give up because the backoff + // has exceeded a maximum allowable value. Clean up the in-memory + // buffers at this point. The PDU clean-up is already on a defer. + oq.cleanPendingInvites() + log.Warnf("Blacklisting %q due to exceeding backoff threshold") + return + } // If we have pending PDUs or EDUs then construct a transaction. if pendingPDUs || pendingEDUs { // Try sending the next transaction and see what happens. transaction, terr := oq.nextTransaction() if terr != nil { - // We failed to send the transaction. - if giveUp := oq.statistics.Failure(); giveUp { - // It's been suggested that we should give up because the backoff - // has exceeded a maximum allowable value. Clean up the in-memory - // buffers at this point. The PDU clean-up is already on a defer. - oq.cleanPendingInvites() - log.WithError(terr).Warnf("Blacklisting %q due to error", oq.destination) - return - } else { - // We haven't been told to give up terminally yet but we still have - // PDUs waiting to be sent. By sending a message into the wake chan, - // the next loop iteration will try processing these PDUs again, - // subject to the backoff. - select { - case oq.notifyPDUs <- true: - default: - } - } + // We failed to send the transaction. Mark it as a failure. + oq.statistics.Failure() } else if transaction { // If we successfully sent the transaction then clear out // the pending events and EDUs, and wipe our transaction ID. @@ -298,14 +289,8 @@ func (oq *destinationQueue) backgroundSend() { if len(oq.pendingInvites) > 0 { sent, ierr := oq.nextInvites(oq.pendingInvites) if ierr != nil { - // We failed to send the transaction so increase the - // backoff and give it another go shortly. - if giveUp := oq.statistics.Failure(); giveUp { - // It's been suggested that we should give up because - // the backoff has exceeded a maximum allowable value. - log.WithError(ierr).Warnf("Blacklisting %q due to error", oq.destination) - return - } + // We failed to send the transaction. Mark it as a failure. + oq.statistics.Failure() } else if sent > 0 { // If we successfully sent the invites then clear out // the pending invites. diff --git a/federationsender/statistics/statistics.go b/federationsender/statistics/statistics.go index f08fe1ce1..0dd8da200 100644 --- a/federationsender/statistics/statistics.go +++ b/federationsender/statistics/statistics.go @@ -67,7 +67,6 @@ type ServerStatistics struct { blacklisted atomic.Bool // is the node blacklisted backoffStarted atomic.Bool // is the backoff started backoffCount atomic.Uint32 // number of times BackoffDuration has been called - failCounter atomic.Uint32 // how many times have we failed? successCounter atomic.Uint32 // how many times have we succeeded? } @@ -77,7 +76,6 @@ type ServerStatistics struct { // we will unblacklist it. func (s *ServerStatistics) Success() { s.successCounter.Add(1) - s.failCounter.Store(0) s.backoffStarted.Store(false) s.backoffCount.Store(0) s.blacklisted.Store(false) @@ -88,16 +86,28 @@ func (s *ServerStatistics) Success() { } } -// Failure marks a failure and works out when to backoff until. It -// returns true if the worker should give up altogether because of -// too many consecutive failures. At this point the host is marked -// as blacklisted. -func (s *ServerStatistics) Failure() bool { - // Increase the fail counter. - failCounter := s.failCounter.Add(1) +// Failure marks a failure and starts backing off if needed. +// The next call to BackoffIfRequired will do the right thing +// after this. +func (s *ServerStatistics) Failure() { + if s.backoffStarted.CAS(false, true) { + s.backoffCount.Store(0) + } +} - // Check that we haven't failed more times than is acceptable. - if failCounter >= s.statistics.FailuresUntilBlacklist { +// BackoffIfRequired will block for as long as the current +// backoff requires, if needed. Otherwise it will do nothing. +func (s *ServerStatistics) BackoffIfRequired(backingOff atomic.Bool, interrupt <-chan bool) (time.Duration, bool) { + if started := s.backoffStarted.Load(); !started { + return 0, false + } + + // Work out how many times we've backed off so far. + count := s.backoffCount.Inc() + duration := time.Second * time.Duration(math.Exp2(float64(count))) + + // Work out if we should be blacklisting at this point. + if count >= s.statistics.FailuresUntilBlacklist { // We've exceeded the maximum amount of times we're willing // to back off, which is probably in the region of hours by // now. Mark the host as blacklisted and tell the caller to @@ -108,32 +118,14 @@ func (s *ServerStatistics) Failure() bool { logrus.WithError(err).Errorf("Failed to add %q to blacklist", s.serverName) } } - return true + return duration, true } - if s.backoffStarted.CAS(false, true) { - s.backoffCount.Store(0) - } - - return false -} - -// BackoffIfRequired will block for as long as the current -// backoff requires, if needed. Otherwise it will do nothing. -func (s *ServerStatistics) BackoffIfRequired(backingOff atomic.Bool, interrupt <-chan bool) time.Duration { - if started := s.backoffStarted.Load(); !started { - return 0 - } - - // Work out how many times we've backed off so far. - count := s.backoffCount.Inc() - // Notify the destination queue that we're backing off now. backingOff.Store(true) defer backingOff.Store(false) // Work out how long we should be backing off for. - duration := time.Second * time.Duration(math.Exp2(float64(count))) logrus.Warnf("Backing off %q for %s", s.serverName, duration) // Wait for either an interruption or for the backoff to @@ -144,7 +136,7 @@ func (s *ServerStatistics) BackoffIfRequired(backingOff atomic.Bool, interrupt < case <-time.After(duration): } - return duration + return duration, false } // Blacklisted returns true if the server is blacklisted and false diff --git a/federationsender/statistics/statistics_test.go b/federationsender/statistics/statistics_test.go index 6783825c7..9050662ec 100644 --- a/federationsender/statistics/statistics_test.go +++ b/federationsender/statistics/statistics_test.go @@ -10,7 +10,7 @@ import ( func TestBackoff(t *testing.T) { stats := Statistics{ - FailuresUntilBlacklist: 6, + FailuresUntilBlacklist: 5, } server := ServerStatistics{ statistics: &stats, @@ -23,18 +23,10 @@ func TestBackoff(t *testing.T) { t.Fatalf("Expected success count 1, got %d", successes) } - // Now we want to cause a series of failures. We'll do this - // as many times as we need to blacklist. We'll check that we - // were blacklisted at the right time based on the threshold. - failures := stats.FailuresUntilBlacklist - for i := uint32(1); i <= failures; i++ { - if server.Failure() == (i < stats.FailuresUntilBlacklist) { - t.Fatalf("Failure %d resulted in blacklist too soon", i) - } - } + // Register a failure. + server.Failure() - t.Logf("Failure counter: %d", server.failCounter) - t.Logf("Backoff counter: %d", server.backoffCount) + t.Logf("Backoff counter: %d", server.backoffCount.Load()) backingOff := atomic.Bool{} // Now we're going to simulate backing off a few times to see @@ -47,12 +39,22 @@ func TestBackoff(t *testing.T) { close(interrupt) // Get the duration. - duration := server.BackoffIfRequired(backingOff, interrupt) - t.Logf("Backoff %d is for %s", i, duration) + duration, blacklist := server.BackoffIfRequired(backingOff, interrupt) + + // Check if we should be blacklisted by now. + if i > stats.FailuresUntilBlacklist { + if !blacklist { + t.Fatalf("Backoff %d should have resulted in blacklist but didn't", i) + } else { + t.Logf("Backoff %d is blacklisted as expected", i) + continue + } + } // Check if the duration is what we expect. - if wanted := time.Second * time.Duration(math.Exp2(float64(i))); duration != wanted { - t.Fatalf("Backoff should have been %s but was %s", wanted, duration) + t.Logf("Backoff %d is for %s", i, duration) + if wanted := time.Second * time.Duration(math.Exp2(float64(i))); !blacklist && duration != wanted { + t.Fatalf("Backoff %d should have been %s but was %s", i, wanted, duration) } } }