log less for failed key querys, add counters for incoming pdus/edus (#1801)
* log less for failed key querys, add counters for incoming pdus/edus * use labels * Blacklist flakey test * Fix metrics
This commit is contained in:
parent
01267a34b9
commit
a1b7e4ef3f
|
@ -30,9 +30,34 @@ import (
|
||||||
"github.com/matrix-org/dendrite/setup/config"
|
"github.com/matrix-org/dendrite/setup/config"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/matrix-org/util"
|
"github.com/matrix-org/util"
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
pduCountTotal = prometheus.NewCounterVec(
|
||||||
|
prometheus.CounterOpts{
|
||||||
|
Namespace: "dendrite",
|
||||||
|
Subsystem: "federationapi",
|
||||||
|
Name: "recv_pdus",
|
||||||
|
},
|
||||||
|
[]string{"status"},
|
||||||
|
)
|
||||||
|
eduCountTotal = prometheus.NewCounter(
|
||||||
|
prometheus.CounterOpts{
|
||||||
|
Namespace: "dendrite",
|
||||||
|
Subsystem: "federationapi",
|
||||||
|
Name: "recv_edus",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
prometheus.MustRegister(
|
||||||
|
pduCountTotal, eduCountTotal,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
// Send implements /_matrix/federation/v1/send/{txnID}
|
// Send implements /_matrix/federation/v1/send/{txnID}
|
||||||
func Send(
|
func Send(
|
||||||
httpReq *http.Request,
|
httpReq *http.Request,
|
||||||
|
@ -133,6 +158,7 @@ func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.Res
|
||||||
|
|
||||||
pdus := []*gomatrixserverlib.HeaderedEvent{}
|
pdus := []*gomatrixserverlib.HeaderedEvent{}
|
||||||
for _, pdu := range t.PDUs {
|
for _, pdu := range t.PDUs {
|
||||||
|
pduCountTotal.WithLabelValues("total").Inc()
|
||||||
var header struct {
|
var header struct {
|
||||||
RoomID string `json:"room_id"`
|
RoomID string `json:"room_id"`
|
||||||
}
|
}
|
||||||
|
@ -224,6 +250,7 @@ func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.Res
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
results[e.EventID()] = gomatrixserverlib.PDUResult{}
|
results[e.EventID()] = gomatrixserverlib.PDUResult{}
|
||||||
|
pduCountTotal.WithLabelValues("success").Inc()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -281,6 +308,7 @@ func (t *txnReq) haveEventIDs() map[string]bool {
|
||||||
|
|
||||||
func (t *txnReq) processEDUs(ctx context.Context) {
|
func (t *txnReq) processEDUs(ctx context.Context) {
|
||||||
for _, e := range t.EDUs {
|
for _, e := range t.EDUs {
|
||||||
|
eduCountTotal.Inc()
|
||||||
switch e.Type {
|
switch e.Type {
|
||||||
case gomatrixserverlib.MTyping:
|
case gomatrixserverlib.MTyping:
|
||||||
// https://matrix.org/docs/spec/server_server/latest#typing-notifications
|
// https://matrix.org/docs/spec/server_server/latest#typing-notifications
|
||||||
|
|
|
@ -212,8 +212,7 @@ func (t *OutputEDUConsumer) onReceiptEvent(msg *sarama.ConsumerMessage) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if receiptServerName != t.ServerName {
|
if receiptServerName != t.ServerName {
|
||||||
log.WithField("other_server", receiptServerName).Info("Suppressing receipt notif: originated elsewhere")
|
return nil // don't log, very spammy as it logs for each remote receipt
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
joined, err := t.db.GetJoinedHosts(context.TODO(), receipt.RoomID)
|
joined, err := t.db.GetJoinedHosts(context.TODO(), receipt.RoomID)
|
||||||
|
|
|
@ -330,16 +330,16 @@ func (u *DeviceListUpdater) processServer(serverName gomatrixserverlib.ServerNam
|
||||||
logger.WithError(err).Error("failed to load stale device lists")
|
logger.WithError(err).Error("failed to load stale device lists")
|
||||||
return waitTime, true
|
return waitTime, true
|
||||||
}
|
}
|
||||||
hasFailures := false
|
failCount := 0
|
||||||
for _, userID := range userIDs {
|
for _, userID := range userIDs {
|
||||||
if ctx.Err() != nil {
|
if ctx.Err() != nil {
|
||||||
// we've timed out, give up and go to the back of the queue to let another server be processed.
|
// we've timed out, give up and go to the back of the queue to let another server be processed.
|
||||||
hasFailures = true
|
failCount += 1
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
res, err := u.fedClient.GetUserDevices(ctx, serverName, userID)
|
res, err := u.fedClient.GetUserDevices(ctx, serverName, userID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.WithError(err).WithField("user_id", userID).Error("failed to query device keys for user")
|
failCount += 1
|
||||||
fcerr, ok := err.(*fedsenderapi.FederationClientError)
|
fcerr, ok := err.(*fedsenderapi.FederationClientError)
|
||||||
if ok {
|
if ok {
|
||||||
if fcerr.RetryAfter > 0 {
|
if fcerr.RetryAfter > 0 {
|
||||||
|
@ -351,20 +351,22 @@ func (u *DeviceListUpdater) processServer(serverName gomatrixserverlib.ServerNam
|
||||||
waitTime = time.Hour
|
waitTime = time.Hour
|
||||||
logger.WithError(err).Warn("GetUserDevices returned unknown error type")
|
logger.WithError(err).Warn("GetUserDevices returned unknown error type")
|
||||||
}
|
}
|
||||||
hasFailures = true
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
err = u.updateDeviceList(&res)
|
err = u.updateDeviceList(&res)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.WithError(err).WithField("user_id", userID).Error("fetched device list but failed to store/emit it")
|
logger.WithError(err).WithField("user_id", userID).Error("fetched device list but failed to store/emit it")
|
||||||
hasFailures = true
|
failCount += 1
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if failCount > 0 {
|
||||||
|
logger.WithField("total", len(userIDs)).WithField("failed", failCount).Error("failed to query device keys for some users")
|
||||||
|
}
|
||||||
for _, userID := range userIDs {
|
for _, userID := range userIDs {
|
||||||
// always clear the channel to unblock Update calls regardless of success/failure
|
// always clear the channel to unblock Update calls regardless of success/failure
|
||||||
u.clearChannel(userID)
|
u.clearChannel(userID)
|
||||||
}
|
}
|
||||||
return waitTime, hasFailures
|
return waitTime, failCount > 0
|
||||||
}
|
}
|
||||||
|
|
||||||
func (u *DeviceListUpdater) updateDeviceList(res *gomatrixserverlib.RespUserDevices) error {
|
func (u *DeviceListUpdater) updateDeviceList(res *gomatrixserverlib.RespUserDevices) error {
|
||||||
|
|
|
@ -70,3 +70,6 @@ Can re-join room if re-invited
|
||||||
|
|
||||||
# Blacklisted due to flakiness after #1774
|
# Blacklisted due to flakiness after #1774
|
||||||
Local device key changes get to remote servers with correct prev_id
|
Local device key changes get to remote servers with correct prev_id
|
||||||
|
|
||||||
|
# Flakey
|
||||||
|
Local device key changes appear in /keys/changes
|
Loading…
Reference in a new issue