WIP: Eagerly sync device lists on /user/keys/query requests

Also notify servers when a user's device display name changes. Few
caveats:
 - sytest `Device deletion propagates over federation` fails
 - `populateResponseWithDeviceKeysFromDatabase` is called from multiple
   goroutines and hence is unsafe.
This commit is contained in:
Kegan Dougal 2020-08-12 16:01:35 +01:00
parent d98ec12422
commit a53030792b
5 changed files with 137 additions and 17 deletions

View file

@ -110,6 +110,11 @@ type OneTimeKeysCount struct {
type PerformUploadKeysRequest struct { type PerformUploadKeysRequest struct {
DeviceKeys []DeviceKeys DeviceKeys []DeviceKeys
OneTimeKeys []OneTimeKeys OneTimeKeys []OneTimeKeys
// OnlyDisplayNameUpdates should be `true` if ALL the DeviceKeys are present to update
// the display name for their respective device, and NOT to modify the keys. The key
// itself doesn't change but it's easier to pretend upload new keys and reuse the same code paths.
// Without this flag, requests to modify device display names would delete device keys.
OnlyDisplayNameUpdates bool
} }
// PerformUploadKeysResponse is the response to PerformUploadKeys // PerformUploadKeysResponse is the response to PerformUploadKeys

View file

@ -144,6 +144,20 @@ func (u *DeviceListUpdater) mutex(userID string) *sync.Mutex {
return u.userIDToMutex[userID] return u.userIDToMutex[userID]
} }
// ManualUpdate invalidates the device list for the given user and fetches the latest and tracks it.
// Blocks until the device list is synced or the timeout is reached.
func (u *DeviceListUpdater) ManualUpdate(ctx context.Context, serverName gomatrixserverlib.ServerName, userID string) error {
mu := u.mutex(userID)
mu.Lock()
err := u.db.MarkDeviceListStale(ctx, userID, true)
mu.Unlock()
if err != nil {
return fmt.Errorf("ManualUpdate: failed to mark device list for %s as stale: %w", userID, err)
}
u.notifyWorkers(userID)
return nil
}
// Update blocks until the update has been stored in the database. It blocks primarily for satisfying sytest, // Update blocks until the update has been stored in the database. It blocks primarily for satisfying sytest,
// which assumes when /send 200 OKs that the device lists have been updated. // which assumes when /send 200 OKs that the device lists have been updated.
func (u *DeviceListUpdater) Update(ctx context.Context, event gomatrixserverlib.DeviceListUpdateEvent) error { func (u *DeviceListUpdater) Update(ctx context.Context, event gomatrixserverlib.DeviceListUpdateEvent) error {

View file

@ -28,6 +28,7 @@ import (
userapi "github.com/matrix-org/dendrite/userapi/api" userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util" "github.com/matrix-org/util"
"github.com/sirupsen/logrus"
"github.com/tidwall/gjson" "github.com/tidwall/gjson"
"github.com/tidwall/sjson" "github.com/tidwall/sjson"
) )
@ -282,27 +283,21 @@ func (a *KeyInternalAPI) remoteKeysFromDatabase(
fetchRemote := make(map[string]map[string][]string) fetchRemote := make(map[string]map[string][]string)
for domain, userToDeviceMap := range domainToDeviceKeys { for domain, userToDeviceMap := range domainToDeviceKeys {
for userID, deviceIDs := range userToDeviceMap { for userID, deviceIDs := range userToDeviceMap {
keys, err := a.DB.DeviceKeysForUser(ctx, userID, deviceIDs) // we can't safely return keys from the db when all devices are requested as we don't
// if we can't query the db or there are fewer keys than requested, fetch from remote.
// Likewise, we can't safely return keys from the db when all devices are requested as we don't
// know if one has just been added. // know if one has just been added.
if len(deviceIDs) == 0 || err != nil || len(keys) < len(deviceIDs) { if len(deviceIDs) > 0 {
err := a.populateResponseWithDeviceKeysFromDatabase(ctx, res, userID, deviceIDs)
if err == nil {
continue
}
util.GetLogger(ctx).WithError(err).Error("populateResponseWithDeviceKeysFromDatabase")
}
// fetch device lists from remote
if _, ok := fetchRemote[domain]; !ok { if _, ok := fetchRemote[domain]; !ok {
fetchRemote[domain] = make(map[string][]string) fetchRemote[domain] = make(map[string][]string)
} }
fetchRemote[domain][userID] = append(fetchRemote[domain][userID], deviceIDs...) fetchRemote[domain][userID] = append(fetchRemote[domain][userID], deviceIDs...)
continue
}
if res.DeviceKeys[userID] == nil {
res.DeviceKeys[userID] = make(map[string]json.RawMessage)
}
for _, key := range keys {
// inject the display name
key.KeyJSON, _ = sjson.SetBytes(key.KeyJSON, "unsigned", struct {
DisplayName string `json:"device_display_name,omitempty"`
}{key.DisplayName})
res.DeviceKeys[userID][key.DeviceID] = key.KeyJSON
}
} }
} }
return fetchRemote return fetchRemote
@ -324,6 +319,45 @@ func (a *KeyInternalAPI) queryRemoteKeys(
defer wg.Done() defer wg.Done()
fedCtx, cancel := context.WithTimeout(ctx, timeout) fedCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel() defer cancel()
// for users who we do not have any knowledge about, try to start doing device list updates for them
// by hitting /users/devices - otherwise fallback to /keys/query which has nicer bulk properties but
// lack a stream ID.
var userIDsForAllDevices []string
for userID, deviceIDs := range devKeys {
if len(deviceIDs) == 0 {
userIDsForAllDevices = append(userIDsForAllDevices, userID)
delete(devKeys, userID)
}
}
for _, userID := range userIDsForAllDevices {
err := a.Updater.ManualUpdate(context.Background(), gomatrixserverlib.ServerName(serverName), userID)
if err != nil {
logrus.WithFields(logrus.Fields{
logrus.ErrorKey: err,
"user_id": userID,
"server": serverName,
}).Error("Failed to manually update device lists for user")
// try to do it via /keys/query
devKeys[userID] = []string{}
continue
}
// refresh entries from DB: unlike remoteKeysFromDatabase we know we previously had no device info for this
// user so the fact that we're populating all devices here isn't a problem so long as we have devices.
err = a.populateResponseWithDeviceKeysFromDatabase(ctx, res, userID, nil)
if err != nil {
logrus.WithFields(logrus.Fields{
logrus.ErrorKey: err,
"user_id": userID,
"server": serverName,
}).Error("Failed to manually update device lists for user")
// try to do it via /keys/query
devKeys[userID] = []string{}
continue
}
}
if len(devKeys) == 0 {
return
}
queryKeysResp, err := a.FedClient.QueryKeys(fedCtx, gomatrixserverlib.ServerName(serverName), devKeys) queryKeysResp, err := a.FedClient.QueryKeys(fedCtx, gomatrixserverlib.ServerName(serverName), devKeys)
if err != nil { if err != nil {
failMu.Lock() failMu.Lock()
@ -357,6 +391,33 @@ func (a *KeyInternalAPI) queryRemoteKeys(
} }
} }
func (a *KeyInternalAPI) populateResponseWithDeviceKeysFromDatabase(
ctx context.Context, res *api.QueryKeysResponse, userID string, deviceIDs []string,
) error {
keys, err := a.DB.DeviceKeysForUser(ctx, userID, deviceIDs)
// if we can't query the db or there are fewer keys than requested, fetch from remote.
if err != nil {
return fmt.Errorf("DeviceKeysForUser %s %v failed: %w", userID, deviceIDs, err)
}
if len(keys) < len(deviceIDs) {
return fmt.Errorf("DeviceKeysForUser %s returned fewer devices than requested, falling back to remote", userID)
}
if len(deviceIDs) == 0 && len(keys) == 0 {
return fmt.Errorf("DeviceKeysForUser %s returned no keys but wanted all keys, falling back to remote", userID)
}
if res.DeviceKeys[userID] == nil {
res.DeviceKeys[userID] = make(map[string]json.RawMessage)
}
for _, key := range keys {
// inject the display name
key.KeyJSON, _ = sjson.SetBytes(key.KeyJSON, "unsigned", struct {
DisplayName string `json:"device_display_name,omitempty"`
}{key.DisplayName})
res.DeviceKeys[userID][key.DeviceID] = key.KeyJSON
}
return nil
}
func (a *KeyInternalAPI) uploadLocalDeviceKeys(ctx context.Context, req *api.PerformUploadKeysRequest, res *api.PerformUploadKeysResponse) { func (a *KeyInternalAPI) uploadLocalDeviceKeys(ctx context.Context, req *api.PerformUploadKeysRequest, res *api.PerformUploadKeysResponse) {
var keysToStore []api.DeviceMessage var keysToStore []api.DeviceMessage
// assert that the user ID / device ID are not lying for each key // assert that the user ID / device ID are not lying for each key
@ -403,6 +464,10 @@ func (a *KeyInternalAPI) uploadLocalDeviceKeys(ctx context.Context, req *api.Per
} }
return return
} }
if req.OnlyDisplayNameUpdates {
// add the display name field from keysToStore into existingKeys
keysToStore = appendDisplayNames(existingKeys, keysToStore)
}
// store the device keys and emit changes // store the device keys and emit changes
err := a.DB.StoreLocalDeviceKeys(ctx, keysToStore) err := a.DB.StoreLocalDeviceKeys(ctx, keysToStore)
if err != nil { if err != nil {
@ -475,3 +540,16 @@ func (a *KeyInternalAPI) emitDeviceKeyChanges(existing, new []api.DeviceMessage)
} }
return a.Producer.ProduceKeyChanges(keysAdded) return a.Producer.ProduceKeyChanges(keysAdded)
} }
func appendDisplayNames(existing, new []api.DeviceMessage) []api.DeviceMessage {
for i, existingDevice := range existing {
for _, newDevice := range new {
if existingDevice.DeviceID != newDevice.DeviceID {
continue
}
existingDevice.DisplayName = newDevice.DisplayName
existing[i] = existingDevice
}
}
return existing
}

View file

@ -146,6 +146,8 @@ If remote user leaves room we no longer receive device updates
If a device list update goes missing, the server resyncs on the next one If a device list update goes missing, the server resyncs on the next one
Get left notifs in sync and /keys/changes when other user leaves Get left notifs in sync and /keys/changes when other user leaves
Can query remote device keys using POST after notification Can query remote device keys using POST after notification
Server correctly resyncs when client query keys and there is no remote cache
Server correctly resyncs when server leaves and rejoins a room
Can add account data Can add account data
Can add account data to room Can add account data to room
Can get account data without syncing Can get account data without syncing

View file

@ -180,6 +180,27 @@ func (a *UserInternalAPI) PerformDeviceUpdate(ctx context.Context, req *api.Perf
util.GetLogger(ctx).WithError(err).Error("deviceDB.UpdateDevice failed") util.GetLogger(ctx).WithError(err).Error("deviceDB.UpdateDevice failed")
return err return err
} }
if req.DisplayName != nil && dev.DisplayName != *req.DisplayName {
// display name has changed: update the device key
var uploadRes keyapi.PerformUploadKeysResponse
a.KeyAPI.PerformUploadKeys(context.Background(), &keyapi.PerformUploadKeysRequest{
DeviceKeys: []keyapi.DeviceKeys{
{
DeviceID: dev.ID,
DisplayName: *req.DisplayName,
KeyJSON: nil,
UserID: dev.UserID,
},
},
OnlyDisplayNameUpdates: true,
}, &uploadRes)
if uploadRes.Error != nil {
return fmt.Errorf("Failed to update device key display name: %v", uploadRes.Error)
}
if len(uploadRes.KeyErrors) > 0 {
return fmt.Errorf("Failed to update device key display name, key errors: %+v", uploadRes.KeyErrors)
}
}
return nil return nil
} }