From a1b7e4ef3f9231ce6933cd2f5e7e93525e840293 Mon Sep 17 00:00:00 2001 From: Kegsay Date: Tue, 23 Mar 2021 11:33:36 +0000 Subject: [PATCH] log less for failed key querys, add counters for incoming pdus/edus (#1801) * log less for failed key querys, add counters for incoming pdus/edus * use labels * Blacklist flakey test * Fix metrics --- federationapi/routing/send.go | 28 ++++++++++++++++++++++++ federationsender/consumers/eduserver.go | 3 +-- keyserver/internal/device_list_update.go | 14 +++++++----- sytest-blacklist | 5 ++++- 4 files changed, 41 insertions(+), 9 deletions(-) diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index ea0b54b61..d43ed8324 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -30,9 +30,34 @@ import ( "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" + "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" ) +var ( + pduCountTotal = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "dendrite", + Subsystem: "federationapi", + Name: "recv_pdus", + }, + []string{"status"}, + ) + eduCountTotal = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: "dendrite", + Subsystem: "federationapi", + Name: "recv_edus", + }, + ) +) + +func init() { + prometheus.MustRegister( + pduCountTotal, eduCountTotal, + ) +} + // Send implements /_matrix/federation/v1/send/{txnID} func Send( httpReq *http.Request, @@ -133,6 +158,7 @@ func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.Res pdus := []*gomatrixserverlib.HeaderedEvent{} for _, pdu := range t.PDUs { + pduCountTotal.WithLabelValues("total").Inc() var header struct { RoomID string `json:"room_id"` } @@ -224,6 +250,7 @@ func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.Res } } else { results[e.EventID()] = gomatrixserverlib.PDUResult{} + pduCountTotal.WithLabelValues("success").Inc() } } @@ -281,6 +308,7 @@ func (t *txnReq) haveEventIDs() map[string]bool { func (t *txnReq) processEDUs(ctx context.Context) { for _, e := range t.EDUs { + eduCountTotal.Inc() switch e.Type { case gomatrixserverlib.MTyping: // https://matrix.org/docs/spec/server_server/latest#typing-notifications diff --git a/federationsender/consumers/eduserver.go b/federationsender/consumers/eduserver.go index 639cd7315..9a1ec1e26 100644 --- a/federationsender/consumers/eduserver.go +++ b/federationsender/consumers/eduserver.go @@ -212,8 +212,7 @@ func (t *OutputEDUConsumer) onReceiptEvent(msg *sarama.ConsumerMessage) error { return nil } if receiptServerName != t.ServerName { - log.WithField("other_server", receiptServerName).Info("Suppressing receipt notif: originated elsewhere") - return nil + return nil // don't log, very spammy as it logs for each remote receipt } joined, err := t.db.GetJoinedHosts(context.TODO(), receipt.RoomID) diff --git a/keyserver/internal/device_list_update.go b/keyserver/internal/device_list_update.go index 7f6a14f49..1a4d9818a 100644 --- a/keyserver/internal/device_list_update.go +++ b/keyserver/internal/device_list_update.go @@ -330,16 +330,16 @@ func (u *DeviceListUpdater) processServer(serverName gomatrixserverlib.ServerNam logger.WithError(err).Error("failed to load stale device lists") return waitTime, true } - hasFailures := false + failCount := 0 for _, userID := range userIDs { if ctx.Err() != nil { // we've timed out, give up and go to the back of the queue to let another server be processed. - hasFailures = true + failCount += 1 break } res, err := u.fedClient.GetUserDevices(ctx, serverName, userID) if err != nil { - logger.WithError(err).WithField("user_id", userID).Error("failed to query device keys for user") + failCount += 1 fcerr, ok := err.(*fedsenderapi.FederationClientError) if ok { if fcerr.RetryAfter > 0 { @@ -351,20 +351,22 @@ func (u *DeviceListUpdater) processServer(serverName gomatrixserverlib.ServerNam waitTime = time.Hour logger.WithError(err).Warn("GetUserDevices returned unknown error type") } - hasFailures = true continue } err = u.updateDeviceList(&res) if err != nil { logger.WithError(err).WithField("user_id", userID).Error("fetched device list but failed to store/emit it") - hasFailures = true + failCount += 1 } } + if failCount > 0 { + logger.WithField("total", len(userIDs)).WithField("failed", failCount).Error("failed to query device keys for some users") + } for _, userID := range userIDs { // always clear the channel to unblock Update calls regardless of success/failure u.clearChannel(userID) } - return waitTime, hasFailures + return waitTime, failCount > 0 } func (u *DeviceListUpdater) updateDeviceList(res *gomatrixserverlib.RespUserDevices) error { diff --git a/sytest-blacklist b/sytest-blacklist index b635c9f0e..4d9587d00 100644 --- a/sytest-blacklist +++ b/sytest-blacklist @@ -69,4 +69,7 @@ Forgotten room messages cannot be paginated Can re-join room if re-invited # Blacklisted due to flakiness after #1774 -Local device key changes get to remote servers with correct prev_id \ No newline at end of file +Local device key changes get to remote servers with correct prev_id + +# Flakey +Local device key changes appear in /keys/changes \ No newline at end of file