Rejig backoff

This commit is contained in:
Kegan Dougal 2020-08-20 15:33:41 +01:00
parent 2331326392
commit a4083e07cd
3 changed files with 64 additions and 54 deletions

View file

@ -12,7 +12,7 @@ import (
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrix"
"github.com/matrix-org/gomatrixserverlib"
"go.uber.org/atomic"
"github.com/matrix-org/util"
)
// FederationSenderInternalAPI is an implementation of api.FederationSenderInternalAPI
@ -47,95 +47,95 @@ func NewFederationSenderInternalAPI(
func (a *FederationSenderInternalAPI) isBlacklistedOrBackingOff(s gomatrixserverlib.ServerName) (*statistics.ServerStatistics, error) {
stats := a.statistics.ForServer(s)
if stats.Blacklisted() {
until, blacklisted := stats.BackoffInfo()
if blacklisted {
return stats, &api.FederationClientError{
Blacklisted: true,
}
}
// Call BackoffIfRequired with a closed channel to make it return immediately.
// It will return the duration to backoff for.
var duration time.Duration
interrupt := make(chan bool)
close(interrupt)
var bo atomic.Bool
duration, _ = stats.BackoffIfRequired(bo, interrupt)
if duration > 0 {
now := time.Now()
if until != nil && now.Before(*until) {
return stats, &api.FederationClientError{
RetryAfter: duration,
RetryAfter: until.Sub(now),
}
}
return stats, nil
}
func failBlacklistableError(err error, stats *statistics.ServerStatistics) {
func failBlacklistableError(err error, stats *statistics.ServerStatistics) (until time.Time, blacklisted bool) {
if err == nil {
return
}
mxerr, ok := err.(gomatrix.HTTPError)
if !ok {
stats.Failure()
return
return stats.Failure()
}
if mxerr.Code >= 500 || mxerr.Code < 600 {
stats.Failure()
return
return stats.Failure()
}
return
}
func (a *FederationSenderInternalAPI) doRequest(
ctx context.Context, s gomatrixserverlib.ServerName, request func() (interface{}, error),
) (interface{}, error) {
stats, err := a.isBlacklistedOrBackingOff(s)
if err != nil {
util.GetLogger(ctx).Infof("isBlacklistedOrBackingOff %v", err)
return nil, err
}
res, err := request()
if err != nil {
until, blacklisted := failBlacklistableError(err, stats)
now := time.Now()
var retryAfter time.Duration
if until.After(now) {
retryAfter = until.Sub(now)
}
return res, &api.FederationClientError{
Err: err.Error(),
Blacklisted: blacklisted,
RetryAfter: retryAfter,
}
}
stats.Success()
return res, nil
}
func (a *FederationSenderInternalAPI) GetUserDevices(
ctx context.Context, s gomatrixserverlib.ServerName, userID string,
) (gomatrixserverlib.RespUserDevices, error) {
var res gomatrixserverlib.RespUserDevices
stats, err := a.isBlacklistedOrBackingOff(s)
ires, err := a.doRequest(ctx, s, func() (interface{}, error) {
util.GetLogger(ctx).Infof("GetUserDevices being called now")
return a.federation.GetUserDevices(ctx, s, userID)
})
if err != nil {
return res, err
return gomatrixserverlib.RespUserDevices{}, err
}
res, err = a.federation.GetUserDevices(ctx, s, userID)
if err != nil {
failBlacklistableError(err, stats)
return res, &api.FederationClientError{
Err: err.Error(),
}
}
stats.Success()
return res, nil
return ires.(gomatrixserverlib.RespUserDevices), nil
}
func (a *FederationSenderInternalAPI) ClaimKeys(
ctx context.Context, s gomatrixserverlib.ServerName, oneTimeKeys map[string]map[string]string,
) (gomatrixserverlib.RespClaimKeys, error) {
var res gomatrixserverlib.RespClaimKeys
stats, err := a.isBlacklistedOrBackingOff(s)
ires, err := a.doRequest(ctx, s, func() (interface{}, error) {
return a.federation.ClaimKeys(ctx, s, oneTimeKeys)
})
if err != nil {
return res, err
return gomatrixserverlib.RespClaimKeys{}, err
}
res, err = a.federation.ClaimKeys(ctx, s, oneTimeKeys)
if err != nil {
failBlacklistableError(err, stats)
return res, &api.FederationClientError{
Err: err.Error(),
}
}
stats.Success()
return res, nil
return ires.(gomatrixserverlib.RespClaimKeys), nil
}
func (a *FederationSenderInternalAPI) QueryKeys(
ctx context.Context, s gomatrixserverlib.ServerName, keys map[string][]string,
) (gomatrixserverlib.RespQueryKeys, error) {
var res gomatrixserverlib.RespQueryKeys
stats, err := a.isBlacklistedOrBackingOff(s)
ires, err := a.doRequest(ctx, s, func() (interface{}, error) {
return a.federation.QueryKeys(ctx, s, keys)
})
if err != nil {
return res, err
return gomatrixserverlib.RespQueryKeys{}, err
}
res, err = a.federation.QueryKeys(ctx, s, keys)
if err != nil {
failBlacklistableError(err, stats)
return res, &api.FederationClientError{
Err: err.Error(),
}
}
stats.Success()
return res, nil
return ires.(gomatrixserverlib.RespQueryKeys), nil
}

View file

@ -126,6 +126,16 @@ func (s *ServerStatistics) Failure() (time.Time, bool) {
return until, false
}
// BackoffInfo returns information about the current or previous backoff.
// Returns the last backoffUntil time and whether the server is currently blacklisted or not.
func (s *ServerStatistics) BackoffInfo() (*time.Time, bool) {
until, ok := s.backoffUntil.Load().(time.Time)
if ok {
return &until, s.blacklisted.Load()
}
return nil, s.blacklisted.Load()
}
// BackoffIfRequired will block for as long as the current
// backoff requires, if needed. Otherwise it will do nothing.
// Returns the amount of time to backoff for and whether to give up or not.

View file

@ -305,7 +305,7 @@ func (u *DeviceListUpdater) worker(ch chan gomatrixserverlib.ServerName) {
continue
} else {
scheduledRetries[serverName] = time.Now().Add(cooloffPeriod)
go inject(serverName, cooloffPeriod) // TODO: Backoff?
go inject(serverName, cooloffPeriod)
continue
}
}
@ -313,7 +313,7 @@ func (u *DeviceListUpdater) worker(ch chan gomatrixserverlib.ServerName) {
shouldRetry := u.processServer(serverName)
if shouldRetry {
scheduledRetries[serverName] = time.Now().Add(cooloffPeriod)
go inject(serverName, cooloffPeriod) // TODO: Backoff?
go inject(serverName, cooloffPeriod)
}
}
}