Fix issue with stale device lists (#2702)
We were only sending the last entry to the worker, so most likely missed updates.
This commit is contained in:
parent
d5876abbe9
commit
42a82091a8
|
@ -4,6 +4,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"net/url"
|
||||||
|
|
||||||
"github.com/gorilla/mux"
|
"github.com/gorilla/mux"
|
||||||
"github.com/matrix-org/gomatrix"
|
"github.com/matrix-org/gomatrix"
|
||||||
|
@ -235,9 +236,17 @@ func federationClientError(err error) error {
|
||||||
return &api.FederationClientError{
|
return &api.FederationClientError{
|
||||||
Code: ferr.Code,
|
Code: ferr.Code,
|
||||||
}
|
}
|
||||||
|
case *url.Error: // e.g. certificate error, unable to connect
|
||||||
|
return &api.FederationClientError{
|
||||||
|
Err: ferr.Error(),
|
||||||
|
Code: 400,
|
||||||
|
}
|
||||||
default:
|
default:
|
||||||
|
// We don't know what exactly failed, but we probably don't
|
||||||
|
// want to retry the request immediately in the device list updater
|
||||||
return &api.FederationClientError{
|
return &api.FederationClientError{
|
||||||
Err: err.Error(),
|
Err: err.Error(),
|
||||||
|
Code: 400,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -167,6 +167,7 @@ func (u *DeviceListUpdater) Start() error {
|
||||||
step = (time.Second * 120) / time.Duration(max)
|
step = (time.Second * 120) / time.Duration(max)
|
||||||
}
|
}
|
||||||
for _, userID := range staleLists {
|
for _, userID := range staleLists {
|
||||||
|
userID := userID // otherwise we are only sending the last entry
|
||||||
time.AfterFunc(offset, func() {
|
time.AfterFunc(offset, func() {
|
||||||
u.notifyWorkers(userID)
|
u.notifyWorkers(userID)
|
||||||
})
|
})
|
||||||
|
@ -396,11 +397,19 @@ userLoop:
|
||||||
if ctx.Err() != nil {
|
if ctx.Err() != nil {
|
||||||
// we've timed out, give up and go to the back of the queue to let another server be processed.
|
// we've timed out, give up and go to the back of the queue to let another server be processed.
|
||||||
failCount += 1
|
failCount += 1
|
||||||
|
waitTime = time.Minute * 10
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
res, err := u.fedClient.GetUserDevices(ctx, serverName, userID)
|
res, err := u.fedClient.GetUserDevices(ctx, serverName, userID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
failCount += 1
|
failCount += 1
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
// we've timed out, give up and go to the back of the queue to let another server be processed.
|
||||||
|
waitTime = time.Minute * 10
|
||||||
|
break userLoop
|
||||||
|
default:
|
||||||
|
}
|
||||||
switch e := err.(type) {
|
switch e := err.(type) {
|
||||||
case *fedsenderapi.FederationClientError:
|
case *fedsenderapi.FederationClientError:
|
||||||
if e.RetryAfter > 0 {
|
if e.RetryAfter > 0 {
|
||||||
|
@ -419,7 +428,7 @@ userLoop:
|
||||||
// It probably doesn't make sense to try further users.
|
// It probably doesn't make sense to try further users.
|
||||||
if !e.Timeout() {
|
if !e.Timeout() {
|
||||||
waitTime = time.Minute * 10
|
waitTime = time.Minute * 10
|
||||||
logrus.WithError(e).Error("GetUserDevices returned net.Error")
|
logger.WithError(e).Error("GetUserDevices returned net.Error")
|
||||||
break userLoop
|
break userLoop
|
||||||
}
|
}
|
||||||
case gomatrix.HTTPError:
|
case gomatrix.HTTPError:
|
||||||
|
@ -427,7 +436,7 @@ userLoop:
|
||||||
// This is to avoid spamming remote servers, which may not be Matrix servers anymore.
|
// This is to avoid spamming remote servers, which may not be Matrix servers anymore.
|
||||||
if e.Code >= 300 {
|
if e.Code >= 300 {
|
||||||
waitTime = time.Hour
|
waitTime = time.Hour
|
||||||
logrus.WithError(e).Error("GetUserDevices returned gomatrix.HTTPError")
|
logger.WithError(e).Error("GetUserDevices returned gomatrix.HTTPError")
|
||||||
break userLoop
|
break userLoop
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
|
|
Loading…
Reference in a new issue