From ea103886a9ae89af4d6e7c760f5cec0ec5040f6f Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Tue, 25 Jan 2022 16:28:47 +0000 Subject: [PATCH] Try backing off servers --- federationapi/internal/api.go | 16 ++++++++++++++- federationapi/internal/federationclient.go | 24 +++++++++++----------- federationapi/internal/query.go | 2 +- 3 files changed, 28 insertions(+), 14 deletions(-) diff --git a/federationapi/internal/api.go b/federationapi/internal/api.go index 22850179e..4e9fa8410 100644 --- a/federationapi/internal/api.go +++ b/federationapi/internal/api.go @@ -3,6 +3,7 @@ package internal import ( "crypto/ed25519" "encoding/base64" + "fmt" "sync" "time" @@ -141,7 +142,7 @@ func failBlacklistableError(err error, stats *statistics.ServerStatistics) (unti return } -func (a *FederationInternalAPI) doRequest( +func (a *FederationInternalAPI) doRequestIfNotBackingOffOrBlacklisted( s gomatrixserverlib.ServerName, request func() (interface{}, error), ) (interface{}, error) { stats, err := a.isBlacklistedOrBackingOff(s) @@ -165,3 +166,16 @@ func (a *FederationInternalAPI) doRequest( stats.Success() return res, nil } + +func (a *FederationInternalAPI) doRequestIfNotBlacklisted( + s gomatrixserverlib.ServerName, request func() (interface{}, error), +) (interface{}, error) { + stats := a.statistics.ForServer(s) + if _, blacklisted := stats.BackoffInfo(); blacklisted { + return stats, &api.FederationClientError{ + Err: fmt.Sprintf("server %q is blacklisted", s), + Blacklisted: true, + } + } + return request() +} diff --git a/federationapi/internal/federationclient.go b/federationapi/internal/federationclient.go index e2bec10ac..b31db466c 100644 --- a/federationapi/internal/federationclient.go +++ b/federationapi/internal/federationclient.go @@ -16,7 +16,7 @@ func (a *FederationInternalAPI) GetEventAuth( ) (res gomatrixserverlib.RespEventAuth, err error) { ctx, cancel := context.WithTimeout(ctx, time.Second*30) defer cancel() - ires, err := a.doRequest(s, func() (interface{}, error) { + ires, err := a.doRequestIfNotBlacklisted(s, func() (interface{}, error) { return a.federation.GetEventAuth(ctx, s, roomVersion, roomID, eventID) }) if err != nil { @@ -30,7 +30,7 @@ func (a *FederationInternalAPI) GetUserDevices( ) (gomatrixserverlib.RespUserDevices, error) { ctx, cancel := context.WithTimeout(ctx, time.Second*30) defer cancel() - ires, err := a.doRequest(s, func() (interface{}, error) { + ires, err := a.doRequestIfNotBlacklisted(s, func() (interface{}, error) { return a.federation.GetUserDevices(ctx, s, userID) }) if err != nil { @@ -44,7 +44,7 @@ func (a *FederationInternalAPI) ClaimKeys( ) (gomatrixserverlib.RespClaimKeys, error) { ctx, cancel := context.WithTimeout(ctx, time.Second*30) defer cancel() - ires, err := a.doRequest(s, func() (interface{}, error) { + ires, err := a.doRequestIfNotBackingOffOrBlacklisted(s, func() (interface{}, error) { return a.federation.ClaimKeys(ctx, s, oneTimeKeys) }) if err != nil { @@ -56,7 +56,7 @@ func (a *FederationInternalAPI) ClaimKeys( func (a *FederationInternalAPI) QueryKeys( ctx context.Context, s gomatrixserverlib.ServerName, keys map[string][]string, ) (gomatrixserverlib.RespQueryKeys, error) { - ires, err := a.doRequest(s, func() (interface{}, error) { + ires, err := a.doRequestIfNotBackingOffOrBlacklisted(s, func() (interface{}, error) { return a.federation.QueryKeys(ctx, s, keys) }) if err != nil { @@ -70,7 +70,7 @@ func (a *FederationInternalAPI) Backfill( ) (res gomatrixserverlib.Transaction, err error) { ctx, cancel := context.WithTimeout(ctx, time.Second*30) defer cancel() - ires, err := a.doRequest(s, func() (interface{}, error) { + ires, err := a.doRequestIfNotBlacklisted(s, func() (interface{}, error) { return a.federation.Backfill(ctx, s, roomID, limit, eventIDs) }) if err != nil { @@ -84,7 +84,7 @@ func (a *FederationInternalAPI) LookupState( ) (res gomatrixserverlib.RespState, err error) { ctx, cancel := context.WithTimeout(ctx, time.Second*30) defer cancel() - ires, err := a.doRequest(s, func() (interface{}, error) { + ires, err := a.doRequestIfNotBlacklisted(s, func() (interface{}, error) { return a.federation.LookupState(ctx, s, roomID, eventID, roomVersion) }) if err != nil { @@ -98,7 +98,7 @@ func (a *FederationInternalAPI) LookupStateIDs( ) (res gomatrixserverlib.RespStateIDs, err error) { ctx, cancel := context.WithTimeout(ctx, time.Second*30) defer cancel() - ires, err := a.doRequest(s, func() (interface{}, error) { + ires, err := a.doRequestIfNotBlacklisted(s, func() (interface{}, error) { return a.federation.LookupStateIDs(ctx, s, roomID, eventID) }) if err != nil { @@ -113,7 +113,7 @@ func (a *FederationInternalAPI) LookupMissingEvents( ) (res gomatrixserverlib.RespMissingEvents, err error) { ctx, cancel := context.WithTimeout(ctx, time.Second*30) defer cancel() - ires, err := a.doRequest(s, func() (interface{}, error) { + ires, err := a.doRequestIfNotBlacklisted(s, func() (interface{}, error) { return a.federation.LookupMissingEvents(ctx, s, roomID, missing, roomVersion) }) if err != nil { @@ -127,7 +127,7 @@ func (a *FederationInternalAPI) GetEvent( ) (res gomatrixserverlib.Transaction, err error) { ctx, cancel := context.WithTimeout(ctx, time.Second*30) defer cancel() - ires, err := a.doRequest(s, func() (interface{}, error) { + ires, err := a.doRequestIfNotBlacklisted(s, func() (interface{}, error) { return a.federation.GetEvent(ctx, s, eventID) }) if err != nil { @@ -141,7 +141,7 @@ func (a *FederationInternalAPI) LookupServerKeys( ) ([]gomatrixserverlib.ServerKeys, error) { ctx, cancel := context.WithTimeout(ctx, time.Minute) defer cancel() - ires, err := a.doRequest(s, func() (interface{}, error) { + ires, err := a.doRequestIfNotBlacklisted(s, func() (interface{}, error) { return a.federation.LookupServerKeys(ctx, s, keyRequests) }) if err != nil { @@ -156,7 +156,7 @@ func (a *FederationInternalAPI) MSC2836EventRelationships( ) (res gomatrixserverlib.MSC2836EventRelationshipsResponse, err error) { ctx, cancel := context.WithTimeout(ctx, time.Minute) defer cancel() - ires, err := a.doRequest(s, func() (interface{}, error) { + ires, err := a.doRequestIfNotBlacklisted(s, func() (interface{}, error) { return a.federation.MSC2836EventRelationships(ctx, s, r, roomVersion) }) if err != nil { @@ -170,7 +170,7 @@ func (a *FederationInternalAPI) MSC2946Spaces( ) (res gomatrixserverlib.MSC2946SpacesResponse, err error) { ctx, cancel := context.WithTimeout(ctx, time.Minute) defer cancel() - ires, err := a.doRequest(s, func() (interface{}, error) { + ires, err := a.doRequestIfNotBlacklisted(s, func() (interface{}, error) { return a.federation.MSC2946Spaces(ctx, s, roomID, r) }) if err != nil { diff --git a/federationapi/internal/query.go b/federationapi/internal/query.go index bac813331..ce57778be 100644 --- a/federationapi/internal/query.go +++ b/federationapi/internal/query.go @@ -28,7 +28,7 @@ func (f *FederationInternalAPI) QueryJoinedHostServerNamesInRoom( func (a *FederationInternalAPI) fetchServerKeysDirectly(ctx context.Context, serverName gomatrixserverlib.ServerName) (*gomatrixserverlib.ServerKeys, error) { ctx, cancel := context.WithTimeout(ctx, time.Second*30) defer cancel() - ires, err := a.doRequest(serverName, func() (interface{}, error) { + ires, err := a.doRequestIfNotBackingOffOrBlacklisted(serverName, func() (interface{}, error) { return a.federation.GetServerKeys(ctx, serverName) }) if err != nil {