Introduce a new config to enable relays explicitly

This commit is contained in:
Till Faelligen 2024-02-26 19:39:33 +01:00
parent f4e77453cb
commit 5100c564c2
No known key found for this signature in database
GPG key ID: 3DF82D8AB9211D4E
7 changed files with 43 additions and 30 deletions

View file

@ -113,10 +113,7 @@ func NewInternalAPI(
_ = federationDB.RemoveAllServersFromBlacklist()
}
stats := statistics.NewStatistics(
federationDB,
cfg.FederationMaxRetries+1,
cfg.P2PFederationRetriesUntilAssumedOffline+1)
stats := statistics.NewStatistics(federationDB, cfg.FederationMaxRetries+1, cfg.P2PFederationRetriesUntilAssumedOffline+1, false)
js, nats := natsInstance.Prepare(processContext, &cfg.Matrix.JetStream)

View file

@ -61,7 +61,7 @@ func TestFederationClientQueryKeys(t *testing.T) {
},
}
fedClient := &testFedClient{}
stats := statistics.NewStatistics(testDB, FailuresUntilBlacklist, FailuresUntilAssumedOffline)
stats := statistics.NewStatistics(testDB, FailuresUntilBlacklist, FailuresUntilAssumedOffline, false)
queues := queue.NewOutgoingQueues(
testDB, process.NewProcessContext(),
false,
@ -92,7 +92,7 @@ func TestFederationClientQueryKeysBlacklisted(t *testing.T) {
},
}
fedClient := &testFedClient{}
stats := statistics.NewStatistics(testDB, FailuresUntilBlacklist, FailuresUntilAssumedOffline)
stats := statistics.NewStatistics(testDB, FailuresUntilBlacklist, FailuresUntilAssumedOffline, false)
queues := queue.NewOutgoingQueues(
testDB, process.NewProcessContext(),
false,
@ -122,7 +122,7 @@ func TestFederationClientQueryKeysFailure(t *testing.T) {
},
}
fedClient := &testFedClient{shouldFail: true}
stats := statistics.NewStatistics(testDB, FailuresUntilBlacklist, FailuresUntilAssumedOffline)
stats := statistics.NewStatistics(testDB, FailuresUntilBlacklist, FailuresUntilAssumedOffline, false)
queues := queue.NewOutgoingQueues(
testDB, process.NewProcessContext(),
false,
@ -152,7 +152,7 @@ func TestFederationClientClaimKeys(t *testing.T) {
},
}
fedClient := &testFedClient{}
stats := statistics.NewStatistics(testDB, FailuresUntilBlacklist, FailuresUntilAssumedOffline)
stats := statistics.NewStatistics(testDB, FailuresUntilBlacklist, FailuresUntilAssumedOffline, false)
queues := queue.NewOutgoingQueues(
testDB, process.NewProcessContext(),
false,
@ -183,7 +183,7 @@ func TestFederationClientClaimKeysBlacklisted(t *testing.T) {
},
}
fedClient := &testFedClient{}
stats := statistics.NewStatistics(testDB, FailuresUntilBlacklist, FailuresUntilAssumedOffline)
stats := statistics.NewStatistics(testDB, FailuresUntilBlacklist, FailuresUntilAssumedOffline, false)
queues := queue.NewOutgoingQueues(
testDB, process.NewProcessContext(),
false,

View file

@ -66,7 +66,7 @@ func TestPerformWakeupServers(t *testing.T) {
},
}
fedClient := &testFedClient{}
stats := statistics.NewStatistics(testDB, FailuresUntilBlacklist, FailuresUntilAssumedOffline)
stats := statistics.NewStatistics(testDB, FailuresUntilBlacklist, FailuresUntilAssumedOffline, true)
queues := queue.NewOutgoingQueues(
testDB, process.NewProcessContext(),
false,
@ -112,7 +112,7 @@ func TestQueryRelayServers(t *testing.T) {
},
}
fedClient := &testFedClient{}
stats := statistics.NewStatistics(testDB, FailuresUntilBlacklist, FailuresUntilAssumedOffline)
stats := statistics.NewStatistics(testDB, FailuresUntilBlacklist, FailuresUntilAssumedOffline, false)
queues := queue.NewOutgoingQueues(
testDB, process.NewProcessContext(),
false,
@ -153,7 +153,7 @@ func TestRemoveRelayServers(t *testing.T) {
},
}
fedClient := &testFedClient{}
stats := statistics.NewStatistics(testDB, FailuresUntilBlacklist, FailuresUntilAssumedOffline)
stats := statistics.NewStatistics(testDB, FailuresUntilBlacklist, FailuresUntilAssumedOffline, false)
queues := queue.NewOutgoingQueues(
testDB, process.NewProcessContext(),
false,
@ -193,7 +193,7 @@ func TestPerformDirectoryLookup(t *testing.T) {
},
}
fedClient := &testFedClient{}
stats := statistics.NewStatistics(testDB, FailuresUntilBlacklist, FailuresUntilAssumedOffline)
stats := statistics.NewStatistics(testDB, FailuresUntilBlacklist, FailuresUntilAssumedOffline, false)
queues := queue.NewOutgoingQueues(
testDB, process.NewProcessContext(),
false,
@ -232,7 +232,7 @@ func TestPerformDirectoryLookupRelaying(t *testing.T) {
},
}
fedClient := &testFedClient{}
stats := statistics.NewStatistics(testDB, FailuresUntilBlacklist, FailuresUntilAssumedOffline)
stats := statistics.NewStatistics(testDB, FailuresUntilBlacklist, FailuresUntilAssumedOffline, true)
queues := queue.NewOutgoingQueues(
testDB, process.NewProcessContext(),
false,

View file

@ -117,7 +117,7 @@ func testSetup(failuresUntilBlacklist uint32, failuresUntilAssumedOffline uint32
txRelayCount: *atomic.NewUint32(0),
}
stats := statistics.NewStatistics(db, failuresUntilBlacklist, failuresUntilAssumedOffline)
stats := statistics.NewStatistics(db, failuresUntilBlacklist, failuresUntilAssumedOffline, false)
signingInfo := []*fclient.SigningIdentity{
{
KeyID: "ed21019:auto",

View file

@ -34,12 +34,15 @@ type Statistics struct {
// mark the destination as offline. At this point we should attempt
// to send messages to the user's async relay servers if we know them.
FailuresUntilAssumedOffline uint32
enableRelays bool
}
func NewStatistics(
db storage.Database,
failuresUntilBlacklist uint32,
failuresUntilAssumedOffline uint32,
enableRelays bool,
) Statistics {
return Statistics{
DB: db,
@ -47,6 +50,7 @@ func NewStatistics(
FailuresUntilAssumedOffline: failuresUntilAssumedOffline,
backoffTimers: make(map[spec.ServerName]*time.Timer),
servers: make(map[spec.ServerName]*ServerStatistics),
enableRelays: enableRelays,
}
}
@ -73,20 +77,25 @@ func (s *Statistics) ForServer(serverName spec.ServerName) *ServerStatistics {
} else {
server.blacklisted.Store(blacklisted)
}
assumedOffline, err := s.DB.IsServerAssumedOffline(context.Background(), serverName)
if err != nil {
logrus.WithError(err).Errorf("Failed to get assumed offline entry %q", serverName)
} else {
server.assumedOffline.Store(assumedOffline)
}
knownRelayServers, err := s.DB.P2PGetRelayServersForServer(context.Background(), serverName)
if err != nil {
logrus.WithError(err).Errorf("Failed to get relay server list for %q", serverName)
} else {
server.relayMutex.Lock()
server.knownRelayServers = knownRelayServers
server.relayMutex.Unlock()
// Don't bother hitting the database 2 additional times
// if we don't want to use relays.
if s.enableRelays {
assumedOffline, err := s.DB.IsServerAssumedOffline(context.Background(), serverName)
if err != nil {
logrus.WithError(err).Errorf("Failed to get assumed offline entry %q", serverName)
} else {
server.assumedOffline.Store(assumedOffline)
}
knownRelayServers, err := s.DB.P2PGetRelayServersForServer(context.Background(), serverName)
if err != nil {
logrus.WithError(err).Errorf("Failed to get relay server list for %q", serverName)
} else {
server.relayMutex.Lock()
server.knownRelayServers = knownRelayServers
server.relayMutex.Unlock()
}
}
}
return server

View file

@ -16,7 +16,7 @@ const (
)
func TestBackoff(t *testing.T) {
stats := NewStatistics(nil, FailuresUntilBlacklist, FailuresUntilAssumedOffline)
stats := NewStatistics(nil, FailuresUntilBlacklist, FailuresUntilAssumedOffline, false)
server := ServerStatistics{
statistics: &stats,
serverName: "test.com",
@ -106,7 +106,7 @@ func TestBackoff(t *testing.T) {
}
func TestRelayServersListing(t *testing.T) {
stats := NewStatistics(test.NewInMemoryFederationDatabase(), FailuresUntilBlacklist, FailuresUntilAssumedOffline)
stats := NewStatistics(test.NewInMemoryFederationDatabase(), FailuresUntilBlacklist, FailuresUntilAssumedOffline, false)
server := ServerStatistics{statistics: &stats}
server.AddRelayServers([]spec.ServerName{"relay1", "relay1", "relay2"})
relayServers := server.KnownRelayServers()

View file

@ -18,6 +18,13 @@ type FederationAPI struct {
// The default value is 16 if not specified, which is circa 18 hours.
FederationMaxRetries uint32 `yaml:"send_max_retries"`
// P2P Feature: Whether relaying to specific nodes should be enabled.
// Defaults to false.
// Note: Enabling relays introduces a huge startup delay, if you are not using
// relays and have many servers to re-hydrate on start. Only enable this
// if you are using relays!
EnableRelays bool `yaml:"enable_relays"`
// P2P Feature: How many consecutive failures that we should tolerate when
// sending federation requests to a specific server until we should assume they
// are offline. If we assume they are offline then we will attempt to send