log less for failed key querys, add counters for incoming pdus/edus

This commit is contained in:
Kegan Dougal 2021-03-23 10:16:23 +00:00
parent 01267a34b9
commit cd639b07c4
3 changed files with 43 additions and 8 deletions

View file

@ -30,9 +30,40 @@ import (
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util" "github.com/matrix-org/util"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
var (
pduCountTotal = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "dendrite",
Subsystem: "federationapi",
Name: "recv_pdus",
},
)
eduCountTotal = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "dendrite",
Subsystem: "federationapi",
Name: "recv_edus",
},
)
pduSuccessTotal = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "dendrite",
Subsystem: "federationapi",
Name: "recv_pdus_success",
},
)
)
func init() {
prometheus.MustRegister(
pduCountTotal, eduCountTotal, pduSuccessTotal,
)
}
// Send implements /_matrix/federation/v1/send/{txnID} // Send implements /_matrix/federation/v1/send/{txnID}
func Send( func Send(
httpReq *http.Request, httpReq *http.Request,
@ -133,6 +164,7 @@ func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.Res
pdus := []*gomatrixserverlib.HeaderedEvent{} pdus := []*gomatrixserverlib.HeaderedEvent{}
for _, pdu := range t.PDUs { for _, pdu := range t.PDUs {
pduCountTotal.Inc()
var header struct { var header struct {
RoomID string `json:"room_id"` RoomID string `json:"room_id"`
} }
@ -224,6 +256,7 @@ func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.Res
} }
} else { } else {
results[e.EventID()] = gomatrixserverlib.PDUResult{} results[e.EventID()] = gomatrixserverlib.PDUResult{}
pduSuccessTotal.Inc()
} }
} }
@ -281,6 +314,7 @@ func (t *txnReq) haveEventIDs() map[string]bool {
func (t *txnReq) processEDUs(ctx context.Context) { func (t *txnReq) processEDUs(ctx context.Context) {
for _, e := range t.EDUs { for _, e := range t.EDUs {
eduCountTotal.Inc()
switch e.Type { switch e.Type {
case gomatrixserverlib.MTyping: case gomatrixserverlib.MTyping:
// https://matrix.org/docs/spec/server_server/latest#typing-notifications // https://matrix.org/docs/spec/server_server/latest#typing-notifications

View file

@ -212,8 +212,7 @@ func (t *OutputEDUConsumer) onReceiptEvent(msg *sarama.ConsumerMessage) error {
return nil return nil
} }
if receiptServerName != t.ServerName { if receiptServerName != t.ServerName {
log.WithField("other_server", receiptServerName).Info("Suppressing receipt notif: originated elsewhere") return nil // don't log, very spammy as it logs for each remote receipt
return nil
} }
joined, err := t.db.GetJoinedHosts(context.TODO(), receipt.RoomID) joined, err := t.db.GetJoinedHosts(context.TODO(), receipt.RoomID)

View file

@ -330,16 +330,16 @@ func (u *DeviceListUpdater) processServer(serverName gomatrixserverlib.ServerNam
logger.WithError(err).Error("failed to load stale device lists") logger.WithError(err).Error("failed to load stale device lists")
return waitTime, true return waitTime, true
} }
hasFailures := false failCount := 0
for _, userID := range userIDs { for _, userID := range userIDs {
if ctx.Err() != nil { if ctx.Err() != nil {
// we've timed out, give up and go to the back of the queue to let another server be processed. // we've timed out, give up and go to the back of the queue to let another server be processed.
hasFailures = true failCount += 1
break break
} }
res, err := u.fedClient.GetUserDevices(ctx, serverName, userID) res, err := u.fedClient.GetUserDevices(ctx, serverName, userID)
if err != nil { if err != nil {
logger.WithError(err).WithField("user_id", userID).Error("failed to query device keys for user") failCount += 1
fcerr, ok := err.(*fedsenderapi.FederationClientError) fcerr, ok := err.(*fedsenderapi.FederationClientError)
if ok { if ok {
if fcerr.RetryAfter > 0 { if fcerr.RetryAfter > 0 {
@ -351,20 +351,22 @@ func (u *DeviceListUpdater) processServer(serverName gomatrixserverlib.ServerNam
waitTime = time.Hour waitTime = time.Hour
logger.WithError(err).Warn("GetUserDevices returned unknown error type") logger.WithError(err).Warn("GetUserDevices returned unknown error type")
} }
hasFailures = true
continue continue
} }
err = u.updateDeviceList(&res) err = u.updateDeviceList(&res)
if err != nil { if err != nil {
logger.WithError(err).WithField("user_id", userID).Error("fetched device list but failed to store/emit it") logger.WithError(err).WithField("user_id", userID).Error("fetched device list but failed to store/emit it")
hasFailures = true failCount += 1
} }
} }
if failCount > 0 {
logger.WithField("total", len(userIDs)).WithField("failed", failCount).Error("failed to query device keys for some users")
}
for _, userID := range userIDs { for _, userID := range userIDs {
// always clear the channel to unblock Update calls regardless of success/failure // always clear the channel to unblock Update calls regardless of success/failure
u.clearChannel(userID) u.clearChannel(userID)
} }
return waitTime, hasFailures return waitTime, failCount > 0
} }
func (u *DeviceListUpdater) updateDeviceList(res *gomatrixserverlib.RespUserDevices) error { func (u *DeviceListUpdater) updateDeviceList(res *gomatrixserverlib.RespUserDevices) error {