Make usage of relays optional, avoid DB roundtrips (#3337)

This should avoid 2 additional DB roundtrips if we don't want to use
relays.

So instead of possibly doing roughly 20k trips to the DB, we are now
"only" doing ~6600.

---------

Co-authored-by: devonh <devon.dmytro@gmail.com>
This commit is contained in:
Till 2024-02-28 20:59:34 +01:00 committed by GitHub
parent 4ccf6d6f67
commit 865fff5f03
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 32 additions and 17 deletions

View file

@ -113,10 +113,7 @@ func NewInternalAPI(
_ = federationDB.RemoveAllServersFromBlacklist() _ = federationDB.RemoveAllServersFromBlacklist()
} }
stats := statistics.NewStatistics( stats := statistics.NewStatistics(federationDB, cfg.FederationMaxRetries+1, cfg.P2PFederationRetriesUntilAssumedOffline+1, cfg.EnableRelays)
federationDB,
cfg.FederationMaxRetries+1,
cfg.P2PFederationRetriesUntilAssumedOffline+1)
js, nats := natsInstance.Prepare(processContext, &cfg.Matrix.JetStream) js, nats := natsInstance.Prepare(processContext, &cfg.Matrix.JetStream)

View file

@ -61,7 +61,7 @@ func TestFederationClientQueryKeys(t *testing.T) {
}, },
} }
fedClient := &testFedClient{} fedClient := &testFedClient{}
stats := statistics.NewStatistics(testDB, FailuresUntilBlacklist, FailuresUntilAssumedOffline) stats := statistics.NewStatistics(testDB, FailuresUntilBlacklist, FailuresUntilAssumedOffline, false)
queues := queue.NewOutgoingQueues( queues := queue.NewOutgoingQueues(
testDB, process.NewProcessContext(), testDB, process.NewProcessContext(),
false, false,
@ -92,7 +92,7 @@ func TestFederationClientQueryKeysBlacklisted(t *testing.T) {
}, },
} }
fedClient := &testFedClient{} fedClient := &testFedClient{}
stats := statistics.NewStatistics(testDB, FailuresUntilBlacklist, FailuresUntilAssumedOffline) stats := statistics.NewStatistics(testDB, FailuresUntilBlacklist, FailuresUntilAssumedOffline, false)
queues := queue.NewOutgoingQueues( queues := queue.NewOutgoingQueues(
testDB, process.NewProcessContext(), testDB, process.NewProcessContext(),
false, false,
@ -122,7 +122,7 @@ func TestFederationClientQueryKeysFailure(t *testing.T) {
}, },
} }
fedClient := &testFedClient{shouldFail: true} fedClient := &testFedClient{shouldFail: true}
stats := statistics.NewStatistics(testDB, FailuresUntilBlacklist, FailuresUntilAssumedOffline) stats := statistics.NewStatistics(testDB, FailuresUntilBlacklist, FailuresUntilAssumedOffline, false)
queues := queue.NewOutgoingQueues( queues := queue.NewOutgoingQueues(
testDB, process.NewProcessContext(), testDB, process.NewProcessContext(),
false, false,
@ -152,7 +152,7 @@ func TestFederationClientClaimKeys(t *testing.T) {
}, },
} }
fedClient := &testFedClient{} fedClient := &testFedClient{}
stats := statistics.NewStatistics(testDB, FailuresUntilBlacklist, FailuresUntilAssumedOffline) stats := statistics.NewStatistics(testDB, FailuresUntilBlacklist, FailuresUntilAssumedOffline, false)
queues := queue.NewOutgoingQueues( queues := queue.NewOutgoingQueues(
testDB, process.NewProcessContext(), testDB, process.NewProcessContext(),
false, false,
@ -183,7 +183,7 @@ func TestFederationClientClaimKeysBlacklisted(t *testing.T) {
}, },
} }
fedClient := &testFedClient{} fedClient := &testFedClient{}
stats := statistics.NewStatistics(testDB, FailuresUntilBlacklist, FailuresUntilAssumedOffline) stats := statistics.NewStatistics(testDB, FailuresUntilBlacklist, FailuresUntilAssumedOffline, false)
queues := queue.NewOutgoingQueues( queues := queue.NewOutgoingQueues(
testDB, process.NewProcessContext(), testDB, process.NewProcessContext(),
false, false,

View file

@ -66,7 +66,7 @@ func TestPerformWakeupServers(t *testing.T) {
}, },
} }
fedClient := &testFedClient{} fedClient := &testFedClient{}
stats := statistics.NewStatistics(testDB, FailuresUntilBlacklist, FailuresUntilAssumedOffline) stats := statistics.NewStatistics(testDB, FailuresUntilBlacklist, FailuresUntilAssumedOffline, true)
queues := queue.NewOutgoingQueues( queues := queue.NewOutgoingQueues(
testDB, process.NewProcessContext(), testDB, process.NewProcessContext(),
false, false,
@ -112,7 +112,7 @@ func TestQueryRelayServers(t *testing.T) {
}, },
} }
fedClient := &testFedClient{} fedClient := &testFedClient{}
stats := statistics.NewStatistics(testDB, FailuresUntilBlacklist, FailuresUntilAssumedOffline) stats := statistics.NewStatistics(testDB, FailuresUntilBlacklist, FailuresUntilAssumedOffline, false)
queues := queue.NewOutgoingQueues( queues := queue.NewOutgoingQueues(
testDB, process.NewProcessContext(), testDB, process.NewProcessContext(),
false, false,
@ -153,7 +153,7 @@ func TestRemoveRelayServers(t *testing.T) {
}, },
} }
fedClient := &testFedClient{} fedClient := &testFedClient{}
stats := statistics.NewStatistics(testDB, FailuresUntilBlacklist, FailuresUntilAssumedOffline) stats := statistics.NewStatistics(testDB, FailuresUntilBlacklist, FailuresUntilAssumedOffline, false)
queues := queue.NewOutgoingQueues( queues := queue.NewOutgoingQueues(
testDB, process.NewProcessContext(), testDB, process.NewProcessContext(),
false, false,
@ -193,7 +193,7 @@ func TestPerformDirectoryLookup(t *testing.T) {
}, },
} }
fedClient := &testFedClient{} fedClient := &testFedClient{}
stats := statistics.NewStatistics(testDB, FailuresUntilBlacklist, FailuresUntilAssumedOffline) stats := statistics.NewStatistics(testDB, FailuresUntilBlacklist, FailuresUntilAssumedOffline, false)
queues := queue.NewOutgoingQueues( queues := queue.NewOutgoingQueues(
testDB, process.NewProcessContext(), testDB, process.NewProcessContext(),
false, false,
@ -232,7 +232,7 @@ func TestPerformDirectoryLookupRelaying(t *testing.T) {
}, },
} }
fedClient := &testFedClient{} fedClient := &testFedClient{}
stats := statistics.NewStatistics(testDB, FailuresUntilBlacklist, FailuresUntilAssumedOffline) stats := statistics.NewStatistics(testDB, FailuresUntilBlacklist, FailuresUntilAssumedOffline, true)
queues := queue.NewOutgoingQueues( queues := queue.NewOutgoingQueues(
testDB, process.NewProcessContext(), testDB, process.NewProcessContext(),
false, false,

View file

@ -117,7 +117,7 @@ func testSetup(failuresUntilBlacklist uint32, failuresUntilAssumedOffline uint32
txRelayCount: *atomic.NewUint32(0), txRelayCount: *atomic.NewUint32(0),
} }
stats := statistics.NewStatistics(db, failuresUntilBlacklist, failuresUntilAssumedOffline) stats := statistics.NewStatistics(db, failuresUntilBlacklist, failuresUntilAssumedOffline, false)
signingInfo := []*fclient.SigningIdentity{ signingInfo := []*fclient.SigningIdentity{
{ {
KeyID: "ed21019:auto", KeyID: "ed21019:auto",

View file

@ -34,12 +34,15 @@ type Statistics struct {
// mark the destination as offline. At this point we should attempt // mark the destination as offline. At this point we should attempt
// to send messages to the user's async relay servers if we know them. // to send messages to the user's async relay servers if we know them.
FailuresUntilAssumedOffline uint32 FailuresUntilAssumedOffline uint32
enableRelays bool
} }
func NewStatistics( func NewStatistics(
db storage.Database, db storage.Database,
failuresUntilBlacklist uint32, failuresUntilBlacklist uint32,
failuresUntilAssumedOffline uint32, failuresUntilAssumedOffline uint32,
enableRelays bool,
) Statistics { ) Statistics {
return Statistics{ return Statistics{
DB: db, DB: db,
@ -47,6 +50,7 @@ func NewStatistics(
FailuresUntilAssumedOffline: failuresUntilAssumedOffline, FailuresUntilAssumedOffline: failuresUntilAssumedOffline,
backoffTimers: make(map[spec.ServerName]*time.Timer), backoffTimers: make(map[spec.ServerName]*time.Timer),
servers: make(map[spec.ServerName]*ServerStatistics), servers: make(map[spec.ServerName]*ServerStatistics),
enableRelays: enableRelays,
} }
} }
@ -73,6 +77,13 @@ func (s *Statistics) ForServer(serverName spec.ServerName) *ServerStatistics {
} else { } else {
server.blacklisted.Store(blacklisted) server.blacklisted.Store(blacklisted)
} }
// Don't bother hitting the database 2 additional times
// if we don't want to use relays.
if !s.enableRelays {
return server
}
assumedOffline, err := s.DB.IsServerAssumedOffline(context.Background(), serverName) assumedOffline, err := s.DB.IsServerAssumedOffline(context.Background(), serverName)
if err != nil { if err != nil {
logrus.WithError(err).Errorf("Failed to get assumed offline entry %q", serverName) logrus.WithError(err).Errorf("Failed to get assumed offline entry %q", serverName)

View file

@ -16,7 +16,7 @@ const (
) )
func TestBackoff(t *testing.T) { func TestBackoff(t *testing.T) {
stats := NewStatistics(nil, FailuresUntilBlacklist, FailuresUntilAssumedOffline) stats := NewStatistics(nil, FailuresUntilBlacklist, FailuresUntilAssumedOffline, false)
server := ServerStatistics{ server := ServerStatistics{
statistics: &stats, statistics: &stats,
serverName: "test.com", serverName: "test.com",
@ -106,7 +106,7 @@ func TestBackoff(t *testing.T) {
} }
func TestRelayServersListing(t *testing.T) { func TestRelayServersListing(t *testing.T) {
stats := NewStatistics(test.NewInMemoryFederationDatabase(), FailuresUntilBlacklist, FailuresUntilAssumedOffline) stats := NewStatistics(test.NewInMemoryFederationDatabase(), FailuresUntilBlacklist, FailuresUntilAssumedOffline, false)
server := ServerStatistics{statistics: &stats} server := ServerStatistics{statistics: &stats}
server.AddRelayServers([]spec.ServerName{"relay1", "relay1", "relay2"}) server.AddRelayServers([]spec.ServerName{"relay1", "relay1", "relay2"})
relayServers := server.KnownRelayServers() relayServers := server.KnownRelayServers()

View file

@ -18,6 +18,13 @@ type FederationAPI struct {
// The default value is 16 if not specified, which is circa 18 hours. // The default value is 16 if not specified, which is circa 18 hours.
FederationMaxRetries uint32 `yaml:"send_max_retries"` FederationMaxRetries uint32 `yaml:"send_max_retries"`
// P2P Feature: Whether relaying to specific nodes should be enabled.
// Defaults to false.
// Note: Enabling relays introduces a huge startup delay, if you are not using
// relays and have many servers to re-hydrate on start. Only enable this
// if you are using relays!
EnableRelays bool `yaml:"enable_relays"`
// P2P Feature: How many consecutive failures that we should tolerate when // P2P Feature: How many consecutive failures that we should tolerate when
// sending federation requests to a specific server until we should assume they // sending federation requests to a specific server until we should assume they
// are offline. If we assume they are offline then we will attempt to send // are offline. If we assume they are offline then we will attempt to send