Try backing off servers

This commit is contained in:
Neil Alexander 2022-01-25 16:28:47 +00:00
parent 7f77bf832a
commit ea103886a9
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
3 changed files with 28 additions and 14 deletions

View file

@ -3,6 +3,7 @@ package internal
import (
"crypto/ed25519"
"encoding/base64"
"fmt"
"sync"
"time"
@ -141,7 +142,7 @@ func failBlacklistableError(err error, stats *statistics.ServerStatistics) (unti
return
}
func (a *FederationInternalAPI) doRequest(
func (a *FederationInternalAPI) doRequestIfNotBackingOffOrBlacklisted(
s gomatrixserverlib.ServerName, request func() (interface{}, error),
) (interface{}, error) {
stats, err := a.isBlacklistedOrBackingOff(s)
@ -165,3 +166,16 @@ func (a *FederationInternalAPI) doRequest(
stats.Success()
return res, nil
}
func (a *FederationInternalAPI) doRequestIfNotBlacklisted(
s gomatrixserverlib.ServerName, request func() (interface{}, error),
) (interface{}, error) {
stats := a.statistics.ForServer(s)
if _, blacklisted := stats.BackoffInfo(); blacklisted {
return stats, &api.FederationClientError{
Err: fmt.Sprintf("server %q is blacklisted", s),
Blacklisted: true,
}
}
return request()
}

View file

@ -16,7 +16,7 @@ func (a *FederationInternalAPI) GetEventAuth(
) (res gomatrixserverlib.RespEventAuth, err error) {
ctx, cancel := context.WithTimeout(ctx, time.Second*30)
defer cancel()
ires, err := a.doRequest(s, func() (interface{}, error) {
ires, err := a.doRequestIfNotBlacklisted(s, func() (interface{}, error) {
return a.federation.GetEventAuth(ctx, s, roomVersion, roomID, eventID)
})
if err != nil {
@ -30,7 +30,7 @@ func (a *FederationInternalAPI) GetUserDevices(
) (gomatrixserverlib.RespUserDevices, error) {
ctx, cancel := context.WithTimeout(ctx, time.Second*30)
defer cancel()
ires, err := a.doRequest(s, func() (interface{}, error) {
ires, err := a.doRequestIfNotBlacklisted(s, func() (interface{}, error) {
return a.federation.GetUserDevices(ctx, s, userID)
})
if err != nil {
@ -44,7 +44,7 @@ func (a *FederationInternalAPI) ClaimKeys(
) (gomatrixserverlib.RespClaimKeys, error) {
ctx, cancel := context.WithTimeout(ctx, time.Second*30)
defer cancel()
ires, err := a.doRequest(s, func() (interface{}, error) {
ires, err := a.doRequestIfNotBackingOffOrBlacklisted(s, func() (interface{}, error) {
return a.federation.ClaimKeys(ctx, s, oneTimeKeys)
})
if err != nil {
@ -56,7 +56,7 @@ func (a *FederationInternalAPI) ClaimKeys(
func (a *FederationInternalAPI) QueryKeys(
ctx context.Context, s gomatrixserverlib.ServerName, keys map[string][]string,
) (gomatrixserverlib.RespQueryKeys, error) {
ires, err := a.doRequest(s, func() (interface{}, error) {
ires, err := a.doRequestIfNotBackingOffOrBlacklisted(s, func() (interface{}, error) {
return a.federation.QueryKeys(ctx, s, keys)
})
if err != nil {
@ -70,7 +70,7 @@ func (a *FederationInternalAPI) Backfill(
) (res gomatrixserverlib.Transaction, err error) {
ctx, cancel := context.WithTimeout(ctx, time.Second*30)
defer cancel()
ires, err := a.doRequest(s, func() (interface{}, error) {
ires, err := a.doRequestIfNotBlacklisted(s, func() (interface{}, error) {
return a.federation.Backfill(ctx, s, roomID, limit, eventIDs)
})
if err != nil {
@ -84,7 +84,7 @@ func (a *FederationInternalAPI) LookupState(
) (res gomatrixserverlib.RespState, err error) {
ctx, cancel := context.WithTimeout(ctx, time.Second*30)
defer cancel()
ires, err := a.doRequest(s, func() (interface{}, error) {
ires, err := a.doRequestIfNotBlacklisted(s, func() (interface{}, error) {
return a.federation.LookupState(ctx, s, roomID, eventID, roomVersion)
})
if err != nil {
@ -98,7 +98,7 @@ func (a *FederationInternalAPI) LookupStateIDs(
) (res gomatrixserverlib.RespStateIDs, err error) {
ctx, cancel := context.WithTimeout(ctx, time.Second*30)
defer cancel()
ires, err := a.doRequest(s, func() (interface{}, error) {
ires, err := a.doRequestIfNotBlacklisted(s, func() (interface{}, error) {
return a.federation.LookupStateIDs(ctx, s, roomID, eventID)
})
if err != nil {
@ -113,7 +113,7 @@ func (a *FederationInternalAPI) LookupMissingEvents(
) (res gomatrixserverlib.RespMissingEvents, err error) {
ctx, cancel := context.WithTimeout(ctx, time.Second*30)
defer cancel()
ires, err := a.doRequest(s, func() (interface{}, error) {
ires, err := a.doRequestIfNotBlacklisted(s, func() (interface{}, error) {
return a.federation.LookupMissingEvents(ctx, s, roomID, missing, roomVersion)
})
if err != nil {
@ -127,7 +127,7 @@ func (a *FederationInternalAPI) GetEvent(
) (res gomatrixserverlib.Transaction, err error) {
ctx, cancel := context.WithTimeout(ctx, time.Second*30)
defer cancel()
ires, err := a.doRequest(s, func() (interface{}, error) {
ires, err := a.doRequestIfNotBlacklisted(s, func() (interface{}, error) {
return a.federation.GetEvent(ctx, s, eventID)
})
if err != nil {
@ -141,7 +141,7 @@ func (a *FederationInternalAPI) LookupServerKeys(
) ([]gomatrixserverlib.ServerKeys, error) {
ctx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()
ires, err := a.doRequest(s, func() (interface{}, error) {
ires, err := a.doRequestIfNotBlacklisted(s, func() (interface{}, error) {
return a.federation.LookupServerKeys(ctx, s, keyRequests)
})
if err != nil {
@ -156,7 +156,7 @@ func (a *FederationInternalAPI) MSC2836EventRelationships(
) (res gomatrixserverlib.MSC2836EventRelationshipsResponse, err error) {
ctx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()
ires, err := a.doRequest(s, func() (interface{}, error) {
ires, err := a.doRequestIfNotBlacklisted(s, func() (interface{}, error) {
return a.federation.MSC2836EventRelationships(ctx, s, r, roomVersion)
})
if err != nil {
@ -170,7 +170,7 @@ func (a *FederationInternalAPI) MSC2946Spaces(
) (res gomatrixserverlib.MSC2946SpacesResponse, err error) {
ctx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()
ires, err := a.doRequest(s, func() (interface{}, error) {
ires, err := a.doRequestIfNotBlacklisted(s, func() (interface{}, error) {
return a.federation.MSC2946Spaces(ctx, s, roomID, r)
})
if err != nil {

View file

@ -28,7 +28,7 @@ func (f *FederationInternalAPI) QueryJoinedHostServerNamesInRoom(
func (a *FederationInternalAPI) fetchServerKeysDirectly(ctx context.Context, serverName gomatrixserverlib.ServerName) (*gomatrixserverlib.ServerKeys, error) {
ctx, cancel := context.WithTimeout(ctx, time.Second*30)
defer cancel()
ires, err := a.doRequest(serverName, func() (interface{}, error) {
ires, err := a.doRequestIfNotBackingOffOrBlacklisted(serverName, func() (interface{}, error) {
return a.federation.GetServerKeys(ctx, serverName)
})
if err != nil {