diff --git a/federationapi/internal/api.go b/federationapi/internal/api.go index 778387bdc..99773a750 100644 --- a/federationapi/internal/api.go +++ b/federationapi/internal/api.go @@ -164,7 +164,7 @@ func (a *FederationInternalAPI) doRequestIfNotBackingOffOrBlacklisted( RetryAfter: retryAfter, } } - stats.Success(false) + stats.Success(statistics.SendDirect) return res, nil } diff --git a/federationapi/internal/perform.go b/federationapi/internal/perform.go index 6ddbacbef..1179801e6 100644 --- a/federationapi/internal/perform.go +++ b/federationapi/internal/perform.go @@ -14,6 +14,7 @@ import ( "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/federationapi/consumers" + "github.com/matrix-org/dendrite/federationapi/statistics" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/version" ) @@ -41,7 +42,7 @@ func (r *FederationInternalAPI) PerformDirectoryLookup( } response.RoomID = dir.RoomID response.ServerNames = dir.Servers - r.statistics.ForServer(request.ServerName).Success(false) + r.statistics.ForServer(request.ServerName).Success(statistics.SendDirect) return nil } @@ -174,7 +175,7 @@ func (r *FederationInternalAPI) performJoinUsingServer( r.statistics.ForServer(serverName).Failure() return fmt.Errorf("r.federation.MakeJoin: %w", err) } - r.statistics.ForServer(serverName).Success(false) + r.statistics.ForServer(serverName).Success(statistics.SendDirect) // Set all the fields to be what they should be, this should be a no-op // but it's possible that the remote server returned us something "odd" @@ -229,7 +230,7 @@ func (r *FederationInternalAPI) performJoinUsingServer( r.statistics.ForServer(serverName).Failure() return fmt.Errorf("r.federation.SendJoin: %w", err) } - r.statistics.ForServer(serverName).Success(false) + r.statistics.ForServer(serverName).Success(statistics.SendDirect) // If the remote server returned an event in the "event" key of // the send_join request then we should use that instead. It may @@ -461,7 +462,7 @@ func (r *FederationInternalAPI) performOutboundPeekUsingServer( r.statistics.ForServer(serverName).Failure() return fmt.Errorf("r.federation.Peek: %w", err) } - r.statistics.ForServer(serverName).Success(false) + r.statistics.ForServer(serverName).Success(statistics.SendDirect) // Work out if we support the room version that has been supplied in // the peek response. @@ -605,7 +606,7 @@ func (r *FederationInternalAPI) PerformLeave( continue } - r.statistics.ForServer(serverName).Success(false) + r.statistics.ForServer(serverName).Success(statistics.SendDirect) return nil } diff --git a/federationapi/queue/destinationqueue.go b/federationapi/queue/destinationqueue.go index c4f938577..19d41fe3d 100644 --- a/federationapi/queue/destinationqueue.go +++ b/federationapi/queue/destinationqueue.go @@ -400,7 +400,7 @@ func (oq *destinationQueue) backgroundSend() { func (oq *destinationQueue) nextTransaction( pdus []*queuedPDU, edus []*queuedEDU, -) (err error, relaySuccess bool) { +) (err error, sendMethod statistics.SendMethod) { // Create the transaction. t, pduReceipts, eduReceipts := oq.createTransaction(pdus, edus) logrus.WithField("server_name", oq.destination).Debugf("Sending transaction %q containing %d PDUs, %d EDUs", t.TransactionID, len(t.PDUs), len(t.EDUs)) @@ -411,11 +411,13 @@ func (oq *destinationQueue) nextTransaction( relayServers := oq.statistics.KnownRelayServers() if oq.statistics.AssumedOffline() && len(relayServers) > 0 { + sendMethod = statistics.SendViaRelay + relaySuccess := false logrus.Infof("Sending to relay servers: %v", relayServers) // TODO : how to pass through actual userID here?!?!?!?! userID, userErr := gomatrixserverlib.NewUserID("@user:"+string(oq.destination), false) if userErr != nil { - return userErr, false + return userErr, sendMethod } for _, relayServer := range relayServers { _, relayErr := oq.client.P2PSendTransactionToRelay(ctx, *userID, t, relayServer) @@ -429,6 +431,7 @@ func (oq *destinationQueue) nextTransaction( err = nil } } else { + sendMethod = statistics.SendDirect _, err = oq.client.SendTransaction(ctx, t) } switch errResponse := err.(type) { @@ -450,7 +453,7 @@ func (oq *destinationQueue) nextTransaction( oq.transactionIDMutex.Lock() oq.transactionID = "" oq.transactionIDMutex.Unlock() - return nil, relaySuccess + return nil, sendMethod case gomatrix.HTTPError: // Report that we failed to send the transaction and we // will retry again, subject to backoff. @@ -460,13 +463,13 @@ func (oq *destinationQueue) nextTransaction( // to a 400-ish error code := errResponse.Code logrus.Debug("Transaction failed with HTTP", code) - return err, false + return err, sendMethod default: logrus.WithFields(logrus.Fields{ "destination": oq.destination, logrus.ErrorKey: err, }).Debugf("Failed to send transaction %q", t.TransactionID) - return err, false + return err, sendMethod } } @@ -553,10 +556,11 @@ func (oq *destinationQueue) blacklistDestination() { // handleTransactionSuccess updates the cached event queues as well as the success and // backoff information for this server. -func (oq *destinationQueue) handleTransactionSuccess(pduCount int, eduCount int, relaySuccess bool) { +func (oq *destinationQueue) handleTransactionSuccess(pduCount int, eduCount int, sendMethod statistics.SendMethod) { // If we successfully sent the transaction then clear out // the pending events and EDUs, and wipe our transaction ID. - oq.statistics.Success(relaySuccess) + + oq.statistics.Success(sendMethod) oq.pendingMutex.Lock() defer oq.pendingMutex.Unlock() diff --git a/federationapi/statistics/statistics.go b/federationapi/statistics/statistics.go index 2803c891d..5395f70a3 100644 --- a/federationapi/statistics/statistics.go +++ b/federationapi/statistics/statistics.go @@ -89,6 +89,13 @@ func (s *Statistics) ForServer(serverName gomatrixserverlib.ServerName) *ServerS return server } +type SendMethod uint8 + +const ( + SendDirect SendMethod = iota + SendViaRelay +) + // ServerStatistics contains information about our interactions with a // remote federated host, e.g. how many times we were successful, how // many times we failed etc. It also manages the backoff time and black- @@ -139,16 +146,16 @@ func (s *ServerStatistics) AssignBackoffNotifier(notifier func()) { // attempt, which increases the sent counter and resets the idle and // failure counters. If a host was blacklisted at this point then // we will unblacklist it. -// `async` specifies whether the success was to the actual destination +// `relay` specifies whether the success was to the actual destination // or one of their relay servers. -func (s *ServerStatistics) Success(async bool) { +func (s *ServerStatistics) Success(method SendMethod) { s.cancel() s.backoffCount.Store(0) // NOTE : Sending to the final destination vs. a relay server has // slightly different semantics. - if !async { + if method == SendDirect { s.successCounter.Inc() - if s.statistics.DB != nil { + if s.blacklisted.Load() && s.statistics.DB != nil { if err := s.statistics.DB.RemoveServerFromBlacklist(s.serverName); err != nil { logrus.WithError(err).Errorf("Failed to remove %q from blacklist", s.serverName) } diff --git a/federationapi/statistics/statistics_test.go b/federationapi/statistics/statistics_test.go index 24e654499..312c19a68 100644 --- a/federationapi/statistics/statistics_test.go +++ b/federationapi/statistics/statistics_test.go @@ -18,7 +18,7 @@ func TestBackoff(t *testing.T) { } // Start by checking that counting successes works. - server.Success(false) + server.Success(SendDirect) if successes := server.SuccessCount(); successes != 1 { t.Fatalf("Expected success count 1, got %d", successes) }