mirror of
https://github.com/matrix-org/dendrite.git
synced 2026-01-22 05:23:09 -06:00
Refactor stats success to clarify whether sent via relay or direct
This commit is contained in:
parent
75fd677081
commit
cba0644ec2
|
|
@ -164,7 +164,7 @@ func (a *FederationInternalAPI) doRequestIfNotBackingOffOrBlacklisted(
|
||||||
RetryAfter: retryAfter,
|
RetryAfter: retryAfter,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
stats.Success(false)
|
stats.Success(statistics.SendDirect)
|
||||||
return res, nil
|
return res, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -14,6 +14,7 @@ import (
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/federationapi/api"
|
"github.com/matrix-org/dendrite/federationapi/api"
|
||||||
"github.com/matrix-org/dendrite/federationapi/consumers"
|
"github.com/matrix-org/dendrite/federationapi/consumers"
|
||||||
|
"github.com/matrix-org/dendrite/federationapi/statistics"
|
||||||
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
|
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
|
||||||
"github.com/matrix-org/dendrite/roomserver/version"
|
"github.com/matrix-org/dendrite/roomserver/version"
|
||||||
)
|
)
|
||||||
|
|
@ -41,7 +42,7 @@ func (r *FederationInternalAPI) PerformDirectoryLookup(
|
||||||
}
|
}
|
||||||
response.RoomID = dir.RoomID
|
response.RoomID = dir.RoomID
|
||||||
response.ServerNames = dir.Servers
|
response.ServerNames = dir.Servers
|
||||||
r.statistics.ForServer(request.ServerName).Success(false)
|
r.statistics.ForServer(request.ServerName).Success(statistics.SendDirect)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -174,7 +175,7 @@ func (r *FederationInternalAPI) performJoinUsingServer(
|
||||||
r.statistics.ForServer(serverName).Failure()
|
r.statistics.ForServer(serverName).Failure()
|
||||||
return fmt.Errorf("r.federation.MakeJoin: %w", err)
|
return fmt.Errorf("r.federation.MakeJoin: %w", err)
|
||||||
}
|
}
|
||||||
r.statistics.ForServer(serverName).Success(false)
|
r.statistics.ForServer(serverName).Success(statistics.SendDirect)
|
||||||
|
|
||||||
// Set all the fields to be what they should be, this should be a no-op
|
// Set all the fields to be what they should be, this should be a no-op
|
||||||
// but it's possible that the remote server returned us something "odd"
|
// but it's possible that the remote server returned us something "odd"
|
||||||
|
|
@ -229,7 +230,7 @@ func (r *FederationInternalAPI) performJoinUsingServer(
|
||||||
r.statistics.ForServer(serverName).Failure()
|
r.statistics.ForServer(serverName).Failure()
|
||||||
return fmt.Errorf("r.federation.SendJoin: %w", err)
|
return fmt.Errorf("r.federation.SendJoin: %w", err)
|
||||||
}
|
}
|
||||||
r.statistics.ForServer(serverName).Success(false)
|
r.statistics.ForServer(serverName).Success(statistics.SendDirect)
|
||||||
|
|
||||||
// If the remote server returned an event in the "event" key of
|
// If the remote server returned an event in the "event" key of
|
||||||
// the send_join request then we should use that instead. It may
|
// the send_join request then we should use that instead. It may
|
||||||
|
|
@ -461,7 +462,7 @@ func (r *FederationInternalAPI) performOutboundPeekUsingServer(
|
||||||
r.statistics.ForServer(serverName).Failure()
|
r.statistics.ForServer(serverName).Failure()
|
||||||
return fmt.Errorf("r.federation.Peek: %w", err)
|
return fmt.Errorf("r.federation.Peek: %w", err)
|
||||||
}
|
}
|
||||||
r.statistics.ForServer(serverName).Success(false)
|
r.statistics.ForServer(serverName).Success(statistics.SendDirect)
|
||||||
|
|
||||||
// Work out if we support the room version that has been supplied in
|
// Work out if we support the room version that has been supplied in
|
||||||
// the peek response.
|
// the peek response.
|
||||||
|
|
@ -605,7 +606,7 @@ func (r *FederationInternalAPI) PerformLeave(
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
r.statistics.ForServer(serverName).Success(false)
|
r.statistics.ForServer(serverName).Success(statistics.SendDirect)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -400,7 +400,7 @@ func (oq *destinationQueue) backgroundSend() {
|
||||||
func (oq *destinationQueue) nextTransaction(
|
func (oq *destinationQueue) nextTransaction(
|
||||||
pdus []*queuedPDU,
|
pdus []*queuedPDU,
|
||||||
edus []*queuedEDU,
|
edus []*queuedEDU,
|
||||||
) (err error, relaySuccess bool) {
|
) (err error, sendMethod statistics.SendMethod) {
|
||||||
// Create the transaction.
|
// Create the transaction.
|
||||||
t, pduReceipts, eduReceipts := oq.createTransaction(pdus, edus)
|
t, pduReceipts, eduReceipts := oq.createTransaction(pdus, edus)
|
||||||
logrus.WithField("server_name", oq.destination).Debugf("Sending transaction %q containing %d PDUs, %d EDUs", t.TransactionID, len(t.PDUs), len(t.EDUs))
|
logrus.WithField("server_name", oq.destination).Debugf("Sending transaction %q containing %d PDUs, %d EDUs", t.TransactionID, len(t.PDUs), len(t.EDUs))
|
||||||
|
|
@ -411,11 +411,13 @@ func (oq *destinationQueue) nextTransaction(
|
||||||
|
|
||||||
relayServers := oq.statistics.KnownRelayServers()
|
relayServers := oq.statistics.KnownRelayServers()
|
||||||
if oq.statistics.AssumedOffline() && len(relayServers) > 0 {
|
if oq.statistics.AssumedOffline() && len(relayServers) > 0 {
|
||||||
|
sendMethod = statistics.SendViaRelay
|
||||||
|
relaySuccess := false
|
||||||
logrus.Infof("Sending to relay servers: %v", relayServers)
|
logrus.Infof("Sending to relay servers: %v", relayServers)
|
||||||
// TODO : how to pass through actual userID here?!?!?!?!
|
// TODO : how to pass through actual userID here?!?!?!?!
|
||||||
userID, userErr := gomatrixserverlib.NewUserID("@user:"+string(oq.destination), false)
|
userID, userErr := gomatrixserverlib.NewUserID("@user:"+string(oq.destination), false)
|
||||||
if userErr != nil {
|
if userErr != nil {
|
||||||
return userErr, false
|
return userErr, sendMethod
|
||||||
}
|
}
|
||||||
for _, relayServer := range relayServers {
|
for _, relayServer := range relayServers {
|
||||||
_, relayErr := oq.client.P2PSendTransactionToRelay(ctx, *userID, t, relayServer)
|
_, relayErr := oq.client.P2PSendTransactionToRelay(ctx, *userID, t, relayServer)
|
||||||
|
|
@ -429,6 +431,7 @@ func (oq *destinationQueue) nextTransaction(
|
||||||
err = nil
|
err = nil
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
sendMethod = statistics.SendDirect
|
||||||
_, err = oq.client.SendTransaction(ctx, t)
|
_, err = oq.client.SendTransaction(ctx, t)
|
||||||
}
|
}
|
||||||
switch errResponse := err.(type) {
|
switch errResponse := err.(type) {
|
||||||
|
|
@ -450,7 +453,7 @@ func (oq *destinationQueue) nextTransaction(
|
||||||
oq.transactionIDMutex.Lock()
|
oq.transactionIDMutex.Lock()
|
||||||
oq.transactionID = ""
|
oq.transactionID = ""
|
||||||
oq.transactionIDMutex.Unlock()
|
oq.transactionIDMutex.Unlock()
|
||||||
return nil, relaySuccess
|
return nil, sendMethod
|
||||||
case gomatrix.HTTPError:
|
case gomatrix.HTTPError:
|
||||||
// Report that we failed to send the transaction and we
|
// Report that we failed to send the transaction and we
|
||||||
// will retry again, subject to backoff.
|
// will retry again, subject to backoff.
|
||||||
|
|
@ -460,13 +463,13 @@ func (oq *destinationQueue) nextTransaction(
|
||||||
// to a 400-ish error
|
// to a 400-ish error
|
||||||
code := errResponse.Code
|
code := errResponse.Code
|
||||||
logrus.Debug("Transaction failed with HTTP", code)
|
logrus.Debug("Transaction failed with HTTP", code)
|
||||||
return err, false
|
return err, sendMethod
|
||||||
default:
|
default:
|
||||||
logrus.WithFields(logrus.Fields{
|
logrus.WithFields(logrus.Fields{
|
||||||
"destination": oq.destination,
|
"destination": oq.destination,
|
||||||
logrus.ErrorKey: err,
|
logrus.ErrorKey: err,
|
||||||
}).Debugf("Failed to send transaction %q", t.TransactionID)
|
}).Debugf("Failed to send transaction %q", t.TransactionID)
|
||||||
return err, false
|
return err, sendMethod
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -553,10 +556,11 @@ func (oq *destinationQueue) blacklistDestination() {
|
||||||
|
|
||||||
// handleTransactionSuccess updates the cached event queues as well as the success and
|
// handleTransactionSuccess updates the cached event queues as well as the success and
|
||||||
// backoff information for this server.
|
// backoff information for this server.
|
||||||
func (oq *destinationQueue) handleTransactionSuccess(pduCount int, eduCount int, relaySuccess bool) {
|
func (oq *destinationQueue) handleTransactionSuccess(pduCount int, eduCount int, sendMethod statistics.SendMethod) {
|
||||||
// If we successfully sent the transaction then clear out
|
// If we successfully sent the transaction then clear out
|
||||||
// the pending events and EDUs, and wipe our transaction ID.
|
// the pending events and EDUs, and wipe our transaction ID.
|
||||||
oq.statistics.Success(relaySuccess)
|
|
||||||
|
oq.statistics.Success(sendMethod)
|
||||||
oq.pendingMutex.Lock()
|
oq.pendingMutex.Lock()
|
||||||
defer oq.pendingMutex.Unlock()
|
defer oq.pendingMutex.Unlock()
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -89,6 +89,13 @@ func (s *Statistics) ForServer(serverName gomatrixserverlib.ServerName) *ServerS
|
||||||
return server
|
return server
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type SendMethod uint8
|
||||||
|
|
||||||
|
const (
|
||||||
|
SendDirect SendMethod = iota
|
||||||
|
SendViaRelay
|
||||||
|
)
|
||||||
|
|
||||||
// ServerStatistics contains information about our interactions with a
|
// ServerStatistics contains information about our interactions with a
|
||||||
// remote federated host, e.g. how many times we were successful, how
|
// remote federated host, e.g. how many times we were successful, how
|
||||||
// many times we failed etc. It also manages the backoff time and black-
|
// many times we failed etc. It also manages the backoff time and black-
|
||||||
|
|
@ -139,16 +146,16 @@ func (s *ServerStatistics) AssignBackoffNotifier(notifier func()) {
|
||||||
// attempt, which increases the sent counter and resets the idle and
|
// attempt, which increases the sent counter and resets the idle and
|
||||||
// failure counters. If a host was blacklisted at this point then
|
// failure counters. If a host was blacklisted at this point then
|
||||||
// we will unblacklist it.
|
// we will unblacklist it.
|
||||||
// `async` specifies whether the success was to the actual destination
|
// `relay` specifies whether the success was to the actual destination
|
||||||
// or one of their relay servers.
|
// or one of their relay servers.
|
||||||
func (s *ServerStatistics) Success(async bool) {
|
func (s *ServerStatistics) Success(method SendMethod) {
|
||||||
s.cancel()
|
s.cancel()
|
||||||
s.backoffCount.Store(0)
|
s.backoffCount.Store(0)
|
||||||
// NOTE : Sending to the final destination vs. a relay server has
|
// NOTE : Sending to the final destination vs. a relay server has
|
||||||
// slightly different semantics.
|
// slightly different semantics.
|
||||||
if !async {
|
if method == SendDirect {
|
||||||
s.successCounter.Inc()
|
s.successCounter.Inc()
|
||||||
if s.statistics.DB != nil {
|
if s.blacklisted.Load() && s.statistics.DB != nil {
|
||||||
if err := s.statistics.DB.RemoveServerFromBlacklist(s.serverName); err != nil {
|
if err := s.statistics.DB.RemoveServerFromBlacklist(s.serverName); err != nil {
|
||||||
logrus.WithError(err).Errorf("Failed to remove %q from blacklist", s.serverName)
|
logrus.WithError(err).Errorf("Failed to remove %q from blacklist", s.serverName)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -18,7 +18,7 @@ func TestBackoff(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start by checking that counting successes works.
|
// Start by checking that counting successes works.
|
||||||
server.Success(false)
|
server.Success(SendDirect)
|
||||||
if successes := server.SuccessCount(); successes != 1 {
|
if successes := server.SuccessCount(); successes != 1 {
|
||||||
t.Fatalf("Expected success count 1, got %d", successes)
|
t.Fatalf("Expected success count 1, got %d", successes)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue