Rework statistics offline/blacklist clearing logic to flow better

This commit is contained in:
Devon Hudson 2023-01-12 11:23:08 -07:00
parent f6a9a77ecc
commit 7fb194fc61
No known key found for this signature in database
GPG key ID: CD06B18E77F6A628
4 changed files with 34 additions and 33 deletions

View file

@ -708,15 +708,8 @@ func (r *FederationInternalAPI) PerformWakeupServers(
func (r *FederationInternalAPI) MarkServersAlive(destinations []gomatrixserverlib.ServerName) { func (r *FederationInternalAPI) MarkServersAlive(destinations []gomatrixserverlib.ServerName) {
for _, srv := range destinations { for _, srv := range destinations {
// Check the statistics cache for the blacklist & assumed offline status to prevent hitting wasBlacklisted := r.statistics.ForServer(srv).MarkServerAlive()
// the database unnecessarily. r.queues.RetryServer(srv, wasBlacklisted)
if r.statistics.ForServer(srv).AssumedOffline() {
_ = r.db.RemoveServerAssumedOffline(srv)
}
if r.queues.IsServerBlacklisted(srv) {
_ = r.db.RemoveServerFromBlacklist(srv)
}
r.queues.RetryServer(srv)
} }
} }

View file

@ -374,25 +374,13 @@ func (oqs *OutgoingQueues) SendEDU(
return nil return nil
} }
// IsServerBlacklisted returns whether or not the provided server is currently
// blacklisted.
func (oqs *OutgoingQueues) IsServerBlacklisted(srv gomatrixserverlib.ServerName) bool {
return oqs.statistics.ForServer(srv).Blacklisted()
}
// RetryServer attempts to resend events to the given server if we had given up. // RetryServer attempts to resend events to the given server if we had given up.
func (oqs *OutgoingQueues) RetryServer(srv gomatrixserverlib.ServerName) { func (oqs *OutgoingQueues) RetryServer(srv gomatrixserverlib.ServerName, wasBlacklisted bool) {
if oqs.disabled { if oqs.disabled {
return return
} }
serverStatistics := oqs.statistics.ForServer(srv)
forceWakeup := serverStatistics.Blacklisted()
serverStatistics.RemoveAssumedOffline()
serverStatistics.RemoveBlacklist()
serverStatistics.ClearBackoff()
if queue := oqs.getQueue(srv); queue != nil { if queue := oqs.getQueue(srv); queue != nil {
queue.wakeQueueIfEventsPending(forceWakeup) queue.wakeQueueIfEventsPending(wasBlacklisted)
} }
} }

View file

@ -528,8 +528,8 @@ func TestRetryServerSendsPDUSuccessfully(t *testing.T) {
poll.WaitOn(t, checkBlacklisted, poll.WithTimeout(5*time.Second), poll.WithDelay(100*time.Millisecond)) poll.WaitOn(t, checkBlacklisted, poll.WithTimeout(5*time.Second), poll.WithDelay(100*time.Millisecond))
fc.shouldTxSucceed = true fc.shouldTxSucceed = true
db.RemoveServerFromBlacklist(destination) wasBlacklisted := dest.statistics.MarkServerAlive()
queues.RetryServer(destination) queues.RetryServer(destination, wasBlacklisted)
checkRetry := func(log poll.LogT) poll.Result { checkRetry := func(log poll.LogT) poll.Result {
data, dbErr := db.GetPendingPDUs(pc.Context(), destination, 100) data, dbErr := db.GetPendingPDUs(pc.Context(), destination, 100)
assert.NoError(t, dbErr) assert.NoError(t, dbErr)
@ -579,8 +579,8 @@ func TestRetryServerSendsEDUSuccessfully(t *testing.T) {
poll.WaitOn(t, checkBlacklisted, poll.WithTimeout(5*time.Second), poll.WithDelay(100*time.Millisecond)) poll.WaitOn(t, checkBlacklisted, poll.WithTimeout(5*time.Second), poll.WithDelay(100*time.Millisecond))
fc.shouldTxSucceed = true fc.shouldTxSucceed = true
db.RemoveServerFromBlacklist(destination) wasBlacklisted := dest.statistics.MarkServerAlive()
queues.RetryServer(destination) queues.RetryServer(destination, wasBlacklisted)
checkRetry := func(log poll.LogT) poll.Result { checkRetry := func(log poll.LogT) poll.Result {
data, dbErr := db.GetPendingEDUs(pc.Context(), destination, 100) data, dbErr := db.GetPendingEDUs(pc.Context(), destination, 100)
assert.NoError(t, dbErr) assert.NoError(t, dbErr)
@ -821,8 +821,8 @@ func TestQueueInteractsWithRealDatabasePDUAndEDU(t *testing.T) {
poll.WaitOn(t, checkBlacklisted, poll.WithTimeout(10*time.Second), poll.WithDelay(100*time.Millisecond)) poll.WaitOn(t, checkBlacklisted, poll.WithTimeout(10*time.Second), poll.WithDelay(100*time.Millisecond))
fc.shouldTxSucceed = true fc.shouldTxSucceed = true
db.RemoveServerFromBlacklist(destination) wasBlacklisted := dest.statistics.MarkServerAlive()
queues.RetryServer(destination) queues.RetryServer(destination, wasBlacklisted)
checkRetry := func(log poll.LogT) poll.Result { checkRetry := func(log poll.LogT) poll.Result {
pduData, dbErrPDU := db.GetPendingPDUs(pc.Context(), destination, 200) pduData, dbErrPDU := db.GetPendingPDUs(pc.Context(), destination, 200)
assert.NoError(t, dbErrPDU) assert.NoError(t, dbErrPDU)

View file

@ -214,6 +214,14 @@ func (s *ServerStatistics) Failure() (time.Time, bool) {
return s.backoffUntil.Load().(time.Time), false return s.backoffUntil.Load().(time.Time), false
} }
// MarkServerAlive removes the assumed offline and blacklisted statuses from this server.
// Returns whether the server was blacklisted before this point.
func (s *ServerStatistics) MarkServerAlive() bool {
s.removeAssumedOffline()
wasBlacklisted := s.removeBlacklist()
return wasBlacklisted
}
// ClearBackoff stops the backoff timer for this destination if it is running // ClearBackoff stops the backoff timer for this destination if it is running
// and removes the timer from the backoffTimers map. // and removes the timer from the backoffTimers map.
func (s *ServerStatistics) ClearBackoff() { func (s *ServerStatistics) ClearBackoff() {
@ -262,14 +270,26 @@ func (s *ServerStatistics) AssumedOffline() bool {
return s.assumedOffline.Load() return s.assumedOffline.Load()
} }
// RemoveBlacklist removes the blacklisted status from the server. // removeBlacklist removes the blacklisted status from the server.
func (s *ServerStatistics) RemoveBlacklist() { // Returns whether the server was blacklisted.
func (s *ServerStatistics) removeBlacklist() bool {
var wasBlacklisted bool
if s.Blacklisted() {
wasBlacklisted = true
_ = s.statistics.DB.RemoveServerFromBlacklist(s.serverName)
}
s.cancel() s.cancel()
s.backoffCount.Store(0) s.backoffCount.Store(0)
return wasBlacklisted
} }
// RemoveAssumedOffline removes the assumed offline status from the server. // removeAssumedOffline removes the assumed offline status from the server.
func (s *ServerStatistics) RemoveAssumedOffline() { func (s *ServerStatistics) removeAssumedOffline() {
if s.AssumedOffline() {
_ = s.statistics.DB.RemoveServerAssumedOffline(s.serverName)
}
s.assumedOffline.Store(false) s.assumedOffline.Store(false)
} }