mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-06 14:33:10 -06:00
Review comments, check blacklisted when processing a server
This commit is contained in:
parent
95c99bce90
commit
1408ccf3b8
|
|
@ -381,6 +381,8 @@ func (u *DeviceListUpdater) notifyWorkers(userID string) {
|
||||||
index := int(int64(hash.Sum32()) % int64(len(u.workerChans)))
|
index := int(int64(hash.Sum32()) % int64(len(u.workerChans)))
|
||||||
|
|
||||||
ch := u.assignChannel(userID)
|
ch := u.assignChannel(userID)
|
||||||
|
// Since workerChans are buffered, we only increment here and let the worker
|
||||||
|
// decrement it once it is done processing.
|
||||||
deviceListUpdaterBackpressure.With(prometheus.Labels{"worker_id": strconv.Itoa(index)}).Inc()
|
deviceListUpdaterBackpressure.With(prometheus.Labels{"worker_id": strconv.Itoa(index)}).Inc()
|
||||||
u.workerChans[index] <- remoteServer
|
u.workerChans[index] <- remoteServer
|
||||||
select {
|
select {
|
||||||
|
|
@ -418,15 +420,13 @@ func (u *DeviceListUpdater) worker(ch chan spec.ServerName, workerID int) {
|
||||||
go func() {
|
go func() {
|
||||||
var serversToRetry []spec.ServerName
|
var serversToRetry []spec.ServerName
|
||||||
for {
|
for {
|
||||||
|
// nuke serversToRetry by re-slicing it to be "empty".
|
||||||
|
// The capacity of the slice is unchanged, which ensures we can reuse the memory.
|
||||||
|
serversToRetry = serversToRetry[:0]
|
||||||
|
|
||||||
deviceListUpdaterServersRetrying.With(prometheus.Labels{"worker_id": strconv.Itoa(workerID)}).Set(float64(len(retries)))
|
deviceListUpdaterServersRetrying.With(prometheus.Labels{"worker_id": strconv.Itoa(workerID)}).Set(float64(len(retries)))
|
||||||
time.Sleep(time.Second * 2)
|
time.Sleep(time.Second * 2)
|
||||||
|
|
||||||
// The channel is at capacity, don't try to send more work
|
|
||||||
if len(ch) == cap(ch) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
serversToRetry = serversToRetry[:0] // reuse memory
|
|
||||||
|
|
||||||
// -2, so we have space for incoming device list updates over federation
|
// -2, so we have space for incoming device list updates over federation
|
||||||
maxServers := (cap(ch) - len(ch)) - 2
|
maxServers := (cap(ch) - len(ch)) - 2
|
||||||
if maxServers <= 0 {
|
if maxServers <= 0 {
|
||||||
|
|
@ -459,9 +459,18 @@ func (u *DeviceListUpdater) worker(ch chan spec.ServerName, workerID int) {
|
||||||
retriesMu.Lock()
|
retriesMu.Lock()
|
||||||
_, exists := retries[serverName]
|
_, exists := retries[serverName]
|
||||||
retriesMu.Unlock()
|
retriesMu.Unlock()
|
||||||
if exists {
|
|
||||||
|
// If the serverName is coming from retries, maybe it was
|
||||||
|
// blacklisted in the meantime.
|
||||||
|
_, err := u.isBlacklistedOrBackingOffFn(serverName)
|
||||||
|
var federationClientError *fedsenderapi.FederationClientError
|
||||||
|
// unwrap errors and check for FederationClientError, if found, federationClientError will be not nil
|
||||||
|
errors.As(err, &federationClientError)
|
||||||
|
isBlacklisted := federationClientError != nil && federationClientError.Blacklisted
|
||||||
|
|
||||||
|
// Don't retry a server that we're already waiting for or is blacklisted by now.
|
||||||
|
if exists || isBlacklisted {
|
||||||
deviceListUpdaterBackpressure.With(prometheus.Labels{"worker_id": strconv.Itoa(workerID)}).Dec()
|
deviceListUpdaterBackpressure.With(prometheus.Labels{"worker_id": strconv.Itoa(workerID)}).Dec()
|
||||||
// Don't retry a server that we're already waiting for.
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
waitTime, shouldRetry := u.processServer(serverName)
|
waitTime, shouldRetry := u.processServer(serverName)
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue