Handle errors differently in the DeviceListUpdater
(#2695)
`If a device list update goes missing, the server resyncs on the next one` was failing because a previous test would receive a `waitTime` of 1h, resulting in the test timing out. This now tries to handle the returned errors differently, e.g. by using the default `waitTime` of 2s. Also doesn't try further users in the list, if one of the errors would cause a longer `waitTime`.
This commit is contained in:
parent
847032df36
commit
440eb0f3a2
|
@ -53,9 +53,9 @@ func (d *Database) AssociateEDUWithDestination(
|
||||||
// Keep EDUs for at least x minutes before deleting them
|
// Keep EDUs for at least x minutes before deleting them
|
||||||
expiresAt = gomatrixserverlib.AsTimestamp(time.Now().Add(duration))
|
expiresAt = gomatrixserverlib.AsTimestamp(time.Now().Add(duration))
|
||||||
}
|
}
|
||||||
// We forcibly set m.direct_to_device events to 0, as we always want them
|
// We forcibly set m.direct_to_device and m.device_list_update events
|
||||||
// to be delivered. (required for E2EE)
|
// to 0, as we always want them to be delivered. (required for E2EE)
|
||||||
if eduType == gomatrixserverlib.MDirectToDevice {
|
if eduType == gomatrixserverlib.MDirectToDevice || eduType == gomatrixserverlib.MDeviceListUpdate {
|
||||||
expiresAt = 0
|
expiresAt = 0
|
||||||
}
|
}
|
||||||
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||||
|
|
|
@ -24,7 +24,7 @@ import (
|
||||||
"net/url"
|
"net/url"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
opentracing "github.com/opentracing/opentracing-go"
|
"github.com/opentracing/opentracing-go"
|
||||||
"github.com/opentracing/opentracing-go/ext"
|
"github.com/opentracing/opentracing-go/ext"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -81,8 +81,8 @@ func PostJSON[reqtype, restype any, errtype error](
|
||||||
return fmt.Errorf("HTTP %d from %s (no response body)", res.StatusCode, apiURL)
|
return fmt.Errorf("HTTP %d from %s (no response body)", res.StatusCode, apiURL)
|
||||||
}
|
}
|
||||||
var reserr errtype
|
var reserr errtype
|
||||||
if err = json.Unmarshal(body, reserr); err != nil {
|
if err = json.Unmarshal(body, &reserr); err != nil {
|
||||||
return fmt.Errorf("HTTP %d from %s", res.StatusCode, apiURL)
|
return fmt.Errorf("HTTP %d from %s - %w", res.StatusCode, apiURL, err)
|
||||||
}
|
}
|
||||||
return reserr
|
return reserr
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,9 +19,11 @@ import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"hash/fnv"
|
"hash/fnv"
|
||||||
|
"net"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/matrix-org/gomatrix"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/matrix-org/util"
|
"github.com/matrix-org/util"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
@ -388,6 +390,8 @@ func (u *DeviceListUpdater) processServer(serverName gomatrixserverlib.ServerNam
|
||||||
return waitTime, true
|
return waitTime, true
|
||||||
}
|
}
|
||||||
failCount := 0
|
failCount := 0
|
||||||
|
|
||||||
|
userLoop:
|
||||||
for _, userID := range userIDs {
|
for _, userID := range userIDs {
|
||||||
if ctx.Err() != nil {
|
if ctx.Err() != nil {
|
||||||
// we've timed out, give up and go to the back of the queue to let another server be processed.
|
// we've timed out, give up and go to the back of the queue to let another server be processed.
|
||||||
|
@ -397,19 +401,35 @@ func (u *DeviceListUpdater) processServer(serverName gomatrixserverlib.ServerNam
|
||||||
res, err := u.fedClient.GetUserDevices(ctx, serverName, userID)
|
res, err := u.fedClient.GetUserDevices(ctx, serverName, userID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
failCount += 1
|
failCount += 1
|
||||||
fcerr, ok := err.(*fedsenderapi.FederationClientError)
|
switch e := err.(type) {
|
||||||
if ok {
|
case *fedsenderapi.FederationClientError:
|
||||||
if fcerr.RetryAfter > 0 {
|
if e.RetryAfter > 0 {
|
||||||
waitTime = fcerr.RetryAfter
|
waitTime = e.RetryAfter
|
||||||
} else if fcerr.Blacklisted {
|
break userLoop
|
||||||
|
} else if e.Blacklisted {
|
||||||
waitTime = time.Hour * 8
|
waitTime = time.Hour * 8
|
||||||
} else {
|
break userLoop
|
||||||
// For all other errors (DNS resolution, network etc.) wait 1 hour.
|
|
||||||
waitTime = time.Hour
|
|
||||||
}
|
}
|
||||||
} else {
|
case net.Error:
|
||||||
waitTime = time.Hour
|
// Use the default waitTime, if it's a timeout.
|
||||||
logger.WithError(err).WithField("user_id", userID).Debug("GetUserDevices returned unknown error type")
|
// It probably doesn't make sense to try further users.
|
||||||
|
if !e.Timeout() {
|
||||||
|
waitTime = time.Minute * 10
|
||||||
|
logrus.WithError(e).Error("GetUserDevices returned net.Error")
|
||||||
|
break userLoop
|
||||||
|
}
|
||||||
|
case gomatrix.HTTPError:
|
||||||
|
// The remote server returned an error, give it some time to recover
|
||||||
|
if e.Code >= 500 {
|
||||||
|
waitTime = time.Minute * 10
|
||||||
|
logrus.WithError(e).Error("GetUserDevices returned gomatrix.HTTPError")
|
||||||
|
break userLoop
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
// Something else failed
|
||||||
|
waitTime = time.Minute * 10
|
||||||
|
logger.WithError(err).WithField("user_id", userID).Debugf("GetUserDevices returned unknown error type: %T", err)
|
||||||
|
break userLoop
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -437,7 +457,11 @@ func (u *DeviceListUpdater) processServer(serverName gomatrixserverlib.ServerNam
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if failCount > 0 {
|
if failCount > 0 {
|
||||||
logger.WithField("total", len(userIDs)).WithField("failed", failCount).WithField("wait", waitTime).Warn("Failed to query device keys for some users")
|
logger.WithFields(logrus.Fields{
|
||||||
|
"total": len(userIDs),
|
||||||
|
"failed": failCount,
|
||||||
|
"skipped": len(userIDs) - failCount,
|
||||||
|
}).Warn("Failed to query device keys for some users")
|
||||||
}
|
}
|
||||||
for _, userID := range userIDs {
|
for _, userID := range userIDs {
|
||||||
// always clear the channel to unblock Update calls regardless of success/failure
|
// always clear the channel to unblock Update calls regardless of success/failure
|
||||||
|
|
|
@ -742,3 +742,4 @@ User in private room doesn't appear in user directory
|
||||||
User joining then leaving public room appears and dissappears from directory
|
User joining then leaving public room appears and dissappears from directory
|
||||||
User in remote room doesn't appear in user directory after server left room
|
User in remote room doesn't appear in user directory after server left room
|
||||||
User in shared private room does appear in user directory until leave
|
User in shared private room does appear in user directory until leave
|
||||||
|
If a device list update goes missing, the server resyncs on the next one
|
Loading…
Reference in a new issue