mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-20 21:33:19 -06:00
Maybe fix that bug after all
This commit is contained in:
parent
5788fa500e
commit
219afbbf8d
|
|
@ -262,31 +262,22 @@ func (oq *destinationQueue) backgroundSend() {
|
|||
// If we are backing off this server then wait for the
|
||||
// backoff duration to complete first, or until explicitly
|
||||
// told to retry.
|
||||
oq.statistics.BackoffIfRequired(oq.backingOff, oq.interruptBackoff)
|
||||
if _, giveUp := oq.statistics.BackoffIfRequired(oq.backingOff, oq.interruptBackoff); giveUp {
|
||||
// It's been suggested that we should give up because the backoff
|
||||
// has exceeded a maximum allowable value. Clean up the in-memory
|
||||
// buffers at this point. The PDU clean-up is already on a defer.
|
||||
oq.cleanPendingInvites()
|
||||
log.Warnf("Blacklisting %q due to exceeding backoff threshold")
|
||||
return
|
||||
}
|
||||
|
||||
// If we have pending PDUs or EDUs then construct a transaction.
|
||||
if pendingPDUs || pendingEDUs {
|
||||
// Try sending the next transaction and see what happens.
|
||||
transaction, terr := oq.nextTransaction()
|
||||
if terr != nil {
|
||||
// We failed to send the transaction.
|
||||
if giveUp := oq.statistics.Failure(); giveUp {
|
||||
// It's been suggested that we should give up because the backoff
|
||||
// has exceeded a maximum allowable value. Clean up the in-memory
|
||||
// buffers at this point. The PDU clean-up is already on a defer.
|
||||
oq.cleanPendingInvites()
|
||||
log.WithError(terr).Warnf("Blacklisting %q due to error", oq.destination)
|
||||
return
|
||||
} else {
|
||||
// We haven't been told to give up terminally yet but we still have
|
||||
// PDUs waiting to be sent. By sending a message into the wake chan,
|
||||
// the next loop iteration will try processing these PDUs again,
|
||||
// subject to the backoff.
|
||||
select {
|
||||
case oq.notifyPDUs <- true:
|
||||
default:
|
||||
}
|
||||
}
|
||||
// We failed to send the transaction. Mark it as a failure.
|
||||
oq.statistics.Failure()
|
||||
} else if transaction {
|
||||
// If we successfully sent the transaction then clear out
|
||||
// the pending events and EDUs, and wipe our transaction ID.
|
||||
|
|
@ -298,14 +289,8 @@ func (oq *destinationQueue) backgroundSend() {
|
|||
if len(oq.pendingInvites) > 0 {
|
||||
sent, ierr := oq.nextInvites(oq.pendingInvites)
|
||||
if ierr != nil {
|
||||
// We failed to send the transaction so increase the
|
||||
// backoff and give it another go shortly.
|
||||
if giveUp := oq.statistics.Failure(); giveUp {
|
||||
// It's been suggested that we should give up because
|
||||
// the backoff has exceeded a maximum allowable value.
|
||||
log.WithError(ierr).Warnf("Blacklisting %q due to error", oq.destination)
|
||||
return
|
||||
}
|
||||
// We failed to send the transaction. Mark it as a failure.
|
||||
oq.statistics.Failure()
|
||||
} else if sent > 0 {
|
||||
// If we successfully sent the invites then clear out
|
||||
// the pending invites.
|
||||
|
|
|
|||
|
|
@ -67,7 +67,6 @@ type ServerStatistics struct {
|
|||
blacklisted atomic.Bool // is the node blacklisted
|
||||
backoffStarted atomic.Bool // is the backoff started
|
||||
backoffCount atomic.Uint32 // number of times BackoffDuration has been called
|
||||
failCounter atomic.Uint32 // how many times have we failed?
|
||||
successCounter atomic.Uint32 // how many times have we succeeded?
|
||||
}
|
||||
|
||||
|
|
@ -77,7 +76,6 @@ type ServerStatistics struct {
|
|||
// we will unblacklist it.
|
||||
func (s *ServerStatistics) Success() {
|
||||
s.successCounter.Add(1)
|
||||
s.failCounter.Store(0)
|
||||
s.backoffStarted.Store(false)
|
||||
s.backoffCount.Store(0)
|
||||
s.blacklisted.Store(false)
|
||||
|
|
@ -88,16 +86,28 @@ func (s *ServerStatistics) Success() {
|
|||
}
|
||||
}
|
||||
|
||||
// Failure marks a failure and works out when to backoff until. It
|
||||
// returns true if the worker should give up altogether because of
|
||||
// too many consecutive failures. At this point the host is marked
|
||||
// as blacklisted.
|
||||
func (s *ServerStatistics) Failure() bool {
|
||||
// Increase the fail counter.
|
||||
failCounter := s.failCounter.Add(1)
|
||||
// Failure marks a failure and starts backing off if needed.
|
||||
// The next call to BackoffIfRequired will do the right thing
|
||||
// after this.
|
||||
func (s *ServerStatistics) Failure() {
|
||||
if s.backoffStarted.CAS(false, true) {
|
||||
s.backoffCount.Store(0)
|
||||
}
|
||||
}
|
||||
|
||||
// Check that we haven't failed more times than is acceptable.
|
||||
if failCounter >= s.statistics.FailuresUntilBlacklist {
|
||||
// BackoffIfRequired will block for as long as the current
|
||||
// backoff requires, if needed. Otherwise it will do nothing.
|
||||
func (s *ServerStatistics) BackoffIfRequired(backingOff atomic.Bool, interrupt <-chan bool) (time.Duration, bool) {
|
||||
if started := s.backoffStarted.Load(); !started {
|
||||
return 0, false
|
||||
}
|
||||
|
||||
// Work out how many times we've backed off so far.
|
||||
count := s.backoffCount.Inc()
|
||||
duration := time.Second * time.Duration(math.Exp2(float64(count)))
|
||||
|
||||
// Work out if we should be blacklisting at this point.
|
||||
if count >= s.statistics.FailuresUntilBlacklist {
|
||||
// We've exceeded the maximum amount of times we're willing
|
||||
// to back off, which is probably in the region of hours by
|
||||
// now. Mark the host as blacklisted and tell the caller to
|
||||
|
|
@ -108,32 +118,14 @@ func (s *ServerStatistics) Failure() bool {
|
|||
logrus.WithError(err).Errorf("Failed to add %q to blacklist", s.serverName)
|
||||
}
|
||||
}
|
||||
return true
|
||||
return duration, true
|
||||
}
|
||||
|
||||
if s.backoffStarted.CAS(false, true) {
|
||||
s.backoffCount.Store(0)
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// BackoffIfRequired will block for as long as the current
|
||||
// backoff requires, if needed. Otherwise it will do nothing.
|
||||
func (s *ServerStatistics) BackoffIfRequired(backingOff atomic.Bool, interrupt <-chan bool) time.Duration {
|
||||
if started := s.backoffStarted.Load(); !started {
|
||||
return 0
|
||||
}
|
||||
|
||||
// Work out how many times we've backed off so far.
|
||||
count := s.backoffCount.Inc()
|
||||
|
||||
// Notify the destination queue that we're backing off now.
|
||||
backingOff.Store(true)
|
||||
defer backingOff.Store(false)
|
||||
|
||||
// Work out how long we should be backing off for.
|
||||
duration := time.Second * time.Duration(math.Exp2(float64(count)))
|
||||
logrus.Warnf("Backing off %q for %s", s.serverName, duration)
|
||||
|
||||
// Wait for either an interruption or for the backoff to
|
||||
|
|
@ -144,7 +136,7 @@ func (s *ServerStatistics) BackoffIfRequired(backingOff atomic.Bool, interrupt <
|
|||
case <-time.After(duration):
|
||||
}
|
||||
|
||||
return duration
|
||||
return duration, false
|
||||
}
|
||||
|
||||
// Blacklisted returns true if the server is blacklisted and false
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@ import (
|
|||
|
||||
func TestBackoff(t *testing.T) {
|
||||
stats := Statistics{
|
||||
FailuresUntilBlacklist: 6,
|
||||
FailuresUntilBlacklist: 5,
|
||||
}
|
||||
server := ServerStatistics{
|
||||
statistics: &stats,
|
||||
|
|
@ -23,18 +23,10 @@ func TestBackoff(t *testing.T) {
|
|||
t.Fatalf("Expected success count 1, got %d", successes)
|
||||
}
|
||||
|
||||
// Now we want to cause a series of failures. We'll do this
|
||||
// as many times as we need to blacklist. We'll check that we
|
||||
// were blacklisted at the right time based on the threshold.
|
||||
failures := stats.FailuresUntilBlacklist
|
||||
for i := uint32(1); i <= failures; i++ {
|
||||
if server.Failure() == (i < stats.FailuresUntilBlacklist) {
|
||||
t.Fatalf("Failure %d resulted in blacklist too soon", i)
|
||||
}
|
||||
}
|
||||
// Register a failure.
|
||||
server.Failure()
|
||||
|
||||
t.Logf("Failure counter: %d", server.failCounter)
|
||||
t.Logf("Backoff counter: %d", server.backoffCount)
|
||||
t.Logf("Backoff counter: %d", server.backoffCount.Load())
|
||||
backingOff := atomic.Bool{}
|
||||
|
||||
// Now we're going to simulate backing off a few times to see
|
||||
|
|
@ -47,12 +39,22 @@ func TestBackoff(t *testing.T) {
|
|||
close(interrupt)
|
||||
|
||||
// Get the duration.
|
||||
duration := server.BackoffIfRequired(backingOff, interrupt)
|
||||
t.Logf("Backoff %d is for %s", i, duration)
|
||||
duration, blacklist := server.BackoffIfRequired(backingOff, interrupt)
|
||||
|
||||
// Check if we should be blacklisted by now.
|
||||
if i > stats.FailuresUntilBlacklist {
|
||||
if !blacklist {
|
||||
t.Fatalf("Backoff %d should have resulted in blacklist but didn't", i)
|
||||
} else {
|
||||
t.Logf("Backoff %d is blacklisted as expected", i)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// Check if the duration is what we expect.
|
||||
if wanted := time.Second * time.Duration(math.Exp2(float64(i))); duration != wanted {
|
||||
t.Fatalf("Backoff should have been %s but was %s", wanted, duration)
|
||||
t.Logf("Backoff %d is for %s", i, duration)
|
||||
if wanted := time.Second * time.Duration(math.Exp2(float64(i))); !blacklist && duration != wanted {
|
||||
t.Fatalf("Backoff %d should have been %s but was %s", i, wanted, duration)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue