From a4083e07cd8112dbfedcfeeb05422bb3c05edf8e Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Thu, 20 Aug 2020 15:33:41 +0100 Subject: [PATCH] Rejig backoff --- federationsender/internal/api.go | 104 +++++++++++----------- federationsender/statistics/statistics.go | 10 +++ keyserver/internal/device_list_update.go | 4 +- 3 files changed, 64 insertions(+), 54 deletions(-) diff --git a/federationsender/internal/api.go b/federationsender/internal/api.go index 386149a33..c71d4209f 100644 --- a/federationsender/internal/api.go +++ b/federationsender/internal/api.go @@ -12,7 +12,7 @@ import ( roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrix" "github.com/matrix-org/gomatrixserverlib" - "go.uber.org/atomic" + "github.com/matrix-org/util" ) // FederationSenderInternalAPI is an implementation of api.FederationSenderInternalAPI @@ -47,95 +47,95 @@ func NewFederationSenderInternalAPI( func (a *FederationSenderInternalAPI) isBlacklistedOrBackingOff(s gomatrixserverlib.ServerName) (*statistics.ServerStatistics, error) { stats := a.statistics.ForServer(s) - if stats.Blacklisted() { + until, blacklisted := stats.BackoffInfo() + if blacklisted { return stats, &api.FederationClientError{ Blacklisted: true, } } - // Call BackoffIfRequired with a closed channel to make it return immediately. - // It will return the duration to backoff for. - var duration time.Duration - interrupt := make(chan bool) - close(interrupt) - var bo atomic.Bool - duration, _ = stats.BackoffIfRequired(bo, interrupt) - if duration > 0 { + now := time.Now() + if until != nil && now.Before(*until) { return stats, &api.FederationClientError{ - RetryAfter: duration, + RetryAfter: until.Sub(now), } } return stats, nil } -func failBlacklistableError(err error, stats *statistics.ServerStatistics) { +func failBlacklistableError(err error, stats *statistics.ServerStatistics) (until time.Time, blacklisted bool) { if err == nil { return } mxerr, ok := err.(gomatrix.HTTPError) if !ok { - stats.Failure() - return + return stats.Failure() } if mxerr.Code >= 500 || mxerr.Code < 600 { - stats.Failure() - return + return stats.Failure() } + return +} + +func (a *FederationSenderInternalAPI) doRequest( + ctx context.Context, s gomatrixserverlib.ServerName, request func() (interface{}, error), +) (interface{}, error) { + stats, err := a.isBlacklistedOrBackingOff(s) + if err != nil { + util.GetLogger(ctx).Infof("isBlacklistedOrBackingOff %v", err) + return nil, err + } + res, err := request() + if err != nil { + until, blacklisted := failBlacklistableError(err, stats) + now := time.Now() + var retryAfter time.Duration + if until.After(now) { + retryAfter = until.Sub(now) + } + return res, &api.FederationClientError{ + Err: err.Error(), + Blacklisted: blacklisted, + RetryAfter: retryAfter, + } + } + stats.Success() + return res, nil } func (a *FederationSenderInternalAPI) GetUserDevices( ctx context.Context, s gomatrixserverlib.ServerName, userID string, ) (gomatrixserverlib.RespUserDevices, error) { - var res gomatrixserverlib.RespUserDevices - stats, err := a.isBlacklistedOrBackingOff(s) + ires, err := a.doRequest(ctx, s, func() (interface{}, error) { + util.GetLogger(ctx).Infof("GetUserDevices being called now") + return a.federation.GetUserDevices(ctx, s, userID) + }) if err != nil { - return res, err + return gomatrixserverlib.RespUserDevices{}, err } - res, err = a.federation.GetUserDevices(ctx, s, userID) - if err != nil { - failBlacklistableError(err, stats) - return res, &api.FederationClientError{ - Err: err.Error(), - } - } - stats.Success() - return res, nil + return ires.(gomatrixserverlib.RespUserDevices), nil } func (a *FederationSenderInternalAPI) ClaimKeys( ctx context.Context, s gomatrixserverlib.ServerName, oneTimeKeys map[string]map[string]string, ) (gomatrixserverlib.RespClaimKeys, error) { - var res gomatrixserverlib.RespClaimKeys - stats, err := a.isBlacklistedOrBackingOff(s) + ires, err := a.doRequest(ctx, s, func() (interface{}, error) { + return a.federation.ClaimKeys(ctx, s, oneTimeKeys) + }) if err != nil { - return res, err + return gomatrixserverlib.RespClaimKeys{}, err } - res, err = a.federation.ClaimKeys(ctx, s, oneTimeKeys) - if err != nil { - failBlacklistableError(err, stats) - return res, &api.FederationClientError{ - Err: err.Error(), - } - } - stats.Success() - return res, nil + return ires.(gomatrixserverlib.RespClaimKeys), nil } func (a *FederationSenderInternalAPI) QueryKeys( ctx context.Context, s gomatrixserverlib.ServerName, keys map[string][]string, ) (gomatrixserverlib.RespQueryKeys, error) { - var res gomatrixserverlib.RespQueryKeys - stats, err := a.isBlacklistedOrBackingOff(s) + ires, err := a.doRequest(ctx, s, func() (interface{}, error) { + return a.federation.QueryKeys(ctx, s, keys) + }) if err != nil { - return res, err + return gomatrixserverlib.RespQueryKeys{}, err } - res, err = a.federation.QueryKeys(ctx, s, keys) - if err != nil { - failBlacklistableError(err, stats) - return res, &api.FederationClientError{ - Err: err.Error(), - } - } - stats.Success() - return res, nil + return ires.(gomatrixserverlib.RespQueryKeys), nil } diff --git a/federationsender/statistics/statistics.go b/federationsender/statistics/statistics.go index 19151cb34..03ef64e95 100644 --- a/federationsender/statistics/statistics.go +++ b/federationsender/statistics/statistics.go @@ -126,6 +126,16 @@ func (s *ServerStatistics) Failure() (time.Time, bool) { return until, false } +// BackoffInfo returns information about the current or previous backoff. +// Returns the last backoffUntil time and whether the server is currently blacklisted or not. +func (s *ServerStatistics) BackoffInfo() (*time.Time, bool) { + until, ok := s.backoffUntil.Load().(time.Time) + if ok { + return &until, s.blacklisted.Load() + } + return nil, s.blacklisted.Load() +} + // BackoffIfRequired will block for as long as the current // backoff requires, if needed. Otherwise it will do nothing. // Returns the amount of time to backoff for and whether to give up or not. diff --git a/keyserver/internal/device_list_update.go b/keyserver/internal/device_list_update.go index 7ef5c3ee6..36918256c 100644 --- a/keyserver/internal/device_list_update.go +++ b/keyserver/internal/device_list_update.go @@ -305,7 +305,7 @@ func (u *DeviceListUpdater) worker(ch chan gomatrixserverlib.ServerName) { continue } else { scheduledRetries[serverName] = time.Now().Add(cooloffPeriod) - go inject(serverName, cooloffPeriod) // TODO: Backoff? + go inject(serverName, cooloffPeriod) continue } } @@ -313,7 +313,7 @@ func (u *DeviceListUpdater) worker(ch chan gomatrixserverlib.ServerName) { shouldRetry := u.processServer(serverName) if shouldRetry { scheduledRetries[serverName] = time.Now().Add(cooloffPeriod) - go inject(serverName, cooloffPeriod) // TODO: Backoff? + go inject(serverName, cooloffPeriod) } } }