refactor handling of device list update pokes

This commit is contained in:
Kegan Dougal 2021-04-08 11:23:37 +01:00
parent 0530329c77
commit 844a8fcd76

View file

@ -72,10 +72,8 @@ func init() {
// we guarantee we will get around to it. Also, more users on a given server does not increase the number of requests // we guarantee we will get around to it. Also, more users on a given server does not increase the number of requests
// (as /keys/query allows multiple users to be specified) so being stuck behind matrix.org won't materially be any worse // (as /keys/query allows multiple users to be specified) so being stuck behind matrix.org won't materially be any worse
// than being stuck behind foo.bar // than being stuck behind foo.bar
// In the event that the query fails, the worker spins up a short-lived goroutine whose sole purpose is to inject the server // In the event that the query fails, a lock is acquired and the server name along with the time to wait before retrying is
// name back into the channel after a certain amount of time. If in the interim the device lists have been updated, then // set in a map. A restarter goroutine periodically probes this map and injects servers which are ready to be retried.
// the database query will return no stale lists. Reinjection into the channel continues until success or the server terminates,
// when it will be reloaded on startup.
type DeviceListUpdater struct { type DeviceListUpdater struct {
// A map from user_id to a mutex. Used when we are missing prev IDs so we don't make more than 1 // A map from user_id to a mutex. Used when we are missing prev IDs so we don't make more than 1
// request to the remote server and race. // request to the remote server and race.
@ -297,42 +295,38 @@ func (u *DeviceListUpdater) clearChannel(userID string) {
} }
func (u *DeviceListUpdater) worker(ch chan gomatrixserverlib.ServerName) { func (u *DeviceListUpdater) worker(ch chan gomatrixserverlib.ServerName) {
// It's possible to get many of the same server name in the channel, so in order retries := make(map[gomatrixserverlib.ServerName]time.Time)
// to prevent processing the same server over and over we keep track of when we retriesMu := &sync.Mutex{}
// last made a request to the server. If we get the server name during the cooloff // restarter goroutine which will inject failed servers into ch when it is time
// period, we'll ignore the poke. go func() {
lastProcessed := make(map[gomatrixserverlib.ServerName]time.Time) for {
// this can't be too long else sytest will give up trying to do a test var serversToRetry []gomatrixserverlib.ServerName
cooloffPeriod := 500 * time.Millisecond time.Sleep(time.Second)
shouldProcess := func(srv gomatrixserverlib.ServerName) bool { retriesMu.Lock()
// we should process requests when now is after the last process time + cooloff now := time.Now()
return time.Now().After(lastProcessed[srv].Add(cooloffPeriod)) for srv, retryAt := range retries {
} if retryAt.After(now) {
serversToRetry = append(serversToRetry, srv)
// on failure, spin up a short-lived goroutine to inject the server name again. }
scheduledRetries := make(map[gomatrixserverlib.ServerName]time.Time) }
inject := func(srv gomatrixserverlib.ServerName, duration time.Duration) { for _, srv := range serversToRetry {
time.Sleep(duration) delete(retries, srv)
ch <- srv }
} retriesMu.Unlock()
for _, srv := range serversToRetry {
for serverName := range ch { ch <- srv
if !shouldProcess(serverName) {
if time.Now().Before(scheduledRetries[serverName]) {
// do not inject into the channel as we know there will be a sleeping goroutine
// which will do it after the cooloff period expires
continue
} else {
scheduledRetries[serverName] = time.Now().Add(cooloffPeriod)
go inject(serverName, cooloffPeriod)
continue
} }
} }
lastProcessed[serverName] = time.Now() }()
for serverName := range ch {
waitTime, shouldRetry := u.processServer(serverName) waitTime, shouldRetry := u.processServer(serverName)
if shouldRetry { if shouldRetry {
scheduledRetries[serverName] = time.Now().Add(waitTime) retriesMu.Lock()
go inject(serverName, waitTime) _, exists := retries[serverName]
if !exists {
retries[serverName] = time.Now().Add(waitTime)
}
retriesMu.Unlock()
} }
} }
} }