Allow more time for device list updates (#2749)
This updates the device list updater so that it has a context per-request, rather than a global 30 seconds for the entire server. This could mean that talking to a slow remote server or requesting a lot of user IDs was pretty much guaranteed to fail. It also uses the process context to allow correct cancellation when Dendrite wants to shut down cleanly.
This commit is contained in:
parent
9005e5b4a8
commit
8a82f10046
|
@ -17,6 +17,7 @@ package internal
|
|||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"hash/fnv"
|
||||
"net"
|
||||
|
@ -31,6 +32,7 @@ import (
|
|||
|
||||
fedsenderapi "github.com/matrix-org/dendrite/federationapi/api"
|
||||
"github.com/matrix-org/dendrite/keyserver/api"
|
||||
"github.com/matrix-org/dendrite/setup/process"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -45,6 +47,9 @@ var (
|
|||
)
|
||||
)
|
||||
|
||||
const defaultWaitTime = time.Minute
|
||||
const requestTimeout = time.Second * 30
|
||||
|
||||
func init() {
|
||||
prometheus.MustRegister(
|
||||
deviceListUpdateCount,
|
||||
|
@ -80,6 +85,7 @@ func init() {
|
|||
// In the event that the query fails, a lock is acquired and the server name along with the time to wait before retrying is
|
||||
// set in a map. A restarter goroutine periodically probes this map and injects servers which are ready to be retried.
|
||||
type DeviceListUpdater struct {
|
||||
process *process.ProcessContext
|
||||
// A map from user_id to a mutex. Used when we are missing prev IDs so we don't make more than 1
|
||||
// request to the remote server and race.
|
||||
// TODO: Put in an LRU cache to bound growth
|
||||
|
@ -131,10 +137,12 @@ type KeyChangeProducer interface {
|
|||
|
||||
// NewDeviceListUpdater creates a new updater which fetches fresh device lists when they go stale.
|
||||
func NewDeviceListUpdater(
|
||||
db DeviceListUpdaterDatabase, api DeviceListUpdaterAPI, producer KeyChangeProducer,
|
||||
process *process.ProcessContext, db DeviceListUpdaterDatabase,
|
||||
api DeviceListUpdaterAPI, producer KeyChangeProducer,
|
||||
fedClient fedsenderapi.KeyserverFederationAPI, numWorkers int,
|
||||
) *DeviceListUpdater {
|
||||
return &DeviceListUpdater{
|
||||
process: process,
|
||||
userIDToMutex: make(map[string]*sync.Mutex),
|
||||
mu: &sync.Mutex{},
|
||||
db: db,
|
||||
|
@ -234,7 +242,7 @@ func (u *DeviceListUpdater) update(ctx context.Context, event gomatrixserverlib.
|
|||
"prev_ids": event.PrevID,
|
||||
"display_name": event.DeviceDisplayName,
|
||||
"deleted": event.Deleted,
|
||||
}).Info("DeviceListUpdater.Update")
|
||||
}).Trace("DeviceListUpdater.Update")
|
||||
|
||||
// if we haven't missed anything update the database and notify users
|
||||
if exists || event.Deleted {
|
||||
|
@ -378,74 +386,99 @@ func (u *DeviceListUpdater) worker(ch chan gomatrixserverlib.ServerName) {
|
|||
}
|
||||
|
||||
func (u *DeviceListUpdater) processServer(serverName gomatrixserverlib.ServerName) (time.Duration, bool) {
|
||||
deviceListUpdateCount.WithLabelValues(string(serverName)).Inc()
|
||||
requestTimeout := time.Second * 30 // max amount of time we want to spend on each request
|
||||
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
||||
defer cancel()
|
||||
ctx := u.process.Context()
|
||||
logger := util.GetLogger(ctx).WithField("server_name", serverName)
|
||||
waitTime := 2 * time.Second
|
||||
// fetch stale device lists
|
||||
deviceListUpdateCount.WithLabelValues(string(serverName)).Inc()
|
||||
|
||||
waitTime := defaultWaitTime // How long should we wait to try again?
|
||||
successCount := 0 // How many user requests failed?
|
||||
|
||||
userIDs, err := u.db.StaleDeviceLists(ctx, []gomatrixserverlib.ServerName{serverName})
|
||||
if err != nil {
|
||||
logger.WithError(err).Error("Failed to load stale device lists")
|
||||
return waitTime, true
|
||||
}
|
||||
failCount := 0
|
||||
|
||||
userLoop:
|
||||
defer func() {
|
||||
for _, userID := range userIDs {
|
||||
if ctx.Err() != nil {
|
||||
// we've timed out, give up and go to the back of the queue to let another server be processed.
|
||||
failCount += 1
|
||||
waitTime = time.Minute * 10
|
||||
// always clear the channel to unblock Update calls regardless of success/failure
|
||||
u.clearChannel(userID)
|
||||
}
|
||||
}()
|
||||
|
||||
for _, userID := range userIDs {
|
||||
userWait, err := u.processServerUser(ctx, serverName, userID)
|
||||
if err != nil {
|
||||
if userWait > waitTime {
|
||||
waitTime = userWait
|
||||
}
|
||||
break
|
||||
}
|
||||
successCount++
|
||||
}
|
||||
|
||||
allUsersSucceeded := successCount == len(userIDs)
|
||||
if !allUsersSucceeded {
|
||||
logger.WithFields(logrus.Fields{
|
||||
"total": len(userIDs),
|
||||
"succeeded": successCount,
|
||||
"failed": len(userIDs) - successCount,
|
||||
"wait_time": waitTime,
|
||||
}).Warn("Failed to query device keys for some users")
|
||||
}
|
||||
return waitTime, !allUsersSucceeded
|
||||
}
|
||||
|
||||
func (u *DeviceListUpdater) processServerUser(ctx context.Context, serverName gomatrixserverlib.ServerName, userID string) (time.Duration, error) {
|
||||
ctx, cancel := context.WithTimeout(ctx, requestTimeout)
|
||||
defer cancel()
|
||||
logger := util.GetLogger(ctx).WithFields(logrus.Fields{
|
||||
"server_name": serverName,
|
||||
"user_id": userID,
|
||||
})
|
||||
|
||||
res, err := u.fedClient.GetUserDevices(ctx, serverName, userID)
|
||||
if err != nil {
|
||||
failCount += 1
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
// we've timed out, give up and go to the back of the queue to let another server be processed.
|
||||
waitTime = time.Minute * 10
|
||||
break userLoop
|
||||
default:
|
||||
if errors.Is(err, context.DeadlineExceeded) {
|
||||
return time.Minute * 10, err
|
||||
}
|
||||
switch e := err.(type) {
|
||||
case *json.UnmarshalTypeError, *json.SyntaxError:
|
||||
logger.WithError(err).Debugf("Device list update for %q contained invalid JSON", userID)
|
||||
return defaultWaitTime, nil
|
||||
case *fedsenderapi.FederationClientError:
|
||||
if e.RetryAfter > 0 {
|
||||
waitTime = e.RetryAfter
|
||||
return e.RetryAfter, err
|
||||
} else if e.Blacklisted {
|
||||
waitTime = time.Hour * 8
|
||||
break userLoop
|
||||
return time.Hour * 8, err
|
||||
} else if e.Code >= 300 {
|
||||
// We didn't get a real FederationClientError (e.g. in polylith mode, where gomatrix.HTTPError
|
||||
// are "converted" to FederationClientError), but we probably shouldn't hit them every $waitTime seconds.
|
||||
waitTime = time.Hour
|
||||
break userLoop
|
||||
return time.Hour, err
|
||||
}
|
||||
case net.Error:
|
||||
// Use the default waitTime, if it's a timeout.
|
||||
// It probably doesn't make sense to try further users.
|
||||
if !e.Timeout() {
|
||||
waitTime = time.Minute * 10
|
||||
logger.WithError(e).Error("GetUserDevices returned net.Error")
|
||||
break userLoop
|
||||
logger.WithError(e).Debug("GetUserDevices returned net.Error")
|
||||
return time.Minute * 10, err
|
||||
}
|
||||
case gomatrix.HTTPError:
|
||||
// The remote server returned an error, give it some time to recover.
|
||||
// This is to avoid spamming remote servers, which may not be Matrix servers anymore.
|
||||
if e.Code >= 300 {
|
||||
waitTime = time.Hour
|
||||
logger.WithError(e).Error("GetUserDevices returned gomatrix.HTTPError")
|
||||
break userLoop
|
||||
logger.WithError(e).Debug("GetUserDevices returned gomatrix.HTTPError")
|
||||
return time.Hour, err
|
||||
}
|
||||
default:
|
||||
// Something else failed
|
||||
waitTime = time.Minute * 10
|
||||
logger.WithError(err).WithField("user_id", userID).Debugf("GetUserDevices returned unknown error type: %T", err)
|
||||
break userLoop
|
||||
logger.WithError(err).Debugf("GetUserDevices returned unknown error type: %T", err)
|
||||
return time.Minute * 10, err
|
||||
}
|
||||
continue
|
||||
}
|
||||
if res.UserID != userID {
|
||||
logger.WithError(err).Debugf("User ID %q in device list update response doesn't match expected %q", res.UserID, userID)
|
||||
return defaultWaitTime, nil
|
||||
}
|
||||
if res.MasterKey != nil || res.SelfSigningKey != nil {
|
||||
uploadReq := &api.PerformUploadDeviceKeysRequest{
|
||||
|
@ -466,23 +499,10 @@ userLoop:
|
|||
}
|
||||
err = u.updateDeviceList(&res)
|
||||
if err != nil {
|
||||
logger.WithError(err).WithField("user_id", userID).Error("Fetched device list but failed to store/emit it")
|
||||
failCount += 1
|
||||
logger.WithError(err).Error("Fetched device list but failed to store/emit it")
|
||||
return defaultWaitTime, err
|
||||
}
|
||||
}
|
||||
if failCount > 0 {
|
||||
logger.WithFields(logrus.Fields{
|
||||
"total": len(userIDs),
|
||||
"failed": failCount,
|
||||
"skipped": len(userIDs) - failCount,
|
||||
"waittime": waitTime,
|
||||
}).Warn("Failed to query device keys for some users")
|
||||
}
|
||||
for _, userID := range userIDs {
|
||||
// always clear the channel to unblock Update calls regardless of success/failure
|
||||
u.clearChannel(userID)
|
||||
}
|
||||
return waitTime, failCount > 0
|
||||
return defaultWaitTime, nil
|
||||
}
|
||||
|
||||
func (u *DeviceListUpdater) updateDeviceList(res *gomatrixserverlib.RespUserDevices) error {
|
||||
|
|
|
@ -30,6 +30,7 @@ import (
|
|||
"github.com/matrix-org/gomatrixserverlib"
|
||||
|
||||
"github.com/matrix-org/dendrite/keyserver/api"
|
||||
"github.com/matrix-org/dendrite/setup/process"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -146,7 +147,7 @@ func TestUpdateHavePrevID(t *testing.T) {
|
|||
}
|
||||
ap := &mockDeviceListUpdaterAPI{}
|
||||
producer := &mockKeyChangeProducer{}
|
||||
updater := NewDeviceListUpdater(db, ap, producer, nil, 1)
|
||||
updater := NewDeviceListUpdater(process.NewProcessContext(), db, ap, producer, nil, 1)
|
||||
event := gomatrixserverlib.DeviceListUpdateEvent{
|
||||
DeviceDisplayName: "Foo Bar",
|
||||
Deleted: false,
|
||||
|
@ -218,7 +219,7 @@ func TestUpdateNoPrevID(t *testing.T) {
|
|||
`)),
|
||||
}, nil
|
||||
})
|
||||
updater := NewDeviceListUpdater(db, ap, producer, fedClient, 2)
|
||||
updater := NewDeviceListUpdater(process.NewProcessContext(), db, ap, producer, fedClient, 2)
|
||||
if err := updater.Start(); err != nil {
|
||||
t.Fatalf("failed to start updater: %s", err)
|
||||
}
|
||||
|
@ -287,7 +288,7 @@ func TestDebounce(t *testing.T) {
|
|||
close(incomingFedReq)
|
||||
return <-fedCh, nil
|
||||
})
|
||||
updater := NewDeviceListUpdater(db, ap, producer, fedClient, 1)
|
||||
updater := NewDeviceListUpdater(process.NewProcessContext(), db, ap, producer, fedClient, 1)
|
||||
if err := updater.Start(); err != nil {
|
||||
t.Fatalf("failed to start updater: %s", err)
|
||||
}
|
||||
|
|
|
@ -58,7 +58,7 @@ func NewInternalAPI(
|
|||
FedClient: fedClient,
|
||||
Producer: keyChangeProducer,
|
||||
}
|
||||
updater := internal.NewDeviceListUpdater(db, ap, keyChangeProducer, fedClient, 8) // 8 workers TODO: configurable
|
||||
updater := internal.NewDeviceListUpdater(base.ProcessContext, db, ap, keyChangeProducer, fedClient, 8) // 8 workers TODO: configurable
|
||||
ap.Updater = updater
|
||||
go func() {
|
||||
if err := updater.Start(); err != nil {
|
||||
|
|
Loading…
Reference in a new issue