From 802f1c96f804f7a146e4e12e25b20c980a6af870 Mon Sep 17 00:00:00 2001 From: Kegsay Date: Tue, 23 Mar 2021 15:22:00 +0000 Subject: [PATCH] Add more metrics (#1802) * Add more metrics * Linting --- federationapi/routing/send.go | 55 +++++++++++++++++++++--- keyserver/internal/device_list_update.go | 20 +++++++++ 2 files changed, 68 insertions(+), 7 deletions(-) diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index d43ed8324..1dd874d48 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -34,27 +34,55 @@ import ( "github.com/sirupsen/logrus" ) +const ( + // Event was passed to the roomserver + MetricsOutcomeOK = "ok" + // Event failed to be processed + MetricsOutcomeFail = "fail" + // Event failed auth checks + MetricsOutcomeRejected = "rejected" + // Terminated the transaction + MetricsOutcomeFatal = "fatal" + // The event has missing auth_events we need to fetch + MetricsWorkMissingAuthEvents = "missing_auth_events" + // No work had to be done as we had all prev/auth events + MetricsWorkDirect = "direct" + // The event has missing prev_events we need to call /g_m_e for + MetricsWorkMissingPrevEvents = "missing_prev_events" +) + var ( pduCountTotal = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: "dendrite", Subsystem: "federationapi", Name: "recv_pdus", + Help: "Number of incoming PDUs from remote servers with labels for success", }, - []string{"status"}, + []string{"status"}, // 'success' or 'total' ) eduCountTotal = prometheus.NewCounter( prometheus.CounterOpts{ Namespace: "dendrite", Subsystem: "federationapi", Name: "recv_edus", + Help: "Number of incoming EDUs from remote servers", }, ) + processEventSummary = prometheus.NewSummaryVec( + prometheus.SummaryOpts{ + Namespace: "dendrite", + Subsystem: "federationapi", + Name: "process_event", + Help: "How long it takes to process an incoming event and what work had to be done for it", + }, + []string{"work", "outcome"}, + ) ) func init() { prometheus.MustRegister( - pduCountTotal, eduCountTotal, + pduCountTotal, eduCountTotal, processEventSummary, ) } @@ -140,6 +168,7 @@ type txnReq struct { // new events which the roomserver does not know about newEvents map[string]bool newEventsMutex sync.RWMutex + work string // metrics } // A subset of FederationClient functionality that txn requires. Useful for testing. @@ -212,6 +241,7 @@ func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.Res // Process the events. for _, e := range pdus { + evStart := time.Now() if err := t.processEvent(ctx, e.Unwrap()); err != nil { // If the error is due to the event itself being bad then we skip // it and move onto the next event. We report an error so that the @@ -233,17 +263,25 @@ func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.Res // our server so we should bail processing the transaction entirely. util.GetLogger(ctx).Warnf("Processing %s failed fatally: %s", e.EventID(), err) jsonErr := util.ErrorResponse(err) + processEventSummary.WithLabelValues(t.work, MetricsOutcomeFatal).Observe( + float64(time.Since(evStart).Nanoseconds()) / 1000., + ) return nil, &jsonErr } else { // Auth errors mean the event is 'rejected' which have to be silent to appease sytest errMsg := "" + outcome := MetricsOutcomeRejected _, rejected := err.(*gomatrixserverlib.NotAllowed) if !rejected { errMsg = err.Error() + outcome = MetricsOutcomeFail } util.GetLogger(ctx).WithError(err).WithField("event_id", e.EventID()).WithField("rejected", rejected).Warn( "Failed to process incoming federation event, skipping", ) + processEventSummary.WithLabelValues(t.work, outcome).Observe( + float64(time.Since(evStart).Nanoseconds()) / 1000., + ) results[e.EventID()] = gomatrixserverlib.PDUResult{ Error: errMsg, } @@ -251,6 +289,9 @@ func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.Res } else { results[e.EventID()] = gomatrixserverlib.PDUResult{} pduCountTotal.WithLabelValues("success").Inc() + processEventSummary.WithLabelValues(t.work, MetricsOutcomeOK).Observe( + float64(time.Since(evStart).Nanoseconds()) / 1000., + ) } } @@ -452,6 +493,7 @@ func (t *txnReq) getServers(ctx context.Context, roomID string) []gomatrixserver func (t *txnReq) processEvent(ctx context.Context, e *gomatrixserverlib.Event) error { logger := util.GetLogger(ctx).WithField("event_id", e.EventID()).WithField("room_id", e.RoomID()) + t.work = "" // reset from previous event // Work out if the roomserver knows everything it needs to know to auth // the event. This includes the prev_events and auth_events. @@ -480,6 +522,7 @@ func (t *txnReq) processEvent(ctx context.Context, e *gomatrixserverlib.Event) e } if len(stateResp.MissingAuthEventIDs) > 0 { + t.work = MetricsWorkMissingAuthEvents logger.Infof("Event refers to %d unknown auth_events", len(stateResp.MissingAuthEventIDs)) if err := t.retrieveMissingAuthEvents(ctx, e, &stateResp); err != nil { return fmt.Errorf("t.retrieveMissingAuthEvents: %w", err) @@ -487,9 +530,11 @@ func (t *txnReq) processEvent(ctx context.Context, e *gomatrixserverlib.Event) e } if len(stateResp.MissingPrevEventIDs) > 0 { + t.work = MetricsWorkMissingPrevEvents logger.Infof("Event refers to %d unknown prev_events", len(stateResp.MissingPrevEventIDs)) return t.processEventWithMissingState(ctx, e, stateResp.RoomVersion) } + t.work = MetricsWorkDirect // pass the event to the roomserver which will do auth checks // If the event fail auth checks, gmsl.NotAllowed error will be returned which we be silently @@ -784,7 +829,7 @@ func (t *txnReq) lookupStateAfterEventLocally(ctx context.Context, roomID, event queryReq := api.QueryEventsByIDRequest{ EventIDs: missingEventList, } - util.GetLogger(ctx).Infof("Fetching missing auth events: %v", missingEventList) + util.GetLogger(ctx).WithField("count", len(missingEventList)).Infof("Fetching missing auth events") var queryRes api.QueryEventsByIDResponse if err = t.rsAPI.QueryEventsByID(ctx, &queryReq, &queryRes); err != nil { return nil @@ -854,10 +899,6 @@ retryAllowedState: }, nil } -// getMissingEvents returns a nil backwardsExtremity if missing events were fetched and handled, else returns the new backwards extremity which we should -// begin from. Returns an error only if we should terminate the transaction which initiated /get_missing_events -// This function recursively calls txnReq.processEvent with the missing events, which will be processed before this function returns. -// This means that we may recursively call this function, as we spider back up prev_events. func (t *txnReq) getMissingEvents(ctx context.Context, e *gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) (newEvents []*gomatrixserverlib.Event, err error) { logger := util.GetLogger(ctx).WithField("event_id", e.EventID()).WithField("room_id", e.RoomID()) needed := gomatrixserverlib.StateNeededForAuth([]*gomatrixserverlib.Event{e}) diff --git a/keyserver/internal/device_list_update.go b/keyserver/internal/device_list_update.go index 1a4d9818a..bd563ef35 100644 --- a/keyserver/internal/device_list_update.go +++ b/keyserver/internal/device_list_update.go @@ -26,9 +26,28 @@ import ( "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" + "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" ) +var ( + deviceListUpdateCount = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "dendrite", + Subsystem: "keyserver", + Name: "device_list_update", + Help: "Number of times we have attempted to update device lists from this server", + }, + []string{"server"}, + ) +) + +func init() { + prometheus.MustRegister( + deviceListUpdateCount, + ) +} + // DeviceListUpdater handles device list updates from remote servers. // // In the case where we have the prev_id for an update, the updater just stores the update (after acquiring a per-user lock). @@ -319,6 +338,7 @@ func (u *DeviceListUpdater) worker(ch chan gomatrixserverlib.ServerName) { } func (u *DeviceListUpdater) processServer(serverName gomatrixserverlib.ServerName) (time.Duration, bool) { + deviceListUpdateCount.WithLabelValues(string(serverName)).Inc() requestTimeout := time.Second * 30 // max amount of time we want to spend on each request ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) defer cancel()