From e87740fd7b0e1a5702a4191cef3f57442d944c62 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Wed, 12 Aug 2020 11:51:23 +0100 Subject: [PATCH] Add sync mechanism to block when updating device lists With a timeout, mainly for sytest to fix the test "Server correctly handles incoming m.device_list_update" which is flakey because it assumes that when `/send` 200 OKs that the server has updated the device lists in prep for `/keys/query` which is not always true when using workers. --- keyserver/internal/device_list_update.go | 51 +++++++++++++++++++++--- 1 file changed, 45 insertions(+), 6 deletions(-) diff --git a/keyserver/internal/device_list_update.go b/keyserver/internal/device_list_update.go index 85785b07c..1c4f0b970 100644 --- a/keyserver/internal/device_list_update.go +++ b/keyserver/internal/device_list_update.go @@ -67,6 +67,11 @@ type DeviceListUpdater struct { producer KeyChangeProducer fedClient *gomatrixserverlib.FederationClient workerChans []chan gomatrixserverlib.ServerName + + // When device lists are stale for a user, they get inserted into this map with a channel which `Update` will + // block on or timeout via a select. + userIDToChan map[string]chan bool + userIDToChanMu *sync.Mutex } // DeviceListUpdaterDatabase is the subset of functionality from storage.Database required for the updater. @@ -98,12 +103,14 @@ func NewDeviceListUpdater( numWorkers int, ) *DeviceListUpdater { return &DeviceListUpdater{ - userIDToMutex: make(map[string]*sync.Mutex), - mu: &sync.Mutex{}, - db: db, - producer: producer, - fedClient: fedClient, - workerChans: make([]chan gomatrixserverlib.ServerName, numWorkers), + userIDToMutex: make(map[string]*sync.Mutex), + mu: &sync.Mutex{}, + db: db, + producer: producer, + fedClient: fedClient, + workerChans: make([]chan gomatrixserverlib.ServerName, numWorkers), + userIDToChan: make(map[string]chan bool), + userIDToChanMu: &sync.Mutex{}, } } @@ -137,6 +144,8 @@ func (u *DeviceListUpdater) mutex(userID string) *sync.Mutex { return u.userIDToMutex[userID] } +// Update blocks until the update has been stored in the database. It blocks primarily for satisfying sytest, +// which assumes when /send 200 OKs that the device lists have been updated. func (u *DeviceListUpdater) Update(ctx context.Context, event gomatrixserverlib.DeviceListUpdateEvent) error { isDeviceListStale, err := u.update(ctx, event) if err != nil { @@ -213,7 +222,35 @@ func (u *DeviceListUpdater) notifyWorkers(userID string) { hash := fnv.New32a() _, _ = hash.Write([]byte(remoteServer)) index := int(hash.Sum32()) % len(u.workerChans) + + ch := u.assignChannel(userID) u.workerChans[index] <- remoteServer + select { + case <-ch: + case <-time.After(10 * time.Second): + // we don't return an error in this case as it's not a failure condition. + // we mainly block for the benefit of sytest anyway + } +} + +func (u *DeviceListUpdater) assignChannel(userID string) chan bool { + u.userIDToChanMu.Lock() + defer u.userIDToChanMu.Unlock() + if ch, ok := u.userIDToChan[userID]; ok { + return ch + } + ch := make(chan bool) + u.userIDToChan[userID] = ch + return ch +} + +func (u *DeviceListUpdater) clearChannel(userID string) { + u.userIDToChanMu.Lock() + defer u.userIDToChanMu.Unlock() + if ch, ok := u.userIDToChan[userID]; ok { + close(ch) + delete(u.userIDToChan, userID) + } } func (u *DeviceListUpdater) worker(ch chan gomatrixserverlib.ServerName) { @@ -285,6 +322,8 @@ func (u *DeviceListUpdater) processServer(serverName gomatrixserverlib.ServerNam if err != nil { logger.WithError(err).WithField("user_id", userID).Error("fetched device list but failed to store/emit it") hasFailures = true + } else { + u.clearChannel(userID) } } return hasFailures