mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-07 15:03:09 -06:00
Tweaks around the device list updater
This commit is contained in:
parent
94ecd95433
commit
688129fbcb
|
|
@ -180,11 +180,27 @@ func (u *DeviceListUpdater) Start() error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Filter out dupe domains, as processServer is going to get all users anyway
|
||||||
|
seenDomains := make(map[spec.ServerName]struct{})
|
||||||
|
newStateLists := make([]string, 0, len(staleLists))
|
||||||
|
for _, userID := range staleLists {
|
||||||
|
_, domain, err := gomatrixserverlib.SplitID('@', userID)
|
||||||
|
if err != nil {
|
||||||
|
// non-fatal and should not block starting up
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if _, ok := seenDomains[domain]; ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
newStateLists = append(newStateLists, userID)
|
||||||
|
seenDomains[domain] = struct{}{}
|
||||||
|
}
|
||||||
offset, step := time.Second*10, time.Second
|
offset, step := time.Second*10, time.Second
|
||||||
if max := len(staleLists); max > 120 {
|
if max := len(newStateLists); max > 120 {
|
||||||
step = (time.Second * 120) / time.Duration(max)
|
step = (time.Second * 120) / time.Duration(max)
|
||||||
}
|
}
|
||||||
for _, userID := range staleLists {
|
for _, userID := range newStateLists {
|
||||||
userID := userID // otherwise we are only sending the last entry
|
userID := userID // otherwise we are only sending the last entry
|
||||||
time.AfterFunc(offset, func() {
|
time.AfterFunc(offset, func() {
|
||||||
u.notifyWorkers(userID)
|
u.notifyWorkers(userID)
|
||||||
|
|
@ -467,6 +483,11 @@ func (u *DeviceListUpdater) processServer(serverName spec.ServerName) (time.Dura
|
||||||
func (u *DeviceListUpdater) processServerUser(ctx context.Context, serverName spec.ServerName, userID string) (time.Duration, error) {
|
func (u *DeviceListUpdater) processServerUser(ctx context.Context, serverName spec.ServerName, userID string) (time.Duration, error) {
|
||||||
ctx, cancel := context.WithTimeout(ctx, requestTimeout)
|
ctx, cancel := context.WithTimeout(ctx, requestTimeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
|
// If we are processing more than one user per server, this unblocks further calls to Update
|
||||||
|
// immediately instead of just after **all** users have been processed.
|
||||||
|
defer u.clearChannel(userID)
|
||||||
|
|
||||||
logger := util.GetLogger(ctx).WithFields(logrus.Fields{
|
logger := util.GetLogger(ctx).WithFields(logrus.Fields{
|
||||||
"server_name": serverName,
|
"server_name": serverName,
|
||||||
"user_id": userID,
|
"user_id": userID,
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue