mirror of
https://github.com/matrix-org/dendrite.git
synced 2026-01-21 13:03:09 -06:00
Fed sending tweaks to make s&f more robust
This commit is contained in:
parent
ab8a3e44ef
commit
0d99625b86
|
|
@ -410,34 +410,49 @@ func (oq *destinationQueue) nextTransaction(
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
relayServers := oq.statistics.KnownRelayServers()
|
relayServers := oq.statistics.KnownRelayServers()
|
||||||
if oq.statistics.AssumedOffline() && len(relayServers) > 0 {
|
hasRelayServers := len(relayServers) > 0
|
||||||
sendMethod = statistics.SendViaRelay
|
shouldSendToRelays := oq.statistics.AssumedOffline() && hasRelayServers
|
||||||
relaySuccess := false
|
if !shouldSendToRelays {
|
||||||
logrus.Infof("Sending to relay servers: %v", relayServers)
|
|
||||||
// TODO : how to pass through actual userID here?!?!?!?!
|
|
||||||
userID, userErr := gomatrixserverlib.NewUserID("@user:"+string(oq.destination), false)
|
|
||||||
if userErr != nil {
|
|
||||||
return userErr, sendMethod
|
|
||||||
}
|
|
||||||
|
|
||||||
// Attempt sending to each known relay server.
|
|
||||||
for _, relayServer := range relayServers {
|
|
||||||
_, relayErr := oq.client.P2PSendTransactionToRelay(ctx, *userID, t, relayServer)
|
|
||||||
if relayErr != nil {
|
|
||||||
err = relayErr
|
|
||||||
} else {
|
|
||||||
// If sending to one of the relay servers succeeds, consider the send successful.
|
|
||||||
relaySuccess = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Clear the error if sending to any of the relay servers succeeded.
|
|
||||||
if relaySuccess {
|
|
||||||
err = nil
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
sendMethod = statistics.SendDirect
|
sendMethod = statistics.SendDirect
|
||||||
_, err = oq.client.SendTransaction(ctx, t)
|
_, err = oq.client.SendTransaction(ctx, t)
|
||||||
|
} else {
|
||||||
|
// Try sending directly to the destination first in case they came back online.
|
||||||
|
sendMethod = statistics.SendDirect
|
||||||
|
_, err = oq.client.SendTransaction(ctx, t)
|
||||||
|
if err != nil {
|
||||||
|
// The destination is still offline, try sending to relays.
|
||||||
|
sendMethod = statistics.SendViaRelay
|
||||||
|
relaySuccess := false
|
||||||
|
logrus.Infof("Sending %q to relay servers: %v", t.TransactionID, relayServers)
|
||||||
|
// TODO : how to pass through actual userID here?!?!?!?!
|
||||||
|
userID, userErr := gomatrixserverlib.NewUserID("@user:"+string(oq.destination), false)
|
||||||
|
if userErr != nil {
|
||||||
|
return userErr, sendMethod
|
||||||
|
}
|
||||||
|
|
||||||
|
// Attempt sending to each known relay server.
|
||||||
|
for _, relayServer := range relayServers {
|
||||||
|
_, relayErr := oq.client.P2PSendTransactionToRelay(ctx, *userID, t, relayServer)
|
||||||
|
if relayErr != nil {
|
||||||
|
err = relayErr
|
||||||
|
} else {
|
||||||
|
// If sending to one of the relay servers succeeds, consider the send successful.
|
||||||
|
relaySuccess = true
|
||||||
|
|
||||||
|
// TODO : what about if the dest comes back online but can't see their relay?
|
||||||
|
// How do I sync with the dest in that case?
|
||||||
|
// Should change the database to have a "relay success" flag on events and if
|
||||||
|
// I see the node back online, maybe directly send through the backlog of events
|
||||||
|
// with "relay success"... could lead to duplicate events, but only those that
|
||||||
|
// I sent. And will lead to a much more consistent experience.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Clear the error if sending to any of the relay servers succeeded.
|
||||||
|
if relaySuccess {
|
||||||
|
err = nil
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
switch errResponse := err.(type) {
|
switch errResponse := err.(type) {
|
||||||
case nil:
|
case nil:
|
||||||
|
|
|
||||||
|
|
@ -164,6 +164,8 @@ func (s *ServerStatistics) Success(method SendMethod) {
|
||||||
logrus.WithError(err).Errorf("Failed to remove %q from blacklist", s.serverName)
|
logrus.WithError(err).Errorf("Failed to remove %q from blacklist", s.serverName)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
s.removeAssumedOffline()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -49,7 +49,7 @@ func (c *FederationAPI) Defaults(opts DefaultOpts) {
|
||||||
c.Database.Defaults(10)
|
c.Database.Defaults(10)
|
||||||
}
|
}
|
||||||
c.FederationMaxRetries = 16
|
c.FederationMaxRetries = 16
|
||||||
c.P2PFederationRetriesUntilAssumedOffline = 2
|
c.P2PFederationRetriesUntilAssumedOffline = 1
|
||||||
c.DisableTLSValidation = false
|
c.DisableTLSValidation = false
|
||||||
c.DisableHTTPKeepalives = false
|
c.DisableHTTPKeepalives = false
|
||||||
if opts.Generate {
|
if opts.Generate {
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue